From 5602f3b5505844459bb9d3dff02c6d795998aea1 Mon Sep 17 00:00:00 2001 From: Kelven Yang Date: Mon, 26 Mar 2012 23:09:59 -0700 Subject: [PATCH] bug 14301: 1) Support HTTP keep-alive in clustering communication channel 2) Increase concurrency level for clustering message delivery Reviewed-By: Kelven (with unit test) --- .../com/cloud/cluster/ClusterManagerImpl.java | 6 +++++- .../ClusterServiceServletContainer.java | 21 ++++++++++--------- .../cluster/ClusterServiceServletImpl.java | 3 ++- 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/server/src/com/cloud/cluster/ClusterManagerImpl.java b/server/src/com/cloud/cluster/ClusterManagerImpl.java index f5ba9eb2c16..54e2b5c7010 100755 --- a/server/src/com/cloud/cluster/ClusterManagerImpl.java +++ b/server/src/com/cloud/cluster/ClusterManagerImpl.java @@ -92,6 +92,7 @@ public class ClusterManagerImpl implements ClusterManager { private static final Logger s_logger = Logger.getLogger(ClusterManagerImpl.class); private static final int EXECUTOR_SHUTDOWN_TIMEOUT = 1000; // 1 second + private static final int DEFAULT_OUTGOING_WORKERS = 5; private final List _listeners = new ArrayList(); private final Map _activePeers = new HashMap(); @@ -1285,8 +1286,11 @@ public class ClusterManagerImpl implements ClusterManager { if(!NetUtils.isLocalAddress(_clusterNodeIP)) { throw new ConfigurationException("cluster node IP should be valid local address where the server is running, please check your configuration"); } + + for(int i = 0; i < DEFAULT_OUTGOING_WORKERS; i++) + _executor.execute(getClusterPduSendingTask()); - _executor.execute(getClusterPduSendingTask()); + // notification task itself in turn works as a task dispatcher _executor.execute(getClusterPduNotificationTask()); Adapters adapters = locator.getAdapters(ClusterServiceAdapter.class); diff --git a/server/src/com/cloud/cluster/ClusterServiceServletContainer.java b/server/src/com/cloud/cluster/ClusterServiceServletContainer.java index 5c312df70ee..db02d74fb76 100644 --- a/server/src/com/cloud/cluster/ClusterServiceServletContainer.java +++ b/server/src/com/cloud/cluster/ClusterServiceServletContainer.java @@ -127,15 +127,16 @@ public class ClusterServiceServletContainer { _executor.execute(new Runnable() { public void run() { HttpContext context = new BasicHttpContext(null); - try { - if(s_logger.isTraceEnabled()) - s_logger.trace("dispatching cluster request from " + conn.getRemoteAddress().toString()); - - _httpService.handleRequest(conn, context); - - if(s_logger.isTraceEnabled()) - s_logger.trace("Cluster request from " + conn.getRemoteAddress().toString() + " is processed"); - + try { + while(!Thread.interrupted() && conn.isOpen()) { + if(s_logger.isTraceEnabled()) + s_logger.trace("dispatching cluster request from " + conn.getRemoteAddress().toString()); + + _httpService.handleRequest(conn, context); + + if(s_logger.isTraceEnabled()) + s_logger.trace("Cluster request from " + conn.getRemoteAddress().toString() + " is processed"); + } } catch (ConnectionClosedException ex) { s_logger.error("Client closed connection", ex); } catch (IOException ex) { @@ -144,7 +145,7 @@ public class ClusterServiceServletContainer { s_logger.error("Unrecoverable HTTP protocol violation", ex); } finally { try { - conn.close(); + conn.shutdown(); } catch (IOException ignore) { s_logger.error("unexpected exception", ignore); } diff --git a/server/src/com/cloud/cluster/ClusterServiceServletImpl.java b/server/src/com/cloud/cluster/ClusterServiceServletImpl.java index ca1ec761618..b124811851b 100644 --- a/server/src/com/cloud/cluster/ClusterServiceServletImpl.java +++ b/server/src/com/cloud/cluster/ClusterServiceServletImpl.java @@ -126,7 +126,7 @@ public class ClusterServiceServletImpl implements ClusterService { if(s_client == null) { MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager(); - mgr.getParams().setDefaultMaxConnectionsPerHost(1); + mgr.getParams().setDefaultMaxConnectionsPerHost(4); // TODO make it configurable mgr.getParams().setMaxTotalConnections(1000); @@ -134,6 +134,7 @@ public class ClusterServiceServletImpl implements ClusterService { s_client = new HttpClient(mgr); HttpClientParams clientParams = new HttpClientParams(); clientParams.setSoTimeout(_requestTimeoutSeconds * 1000); + s_client.setParams(clientParams); } return s_client;