diff --git a/server/src/com/cloud/cluster/ClusterManagerImpl.java b/server/src/com/cloud/cluster/ClusterManagerImpl.java index f5ba9eb2c16..54e2b5c7010 100755 --- a/server/src/com/cloud/cluster/ClusterManagerImpl.java +++ b/server/src/com/cloud/cluster/ClusterManagerImpl.java @@ -92,6 +92,7 @@ public class ClusterManagerImpl implements ClusterManager { private static final Logger s_logger = Logger.getLogger(ClusterManagerImpl.class); private static final int EXECUTOR_SHUTDOWN_TIMEOUT = 1000; // 1 second + private static final int DEFAULT_OUTGOING_WORKERS = 5; private final List _listeners = new ArrayList(); private final Map _activePeers = new HashMap(); @@ -1285,8 +1286,11 @@ public class ClusterManagerImpl implements ClusterManager { if(!NetUtils.isLocalAddress(_clusterNodeIP)) { throw new ConfigurationException("cluster node IP should be valid local address where the server is running, please check your configuration"); } + + for(int i = 0; i < DEFAULT_OUTGOING_WORKERS; i++) + _executor.execute(getClusterPduSendingTask()); - _executor.execute(getClusterPduSendingTask()); + // notification task itself in turn works as a task dispatcher _executor.execute(getClusterPduNotificationTask()); Adapters adapters = locator.getAdapters(ClusterServiceAdapter.class); diff --git a/server/src/com/cloud/cluster/ClusterServiceServletContainer.java b/server/src/com/cloud/cluster/ClusterServiceServletContainer.java index 5c312df70ee..db02d74fb76 100644 --- a/server/src/com/cloud/cluster/ClusterServiceServletContainer.java +++ b/server/src/com/cloud/cluster/ClusterServiceServletContainer.java @@ -127,15 +127,16 @@ public class ClusterServiceServletContainer { _executor.execute(new Runnable() { public void run() { HttpContext context = new BasicHttpContext(null); - try { - if(s_logger.isTraceEnabled()) - s_logger.trace("dispatching cluster request from " + conn.getRemoteAddress().toString()); - - _httpService.handleRequest(conn, context); - - if(s_logger.isTraceEnabled()) - s_logger.trace("Cluster request from " + conn.getRemoteAddress().toString() + " is processed"); - + try { + while(!Thread.interrupted() && conn.isOpen()) { + if(s_logger.isTraceEnabled()) + s_logger.trace("dispatching cluster request from " + conn.getRemoteAddress().toString()); + + _httpService.handleRequest(conn, context); + + if(s_logger.isTraceEnabled()) + s_logger.trace("Cluster request from " + conn.getRemoteAddress().toString() + " is processed"); + } } catch (ConnectionClosedException ex) { s_logger.error("Client closed connection", ex); } catch (IOException ex) { @@ -144,7 +145,7 @@ public class ClusterServiceServletContainer { s_logger.error("Unrecoverable HTTP protocol violation", ex); } finally { try { - conn.close(); + conn.shutdown(); } catch (IOException ignore) { s_logger.error("unexpected exception", ignore); } diff --git a/server/src/com/cloud/cluster/ClusterServiceServletImpl.java b/server/src/com/cloud/cluster/ClusterServiceServletImpl.java index ca1ec761618..b124811851b 100644 --- a/server/src/com/cloud/cluster/ClusterServiceServletImpl.java +++ b/server/src/com/cloud/cluster/ClusterServiceServletImpl.java @@ -126,7 +126,7 @@ public class ClusterServiceServletImpl implements ClusterService { if(s_client == null) { MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager(); - mgr.getParams().setDefaultMaxConnectionsPerHost(1); + mgr.getParams().setDefaultMaxConnectionsPerHost(4); // TODO make it configurable mgr.getParams().setMaxTotalConnections(1000); @@ -134,6 +134,7 @@ public class ClusterServiceServletImpl implements ClusterService { s_client = new HttpClient(mgr); HttpClientParams clientParams = new HttpClientParams(); clientParams.setSoTimeout(_requestTimeoutSeconds * 1000); + s_client.setParams(clientParams); } return s_client;