bug 14301: Decouple synchronized crosss management server call with clustering transport. Reviewed-by: Kelven

This commit is contained in:
Kelven Yang 2012-03-26 14:46:33 -07:00
parent 30ebab72ba
commit f23e4cab71
12 changed files with 492 additions and 605 deletions

View File

@ -1,80 +0,0 @@
/**
* Copyright (C) 2010 Cloud.com, Inc. All rights reserved.
*
* This software is licensed under the GNU General Public License v3 or later.
*
* It is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package com.cloud.cluster;
import com.cloud.agent.Listener;
import com.cloud.agent.api.AgentControlAnswer;
import com.cloud.agent.api.AgentControlCommand;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.Command;
import com.cloud.agent.api.StartupCommand;
import com.cloud.host.HostVO;
import com.cloud.host.Status;
public class ClusterAsyncExectuionListener implements Listener {
private final ClusterManager clusterMgr;
private final String callingPeer;
private boolean recurring = false;
public ClusterAsyncExectuionListener(ClusterManager clusterMgr, String callingPeer) {
this.clusterMgr = clusterMgr;
this.callingPeer = callingPeer;
}
@Override
public boolean processAnswers(long agentId, long seq, Answer[] answers) {
recurring = clusterMgr.forwardAnswer(callingPeer, agentId, seq, answers);
return true;
}
@Override
public boolean processCommands(long agentId, long seq, Command[] commands) {
return false;
}
@Override
public AgentControlAnswer processControlCommand(long agentId, AgentControlCommand cmd) {
return null;
}
@Override
public void processConnect(HostVO agent, StartupCommand cmd, boolean forRebalance) {
}
@Override
public boolean processDisconnect(long agentId, Status state) {
return false;
}
@Override
public boolean isRecurring() {
return recurring;
}
@Override
public boolean processTimeout(long agentId, long seq) {
return true;
}
@Override
public int getTimeout() {
return -1;
}
}

View File

@ -18,7 +18,6 @@
package com.cloud.cluster;
import com.cloud.agent.Listener;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.Command;
import com.cloud.exception.AgentUnavailableException;
@ -31,14 +30,11 @@ public interface ClusterManager extends Manager {
public static final int DEFAULT_HEARTBEAT_INTERVAL = 1500;
public static final int DEFAULT_HEARTBEAT_THRESHOLD = 150000;
public static final String ALERT_SUBJECT = "cluster-alert";
public void OnReceiveClusterServicePdu(ClusterServicePdu pdu);
public Answer[] execute(String strPeer, long agentId, Command [] cmds, boolean stopOnError);
public long executeAsync(String strPeer, long agentId, Command[] cmds, boolean stopOnError, Listener listener);
public boolean onAsyncResult(String executingPeer, long agentId, long seq, Answer[] answers);
public boolean forwardAnswer(String targetPeer, long agentId, long seq, Answer[] answers);
public Answer[] sendToAgent(Long hostId, Command [] cmds, boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException;
public long sendToAgent(Long hostId, Command[] cmds, boolean stopOnError, Listener listener) throws AgentUnavailableException;
public Answer[] sendToAgent(Long hostId, Command [] cmds, boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException;
public boolean executeAgentUserRequest(long agentId, Event event) throws AgentUnavailableException;
public Boolean propagateAgentEvent(long agentId, Event event) throws AgentUnavailableException;
public Boolean propagateResourceEvent(long agentId, ResourceState.Event event) throws AgentUnavailableException;

View File

@ -33,8 +33,10 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -45,13 +47,13 @@ import org.apache.log4j.Logger;
import com.cloud.agent.AgentManager;
import com.cloud.agent.AgentManager.OnError;
import com.cloud.agent.Listener;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.ChangeAgentAnswer;
import com.cloud.agent.api.ChangeAgentCommand;
import com.cloud.agent.api.Command;
import com.cloud.agent.api.PropagateResourceEventCommand;
import com.cloud.agent.api.TransferAgentCommand;
import com.cloud.agent.manager.Commands;
import com.cloud.cluster.ManagementServerHost.State;
import com.cloud.cluster.agentlb.dao.HostTransferMapDao;
import com.cloud.cluster.dao.ManagementServerHostDao;
import com.cloud.cluster.dao.ManagementServerHostPeerDao;
@ -91,17 +93,14 @@ import com.google.gson.Gson;
public class ClusterManagerImpl implements ClusterManager {
private static final Logger s_logger = Logger.getLogger(ClusterManagerImpl.class);
private static final int EXECUTOR_SHUTDOWN_TIMEOUT = 1000; // 1 second
private final List<ClusterManagerListener> _listeners = new ArrayList<ClusterManagerListener>();
private final Map<Long, ManagementServerHostVO> _activePeers = new HashMap<Long, ManagementServerHostVO>();
private int _heartbeatInterval = ClusterManager.DEFAULT_HEARTBEAT_INTERVAL;
private int _heartbeatThreshold = ClusterManager.DEFAULT_HEARTBEAT_THRESHOLD;
private final Map<String, ClusterService> _clusterPeers;
private final Map<String, Listener> _asyncCalls;
private final Gson _gson;
@Inject
@ -142,10 +141,12 @@ public class ClusterManagerImpl implements ClusterManager {
private double _connectedAgentsThreshold = 0.7;
private static boolean _agentLbHappened = false;
private BlockingQueue<ClusterServicePdu> _clusterPduOutgoingQueue = new LinkedBlockingQueue<ClusterServicePdu>();
private BlockingQueue<ClusterServicePdu> _clusterPduIncomingQueue = new LinkedBlockingQueue<ClusterServicePdu>();
private Map<Long, ClusterServiceRequestPdu> _outgoingPdusWaitingForAck = new HashMap<Long, ClusterServiceRequestPdu>();
public ClusterManagerImpl() {
_clusterPeers = new HashMap<String, ClusterService>();
_asyncCalls = new HashMap<String, Listener>();
_gson = GsonHelper.getGson();
@ -154,7 +155,242 @@ public class ClusterManagerImpl implements ClusterManager {
//
_executor = Executors.newCachedThreadPool(new NamedThreadFactory("Cluster-Worker"));
}
private void registerRequestPdu(ClusterServiceRequestPdu pdu) {
synchronized(_outgoingPdusWaitingForAck) {
_outgoingPdusWaitingForAck.put(pdu.getSequenceId(), pdu);
}
}
private ClusterServiceRequestPdu popRequestPdu(long ackSequenceId) {
synchronized(_outgoingPdusWaitingForAck) {
if(_outgoingPdusWaitingForAck.get(ackSequenceId) != null) {
ClusterServiceRequestPdu pdu = _outgoingPdusWaitingForAck.get(ackSequenceId);
_outgoingPdusWaitingForAck.remove(ackSequenceId);
return pdu;
}
}
return null;
}
private void cancelClusterRequestToPeer(String strPeer) {
List<ClusterServiceRequestPdu> candidates = new ArrayList<ClusterServiceRequestPdu>();
synchronized(_outgoingPdusWaitingForAck) {
for(Map.Entry<Long, ClusterServiceRequestPdu> entry : _outgoingPdusWaitingForAck.entrySet()) {
if(entry.getValue().getDestPeer().equalsIgnoreCase(strPeer))
candidates.add(entry.getValue());
}
for(ClusterServiceRequestPdu pdu : candidates) {
_outgoingPdusWaitingForAck.remove(pdu.getSequenceId());
}
}
for(ClusterServiceRequestPdu pdu : candidates) {
s_logger.warn("Cancel cluster request PDU to peer: " + strPeer + ", pdu: " + _gson.toJson(pdu));
synchronized(pdu) {
pdu.notifyAll();
}
}
}
private Runnable getClusterPduSendingTask() {
return new Runnable() {
public void run() {
onSendingClusterPdu();
}
};
}
private Runnable getClusterPduNotificationTask() {
return new Runnable() {
public void run() {
onNotifyingClusterPdu();
}
};
}
private void onSendingClusterPdu() {
while(true) {
try {
ClusterServicePdu pdu = _clusterPduOutgoingQueue.take();
ClusterService peerService = null;
for(int i = 0; i < 2; i++) {
try {
peerService = getPeerService(pdu.getDestPeer());
} catch (RemoteException e) {
s_logger.error("Unable to get cluster service on peer : " + pdu.getDestPeer());
}
if(peerService != null) {
try {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Cluster PDU " + getSelfPeerName() + " -> " + pdu.getDestPeer() + ". agent: " + pdu.getAgentId()
+ ", pdu seq: " + pdu.getSequenceId() + ", pdu ack seq: " + pdu.getAckSequenceId() + ", json: " + pdu.getJsonPackage());
}
long startTick = System.currentTimeMillis();
String strResult = peerService.execute(pdu);
if(s_logger.isDebugEnabled()) {
s_logger.debug("Cluster PDU " + getSelfPeerName() + " -> " + pdu.getDestPeer() + " completed. time: " +
(System.currentTimeMillis() - startTick) + "ms. agent: " + pdu.getAgentId()
+ ", pdu seq: " + pdu.getSequenceId() + ", pdu ack seq: " + pdu.getAckSequenceId() + ", json: " + pdu.getJsonPackage());
}
if("true".equals(strResult))
break;
} catch (RemoteException e) {
invalidatePeerService(pdu.getDestPeer());
if(s_logger.isInfoEnabled()) {
s_logger.info("Exception on remote execution, peer: " + pdu.getDestPeer() + ", iteration: "
+ i + ", exception message :" + e.getMessage());
}
}
}
}
} catch(InterruptedException e) {
} catch(Throwable e) {
s_logger.error("Unexcpeted exception: ", e);
}
}
}
private void onNotifyingClusterPdu() {
while(true) {
try {
ClusterServicePdu pdu = _clusterPduOutgoingQueue.take();
if(pdu.isRequest()) {
String result = dispatchClusterServicePdu(pdu);
if(result == null)
result = "";
ClusterServicePdu responsePdu = new ClusterServicePdu();
responsePdu.setSourcePeer(pdu.getDestPeer());
responsePdu.setDestPeer(pdu.getSourcePeer());
responsePdu.setAckSequenceId(pdu.getSequenceId());
responsePdu.setJsonPackage(result);
_clusterPduOutgoingQueue.put(responsePdu);
} else {
ClusterServiceRequestPdu requestPdu = popRequestPdu(pdu.getAckSequenceId());
if(requestPdu != null) {
requestPdu.setResponseResult(pdu.getJsonPackage());
synchronized(requestPdu) {
requestPdu.notifyAll();
}
} else {
s_logger.warn("Original request has already been cancelled. pdu: " + _gson.toJson(pdu));
}
}
} catch(InterruptedException e) {
} catch(Throwable e) {
s_logger.error("Unexcpeted exception: ", e);
}
}
}
private String dispatchClusterServicePdu(ClusterServicePdu pdu) {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Dispatch ->" + pdu.getAgentId() + ", json: " + pdu.getJsonPackage());
}
Command [] cmds = null;
try {
cmds = _gson.fromJson(pdu.getJsonPackage(), Command[].class);
} catch(Throwable e) {
assert(false);
s_logger.error("Excection in gson decoding : ", e);
}
if (cmds.length == 1 && cmds[0] instanceof ChangeAgentCommand) { //intercepted
ChangeAgentCommand cmd = (ChangeAgentCommand)cmds[0];
if (s_logger.isDebugEnabled()) {
s_logger.debug("Intercepting command for agent change: agent " + cmd.getAgentId() + " event: " + cmd.getEvent());
}
boolean result = false;
try {
result = executeAgentUserRequest(cmd.getAgentId(), cmd.getEvent());
if (s_logger.isDebugEnabled()) {
s_logger.debug("Result is " + result);
}
} catch (AgentUnavailableException e) {
s_logger.warn("Agent is unavailable", e);
return null;
}
Answer[] answers = new Answer[1];
answers[0] = new ChangeAgentAnswer(cmd, result);
return _gson.toJson(answers);
} else if (cmds.length == 1 && cmds[0] instanceof TransferAgentCommand) {
TransferAgentCommand cmd = (TransferAgentCommand) cmds[0];
if (s_logger.isDebugEnabled()) {
s_logger.debug("Intercepting command for agent rebalancing: agent " + cmd.getAgentId() + " event: " + cmd.getEvent());
}
boolean result = false;
try {
result = rebalanceAgent(cmd.getAgentId(), cmd.getEvent(), cmd.getCurrentOwner(), cmd.getFutureOwner());
if (s_logger.isDebugEnabled()) {
s_logger.debug("Result is " + result);
}
} catch (AgentUnavailableException e) {
s_logger.warn("Agent is unavailable", e);
return null;
} catch (OperationTimedoutException e) {
s_logger.warn("Operation timed out", e);
return null;
}
Answer[] answers = new Answer[1];
answers[0] = new Answer(cmd, result, null);
return _gson.toJson(answers);
}
try {
long startTick = System.currentTimeMillis();
if(s_logger.isDebugEnabled()) {
s_logger.debug("Dispatch -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage());
}
Answer[] answers = sendToAgent(pdu.getAgentId(), cmds, pdu.isStopOnError());
if(answers != null) {
String jsonReturn = _gson.toJson(answers);
if(s_logger.isDebugEnabled()) {
s_logger.debug("Completed dispatching -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage() +
" in " + (System.currentTimeMillis() - startTick) + " ms, return result: " + jsonReturn);
}
return jsonReturn;
} else {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Completed dispatching -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage() +
" in " + (System.currentTimeMillis() - startTick) + " ms, return null result");
}
}
} catch(AgentUnavailableException e) {
s_logger.warn("Agent is unavailable", e);
} catch (OperationTimedoutException e) {
s_logger.warn("Timed Out", e);
}
return null;
}
public void OnReceiveClusterServicePdu(ClusterServicePdu pdu) {
try {
_clusterPduIncomingQueue.put(pdu);
} catch (InterruptedException e) {
s_logger.warn("InterruptedException. pdu: " + _gson.toJson(pdu) + " is dropped");
}
}
@Override
public Answer[] sendToAgent(Long hostId, Command[] cmds, boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException {
Commands commands = new Commands(stopOnError ? OnError.Stop : OnError.Continue);
@ -164,15 +400,6 @@ public class ClusterManagerImpl implements ClusterManager {
return _agentMgr.send(hostId, commands);
}
@Override
public long sendToAgent(Long hostId, Command[] cmds, boolean stopOnError, Listener listener) throws AgentUnavailableException {
Commands commands = new Commands(stopOnError ? OnError.Stop : OnError.Continue);
for (Command cmd : cmds) {
commands.addCommand(cmd);
}
return _agentMgr.send(hostId, commands, listener);
}
@Override
public boolean executeAgentUserRequest(long agentId, Event event) throws AgentUnavailableException {
return _agentMgr.executeUserRequest(agentId, event);
@ -229,7 +456,7 @@ public class ClusterManagerImpl implements ClusterManager {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Forwarding " + cmds[0].toString() + " to " + peer.getMsid());
}
Answer[] answers = execute(peerName, agentId, cmds, true);
execute(peerName, agentId, cmds, true);
} catch (Exception e) {
s_logger.warn("Caught exception while talkign to " + peer.getMsid());
}
@ -238,208 +465,44 @@ public class ClusterManagerImpl implements ClusterManager {
@Override
public Answer[] execute(String strPeer, long agentId, Command [] cmds, boolean stopOnError) {
ClusterService peerService = null;
if(s_logger.isDebugEnabled()) {
s_logger.debug(getSelfPeerName() + " -> " + strPeer + "." + agentId + " " +
_gson.toJson(cmds, Command[].class));
}
for(int i = 0; i < 2; i++) {
ClusterServiceRequestPdu pdu = new ClusterServiceRequestPdu();
pdu.setSourcePeer(getSelfPeerName());
pdu.setDestPeer(strPeer);
pdu.setAgentId(agentId);
pdu.setJsonPackage(_gson.toJson(cmds, Command[].class));
pdu.setStopOnError(stopOnError);
pdu.setRequest(true);
registerRequestPdu(pdu);
_clusterPduOutgoingQueue.add(pdu);
synchronized(pdu) {
try {
peerService = getPeerService(strPeer);
} catch (RemoteException e) {
s_logger.error("Unable to get cluster service on peer : " + strPeer);
pdu.wait();
} catch (InterruptedException e) {
}
}
if(peerService != null) {
try {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Send " + getSelfPeerName() + " -> " + strPeer + "." + agentId + " to remote");
}
long startTick = System.currentTimeMillis();
String strResult = peerService.execute(getSelfPeerName(), agentId, _gson.toJson(cmds, Command[].class), stopOnError);
if(s_logger.isDebugEnabled()) {
s_logger.debug("Completed " + getSelfPeerName() + " -> " + strPeer + "." + agentId + "in " +
(System.currentTimeMillis() - startTick) + " ms, result: " + strResult);
}
if(strResult != null) {
try {
return _gson.fromJson(strResult, Answer[].class);
} catch(Throwable e) {
s_logger.error("Exception on parsing gson package from remote call to " + strPeer);
}
}
return null;
} catch (RemoteException e) {
invalidatePeerService(strPeer);
if(s_logger.isInfoEnabled()) {
s_logger.info("Exception on remote execution, peer: " + strPeer + ", iteration: "
+ i + ", exception message :" + e.getMessage());
}
}
if(s_logger.isDebugEnabled()) {
s_logger.debug(getSelfPeerName() + " -> " + strPeer + "." + agentId + " completed. result: " +
pdu.getResponseResult());
}
if(pdu.getResponseResult() != null && pdu.getResponseResult().length() > 0) {
try {
return _gson.fromJson(pdu.getResponseResult(), Answer[].class);
} catch(Throwable e) {
s_logger.error("Exception on parsing gson package from remote call to " + strPeer);
}
}
return null;
}
@Override
public long executeAsync(String strPeer, long agentId, Command[] cmds, boolean stopOnError, Listener listener) {
ClusterService peerService = null;
if(s_logger.isDebugEnabled()) {
s_logger.debug("Async " + getSelfPeerName() + " -> " + strPeer + "." + agentId + " " +
_gson.toJson(cmds, Command[].class));
}
for(int i = 0; i < 2; i++) {
try {
peerService = getPeerService(strPeer);
} catch (RemoteException e) {
s_logger.error("Unable to get cluster service on peer : " + strPeer);
}
if(peerService != null) {
try {
long seq = 0;
synchronized(String.valueOf(agentId).intern()) {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Send Async " + getSelfPeerName() + " -> " + strPeer + "." + agentId + " to remote");
}
long startTick = System.currentTimeMillis();
seq = peerService.executeAsync(getSelfPeerName(), agentId, _gson.toJson(cmds, Command[].class), stopOnError);
if(seq > 0) {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Completed Async " + getSelfPeerName() + " -> " + strPeer + "." + agentId
+ " in " + (System.currentTimeMillis() - startTick) + " ms"
+ ", register local listener " + strPeer + "/" + seq);
}
registerAsyncCall(strPeer, seq, listener);
} else {
s_logger.warn("Completed Async " + getSelfPeerName() + " -> " + strPeer + "." + agentId
+ " in " + (System.currentTimeMillis() - startTick) + " ms, return indicates failure, seq: " + seq);
}
}
return seq;
} catch (RemoteException e) {
invalidatePeerService(strPeer);
if(s_logger.isInfoEnabled()) {
s_logger.info("Exception on remote execution -> " + strPeer + ", iteration : " + i);
}
}
}
}
return 0L;
}
@Override
public boolean onAsyncResult(String executingPeer, long agentId, long seq, Answer[] answers) {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Process Async-call result from remote peer " + executingPeer + ", {" +
agentId + "-" + seq + "} answers: " + (answers != null ? _gson.toJson(answers, Answer[].class): "null"));
}
Listener listener = null;
synchronized(String.valueOf(agentId).intern()) {
// need to synchronize it with executeAsync() to make sure listener have been registered
// before this callback reaches back
listener = getAsyncCallListener(executingPeer, seq);
}
if(listener != null) {
long startTick = System.currentTimeMillis();
if(s_logger.isDebugEnabled()) {
s_logger.debug("Processing answer {" + agentId + "-" + seq + "} from remote peer " + executingPeer);
}
listener.processAnswers(agentId, seq, answers);
if(s_logger.isDebugEnabled()) {
s_logger.debug("Answer {" + agentId + "-" + seq + "} is processed in " +
(System.currentTimeMillis() - startTick) + " ms");
}
if(!listener.isRecurring()) {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Listener is not recurring after async-result callback {" +
agentId + "-" + seq + "}, unregister it");
}
unregisterAsyncCall(executingPeer, seq);
} else {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Listener is recurring after async-result callback {" + agentId
+"-" + seq + "}, will keep it");
}
return true;
}
} else {
if(s_logger.isInfoEnabled()) {
s_logger.info("Async-call Listener has not been registered yet for {" + agentId
+"-" + seq + "}");
}
}
return false;
}
@Override
public boolean forwardAnswer(String targetPeer, long agentId, long seq, Answer[] answers) {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Forward -> " + targetPeer + " Async-call answer {" + agentId + "-" + seq +
"} " + (answers != null? _gson.toJson(answers, Answer[].class):""));
}
final String targetPeerF = targetPeer;
final Answer[] answersF = answers;
final long agentIdF = agentId;
final long seqF = seq;
ClusterService peerService = null;
for(int i = 0; i < 2; i++) {
try {
peerService = getPeerService(targetPeerF);
} catch (RemoteException e) {
s_logger.error("cluster service for peer " + targetPeerF + " no longer exists");
}
if(peerService != null) {
try {
boolean result = false;
long startTick = System.currentTimeMillis();
if(s_logger.isDebugEnabled()) {
s_logger.debug("Start forwarding Async-call answer {" + agentId + "-" + seq + "} to remote");
}
result = peerService.onAsyncResult(getSelfPeerName(), agentIdF, seqF, _gson.toJson(answersF, Answer[].class));
if(s_logger.isDebugEnabled()) {
s_logger.debug("Completed forwarding Async-call answer {" + agentId + "-" + seq + "} in " +
(System.currentTimeMillis() - startTick) + " ms, return result: " + result);
}
return result;
} catch (RemoteException e) {
s_logger.warn("Exception in performing remote call, ", e);
invalidatePeerService(targetPeerF);
}
} else {
s_logger.warn("Remote peer " + targetPeer + " no longer exists to process answer {" + agentId + "-"
+ seq + "}");
}
}
return false;
}
@Override
public String getPeerName(long agentHostId) {
@ -511,10 +574,12 @@ public class ClusterManagerImpl implements ClusterManager {
public void notifyNodeLeft(List<ManagementServerHostVO> nodeList) {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Notify management server node left to listeners.");
for(ManagementServerHostVO mshost : nodeList) {
}
for(ManagementServerHostVO mshost : nodeList) {
if(s_logger.isDebugEnabled())
s_logger.debug("Leaving node, IP: " + mshost.getServiceIP() + ", msid: " + mshost.getMsid());
}
cancelClusterRequestToPeer(String.valueOf(mshost.getMsid()));
}
synchronized(_listeners) {
@ -568,38 +633,6 @@ public class ClusterManagerImpl implements ClusterManager {
}
}
private void registerAsyncCall(String strPeer, long seq, Listener listener) {
String key = strPeer + "/" + seq;
synchronized(_asyncCalls) {
if(!_asyncCalls.containsKey(key)) {
_asyncCalls.put(key, listener);
}
}
}
private Listener getAsyncCallListener(String strPeer, long seq) {
String key = strPeer + "/" + seq;
synchronized(_asyncCalls) {
if(_asyncCalls.containsKey(key)) {
return _asyncCalls.get(key);
}
}
return null;
}
private void unregisterAsyncCall(String strPeer, long seq) {
String key = strPeer + "/" + seq;
synchronized(_asyncCalls) {
if(_asyncCalls.containsKey(key)) {
_asyncCalls.remove(key);
}
}
}
private Runnable getHeartbeatTask() {
return new Runnable() {
@Override
@ -677,12 +710,12 @@ public class ClusterManagerImpl implements ClusterManager {
s_logger.error("Runtime DB exception ", e.getCause());
if(e.getCause() instanceof ClusterInvalidSessionException) {
s_logger.error("Invalid cluster session found");
s_logger.error("Invalid cluster session found, fence it");
queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeIsolated));
}
if(isRootCauseConnectionRelated(e.getCause())) {
s_logger.error("DB communication problem detected");
s_logger.error("DB communication problem detected, fence it");
queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeIsolated));
}
@ -690,12 +723,13 @@ public class ClusterManagerImpl implements ClusterManager {
} catch(ActiveFencingException e) {
queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeIsolated));
} catch (Throwable e) {
s_logger.error("Unexpected exception in cluster heartbeat", e);
if(isRootCauseConnectionRelated(e.getCause())) {
s_logger.error("DB communication problem detected");
s_logger.error("DB communication problem detected, fence it");
queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeIsolated));
}
s_logger.error("Problem with the cluster heartbeat!", e);
invalidHeartbeatConnection();
} finally {
txn.close("ClusterHeartBeat");
}
@ -1205,7 +1239,6 @@ public class ClusterManagerImpl implements ClusterManager {
throw new ConfigurationException("Unable to set current cluster service adapter");
}
_agentLBEnabled = Boolean.valueOf(configDao.getValue(Config.AgentLbEnable.key()));
String connectedAgentsThreshold = configs.get("agent.load.threshold");
@ -1218,6 +1251,13 @@ public class ClusterManagerImpl implements ClusterManager {
checkConflicts();
_executor.execute(getClusterPduSendingTask());
// TODO, make it configurable
for(int i = 0; i < 5; i++)
_executor.execute(getClusterPduNotificationTask());
if(s_logger.isInfoEnabled()) {
s_logger.info("Cluster manager is configured.");
}

View File

@ -21,9 +21,7 @@ package com.cloud.cluster;
import java.rmi.Remote;
import java.rmi.RemoteException;
public interface ClusterService extends Remote {
String execute(String callingPeer, long agentId, String gsonPackage, boolean stopOnError) throws RemoteException;
long executeAsync(String callingPeer, long agentId, String gsonPackage, boolean stopOnError) throws RemoteException;
boolean onAsyncResult(String executingPeer, long agentId, long seq, String gsonPackage) throws RemoteException;
boolean ping(String callingPeer) throws RemoteException;
public interface ClusterService extends Remote {
String execute(ClusterServicePdu pdu) throws RemoteException;
boolean ping(String callingPeer) throws RemoteException;
}

View File

@ -0,0 +1,93 @@
package com.cloud.cluster;
public class ClusterServicePdu {
private long sequenceId;
private long ackSequenceId;
private String sourcePeer;
private String destPeer;
private long agentId;
private boolean stopOnError;
private String jsonPackage;
private boolean request = false;
private static long s_nextPduSequenceId = 1;
public ClusterServicePdu() {
sequenceId = getNextPduSequenceId();
ackSequenceId = 0;
agentId = 0;
stopOnError = false;
}
public synchronized long getNextPduSequenceId() {
return s_nextPduSequenceId++;
}
public long getSequenceId() {
return sequenceId;
}
public void setSequenceId(long sequenceId) {
this.sequenceId = sequenceId;
}
public long getAckSequenceId() {
return ackSequenceId;
}
public void setAckSequenceId(long ackSequenceId) {
this.ackSequenceId = ackSequenceId;
}
public String getSourcePeer() {
return sourcePeer;
}
public void setSourcePeer(String sourcePeer) {
this.sourcePeer = sourcePeer;
}
public String getDestPeer() {
return destPeer;
}
public void setDestPeer(String destPeer) {
this.destPeer = destPeer;
}
public long getAgentId() {
return agentId;
}
public void setAgentId(long agentId) {
this.agentId = agentId;
}
public boolean isStopOnError() {
return stopOnError;
}
public void setStopOnError(boolean stopOnError) {
this.stopOnError = stopOnError;
}
public String getJsonPackage() {
return jsonPackage;
}
public void setJsonPackage(String jsonPackage) {
this.jsonPackage = jsonPackage;
}
public boolean isRequest() {
return request;
}
public void setRequest(boolean value) {
this.request = value;
}
}

View File

@ -0,0 +1,37 @@
package com.cloud.cluster;
public class ClusterServiceRequestPdu extends ClusterServicePdu {
private String responseResult;
private long startTick;
private long timeout;
public ClusterServiceRequestPdu() {
startTick = System.currentTimeMillis();
timeout = -1;
}
public String getResponseResult() {
return responseResult;
}
public void setResponseResult(String responseResult) {
this.responseResult = responseResult;
}
public long getStartTick() {
return startTick;
}
public void setStartTick(long startTick) {
this.startTick = startTick;
}
public long getTimeout() {
return timeout;
}
public void setTimeout(long timeout) {
this.timeout = timeout;
}
}

View File

@ -33,28 +33,13 @@ import org.apache.http.protocol.HttpRequestHandler;
import org.apache.http.util.EntityUtils;
import org.apache.log4j.Logger;
import com.cloud.agent.Listener;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.ChangeAgentAnswer;
import com.cloud.agent.api.ChangeAgentCommand;
import com.cloud.agent.api.Command;
import com.cloud.agent.api.PropagateResourceEventCommand;
import com.cloud.agent.api.TransferAgentCommand;
import com.cloud.exception.AgentUnavailableException;
import com.cloud.exception.OperationTimedoutException;
import com.cloud.serializer.GsonHelper;
import com.google.gson.Gson;
public class ClusterServiceServletHttpHandler implements HttpRequestHandler {
private static final Logger s_logger = Logger.getLogger(ClusterServiceServletHttpHandler.class);
private final Gson gson;
private final ClusterManager manager;
public ClusterServiceServletHttpHandler(ClusterManager manager) {
this.manager = manager;
gson = GsonHelper.getGson();
}
@Override
@ -141,17 +126,9 @@ public class ClusterServiceServletHttpHandler implements HttpRequestHandler {
}
switch(nMethod) {
case RemoteMethodConstants.METHOD_EXECUTE :
responseContent = handleExecuteMethodCall(req);
break;
case RemoteMethodConstants.METHOD_EXECUTE_ASYNC :
responseContent = handleExecuteAsyncMethodCall(req);
break;
case RemoteMethodConstants.METHOD_ASYNC_RESULT :
responseContent = handleAsyncResultMethodCall(req);
break;
case RemoteMethodConstants.METHOD_DELIVER_PDU :
responseContent = handleDeliverPduMethodCall(req);
break;
case RemoteMethodConstants.METHOD_PING :
responseContent = handlePingMethodCall(req);
@ -179,200 +156,31 @@ public class ClusterServiceServletHttpHandler implements HttpRequestHandler {
writeResponse(response, HttpStatus.SC_BAD_REQUEST, null);
}
}
private String handleExecuteMethodCall(HttpRequest req) {
String agentId = (String)req.getParams().getParameter("agentId");
String gsonPackage = (String)req.getParams().getParameter("gsonPackage");
String stopOnError = (String)req.getParams().getParameter("stopOnError");
if(s_logger.isDebugEnabled()) {
s_logger.debug("|->" + agentId + " " + gsonPackage);
}
Command [] cmds = null;
try {
cmds = gson.fromJson(gsonPackage, Command[].class);
} catch(Throwable e) {
assert(false);
s_logger.error("Excection in gson decoding : ", e);
}
if (cmds.length == 1 && cmds[0] instanceof ChangeAgentCommand) { //intercepted
ChangeAgentCommand cmd = (ChangeAgentCommand)cmds[0];
if (s_logger.isDebugEnabled()) {
s_logger.debug("Intercepting command for agent change: agent " + cmd.getAgentId() + " event: " + cmd.getEvent());
}
boolean result = false;
try {
result = manager.executeAgentUserRequest(cmd.getAgentId(), cmd.getEvent());
if (s_logger.isDebugEnabled()) {
s_logger.debug("Result is " + result);
}
} catch (AgentUnavailableException e) {
s_logger.warn("Agent is unavailable", e);
return null;
}
Answer[] answers = new Answer[1];
answers[0] = new ChangeAgentAnswer(cmd, result);
return gson.toJson(answers);
} else if (cmds.length == 1 && cmds[0] instanceof TransferAgentCommand) {
TransferAgentCommand cmd = (TransferAgentCommand) cmds[0];
if (s_logger.isDebugEnabled()) {
s_logger.debug("Intercepting command for agent rebalancing: agent " + cmd.getAgentId() + " event: " + cmd.getEvent());
}
boolean result = false;
try {
result = manager.rebalanceAgent(cmd.getAgentId(), cmd.getEvent(), cmd.getCurrentOwner(), cmd.getFutureOwner());
if (s_logger.isDebugEnabled()) {
s_logger.debug("Result is " + result);
}
private String handleDeliverPduMethodCall(HttpRequest req) {
String pduSeq = (String)req.getParams().getParameter("pduSeq");
String pduAckSeq = (String)req.getParams().getParameter("pduAckSeq");
String sourcePeer = (String)req.getParams().getParameter("sourcePeer");
String destPeer = (String)req.getParams().getParameter("destPeer");
String agentId = (String)req.getParams().getParameter("agentId");
String gsonPackage = (String)req.getParams().getParameter("gsonPackage");
String stopOnError = (String)req.getParams().getParameter("stopOnError");
String requestAck = (String)req.getParams().getParameter("requestAck");
} catch (AgentUnavailableException e) {
s_logger.warn("Agent is unavailable", e);
return null;
} catch (OperationTimedoutException e) {
s_logger.warn("Operation timed out", e);
return null;
}
Answer[] answers = new Answer[1];
answers[0] = new Answer(cmd, result, null);
return gson.toJson(answers);
} else if (cmds.length == 1 && cmds[0] instanceof PropagateResourceEventCommand) {
PropagateResourceEventCommand cmd = (PropagateResourceEventCommand) cmds[0];
if (s_logger.isDebugEnabled()) {
s_logger.debug("Intercepting command for resource event: host " + cmd.getHostId() + " event: " + cmd.getEvent());
}
boolean result = false;
try {
result = manager.executeResourceUserRequest(cmd.getHostId(), cmd.getEvent());
if (s_logger.isDebugEnabled()) {
s_logger.debug("Result is " + result);
}
} catch (AgentUnavailableException e) {
s_logger.warn("Agent is unavailable", e);
return null;
}
Answer[] answers = new Answer[1];
answers[0] = new Answer(cmd, result, null);
return gson.toJson(answers);
}
try {
long startTick = System.currentTimeMillis();
if(s_logger.isDebugEnabled()) {
s_logger.debug("Send |-> " + agentId + " " + gsonPackage + " to agent manager");
}
Answer[] answers = manager.sendToAgent(Long.parseLong(agentId), cmds,
Integer.parseInt(stopOnError) != 0 ? true : false);
if(answers != null) {
String jsonReturn = gson.toJson(answers);
if(s_logger.isDebugEnabled()) {
s_logger.debug("Completed |-> " + agentId + " " + gsonPackage +
" in " + (System.currentTimeMillis() - startTick) + " ms, return result: " + jsonReturn);
}
return jsonReturn;
} else {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Completed |-> " + agentId + " " + gsonPackage +
" in " + (System.currentTimeMillis() - startTick) + " ms, return null result");
}
}
} catch(AgentUnavailableException e) {
s_logger.warn("Agent is unavailable", e);
} catch (OperationTimedoutException e) {
s_logger.warn("Timed Out", e);
}
return null;
}
private String handleExecuteAsyncMethodCall(HttpRequest req) {
String agentId = (String)req.getParams().getParameter("agentId");
String gsonPackage = (String)req.getParams().getParameter("gsonPackage");
String stopOnError = (String)req.getParams().getParameter("stopOnError");
String callingPeer = (String)req.getParams().getParameter("caller");
if(s_logger.isDebugEnabled()) {
s_logger.debug("Async " + callingPeer + " |-> " + agentId + " " + gsonPackage);
}
Command [] cmds = null;
try {
cmds = gson.fromJson(gsonPackage, Command[].class);
} catch(Throwable e) {
assert(false);
s_logger.error("Excection in gson decoding : ", e);
}
Listener listener = new ClusterAsyncExectuionListener(manager, callingPeer);
long seq = -1;
try {
long startTick = System.currentTimeMillis();
if(s_logger.isDebugEnabled()) {
s_logger.debug("Send Async " + callingPeer + " |-> " + agentId + " " + gsonPackage + " to agent manager");
}
seq = manager.sendToAgent(Long.parseLong(agentId), cmds,
Integer.parseInt(stopOnError) != 0 ? true : false, listener);
if(s_logger.isDebugEnabled()) {
s_logger.debug("Complated Async " + callingPeer + " |-> " + agentId + " " + gsonPackage + " in " +
+ (System.currentTimeMillis() - startTick) + " ms, returned seq: " + seq);
}
} catch (AgentUnavailableException e) {
s_logger.warn("Agent is unavailable", e);
seq = -1;
}
return gson.toJson(seq);
}
private String handleAsyncResultMethodCall(HttpRequest req) {
String agentId = (String)req.getParams().getParameter("agentId");
String gsonPackage = (String)req.getParams().getParameter("gsonPackage");
String seq = (String)req.getParams().getParameter("seq");
String executingPeer = (String)req.getParams().getParameter("executingPeer");
if(s_logger.isDebugEnabled()) {
s_logger.debug("Async callback " + executingPeer + "." + agentId + " |-> " + gsonPackage);
}
Answer[] answers = null;
try {
answers = gson.fromJson(gsonPackage, Answer[].class);
} catch(Throwable e) {
assert(false);
s_logger.error("Excection in gson decoding : ", e);
}
long startTick = System.currentTimeMillis();
if(manager.onAsyncResult(executingPeer, Long.parseLong(agentId), Long.parseLong(seq), answers)) {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Completed local callback in " + (System.currentTimeMillis() - startTick) +
" ms, return recurring=true, let async listener contine on");
}
return "recurring=true";
}
if(s_logger.isDebugEnabled()) {
s_logger.debug("Completed local callback in " + (System.currentTimeMillis() - startTick) +
" ms, return recurring=false, indicate to tear down async listener");
}
return "recurring=false";
}
ClusterServicePdu pdu = new ClusterServicePdu();
pdu.setSourcePeer(sourcePeer);
pdu.setDestPeer(destPeer);
pdu.setAgentId(Long.parseLong(agentId));
pdu.setSequenceId(Long.parseLong(pduSeq));
pdu.setAckSequenceId(Long.parseLong(pduAckSeq));
pdu.setJsonPackage(gsonPackage);
pdu.setStopOnError("1".equals(stopOnError));
pdu.setRequest("1".equals(requestAck));
manager.OnReceiveClusterServicePdu(pdu);
return "true";
}
private String handlePingMethodCall(HttpRequest req) {
String callingPeer = (String)req.getParams().getParameter("callingPeer");

View File

@ -51,8 +51,28 @@ public class ClusterServiceServletImpl implements ClusterService {
this._requestTimeoutSeconds = requestTimeoutSeconds;
_gson = GsonHelper.getGson();
}
}
@Override
public String execute(ClusterServicePdu pdu) throws RemoteException {
HttpClient client = getHttpClient();
PostMethod method = new PostMethod(_serviceUrl);
method.addParameter("method", Integer.toString(RemoteMethodConstants.METHOD_DELIVER_PDU));
method.addParameter("sourcePeer", pdu.getSourcePeer());
method.addParameter("destPeer", pdu.getDestPeer());
method.addParameter("pduSeq", Long.toString(pdu.getSequenceId()));
method.addParameter("pduAckSeq", Long.toString(pdu.getAckSequenceId()));
method.addParameter("agentId", Long.toString(pdu.getAgentId()));
method.addParameter("gsonPackage", pdu.getJsonPackage());
method.addParameter("stopOnError", pdu.isStopOnError() ? "1" : "0");
method.addParameter("requestAck", pdu.isRequest() ? "1" : "0");
return executePostMethod(client, method);
}
/*
@Override
public String execute(String callingPeer, long agentId, String gsonPackage, boolean stopOnError) throws RemoteException {
if(s_logger.isDebugEnabled()) {
@ -129,7 +149,7 @@ public class ClusterServiceServletImpl implements ClusterService {
}
return false;
}
*/
@Override
public boolean ping(String callingPeer) throws RemoteException {
if(s_logger.isDebugEnabled()) {
@ -187,11 +207,13 @@ public class ClusterServiceServletImpl implements ClusterService {
// for test purpose only
public static void main(String[] args) {
/*
ClusterServiceServletImpl service = new ClusterServiceServletImpl("http://localhost:9090/clusterservice", 300);
try {
String result = service.execute("test", 1, "{ p1:v1, p2:v2 }", true);
System.out.println(result);
} catch (RemoteException e) {
}
*/
}
}

View File

@ -43,38 +43,29 @@ public class DummyClusterManagerImpl implements ClusterManager {
private String _name;
private final String _clusterNodeIP = "127.0.0.1";
@Override
public void OnReceiveClusterServicePdu(ClusterServicePdu pdu) {
throw new CloudRuntimeException("Unsupported feature");
}
@Override
public Answer[] execute(String strPeer, long agentId, Command [] cmds, boolean stopOnError) {
throw new CloudRuntimeException("Unsupported feature");
}
@Override
public long executeAsync(String strPeer, long agentId, Command[] cmds, boolean stopOnError, Listener listener) {
throw new CloudRuntimeException("Unsupported feature");
}
@Override
public boolean onAsyncResult(String executingPeer, long agentId, long seq, Answer[] answers) {
throw new CloudRuntimeException("Unsupported feature");
}
@Override
public boolean forwardAnswer(String targetPeer, long agentId, long seq, Answer[] answers) {
throw new CloudRuntimeException("Unsupported feature");
}
}
@Override
public Answer[] sendToAgent(Long hostId, Command [] cmds, boolean stopOnError)
throws AgentUnavailableException, OperationTimedoutException {
throw new CloudRuntimeException("Unsupported feature");
}
/*
@Override
public long sendToAgent(Long hostId, Command[] cmds, boolean stopOnError, Listener listener) throws AgentUnavailableException {
throw new CloudRuntimeException("Unsupported feature");
}
*/
@Override
public boolean executeAgentUserRequest(long agentId, Event event) throws AgentUnavailableException {
throw new CloudRuntimeException("Unsupported feature");

View File

@ -19,9 +19,7 @@
package com.cloud.cluster;
public interface RemoteMethodConstants {
public static final int METHOD_UNKNOWN = 0;
public static final int METHOD_EXECUTE = 1;
public static final int METHOD_EXECUTE_ASYNC = 2;
public static final int METHOD_ASYNC_RESULT = 3;
public static final int METHOD_UNKNOWN = 0;
public static final int METHOD_PING = 4;
public static final int METHOD_DELIVER_PDU = 5;
}

View File

@ -258,36 +258,21 @@ public class CheckPointManagerTest extends TestCase {
return _name;
}
@Override
public void OnReceiveClusterServicePdu(ClusterServicePdu pdu) {
throw new CloudRuntimeException("Not implemented");
}
@Override
public Answer[] execute(String strPeer, long agentId, Command[] cmds, boolean stopOnError) {
throw new UnsupportedOperationException("Not implemented");
}
@Override
public long executeAsync(String strPeer, long agentId, Command[] cmds, boolean stopOnError, Listener listener) {
throw new UnsupportedOperationException("Not implemented");
}
@Override
public boolean onAsyncResult(String executingPeer, long agentId, long seq, Answer[] answers) {
throw new UnsupportedOperationException("Not implemented");
}
@Override
public boolean forwardAnswer(String targetPeer, long agentId, long seq, Answer[] answers) {
throw new UnsupportedOperationException("Not implemented");
}
@Override
public Answer[] sendToAgent(Long hostId, Command[] cmds, boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException {
throw new UnsupportedOperationException("Not implemented");
}
@Override
public long sendToAgent(Long hostId, Command[] cmds, boolean stopOnError, Listener listener) throws AgentUnavailableException {
throw new UnsupportedOperationException("Not implemented");
}
@Override
public boolean executeAgentUserRequest(long agentId, Event event) throws AgentUnavailableException {
throw new UnsupportedOperationException("Not implemented");

View File

@ -850,5 +850,4 @@ public class MockNetworkManagerImpl implements NetworkManager, Manager, NetworkS
// TODO Auto-generated method stub
return null;
}
}