Add server side transport driver

This commit is contained in:
Kelven Yang 2012-12-12 11:44:53 -08:00
parent f52950689b
commit 11e9baca37
4 changed files with 86 additions and 6 deletions

View File

@ -24,18 +24,25 @@ import java.util.List;
import java.util.Map;
public class TransportEndpointSite {
private TransportProvider _provider;
private TransportEndpoint _endpoint;
private TransportAddress _address;
private List<TransportPdu> _outputQueue = new ArrayList<TransportPdu>();
private Map<String, TransportMultiplexier> _multiplexierMap = new HashMap<String, TransportMultiplexier>();
public TransportEndpointSite(TransportEndpoint endpoint, TransportAddress address) {
private int _outstandingSignalRequests;
public TransportEndpointSite(TransportProvider provider, TransportEndpoint endpoint, TransportAddress address) {
assert(provider != null);
assert(endpoint != null);
assert(address != null);
_provider = provider;
_endpoint = endpoint;
_address = address;
_outstandingSignalRequests = 0;
}
public TransportEndpoint getEndpoint() {
@ -68,7 +75,7 @@ public class TransportEndpointSite {
_outputQueue.add(pdu);
}
processOutput();
signalOutputProcessRequest();
}
public TransportPdu getNextOutputPdu() {
@ -80,7 +87,7 @@ public class TransportEndpointSite {
return null;
}
private void processOutput() {
public void processOutput() {
TransportPdu pdu;
TransportEndpoint endpoint = getEndpoint();
@ -104,4 +111,24 @@ public class TransportEndpointSite {
return multiplexier;
}
private void signalOutputProcessRequest() {
boolean proceed = false;
synchronized(this) {
if(_outstandingSignalRequests == 0) {
_outstandingSignalRequests++;
proceed = true;
}
}
if(proceed)
_provider.requestSiteOutput(this);
}
public void ackOutputProcessSignal() {
synchronized(this) {
assert(_outstandingSignalRequests == 1);
_outstandingSignalRequests--;
}
}
}

View File

@ -22,6 +22,8 @@ public interface TransportProvider {
TransportEndpointSite attach(TransportEndpoint endpoint, String predefinedAddress);
boolean detach(TransportEndpoint endpoint);
void requestSiteOutput(TransportEndpointSite site);
void sendMessage(String soureEndpointAddress, String targetEndpointAddress,
String multiplexier, String message);
}

View File

@ -37,6 +37,11 @@ public class ClientTransportProvider implements TransportProvider {
return false;
}
@Override
public void requestSiteOutput(TransportEndpointSite site) {
// ???
}
@Override
public void sendMessage(String soureEndpointAddress, String targetEndpointAddress,
String multiplexier, String message) {

View File

@ -21,6 +21,8 @@ package org.apache.cloudstack.framework.messaging.server;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.cloudstack.framework.messaging.TransportAddress;
import org.apache.cloudstack.framework.messaging.TransportDataPdu;
@ -28,20 +30,48 @@ import org.apache.cloudstack.framework.messaging.TransportEndpoint;
import org.apache.cloudstack.framework.messaging.TransportEndpointSite;
import org.apache.cloudstack.framework.messaging.TransportPdu;
import org.apache.cloudstack.framework.messaging.TransportProvider;
import org.apache.log4j.Logger;
import com.cloud.utils.concurrency.NamedThreadFactory;
public class ServerTransportProvider implements TransportProvider {
private static final Logger s_logger = Logger.getLogger(ServerTransportProvider.class);
public static final int DEFAULT_WORKER_POOL_SIZE = 5;
private String _nodeId;
private Map<String, TransportEndpointSite> _endpointMap = new HashMap<String, TransportEndpointSite>();
private int _poolSize = DEFAULT_WORKER_POOL_SIZE;
private ExecutorService _executor;
private int _nextEndpointId = new Random().nextInt();
public ServerTransportProvider() {
}
public String getNodeId() { return _nodeId; }
public void setNodeId(String nodeId) {
public String getNodeId() {
return _nodeId;
}
public ServerTransportProvider setNodeId(String nodeId) {
_nodeId = nodeId;
return this;
}
public int getWorkerPoolSize() {
return _poolSize;
}
public ServerTransportProvider setWorkerPoolSize(int poolSize) {
assert(poolSize > 0);
_poolSize = poolSize;
return this;
}
public void initialize() {
_executor = Executors.newFixedThreadPool(_poolSize, new NamedThreadFactory("Transport-Worker"));
}
@Override
@ -64,7 +94,7 @@ public class ServerTransportProvider implements TransportProvider {
// already attached
return endpointSite;
}
endpointSite = new TransportEndpointSite(endpoint, transportAddress);
endpointSite = new TransportEndpointSite(this, endpoint, transportAddress);
_endpointMap.put(endpointId, endpointSite);
}
@ -86,6 +116,22 @@ public class ServerTransportProvider implements TransportProvider {
return false;
}
@Override
public void requestSiteOutput(final TransportEndpointSite site) {
_executor.execute(new Runnable() {
@Override
public void run() {
try {
site.processOutput();
site.ackOutputProcessSignal();
} catch(Throwable e) {
s_logger.error("Unhandled exception", e);
}
}
});
}
@Override
public void sendMessage(String sourceEndpointAddress, String targetEndpointAddress,
String multiplexier, String message) {