diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java index ca6155be39c..82ed9f5f2ad 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java @@ -24,18 +24,25 @@ import java.util.List; import java.util.Map; public class TransportEndpointSite { + private TransportProvider _provider; private TransportEndpoint _endpoint; private TransportAddress _address; private List _outputQueue = new ArrayList(); private Map _multiplexierMap = new HashMap(); - public TransportEndpointSite(TransportEndpoint endpoint, TransportAddress address) { + private int _outstandingSignalRequests; + + public TransportEndpointSite(TransportProvider provider, TransportEndpoint endpoint, TransportAddress address) { + assert(provider != null); assert(endpoint != null); assert(address != null); + _provider = provider; _endpoint = endpoint; _address = address; + + _outstandingSignalRequests = 0; } public TransportEndpoint getEndpoint() { @@ -68,7 +75,7 @@ public class TransportEndpointSite { _outputQueue.add(pdu); } - processOutput(); + signalOutputProcessRequest(); } public TransportPdu getNextOutputPdu() { @@ -80,7 +87,7 @@ public class TransportEndpointSite { return null; } - private void processOutput() { + public void processOutput() { TransportPdu pdu; TransportEndpoint endpoint = getEndpoint(); @@ -104,4 +111,24 @@ public class TransportEndpointSite { return multiplexier; } + + private void signalOutputProcessRequest() { + boolean proceed = false; + synchronized(this) { + if(_outstandingSignalRequests == 0) { + _outstandingSignalRequests++; + proceed = true; + } + } + + if(proceed) + _provider.requestSiteOutput(this); + } + + public void ackOutputProcessSignal() { + synchronized(this) { + assert(_outstandingSignalRequests == 1); + _outstandingSignalRequests--; + } + } } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java index bdbdd179317..e25407fbe9a 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java @@ -22,6 +22,8 @@ public interface TransportProvider { TransportEndpointSite attach(TransportEndpoint endpoint, String predefinedAddress); boolean detach(TransportEndpoint endpoint); + void requestSiteOutput(TransportEndpointSite site); + void sendMessage(String soureEndpointAddress, String targetEndpointAddress, String multiplexier, String message); } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java index 60c07c3f1b5..c2bbef71e1f 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java @@ -37,6 +37,11 @@ public class ClientTransportProvider implements TransportProvider { return false; } + @Override + public void requestSiteOutput(TransportEndpointSite site) { + // ??? + } + @Override public void sendMessage(String soureEndpointAddress, String targetEndpointAddress, String multiplexier, String message) { diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java index 3372b75c361..014c8fe6207 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java @@ -21,6 +21,8 @@ package org.apache.cloudstack.framework.messaging.server; import java.util.HashMap; import java.util.Map; import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.cloudstack.framework.messaging.TransportAddress; import org.apache.cloudstack.framework.messaging.TransportDataPdu; @@ -28,20 +30,48 @@ import org.apache.cloudstack.framework.messaging.TransportEndpoint; import org.apache.cloudstack.framework.messaging.TransportEndpointSite; import org.apache.cloudstack.framework.messaging.TransportPdu; import org.apache.cloudstack.framework.messaging.TransportProvider; +import org.apache.log4j.Logger; + +import com.cloud.utils.concurrency.NamedThreadFactory; public class ServerTransportProvider implements TransportProvider { + private static final Logger s_logger = Logger.getLogger(ServerTransportProvider.class); + + public static final int DEFAULT_WORKER_POOL_SIZE = 5; + private String _nodeId; private Map _endpointMap = new HashMap(); + private int _poolSize = DEFAULT_WORKER_POOL_SIZE; + private ExecutorService _executor; private int _nextEndpointId = new Random().nextInt(); public ServerTransportProvider() { } - public String getNodeId() { return _nodeId; } - public void setNodeId(String nodeId) { + public String getNodeId() { + return _nodeId; + } + + public ServerTransportProvider setNodeId(String nodeId) { _nodeId = nodeId; + return this; + } + + public int getWorkerPoolSize() { + return _poolSize; + } + + public ServerTransportProvider setWorkerPoolSize(int poolSize) { + assert(poolSize > 0); + + _poolSize = poolSize; + return this; + } + + public void initialize() { + _executor = Executors.newFixedThreadPool(_poolSize, new NamedThreadFactory("Transport-Worker")); } @Override @@ -64,7 +94,7 @@ public class ServerTransportProvider implements TransportProvider { // already attached return endpointSite; } - endpointSite = new TransportEndpointSite(endpoint, transportAddress); + endpointSite = new TransportEndpointSite(this, endpoint, transportAddress); _endpointMap.put(endpointId, endpointSite); } @@ -86,6 +116,22 @@ public class ServerTransportProvider implements TransportProvider { return false; } + @Override + public void requestSiteOutput(final TransportEndpointSite site) { + _executor.execute(new Runnable() { + + @Override + public void run() { + try { + site.processOutput(); + site.ackOutputProcessSignal(); + } catch(Throwable e) { + s_logger.error("Unhandled exception", e); + } + } + }); + } + @Override public void sendMessage(String sourceEndpointAddress, String targetEndpointAddress, String multiplexier, String message) {