diff --git a/server/src/com/cloud/cluster/ClusterManagerImpl.java b/server/src/com/cloud/cluster/ClusterManagerImpl.java index 2d00b9873fb..6a42fce107b 100755 --- a/server/src/com/cloud/cluster/ClusterManagerImpl.java +++ b/server/src/com/cloud/cluster/ClusterManagerImpl.java @@ -47,6 +47,7 @@ import org.apache.log4j.Logger; import com.cloud.agent.AgentManager; import com.cloud.agent.AgentManager.OnError; +import com.cloud.agent.api.AgentControlCommand; import com.cloud.agent.api.Answer; import com.cloud.agent.api.ChangeAgentAnswer; import com.cloud.agent.api.ChangeAgentCommand; @@ -141,8 +142,8 @@ public class ClusterManagerImpl implements ClusterManager { private double _connectedAgentsThreshold = 0.7; private static boolean _agentLbHappened = false; - private BlockingQueue _clusterPduOutgoingQueue = new LinkedBlockingQueue(); - private BlockingQueue _clusterPduIncomingQueue = new LinkedBlockingQueue(); + private List _clusterPduOutgoingQueue = new ArrayList(); + private List _clusterPduIncomingQueue = new ArrayList(); private Map _outgoingPdusWaitingForAck = new HashMap(); public ClusterManagerImpl() { @@ -195,6 +196,52 @@ public class ClusterManagerImpl implements ClusterManager { } } + private void addOutgoingClusterPdu(ClusterServicePdu pdu) { + synchronized(_clusterPduOutgoingQueue) { + _clusterPduOutgoingQueue.add(pdu); + _clusterPduOutgoingQueue.notifyAll(); + } + } + + private ClusterServicePdu popOutgoingClusterPdu(long timeoutMs) { + synchronized(_clusterPduOutgoingQueue) { + try { + _clusterPduOutgoingQueue.wait(timeoutMs); + } catch (InterruptedException e) { + } + + if(_clusterPduOutgoingQueue.size() > 0) { + ClusterServicePdu pdu = _clusterPduOutgoingQueue.get(0); + _clusterPduOutgoingQueue.remove(0); + return pdu; + } + } + return null; + } + + private void addIncomingClusterPdu(ClusterServicePdu pdu) { + synchronized(_clusterPduIncomingQueue) { + _clusterPduIncomingQueue.add(pdu); + _clusterPduIncomingQueue.notifyAll(); + } + } + + private ClusterServicePdu popIncomingClusterPdu(long timeoutMs) { + synchronized(_clusterPduIncomingQueue) { + try { + _clusterPduIncomingQueue.wait(timeoutMs); + } catch (InterruptedException e) { + } + + if(_clusterPduIncomingQueue.size() > 0) { + ClusterServicePdu pdu = _clusterPduIncomingQueue.get(0); + _clusterPduIncomingQueue.remove(0); + return pdu; + } + } + return null; + } + private Runnable getClusterPduSendingTask() { return new Runnable() { public void run() { @@ -214,8 +261,10 @@ public class ClusterManagerImpl implements ClusterManager { private void onSendingClusterPdu() { while(true) { try { - ClusterServicePdu pdu = _clusterPduOutgoingQueue.take(); - + ClusterServicePdu pdu = popOutgoingClusterPdu(1000); + if(pdu == null) + continue; + ClusterService peerService = null; for(int i = 0; i < 2; i++) { try { @@ -251,7 +300,6 @@ public class ClusterManagerImpl implements ClusterManager { } } } - } catch(InterruptedException e) { } catch(Throwable e) { s_logger.error("Unexcpeted exception: ", e); } @@ -261,7 +309,10 @@ public class ClusterManagerImpl implements ClusterManager { private void onNotifyingClusterPdu() { while(true) { try { - ClusterServicePdu pdu = _clusterPduOutgoingQueue.take(); + ClusterServicePdu pdu = popIncomingClusterPdu(1000); + if(pdu == null) + continue; + if(pdu.isRequest()) { String result = dispatchClusterServicePdu(pdu); if(result == null) @@ -273,7 +324,7 @@ public class ClusterManagerImpl implements ClusterManager { responsePdu.setAckSequenceId(pdu.getSequenceId()); responsePdu.setJsonPackage(result); - _clusterPduOutgoingQueue.put(responsePdu); + addOutgoingClusterPdu(responsePdu); } else { ClusterServiceRequestPdu requestPdu = popRequestPdu(pdu.getAckSequenceId()); if(requestPdu != null) { @@ -285,7 +336,6 @@ public class ClusterManagerImpl implements ClusterManager { s_logger.warn("Original request has already been cancelled. pdu: " + _gson.toJson(pdu)); } } - } catch(InterruptedException e) { } catch(Throwable e) { s_logger.error("Unexcpeted exception: ", e); } @@ -305,7 +355,7 @@ public class ClusterManagerImpl implements ClusterManager { assert(false); s_logger.error("Excection in gson decoding : ", e); } - + if (cmds.length == 1 && cmds[0] instanceof ChangeAgentCommand) { //intercepted ChangeAgentCommand cmd = (ChangeAgentCommand)cmds[0]; @@ -384,11 +434,7 @@ public class ClusterManagerImpl implements ClusterManager { } public void OnReceiveClusterServicePdu(ClusterServicePdu pdu) { - try { - _clusterPduIncomingQueue.put(pdu); - } catch (InterruptedException e) { - s_logger.warn("InterruptedException. pdu: " + _gson.toJson(pdu) + " is dropped"); - } + addIncomingClusterPdu(pdu); } @Override @@ -665,7 +711,7 @@ public class ClusterManagerImpl implements ClusterManager { _peerScanInited = true; initPeerScan(); } - + peerScan(); profilerPeerScan.stop(); @@ -1225,6 +1271,12 @@ public class ClusterManagerImpl implements ClusterManager { if(!NetUtils.isLocalAddress(_clusterNodeIP)) { throw new ConfigurationException("cluster node IP should be valid local address where the server is running, please check your configuration"); } + + _executor.execute(getClusterPduSendingTask()); + + // TODO, make it configurable + for(int i = 0; i < 5; i++) + _executor.execute(getClusterPduNotificationTask()); Adapters adapters = locator.getAdapters(ClusterServiceAdapter.class); if (adapters == null || !adapters.isSet()) { @@ -1251,13 +1303,6 @@ public class ClusterManagerImpl implements ClusterManager { checkConflicts(); - _executor.execute(getClusterPduSendingTask()); - - // TODO, make it configurable - for(int i = 0; i < 5; i++) - _executor.execute(getClusterPduNotificationTask()); - - if(s_logger.isInfoEnabled()) { s_logger.info("Cluster manager is configured."); } diff --git a/server/src/com/cloud/cluster/ClusterServiceServletContainer.java b/server/src/com/cloud/cluster/ClusterServiceServletContainer.java index a85bf72f55b..5c312df70ee 100644 --- a/server/src/com/cloud/cluster/ClusterServiceServletContainer.java +++ b/server/src/com/cloud/cluster/ClusterServiceServletContainer.java @@ -144,11 +144,12 @@ public class ClusterServiceServletContainer { s_logger.error("Unrecoverable HTTP protocol violation", ex); } finally { try { - conn.shutdown(); - conn.close(); - } catch (IOException ignore) {} + conn.close(); + } catch (IOException ignore) { + s_logger.error("unexpected exception", ignore); + } } - } + } }); } catch (InterruptedIOException ex) { diff --git a/server/src/com/cloud/cluster/ClusterServiceServletImpl.java b/server/src/com/cloud/cluster/ClusterServiceServletImpl.java index 6f21e929b64..f61301d20cd 100644 --- a/server/src/com/cloud/cluster/ClusterServiceServletImpl.java +++ b/server/src/com/cloud/cluster/ClusterServiceServletImpl.java @@ -24,24 +24,21 @@ import java.rmi.RemoteException; import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.HttpException; import org.apache.commons.httpclient.HttpStatus; +import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager; import org.apache.commons.httpclient.methods.PostMethod; import org.apache.commons.httpclient.params.HttpClientParams; import org.apache.log4j.Logger; -import com.cloud.serializer.GsonHelper; -import com.google.gson.Gson; - public class ClusterServiceServletImpl implements ClusterService { private static final long serialVersionUID = 4574025200012566153L; private static final Logger s_logger = Logger.getLogger(ClusterServiceServletImpl.class); private String _serviceUrl; - private final Gson _gson; - private int _requestTimeoutSeconds; - + private int _requestTimeoutSeconds; + protected static HttpClient s_client = null; + public ClusterServiceServletImpl() { - _gson = GsonHelper.getGson(); } public ClusterServiceServletImpl(String serviceUrl, int requestTimeoutSeconds) { @@ -49,8 +46,6 @@ public class ClusterServiceServletImpl implements ClusterService { this._serviceUrl = serviceUrl; this._requestTimeoutSeconds = requestTimeoutSeconds; - - _gson = GsonHelper.getGson(); } @Override @@ -69,7 +64,11 @@ public class ClusterServiceServletImpl implements ClusterService { method.addParameter("stopOnError", pdu.isStopOnError() ? "1" : "0"); method.addParameter("requestAck", pdu.isRequest() ? "1" : "0"); - return executePostMethod(client, method); + try { + return executePostMethod(client, method); + } finally { + method.releaseConnection(); + } } @Override @@ -82,12 +81,17 @@ public class ClusterServiceServletImpl implements ClusterService { PostMethod method = new PostMethod(_serviceUrl); method.addParameter("method", Integer.toString(RemoteMethodConstants.METHOD_PING)); - method.addParameter("callingPeer", callingPeer); - String returnVal = executePostMethod(client, method); - if("true".equalsIgnoreCase(returnVal)) { - return true; + method.addParameter("callingPeer", callingPeer); + + try { + String returnVal = executePostMethod(client, method); + if("true".equalsIgnoreCase(returnVal)) { + return true; + } + return false; + } finally { + method.releaseConnection(); } - return false; } private String executePostMethod(HttpClient client, PostMethod method) { @@ -119,12 +123,20 @@ public class ClusterServiceServletImpl implements ClusterService { } private HttpClient getHttpClient() { - HttpClient client = new HttpClient(); - HttpClientParams clientParams = new HttpClientParams(); - clientParams.setSoTimeout(this._requestTimeoutSeconds * 1000); - client.setParams(clientParams); - - return client; + + if(s_client == null) { + MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager(); + mgr.getParams().setDefaultMaxConnectionsPerHost(1); + + // TODO make it configurable + mgr.getParams().setMaxTotalConnections(1000); + + s_client = new HttpClient(mgr); + HttpClientParams clientParams = new HttpClientParams(); + clientParams.setSoTimeout(_requestTimeoutSeconds * 1000); + s_client.setParams(clientParams); + } + return s_client; } // for test purpose only