bug 14301:

1) Support HTTP keep-alive in clustering communication channel
2) Increase concurrency level for clustering message delivery

Reviewed-By: Kelven (with unit test)
This commit is contained in:
Kelven Yang 2012-03-26 23:09:59 -07:00
parent 7d87f48ef4
commit 5602f3b550
3 changed files with 18 additions and 12 deletions

View File

@ -92,6 +92,7 @@ public class ClusterManagerImpl implements ClusterManager {
private static final Logger s_logger = Logger.getLogger(ClusterManagerImpl.class);
private static final int EXECUTOR_SHUTDOWN_TIMEOUT = 1000; // 1 second
private static final int DEFAULT_OUTGOING_WORKERS = 5;
private final List<ClusterManagerListener> _listeners = new ArrayList<ClusterManagerListener>();
private final Map<Long, ManagementServerHostVO> _activePeers = new HashMap<Long, ManagementServerHostVO>();
@ -1285,8 +1286,11 @@ public class ClusterManagerImpl implements ClusterManager {
if(!NetUtils.isLocalAddress(_clusterNodeIP)) {
throw new ConfigurationException("cluster node IP should be valid local address where the server is running, please check your configuration");
}
for(int i = 0; i < DEFAULT_OUTGOING_WORKERS; i++)
_executor.execute(getClusterPduSendingTask());
_executor.execute(getClusterPduSendingTask());
// notification task itself in turn works as a task dispatcher
_executor.execute(getClusterPduNotificationTask());
Adapters<ClusterServiceAdapter> adapters = locator.getAdapters(ClusterServiceAdapter.class);

View File

@ -127,15 +127,16 @@ public class ClusterServiceServletContainer {
_executor.execute(new Runnable() {
public void run() {
HttpContext context = new BasicHttpContext(null);
try {
if(s_logger.isTraceEnabled())
s_logger.trace("dispatching cluster request from " + conn.getRemoteAddress().toString());
_httpService.handleRequest(conn, context);
if(s_logger.isTraceEnabled())
s_logger.trace("Cluster request from " + conn.getRemoteAddress().toString() + " is processed");
try {
while(!Thread.interrupted() && conn.isOpen()) {
if(s_logger.isTraceEnabled())
s_logger.trace("dispatching cluster request from " + conn.getRemoteAddress().toString());
_httpService.handleRequest(conn, context);
if(s_logger.isTraceEnabled())
s_logger.trace("Cluster request from " + conn.getRemoteAddress().toString() + " is processed");
}
} catch (ConnectionClosedException ex) {
s_logger.error("Client closed connection", ex);
} catch (IOException ex) {
@ -144,7 +145,7 @@ public class ClusterServiceServletContainer {
s_logger.error("Unrecoverable HTTP protocol violation", ex);
} finally {
try {
conn.close();
conn.shutdown();
} catch (IOException ignore) {
s_logger.error("unexpected exception", ignore);
}

View File

@ -126,7 +126,7 @@ public class ClusterServiceServletImpl implements ClusterService {
if(s_client == null) {
MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager();
mgr.getParams().setDefaultMaxConnectionsPerHost(1);
mgr.getParams().setDefaultMaxConnectionsPerHost(4);
// TODO make it configurable
mgr.getParams().setMaxTotalConnections(1000);
@ -134,6 +134,7 @@ public class ClusterServiceServletImpl implements ClusterService {
s_client = new HttpClient(mgr);
HttpClientParams clientParams = new HttpClientParams();
clientParams.setSoTimeout(_requestTimeoutSeconds * 1000);
s_client.setParams(clientParams);
}
return s_client;