diff --git a/framework/ipc/src/org/apache/cloudstack/framework/client/ClientTransportConnection.java b/framework/ipc/src/org/apache/cloudstack/framework/client/ClientTransportConnection.java new file mode 100644 index 00000000000..34cd5efeb15 --- /dev/null +++ b/framework/ipc/src/org/apache/cloudstack/framework/client/ClientTransportConnection.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cloudstack.framework.client; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.cloudstack.framework.transport.TransportAddress; +import org.apache.cloudstack.framework.transport.TransportAttachResponsePdu; +import org.apache.cloudstack.framework.transport.TransportConnectResponsePdu; +import org.apache.cloudstack.framework.transport.TransportPdu; + +public class ClientTransportConnection { + enum State { + Idle, + Connecting, + Open, + Closing + } + + private ClientTransportProvider _provider; + + // TODO, use state machine + private State _state = State.Idle; + + private TransportAddress _connectionTpAddress; + private List _outputQueue = new ArrayList(); + + public ClientTransportConnection(ClientTransportProvider provider) { + _provider = provider; + } + + public void connect(String serverAddress, int serverPort) { + boolean doConnect = false; + synchronized(this) { + if(_state == State.Idle) { + setState(State.Connecting); + doConnect = true; + } + } + + if(doConnect) { + // ??? + } + } + + public void handleConnectResponsePdu(TransportConnectResponsePdu pdu) { + // TODO assume it is always succeeds + _connectionTpAddress = TransportAddress.fromAddressString(pdu.getDestAddress()); + + // ??? + } + + public void handleAttachResponsePdu(TransportAttachResponsePdu pdu) { + // ??? + } + + private void setState(State state) { + synchronized(this) { + if(_state != state) { + _state = state; + } + } + } +} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/client/ClientTransportEndpointSite.java b/framework/ipc/src/org/apache/cloudstack/framework/client/ClientTransportEndpointSite.java new file mode 100644 index 00000000000..d75c5b6636a --- /dev/null +++ b/framework/ipc/src/org/apache/cloudstack/framework/client/ClientTransportEndpointSite.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cloudstack.framework.client; + +import org.apache.cloudstack.framework.transport.TransportEndpoint; +import org.apache.cloudstack.framework.transport.TransportEndpointSite; +import org.apache.cloudstack.framework.transport.TransportProvider; + +public class ClientTransportEndpointSite extends TransportEndpointSite { + private String _predefinedAddress; + private int _providerKey; + + public ClientTransportEndpointSite(TransportProvider provider, TransportEndpoint endpoint, String predefinedAddress, int providerKey) { + super(provider, endpoint); + + _predefinedAddress = predefinedAddress; + _providerKey = providerKey; + } + + public String getPredefinedAddress() { + return _predefinedAddress; + } + + public int getProviderKey() { + return _providerKey; + } + + public void setProviderKey(int providerKey) { + _providerKey = providerKey; + } +} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/client/ClientTransportProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/client/ClientTransportProvider.java index 3d76e3b41d0..bd93824ea85 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/client/ClientTransportProvider.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/client/ClientTransportProvider.java @@ -18,19 +18,78 @@ */ package org.apache.cloudstack.framework.client; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + import org.apache.cloudstack.framework.serializer.MessageSerializer; import org.apache.cloudstack.framework.transport.TransportEndpoint; import org.apache.cloudstack.framework.transport.TransportEndpointSite; import org.apache.cloudstack.framework.transport.TransportProvider; +import com.cloud.utils.concurrency.NamedThreadFactory; + public class ClientTransportProvider implements TransportProvider { + public static final int DEFAULT_WORKER_POOL_SIZE = 5; + private Map _endpointSites = new HashMap(); + private Map _attachedMap = new HashMap(); + private MessageSerializer _messageSerializer; + + private ClientTransportConnection _connection; + private String _serverAddress; + private int _serverPort; + + private int _poolSize = DEFAULT_WORKER_POOL_SIZE; + private ExecutorService _executor; + + private int _nextProviderKey = 1; + + public ClientTransportProvider() { + } + + public ClientTransportProvider setPoolSize(int poolSize) { + _poolSize = poolSize; + return this; + } + + public void initialize(String serverAddress, int serverPort) { + _serverAddress = serverAddress; + _serverPort = serverPort; + + _executor = Executors.newFixedThreadPool(_poolSize, new NamedThreadFactory("Transport-Worker")); + _connection = new ClientTransportConnection(this); + + _executor.execute(new Runnable() { + @Override + public void run() { + try { + _connection.connect(_serverAddress, _serverPort); + } catch(Throwable e) { + } + } + }); + } + @Override public TransportEndpointSite attach(TransportEndpoint endpoint, String predefinedAddress) { - // TODO Auto-generated method stub - return null; + + ClientTransportEndpointSite endpointSite; + synchronized(this) { + endpointSite = getEndpointSite(endpoint); + if(endpointSite != null) { + // already attached + return endpointSite; + } + + endpointSite = new ClientTransportEndpointSite(this, endpoint, predefinedAddress, getNextProviderKey()); + _endpointSites.put(endpointSite.getProviderKey(), endpointSite); + } + + return endpointSite; } @Override @@ -61,4 +120,21 @@ public class ClientTransportProvider implements TransportProvider { String multiplexier, String message) { // TODO } + + private ClientTransportEndpointSite getEndpointSite(TransportEndpoint endpoint) { + synchronized(this) { + for(ClientTransportEndpointSite endpointSite : _endpointSites.values()) { + if(endpointSite.getEndpoint() == endpoint) + return endpointSite; + } + } + + return null; + } + + public int getNextProviderKey() { + synchronized(this) { + return _nextProviderKey++; + } + } } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcProviderImpl.java b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcProviderImpl.java index 7f73e605b1f..a68a65e91ea 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcProviderImpl.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcProviderImpl.java @@ -167,7 +167,7 @@ public class RpcProviderImpl implements RpcProvider { RpcServerCall call = new RpcServerCallImpl(this, sourceAddress, targetAddress, pdu); // TODO, we are trying to avoid locking when calling into callbacks - // this can be optimized later + // this should be optimized later List endpoints = new ArrayList(); synchronized(_serviceEndpoints) { endpoints.addAll(_serviceEndpoints); diff --git a/framework/ipc/src/org/apache/cloudstack/framework/server/ServerTransportProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/server/ServerTransportProvider.java index d5dae2ed20d..b19a7c9265f 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/server/ServerTransportProvider.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/server/ServerTransportProvider.java @@ -95,10 +95,10 @@ public class ServerTransportProvider implements TransportProvider { String endpointId; if(predefinedAddress != null && !predefinedAddress.isEmpty()) { endpointId = predefinedAddress; - transportAddress = new TransportAddress(_nodeId, endpointId, 0); + transportAddress = new TransportAddress(_nodeId, TransportAddress.LOCAL_SERVICE_CONNECTION, endpointId, 0); } else { endpointId = String.valueOf(getNextEndpointId()); - transportAddress = new TransportAddress(_nodeId, endpointId); + transportAddress = new TransportAddress(_nodeId, TransportAddress.LOCAL_SERVICE_CONNECTION, endpointId); } TransportEndpointSite endpointSite; diff --git a/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportAddress.java b/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportAddress.java index 4a5ad794089..e3cf9684e19 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportAddress.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportAddress.java @@ -23,29 +23,33 @@ import java.util.Random; public class TransportAddress { public final static String LOCAL_SERVICE_NODE = ""; + public final static int LOCAL_SERVICE_CONNECTION = 0; private String _nodeId = LOCAL_SERVICE_NODE; + private int _connectionId = LOCAL_SERVICE_CONNECTION; private String _endpointId; private int _magic; - public TransportAddress(String nodeId, String endpointId) { + public TransportAddress(String nodeId, int connectionId, String endpointId) { assert(nodeId != null); assert(endpointId != null); assert(nodeId.indexOf(".") < 0); assert(endpointId.indexOf(".") < 0); _nodeId = nodeId; + _connectionId = connectionId; _endpointId = endpointId; _magic = new Random().nextInt(); } - public TransportAddress(String nodeId, String endpointId, int magic) { + public TransportAddress(String nodeId, int connectionId, String endpointId, int magic) { assert(nodeId != null); assert(endpointId != null); assert(nodeId.indexOf(".") < 0); assert(endpointId.indexOf(".") < 0); _nodeId = nodeId; + _connectionId = connectionId; _endpointId = endpointId; _magic = magic; } @@ -59,6 +63,14 @@ public class TransportAddress { return this; } + public int getConnectionId() { + return _connectionId; + } + + public void setConnectionId(int connectionId) { + _connectionId = connectionId; + } + public String getEndpointId() { return _endpointId; } @@ -73,20 +85,21 @@ public class TransportAddress { return null; String tokens[] = addressString.split("\\."); - if(tokens.length != 3) + if(tokens.length != 4) return null; - return new TransportAddress(tokens[0], tokens[1], Integer.parseInt(tokens[2])); + return new TransportAddress(tokens[0], Integer.parseInt(tokens[1]), tokens[2], Integer.parseInt(tokens[3])); } public static TransportAddress getLocalPredefinedTransportAddress(String predefinedIdentifier) { - return new TransportAddress(LOCAL_SERVICE_NODE, predefinedIdentifier, 0); + return new TransportAddress(LOCAL_SERVICE_NODE, LOCAL_SERVICE_CONNECTION, predefinedIdentifier, 0); } @Override public int hashCode() { int hashCode = _magic; hashCode = (hashCode << 3) ^ _nodeId.hashCode(); + hashCode = (hashCode << 3) ^ _connectionId; hashCode = (hashCode << 3) ^ _endpointId.hashCode(); return hashCode; @@ -103,7 +116,8 @@ public class TransportAddress { if(this == other) return true; - return _nodeId.equals(((TransportAddress)other)._nodeId) && + return _nodeId.equals(((TransportAddress)other)._nodeId) && + _connectionId == (((TransportAddress)other)._connectionId) && _endpointId.equals(((TransportAddress)other)._endpointId) && _magic == ((TransportAddress)other)._magic; } @@ -114,6 +128,8 @@ public class TransportAddress { if(_nodeId != null) sb.append(_nodeId); sb.append("."); + sb.append(_connectionId); + sb.append("."); sb.append(_endpointId); sb.append("."); sb.append(_magic); diff --git a/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportAttachRequestPdu.java b/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportAttachRequestPdu.java new file mode 100644 index 00000000000..736ae298756 --- /dev/null +++ b/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportAttachRequestPdu.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cloudstack.framework.transport; + +public class TransportAttachRequestPdu extends TransportPdu { + private int _endpointProviderKey; + + public TransportAttachRequestPdu() { + } + + public int getEndpointProviderKey() { + return _endpointProviderKey; + } + + public void setEndpointProviderKey(int endpointProviderKey) { + _endpointProviderKey = endpointProviderKey; + } +} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportAttachResponsePdu.java b/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportAttachResponsePdu.java new file mode 100644 index 00000000000..b2d15c6a2d7 --- /dev/null +++ b/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportAttachResponsePdu.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cloudstack.framework.transport; + +public class TransportAttachResponsePdu extends TransportPdu { + private int _statusCode; + private int _endpointProviderKey; + + public TransportAttachResponsePdu() { + } + + public int getStatusCode() { + return _statusCode; + } + + public void setStatusCode(int statusCode) { + _statusCode = statusCode; + } + + public int getEndpointProviderKey() { + return _endpointProviderKey; + } + + public void setEndpointProviderKey(int endpointProviderKey) { + _endpointProviderKey = endpointProviderKey; + } +} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportConnectRequestPdu.java b/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportConnectRequestPdu.java new file mode 100644 index 00000000000..5b50e2487fa --- /dev/null +++ b/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportConnectRequestPdu.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cloudstack.framework.transport; + +import org.apache.cloudstack.framework.serializer.OnwireName; + +@OnwireName(name="TransportConnectRequestPdu") +public class TransportConnectRequestPdu extends TransportPdu { + String _authIdentity; + String _authCredential; + + public TransportConnectRequestPdu() { + } + + public String getAuthIdentity() { + return _authIdentity; + } + + public void setAuthIdentity(String authIdentity) { + _authIdentity = authIdentity; + } + + public String getAuthCredential() { + return _authCredential; + } + + public void setAuthCredential(String authCredential) { + _authCredential = authCredential; + } +} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportConnectResponsePdu.java b/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportConnectResponsePdu.java new file mode 100644 index 00000000000..8015ad92dd1 --- /dev/null +++ b/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportConnectResponsePdu.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cloudstack.framework.transport; + +import org.apache.cloudstack.framework.serializer.OnwireName; + +@OnwireName(name="TransportConnectRequestPdu") +public class TransportConnectResponsePdu extends TransportPdu { + private int _statusCode; + + public TransportConnectResponsePdu() { + } + + public int getStatusCode() { + return _statusCode; + } + + public void setStatusCode(int statusCode) { + _statusCode = statusCode; + } +} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportEndpointSite.java b/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportEndpointSite.java index e82d702a411..eb55190c1ff 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportEndpointSite.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportEndpointSite.java @@ -44,6 +44,16 @@ public class TransportEndpointSite { _outstandingSignalRequests = 0; } + + public TransportEndpointSite(TransportProvider provider, TransportEndpoint endpoint) { + assert(provider != null); + assert(endpoint != null); + + _provider = provider; + _endpoint = endpoint; + + _outstandingSignalRequests = 0; + } public TransportEndpoint getEndpoint() { return _endpoint;