bug 14301: fix regressions caused by new clustering transport. Correct usage of HttpClient to avoid socket staying in CLOSE_WAIT state for too long. Reviewed-By: Kelven

This commit is contained in:
Kelven Yang 2012-03-27 16:08:57 -07:00
parent b0e8f08a3e
commit 880188466c
3 changed files with 105 additions and 47 deletions

View File

@ -47,6 +47,7 @@ import org.apache.log4j.Logger;
import com.cloud.agent.AgentManager;
import com.cloud.agent.AgentManager.OnError;
import com.cloud.agent.api.AgentControlCommand;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.ChangeAgentAnswer;
import com.cloud.agent.api.ChangeAgentCommand;
@ -141,8 +142,8 @@ public class ClusterManagerImpl implements ClusterManager {
private double _connectedAgentsThreshold = 0.7;
private static boolean _agentLbHappened = false;
private BlockingQueue<ClusterServicePdu> _clusterPduOutgoingQueue = new LinkedBlockingQueue<ClusterServicePdu>();
private BlockingQueue<ClusterServicePdu> _clusterPduIncomingQueue = new LinkedBlockingQueue<ClusterServicePdu>();
private List<ClusterServicePdu> _clusterPduOutgoingQueue = new ArrayList<ClusterServicePdu>();
private List<ClusterServicePdu> _clusterPduIncomingQueue = new ArrayList<ClusterServicePdu>();
private Map<Long, ClusterServiceRequestPdu> _outgoingPdusWaitingForAck = new HashMap<Long, ClusterServiceRequestPdu>();
public ClusterManagerImpl() {
@ -195,6 +196,52 @@ public class ClusterManagerImpl implements ClusterManager {
}
}
private void addOutgoingClusterPdu(ClusterServicePdu pdu) {
synchronized(_clusterPduOutgoingQueue) {
_clusterPduOutgoingQueue.add(pdu);
_clusterPduOutgoingQueue.notifyAll();
}
}
private ClusterServicePdu popOutgoingClusterPdu(long timeoutMs) {
synchronized(_clusterPduOutgoingQueue) {
try {
_clusterPduOutgoingQueue.wait(timeoutMs);
} catch (InterruptedException e) {
}
if(_clusterPduOutgoingQueue.size() > 0) {
ClusterServicePdu pdu = _clusterPduOutgoingQueue.get(0);
_clusterPduOutgoingQueue.remove(0);
return pdu;
}
}
return null;
}
private void addIncomingClusterPdu(ClusterServicePdu pdu) {
synchronized(_clusterPduIncomingQueue) {
_clusterPduIncomingQueue.add(pdu);
_clusterPduIncomingQueue.notifyAll();
}
}
private ClusterServicePdu popIncomingClusterPdu(long timeoutMs) {
synchronized(_clusterPduIncomingQueue) {
try {
_clusterPduIncomingQueue.wait(timeoutMs);
} catch (InterruptedException e) {
}
if(_clusterPduIncomingQueue.size() > 0) {
ClusterServicePdu pdu = _clusterPduIncomingQueue.get(0);
_clusterPduIncomingQueue.remove(0);
return pdu;
}
}
return null;
}
private Runnable getClusterPduSendingTask() {
return new Runnable() {
public void run() {
@ -214,8 +261,10 @@ public class ClusterManagerImpl implements ClusterManager {
private void onSendingClusterPdu() {
while(true) {
try {
ClusterServicePdu pdu = _clusterPduOutgoingQueue.take();
ClusterServicePdu pdu = popOutgoingClusterPdu(1000);
if(pdu == null)
continue;
ClusterService peerService = null;
for(int i = 0; i < 2; i++) {
try {
@ -251,7 +300,6 @@ public class ClusterManagerImpl implements ClusterManager {
}
}
}
} catch(InterruptedException e) {
} catch(Throwable e) {
s_logger.error("Unexcpeted exception: ", e);
}
@ -261,7 +309,10 @@ public class ClusterManagerImpl implements ClusterManager {
private void onNotifyingClusterPdu() {
while(true) {
try {
ClusterServicePdu pdu = _clusterPduOutgoingQueue.take();
ClusterServicePdu pdu = popIncomingClusterPdu(1000);
if(pdu == null)
continue;
if(pdu.isRequest()) {
String result = dispatchClusterServicePdu(pdu);
if(result == null)
@ -273,7 +324,7 @@ public class ClusterManagerImpl implements ClusterManager {
responsePdu.setAckSequenceId(pdu.getSequenceId());
responsePdu.setJsonPackage(result);
_clusterPduOutgoingQueue.put(responsePdu);
addOutgoingClusterPdu(responsePdu);
} else {
ClusterServiceRequestPdu requestPdu = popRequestPdu(pdu.getAckSequenceId());
if(requestPdu != null) {
@ -285,7 +336,6 @@ public class ClusterManagerImpl implements ClusterManager {
s_logger.warn("Original request has already been cancelled. pdu: " + _gson.toJson(pdu));
}
}
} catch(InterruptedException e) {
} catch(Throwable e) {
s_logger.error("Unexcpeted exception: ", e);
}
@ -305,7 +355,7 @@ public class ClusterManagerImpl implements ClusterManager {
assert(false);
s_logger.error("Excection in gson decoding : ", e);
}
if (cmds.length == 1 && cmds[0] instanceof ChangeAgentCommand) { //intercepted
ChangeAgentCommand cmd = (ChangeAgentCommand)cmds[0];
@ -384,11 +434,7 @@ public class ClusterManagerImpl implements ClusterManager {
}
public void OnReceiveClusterServicePdu(ClusterServicePdu pdu) {
try {
_clusterPduIncomingQueue.put(pdu);
} catch (InterruptedException e) {
s_logger.warn("InterruptedException. pdu: " + _gson.toJson(pdu) + " is dropped");
}
addIncomingClusterPdu(pdu);
}
@Override
@ -665,7 +711,7 @@ public class ClusterManagerImpl implements ClusterManager {
_peerScanInited = true;
initPeerScan();
}
peerScan();
profilerPeerScan.stop();
@ -1225,6 +1271,12 @@ public class ClusterManagerImpl implements ClusterManager {
if(!NetUtils.isLocalAddress(_clusterNodeIP)) {
throw new ConfigurationException("cluster node IP should be valid local address where the server is running, please check your configuration");
}
_executor.execute(getClusterPduSendingTask());
// TODO, make it configurable
for(int i = 0; i < 5; i++)
_executor.execute(getClusterPduNotificationTask());
Adapters<ClusterServiceAdapter> adapters = locator.getAdapters(ClusterServiceAdapter.class);
if (adapters == null || !adapters.isSet()) {
@ -1251,13 +1303,6 @@ public class ClusterManagerImpl implements ClusterManager {
checkConflicts();
_executor.execute(getClusterPduSendingTask());
// TODO, make it configurable
for(int i = 0; i < 5; i++)
_executor.execute(getClusterPduNotificationTask());
if(s_logger.isInfoEnabled()) {
s_logger.info("Cluster manager is configured.");
}

View File

@ -144,11 +144,12 @@ public class ClusterServiceServletContainer {
s_logger.error("Unrecoverable HTTP protocol violation", ex);
} finally {
try {
conn.shutdown();
conn.close();
} catch (IOException ignore) {}
conn.close();
} catch (IOException ignore) {
s_logger.error("unexpected exception", ignore);
}
}
}
}
});
} catch (InterruptedIOException ex) {

View File

@ -24,24 +24,21 @@ import java.rmi.RemoteException;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.HttpException;
import org.apache.commons.httpclient.HttpStatus;
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.commons.httpclient.params.HttpClientParams;
import org.apache.log4j.Logger;
import com.cloud.serializer.GsonHelper;
import com.google.gson.Gson;
public class ClusterServiceServletImpl implements ClusterService {
private static final long serialVersionUID = 4574025200012566153L;
private static final Logger s_logger = Logger.getLogger(ClusterServiceServletImpl.class);
private String _serviceUrl;
private final Gson _gson;
private int _requestTimeoutSeconds;
private int _requestTimeoutSeconds;
protected static HttpClient s_client = null;
public ClusterServiceServletImpl() {
_gson = GsonHelper.getGson();
}
public ClusterServiceServletImpl(String serviceUrl, int requestTimeoutSeconds) {
@ -49,8 +46,6 @@ public class ClusterServiceServletImpl implements ClusterService {
this._serviceUrl = serviceUrl;
this._requestTimeoutSeconds = requestTimeoutSeconds;
_gson = GsonHelper.getGson();
}
@Override
@ -69,7 +64,11 @@ public class ClusterServiceServletImpl implements ClusterService {
method.addParameter("stopOnError", pdu.isStopOnError() ? "1" : "0");
method.addParameter("requestAck", pdu.isRequest() ? "1" : "0");
return executePostMethod(client, method);
try {
return executePostMethod(client, method);
} finally {
method.releaseConnection();
}
}
@Override
@ -82,12 +81,17 @@ public class ClusterServiceServletImpl implements ClusterService {
PostMethod method = new PostMethod(_serviceUrl);
method.addParameter("method", Integer.toString(RemoteMethodConstants.METHOD_PING));
method.addParameter("callingPeer", callingPeer);
String returnVal = executePostMethod(client, method);
if("true".equalsIgnoreCase(returnVal)) {
return true;
method.addParameter("callingPeer", callingPeer);
try {
String returnVal = executePostMethod(client, method);
if("true".equalsIgnoreCase(returnVal)) {
return true;
}
return false;
} finally {
method.releaseConnection();
}
return false;
}
private String executePostMethod(HttpClient client, PostMethod method) {
@ -119,12 +123,20 @@ public class ClusterServiceServletImpl implements ClusterService {
}
private HttpClient getHttpClient() {
HttpClient client = new HttpClient();
HttpClientParams clientParams = new HttpClientParams();
clientParams.setSoTimeout(this._requestTimeoutSeconds * 1000);
client.setParams(clientParams);
return client;
if(s_client == null) {
MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager();
mgr.getParams().setDefaultMaxConnectionsPerHost(1);
// TODO make it configurable
mgr.getParams().setMaxTotalConnections(1000);
s_client = new HttpClient(mgr);
HttpClientParams clientParams = new HttpClientParams();
clientParams.setSoTimeout(_requestTimeoutSeconds * 1000);
s_client.setParams(clientParams);
}
return s_client;
}
// for test purpose only