diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java index 73d55b21081..3b766e80e81 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java @@ -24,16 +24,16 @@ public interface RpcProvider extends TransportMultiplexier { void setMessageSerializer(MessageSerializer messageSerializer); MessageSerializer getMessageSerializer(); - void registerRpcServiceEndpoint(String serviceAddress, RpcServiceEndpoint rpcEndpoint); - void unregisteRpcServiceEndpoint(RpcAddressable serviceAddress, RpcServiceEndpoint rpcEndpoint); + void registerRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint); + void unregisteRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint); RpcClientCall newCall(String targetAddress); - RpcClientCall newCall(RpcAddressable targetAddress); + RpcClientCall newCall(TransportAddressMapper targetAddress); // // low-level public API // - RpcClientCall newCall(TransportEndpoint sourceEndpoint, RpcAddressable targetAddress); + RpcClientCall newCall(TransportEndpoint sourceEndpoint, TransportAddressMapper targetAddress); RpcClientCall newCall(TransportEndpoint sourceEndpoint, String targetAddress); void registerCall(RpcClientCall call); diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java index cae85ad05cd..d4355fa246a 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java @@ -69,14 +69,14 @@ public class RpcProviderImpl implements RpcProvider { } @Override - public void registerRpcServiceEndpoint(String serviceAddress, RpcServiceEndpoint rpcEndpoint) { + public void registerRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint) { synchronized(_serviceEndpoints) { _serviceEndpoints.add(rpcEndpoint); } } @Override - public void unregisteRpcServiceEndpoint(RpcAddressable serviceAddress, RpcServiceEndpoint rpcEndpoint) { + public void unregisteRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint) { synchronized(_serviceEndpoints) { _serviceEndpoints.remove(rpcEndpoint); } @@ -94,7 +94,7 @@ public class RpcProviderImpl implements RpcProvider { } @Override - public RpcClientCall newCall(TransportEndpoint sourceEndpoint, RpcAddressable targetAddress) { + public RpcClientCall newCall(TransportEndpoint sourceEndpoint, TransportAddressMapper targetAddress) { long callTag = getNextCallTag(); RpcClientCallImpl call = new RpcClientCallImpl(this); call.setSourceAddress(sourceEndpoint.getEndpointAddress()); @@ -112,7 +112,7 @@ public class RpcProviderImpl implements RpcProvider { } @Override - public RpcClientCall newCall(RpcAddressable targetAddress) { + public RpcClientCall newCall(TransportAddressMapper targetAddress) { return newCall(targetAddress.getAddress()); } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportAddress.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportAddress.java new file mode 100644 index 00000000000..49a45156962 --- /dev/null +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportAddress.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cloudstack.framework.messaging; + +import java.util.Random; + +public class TransportAddress { + public final static String LOCAL_SERVICE_NODE = ""; + + private String _nodeId = LOCAL_SERVICE_NODE; + private String _endpointId; + private int _magic; + + public TransportAddress(String nodeId, String endpointId) { + assert(nodeId != null); + assert(endpointId != null); + assert(nodeId.indexOf(".") < 0); + assert(endpointId.indexOf(".") < 0); + + _nodeId = nodeId; + _endpointId = endpointId; + _magic = new Random().nextInt(); + } + + public TransportAddress(String nodeId, String endpointId, int magic) { + assert(nodeId != null); + assert(endpointId != null); + assert(nodeId.indexOf(".") < 0); + assert(endpointId.indexOf(".") < 0); + + _nodeId = nodeId; + _endpointId = endpointId; + _magic = magic; + } + + public String getNodeId() { + return _nodeId; + } + + public TransportAddress setNodeId(String nodeId) { + _nodeId = nodeId; + return this; + } + + public String getEndpointId() { + return _endpointId; + } + + public TransportAddress setEndpointId(String endpointId) { + _endpointId = endpointId; + return this; + } + + public static TransportAddress fromAddressString(String addressString) { + if(addressString == null || addressString.isEmpty()) + return null; + + String tokens[] = addressString.split("\\."); + if(tokens.length != 3) + return null; + + return new TransportAddress(tokens[0], tokens[1], Integer.parseInt(tokens[2])); + } + + public static TransportAddress getLocalPredefinedTransportAddress(String predefinedIdentifier) { + return new TransportAddress(LOCAL_SERVICE_NODE, predefinedIdentifier, 0); + } + + @Override + public int hashCode() { + int hashCode = _magic; + hashCode = (hashCode << 3) ^ _nodeId.hashCode(); + hashCode = (hashCode << 3) ^ _endpointId.hashCode(); + + return hashCode; + } + + @Override + public boolean equals(Object other) { + if(other == null) + return false; + + if(!(other instanceof TransportAddress)) + return false; + + if(this == other) + return true; + + return _nodeId.equals(((TransportAddress)other)._nodeId) && + _endpointId.equals(((TransportAddress)other)._endpointId) && + _magic == ((TransportAddress)other)._magic; + } + + @Override + public String toString() { + StringBuffer sb = new StringBuffer(); + if(_nodeId != null) + sb.append(_nodeId); + sb.append("."); + sb.append(_endpointId); + sb.append("."); + sb.append(_magic); + + return sb.toString(); + } +} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcAddressable.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportAddressMapper.java similarity index 95% rename from framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcAddressable.java rename to framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportAddressMapper.java index 0bf62cf76a4..11b43660d14 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcAddressable.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportAddressMapper.java @@ -18,6 +18,6 @@ */ package org.apache.cloudstack.framework.messaging; -public interface RpcAddressable { +public interface TransportAddressMapper { String getAddress(); } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportDataPdu.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportDataPdu.java new file mode 100644 index 00000000000..1e6b323766f --- /dev/null +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportDataPdu.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cloudstack.framework.messaging; + +@OnwireName(name="TransportDataPdu") +public class TransportDataPdu extends TransportPdu { + + private String _multiplexier; + private String _content; + + public TransportDataPdu() { + } + + public String getMultiplexier() { + return _multiplexier; + } + + public void setMultiplexier(String multiplexier) { + _multiplexier = multiplexier; + } + + public String getContent() { + return _content; + } + + public void setContent(String content) { + _content = content; + } +} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java new file mode 100644 index 00000000000..4af777271f3 --- /dev/null +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cloudstack.framework.messaging; + +import java.util.ArrayList; +import java.util.List; + +public class TransportEndpointSite { + private TransportEndpoint _endpoint; + private TransportAddress _address; + + private List _outputQueue = new ArrayList(); + + public TransportEndpointSite(TransportEndpoint endpoint, TransportAddress address) { + assert(endpoint != null); + assert(address != null); + + _endpoint = endpoint; + _address = address; + } + + public TransportEndpoint getEndpoint() { + return _endpoint; + } + + public TransportAddress getAddress() { + return _address; + } + + public void setAddress(TransportAddress address) { + _address = address; + } + + public void addOutputPdu(TransportPdu pdu) { + synchronized(this) { + _outputQueue.add(pdu); + } + + processOutput(); + } + + public TransportPdu getNextOutputPdu() { + synchronized(this) { + if(_outputQueue.size() > 0) + return _outputQueue.remove(0); + } + + return null; + } + + private void processOutput() { + TransportPdu pdu; + while((pdu = getNextOutputPdu()) != null) { + // ??? + } + } +} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportPdu.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportPdu.java new file mode 100644 index 00000000000..1bfb367bf0a --- /dev/null +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportPdu.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cloudstack.framework.messaging; + +public class TransportPdu { + protected String _sourceAddress; + protected String _destAddress; + + public TransportPdu() { + } + + public String getSourceAddress() { return _sourceAddress; } + public void setSourceAddress(String sourceAddress) { + _sourceAddress = sourceAddress; + } + + public String getDestAddress() { + return _destAddress; + } + + public void setDestAddress(String destAddress) { + _destAddress = destAddress; + } +} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java index c843b061157..132aa9a76da 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java @@ -19,8 +19,8 @@ package org.apache.cloudstack.framework.messaging; public interface TransportProvider { - void attach(TransportEndpoint endpoint, String predefinedAddress); - void detach(TransportEndpoint endpoint); + boolean attach(TransportEndpoint endpoint, String predefinedAddress); + boolean detach(TransportEndpoint endpoint); void sendMessage(String soureEndpointAddress, String targetEndpointAddress, String multiplexier, String message); diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java index 551838eabf0..2189c7ab165 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java @@ -24,13 +24,16 @@ import org.apache.cloudstack.framework.messaging.TransportProvider; public class ClientTransportProvider implements TransportProvider { @Override - public void attach(TransportEndpoint endpoint, String predefinedAddress) { + public boolean attach(TransportEndpoint endpoint, String predefinedAddress) { // TODO Auto-generated method stub + return false; } @Override - public void detach(TransportEndpoint endpoint) { + public boolean detach(TransportEndpoint endpoint) { // TODO Auto-generated method stub + + return false; } @Override @@ -38,5 +41,4 @@ public class ClientTransportProvider implements TransportProvider { String multiplexier, String message) { // TODO } - } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java index 332e788c3f6..02674cbf7b4 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java @@ -18,26 +18,121 @@ */ package org.apache.cloudstack.framework.messaging.server; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +import org.apache.cloudstack.framework.messaging.TransportAddress; +import org.apache.cloudstack.framework.messaging.TransportDataPdu; import org.apache.cloudstack.framework.messaging.TransportEndpoint; +import org.apache.cloudstack.framework.messaging.TransportEndpointSite; +import org.apache.cloudstack.framework.messaging.TransportPdu; import org.apache.cloudstack.framework.messaging.TransportProvider; public class ServerTransportProvider implements TransportProvider { + private String _nodeId; - @Override - public void attach(TransportEndpoint endpoint, String predefinedAddress) { - // TODO Auto-generated method stub - + private Map _endpointMap = new HashMap(); + + private int _nextEndpointId = new Random().nextInt(); + + public ServerTransportProvider() { } - - @Override - public void detach(TransportEndpoint endpoint) { - // TODO Auto-generated method stub - + + public String getNodeId() { return _nodeId; } + public void setNodeId(String nodeId) { + _nodeId = nodeId; } @Override - public void sendMessage(String soureEndpointAddress, String targetEndpointAddress, + public boolean attach(TransportEndpoint endpoint, String predefinedAddress) { + + TransportAddress transportAddress; + String endpointId; + if(predefinedAddress != null && !predefinedAddress.isEmpty()) { + endpointId = predefinedAddress; + transportAddress = new TransportAddress(_nodeId, endpointId, 0); + } else { + endpointId = String.valueOf(getNextEndpointId()); + transportAddress = new TransportAddress(_nodeId, endpointId); + } + + TransportEndpointSite endpointSite; + synchronized(this) { + endpointSite = _endpointMap.get(endpointId); + if(endpointSite != null) { + // already attached + return false; + } + endpointSite = new TransportEndpointSite(endpoint, transportAddress); + _endpointMap.put(endpointId, endpointSite); + } + + endpoint.onAttachConfirm(true, transportAddress.toString()); + return true; + } + + @Override + public boolean detach(TransportEndpoint endpoint) { + TransportAddress transportAddress = TransportAddress.fromAddressString(endpoint.getEndpointAddress()); + if(transportAddress == null) + return false; + + boolean found = false; + synchronized(this) { + TransportEndpointSite endpointSite = _endpointMap.get(transportAddress.getEndpointId()); + if(endpointSite.getAddress().equals(transportAddress)) { + found = true; + _endpointMap.remove(transportAddress.getEndpointId()); + } + } + + if(found) { + endpoint.onDetachIndication(endpoint.getEndpointAddress()); + return true; + } + + return false; + } + + @Override + public void sendMessage(String sourceEndpointAddress, String targetEndpointAddress, String multiplexier, String message) { - // TODO + + TransportDataPdu pdu = new TransportDataPdu(); + pdu.setSourceAddress(sourceEndpointAddress); + pdu.setDestAddress(targetEndpointAddress); + pdu.setMultiplexier(multiplexier); + pdu.setContent(message); + + dispatchPdu(pdu); + } + + private void dispatchPdu(TransportPdu pdu) { + + TransportAddress transportAddress = TransportAddress.fromAddressString(pdu.getDestAddress()); + + if(isLocalAddress(transportAddress)) { + TransportEndpointSite endpointSite = null; + synchronized(this) { + endpointSite = _endpointMap.get(transportAddress.getEndpointId()); + } + + if(endpointSite != null) + endpointSite.addOutputPdu(pdu); + } else { + // do cross-node forwarding + } + } + + private boolean isLocalAddress(TransportAddress address) { + if(address.getNodeId().equals(_nodeId) || address.getNodeId().equals(TransportAddress.LOCAL_SERVICE_NODE)) + return true; + + return false; + } + + private synchronized int getNextEndpointId() { + return _nextEndpointId++; } } diff --git a/framework/ipc/test/org/apache/cloudstack/framework/messaging/AsyncSampleEventDrivenStyleCaller.java b/framework/ipc/test/org/apache/cloudstack/framework/messaging/AsyncSampleEventDrivenStyleCaller.java index 849f28c1fa3..c0818dbfe94 100644 --- a/framework/ipc/test/org/apache/cloudstack/framework/messaging/AsyncSampleEventDrivenStyleCaller.java +++ b/framework/ipc/test/org/apache/cloudstack/framework/messaging/AsyncSampleEventDrivenStyleCaller.java @@ -19,7 +19,7 @@ package org.apache.cloudstack.framework.messaging; public class AsyncSampleEventDrivenStyleCaller { - AsyncSampleCallee _ds; + AsyncSampleCallee _ds = new AsyncSampleCallee(); AsyncCallbackDriver _callbackDriver; public void MethodThatWillCallAsyncMethod() { @@ -28,7 +28,7 @@ public class AsyncSampleEventDrivenStyleCaller { new AsyncCallbackDispatcher(this) .setOperationName("volume.create") .setContextParam("origVolume", vol) - .attachDriver(_callbackDriver)); + ); } @AsyncCallbackHandler(operationName="volume.create") @@ -37,4 +37,9 @@ public class AsyncSampleEventDrivenStyleCaller { TestVolume resultVol = callback.getResult(); } + + public static void main(String[] args) { + AsyncSampleEventDrivenStyleCaller caller = new AsyncSampleEventDrivenStyleCaller(); + caller.MethodThatWillCallAsyncMethod(); + } } diff --git a/framework/ipc/test/org/apache/cloudstack/framework/messaging/SampleComponent.java b/framework/ipc/test/org/apache/cloudstack/framework/messaging/SampleComponent.java index db1f74923d4..25d3452e6be 100644 --- a/framework/ipc/test/org/apache/cloudstack/framework/messaging/SampleComponent.java +++ b/framework/ipc/test/org/apache/cloudstack/framework/messaging/SampleComponent.java @@ -28,7 +28,7 @@ public class SampleComponent { public void init() { - _rpcProvider.registerRpcServiceEndpoint("AgentManager", + _rpcProvider.registerRpcServiceEndpoint( RpcServiceDispatcher.getDispatcher(this)); // subscribe to all network events (for example)