diff --git a/server/src/com/cloud/cluster/ClusterAsyncExectuionListener.java b/server/src/com/cloud/cluster/ClusterAsyncExectuionListener.java
deleted file mode 100755
index 6a8aa81ced4..00000000000
--- a/server/src/com/cloud/cluster/ClusterAsyncExectuionListener.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Copyright (C) 2010 Cloud.com, Inc. All rights reserved.
- *
- * This software is licensed under the GNU General Public License v3 or later.
- *
- * It is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or any later version.
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see .
- *
- */
-
-package com.cloud.cluster;
-
-import com.cloud.agent.Listener;
-import com.cloud.agent.api.AgentControlAnswer;
-import com.cloud.agent.api.AgentControlCommand;
-import com.cloud.agent.api.Answer;
-import com.cloud.agent.api.Command;
-import com.cloud.agent.api.StartupCommand;
-import com.cloud.host.HostVO;
-import com.cloud.host.Status;
-
-public class ClusterAsyncExectuionListener implements Listener {
- private final ClusterManager clusterMgr;
- private final String callingPeer;
- private boolean recurring = false;
-
- public ClusterAsyncExectuionListener(ClusterManager clusterMgr, String callingPeer) {
- this.clusterMgr = clusterMgr;
- this.callingPeer = callingPeer;
- }
-
- @Override
- public boolean processAnswers(long agentId, long seq, Answer[] answers) {
- recurring = clusterMgr.forwardAnswer(callingPeer, agentId, seq, answers);
- return true;
- }
-
- @Override
- public boolean processCommands(long agentId, long seq, Command[] commands) {
- return false;
- }
-
- @Override
- public AgentControlAnswer processControlCommand(long agentId, AgentControlCommand cmd) {
- return null;
- }
-
- @Override
- public void processConnect(HostVO agent, StartupCommand cmd, boolean forRebalance) {
- }
-
- @Override
- public boolean processDisconnect(long agentId, Status state) {
- return false;
- }
-
- @Override
- public boolean isRecurring() {
- return recurring;
- }
-
- @Override
- public boolean processTimeout(long agentId, long seq) {
- return true;
- }
-
- @Override
- public int getTimeout() {
- return -1;
- }
-}
-
diff --git a/server/src/com/cloud/cluster/ClusterManager.java b/server/src/com/cloud/cluster/ClusterManager.java
index cec5ea7b17c..46fdff3eacf 100755
--- a/server/src/com/cloud/cluster/ClusterManager.java
+++ b/server/src/com/cloud/cluster/ClusterManager.java
@@ -18,7 +18,6 @@
package com.cloud.cluster;
-import com.cloud.agent.Listener;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.Command;
import com.cloud.exception.AgentUnavailableException;
@@ -31,14 +30,11 @@ public interface ClusterManager extends Manager {
public static final int DEFAULT_HEARTBEAT_INTERVAL = 1500;
public static final int DEFAULT_HEARTBEAT_THRESHOLD = 150000;
public static final String ALERT_SUBJECT = "cluster-alert";
-
+
+ public void OnReceiveClusterServicePdu(ClusterServicePdu pdu);
public Answer[] execute(String strPeer, long agentId, Command [] cmds, boolean stopOnError);
- public long executeAsync(String strPeer, long agentId, Command[] cmds, boolean stopOnError, Listener listener);
- public boolean onAsyncResult(String executingPeer, long agentId, long seq, Answer[] answers);
- public boolean forwardAnswer(String targetPeer, long agentId, long seq, Answer[] answers);
-
- public Answer[] sendToAgent(Long hostId, Command [] cmds, boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException;
- public long sendToAgent(Long hostId, Command[] cmds, boolean stopOnError, Listener listener) throws AgentUnavailableException;
+
+ public Answer[] sendToAgent(Long hostId, Command [] cmds, boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException;
public boolean executeAgentUserRequest(long agentId, Event event) throws AgentUnavailableException;
public Boolean propagateAgentEvent(long agentId, Event event) throws AgentUnavailableException;
public Boolean propagateResourceEvent(long agentId, ResourceState.Event event) throws AgentUnavailableException;
diff --git a/server/src/com/cloud/cluster/ClusterManagerImpl.java b/server/src/com/cloud/cluster/ClusterManagerImpl.java
index 0982cc7afaa..2d00b9873fb 100755
--- a/server/src/com/cloud/cluster/ClusterManagerImpl.java
+++ b/server/src/com/cloud/cluster/ClusterManagerImpl.java
@@ -33,8 +33,10 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -45,13 +47,13 @@ import org.apache.log4j.Logger;
import com.cloud.agent.AgentManager;
import com.cloud.agent.AgentManager.OnError;
-import com.cloud.agent.Listener;
import com.cloud.agent.api.Answer;
+import com.cloud.agent.api.ChangeAgentAnswer;
import com.cloud.agent.api.ChangeAgentCommand;
import com.cloud.agent.api.Command;
import com.cloud.agent.api.PropagateResourceEventCommand;
+import com.cloud.agent.api.TransferAgentCommand;
import com.cloud.agent.manager.Commands;
-import com.cloud.cluster.ManagementServerHost.State;
import com.cloud.cluster.agentlb.dao.HostTransferMapDao;
import com.cloud.cluster.dao.ManagementServerHostDao;
import com.cloud.cluster.dao.ManagementServerHostPeerDao;
@@ -91,17 +93,14 @@ import com.google.gson.Gson;
public class ClusterManagerImpl implements ClusterManager {
private static final Logger s_logger = Logger.getLogger(ClusterManagerImpl.class);
-
private static final int EXECUTOR_SHUTDOWN_TIMEOUT = 1000; // 1 second
-
private final List _listeners = new ArrayList();
private final Map _activePeers = new HashMap();
private int _heartbeatInterval = ClusterManager.DEFAULT_HEARTBEAT_INTERVAL;
private int _heartbeatThreshold = ClusterManager.DEFAULT_HEARTBEAT_THRESHOLD;
private final Map _clusterPeers;
- private final Map _asyncCalls;
private final Gson _gson;
@Inject
@@ -142,10 +141,12 @@ public class ClusterManagerImpl implements ClusterManager {
private double _connectedAgentsThreshold = 0.7;
private static boolean _agentLbHappened = false;
+ private BlockingQueue _clusterPduOutgoingQueue = new LinkedBlockingQueue();
+ private BlockingQueue _clusterPduIncomingQueue = new LinkedBlockingQueue();
+ private Map _outgoingPdusWaitingForAck = new HashMap();
public ClusterManagerImpl() {
_clusterPeers = new HashMap();
- _asyncCalls = new HashMap();
_gson = GsonHelper.getGson();
@@ -154,7 +155,242 @@ public class ClusterManagerImpl implements ClusterManager {
//
_executor = Executors.newCachedThreadPool(new NamedThreadFactory("Cluster-Worker"));
}
+
+ private void registerRequestPdu(ClusterServiceRequestPdu pdu) {
+ synchronized(_outgoingPdusWaitingForAck) {
+ _outgoingPdusWaitingForAck.put(pdu.getSequenceId(), pdu);
+ }
+ }
+
+ private ClusterServiceRequestPdu popRequestPdu(long ackSequenceId) {
+ synchronized(_outgoingPdusWaitingForAck) {
+ if(_outgoingPdusWaitingForAck.get(ackSequenceId) != null) {
+ ClusterServiceRequestPdu pdu = _outgoingPdusWaitingForAck.get(ackSequenceId);
+ _outgoingPdusWaitingForAck.remove(ackSequenceId);
+ return pdu;
+ }
+ }
+
+ return null;
+ }
+
+ private void cancelClusterRequestToPeer(String strPeer) {
+ List candidates = new ArrayList();
+ synchronized(_outgoingPdusWaitingForAck) {
+ for(Map.Entry entry : _outgoingPdusWaitingForAck.entrySet()) {
+ if(entry.getValue().getDestPeer().equalsIgnoreCase(strPeer))
+ candidates.add(entry.getValue());
+ }
+ for(ClusterServiceRequestPdu pdu : candidates) {
+ _outgoingPdusWaitingForAck.remove(pdu.getSequenceId());
+ }
+ }
+
+ for(ClusterServiceRequestPdu pdu : candidates) {
+ s_logger.warn("Cancel cluster request PDU to peer: " + strPeer + ", pdu: " + _gson.toJson(pdu));
+ synchronized(pdu) {
+ pdu.notifyAll();
+ }
+ }
+ }
+
+ private Runnable getClusterPduSendingTask() {
+ return new Runnable() {
+ public void run() {
+ onSendingClusterPdu();
+ }
+ };
+ }
+
+ private Runnable getClusterPduNotificationTask() {
+ return new Runnable() {
+ public void run() {
+ onNotifyingClusterPdu();
+ }
+ };
+ }
+
+ private void onSendingClusterPdu() {
+ while(true) {
+ try {
+ ClusterServicePdu pdu = _clusterPduOutgoingQueue.take();
+
+ ClusterService peerService = null;
+ for(int i = 0; i < 2; i++) {
+ try {
+ peerService = getPeerService(pdu.getDestPeer());
+ } catch (RemoteException e) {
+ s_logger.error("Unable to get cluster service on peer : " + pdu.getDestPeer());
+ }
+
+ if(peerService != null) {
+ try {
+ if(s_logger.isDebugEnabled()) {
+ s_logger.debug("Cluster PDU " + getSelfPeerName() + " -> " + pdu.getDestPeer() + ". agent: " + pdu.getAgentId()
+ + ", pdu seq: " + pdu.getSequenceId() + ", pdu ack seq: " + pdu.getAckSequenceId() + ", json: " + pdu.getJsonPackage());
+ }
+
+ long startTick = System.currentTimeMillis();
+ String strResult = peerService.execute(pdu);
+ if(s_logger.isDebugEnabled()) {
+ s_logger.debug("Cluster PDU " + getSelfPeerName() + " -> " + pdu.getDestPeer() + " completed. time: " +
+ (System.currentTimeMillis() - startTick) + "ms. agent: " + pdu.getAgentId()
+ + ", pdu seq: " + pdu.getSequenceId() + ", pdu ack seq: " + pdu.getAckSequenceId() + ", json: " + pdu.getJsonPackage());
+ }
+
+ if("true".equals(strResult))
+ break;
+
+ } catch (RemoteException e) {
+ invalidatePeerService(pdu.getDestPeer());
+ if(s_logger.isInfoEnabled()) {
+ s_logger.info("Exception on remote execution, peer: " + pdu.getDestPeer() + ", iteration: "
+ + i + ", exception message :" + e.getMessage());
+ }
+ }
+ }
+ }
+ } catch(InterruptedException e) {
+ } catch(Throwable e) {
+ s_logger.error("Unexcpeted exception: ", e);
+ }
+ }
+ }
+
+ private void onNotifyingClusterPdu() {
+ while(true) {
+ try {
+ ClusterServicePdu pdu = _clusterPduOutgoingQueue.take();
+ if(pdu.isRequest()) {
+ String result = dispatchClusterServicePdu(pdu);
+ if(result == null)
+ result = "";
+
+ ClusterServicePdu responsePdu = new ClusterServicePdu();
+ responsePdu.setSourcePeer(pdu.getDestPeer());
+ responsePdu.setDestPeer(pdu.getSourcePeer());
+ responsePdu.setAckSequenceId(pdu.getSequenceId());
+ responsePdu.setJsonPackage(result);
+
+ _clusterPduOutgoingQueue.put(responsePdu);
+ } else {
+ ClusterServiceRequestPdu requestPdu = popRequestPdu(pdu.getAckSequenceId());
+ if(requestPdu != null) {
+ requestPdu.setResponseResult(pdu.getJsonPackage());
+ synchronized(requestPdu) {
+ requestPdu.notifyAll();
+ }
+ } else {
+ s_logger.warn("Original request has already been cancelled. pdu: " + _gson.toJson(pdu));
+ }
+ }
+ } catch(InterruptedException e) {
+ } catch(Throwable e) {
+ s_logger.error("Unexcpeted exception: ", e);
+ }
+ }
+ }
+
+ private String dispatchClusterServicePdu(ClusterServicePdu pdu) {
+
+ if(s_logger.isDebugEnabled()) {
+ s_logger.debug("Dispatch ->" + pdu.getAgentId() + ", json: " + pdu.getJsonPackage());
+ }
+
+ Command [] cmds = null;
+ try {
+ cmds = _gson.fromJson(pdu.getJsonPackage(), Command[].class);
+ } catch(Throwable e) {
+ assert(false);
+ s_logger.error("Excection in gson decoding : ", e);
+ }
+
+ if (cmds.length == 1 && cmds[0] instanceof ChangeAgentCommand) { //intercepted
+ ChangeAgentCommand cmd = (ChangeAgentCommand)cmds[0];
+
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Intercepting command for agent change: agent " + cmd.getAgentId() + " event: " + cmd.getEvent());
+ }
+ boolean result = false;
+ try {
+ result = executeAgentUserRequest(cmd.getAgentId(), cmd.getEvent());
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Result is " + result);
+ }
+
+ } catch (AgentUnavailableException e) {
+ s_logger.warn("Agent is unavailable", e);
+ return null;
+ }
+
+ Answer[] answers = new Answer[1];
+ answers[0] = new ChangeAgentAnswer(cmd, result);
+ return _gson.toJson(answers);
+ } else if (cmds.length == 1 && cmds[0] instanceof TransferAgentCommand) {
+ TransferAgentCommand cmd = (TransferAgentCommand) cmds[0];
+
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Intercepting command for agent rebalancing: agent " + cmd.getAgentId() + " event: " + cmd.getEvent());
+ }
+ boolean result = false;
+ try {
+ result = rebalanceAgent(cmd.getAgentId(), cmd.getEvent(), cmd.getCurrentOwner(), cmd.getFutureOwner());
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Result is " + result);
+ }
+
+ } catch (AgentUnavailableException e) {
+ s_logger.warn("Agent is unavailable", e);
+ return null;
+ } catch (OperationTimedoutException e) {
+ s_logger.warn("Operation timed out", e);
+ return null;
+ }
+ Answer[] answers = new Answer[1];
+ answers[0] = new Answer(cmd, result, null);
+ return _gson.toJson(answers);
+ }
+
+ try {
+ long startTick = System.currentTimeMillis();
+ if(s_logger.isDebugEnabled()) {
+ s_logger.debug("Dispatch -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage());
+ }
+
+ Answer[] answers = sendToAgent(pdu.getAgentId(), cmds, pdu.isStopOnError());
+ if(answers != null) {
+ String jsonReturn = _gson.toJson(answers);
+
+ if(s_logger.isDebugEnabled()) {
+ s_logger.debug("Completed dispatching -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage() +
+ " in " + (System.currentTimeMillis() - startTick) + " ms, return result: " + jsonReturn);
+ }
+
+ return jsonReturn;
+ } else {
+ if(s_logger.isDebugEnabled()) {
+ s_logger.debug("Completed dispatching -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage() +
+ " in " + (System.currentTimeMillis() - startTick) + " ms, return null result");
+ }
+ }
+ } catch(AgentUnavailableException e) {
+ s_logger.warn("Agent is unavailable", e);
+ } catch (OperationTimedoutException e) {
+ s_logger.warn("Timed Out", e);
+ }
+
+ return null;
+ }
+
+ public void OnReceiveClusterServicePdu(ClusterServicePdu pdu) {
+ try {
+ _clusterPduIncomingQueue.put(pdu);
+ } catch (InterruptedException e) {
+ s_logger.warn("InterruptedException. pdu: " + _gson.toJson(pdu) + " is dropped");
+ }
+ }
+
@Override
public Answer[] sendToAgent(Long hostId, Command[] cmds, boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException {
Commands commands = new Commands(stopOnError ? OnError.Stop : OnError.Continue);
@@ -164,15 +400,6 @@ public class ClusterManagerImpl implements ClusterManager {
return _agentMgr.send(hostId, commands);
}
- @Override
- public long sendToAgent(Long hostId, Command[] cmds, boolean stopOnError, Listener listener) throws AgentUnavailableException {
- Commands commands = new Commands(stopOnError ? OnError.Stop : OnError.Continue);
- for (Command cmd : cmds) {
- commands.addCommand(cmd);
- }
- return _agentMgr.send(hostId, commands, listener);
- }
-
@Override
public boolean executeAgentUserRequest(long agentId, Event event) throws AgentUnavailableException {
return _agentMgr.executeUserRequest(agentId, event);
@@ -229,7 +456,7 @@ public class ClusterManagerImpl implements ClusterManager {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Forwarding " + cmds[0].toString() + " to " + peer.getMsid());
}
- Answer[] answers = execute(peerName, agentId, cmds, true);
+ execute(peerName, agentId, cmds, true);
} catch (Exception e) {
s_logger.warn("Caught exception while talkign to " + peer.getMsid());
}
@@ -238,208 +465,44 @@ public class ClusterManagerImpl implements ClusterManager {
@Override
public Answer[] execute(String strPeer, long agentId, Command [] cmds, boolean stopOnError) {
- ClusterService peerService = null;
-
if(s_logger.isDebugEnabled()) {
s_logger.debug(getSelfPeerName() + " -> " + strPeer + "." + agentId + " " +
_gson.toJson(cmds, Command[].class));
}
-
- for(int i = 0; i < 2; i++) {
+
+ ClusterServiceRequestPdu pdu = new ClusterServiceRequestPdu();
+ pdu.setSourcePeer(getSelfPeerName());
+ pdu.setDestPeer(strPeer);
+ pdu.setAgentId(agentId);
+ pdu.setJsonPackage(_gson.toJson(cmds, Command[].class));
+ pdu.setStopOnError(stopOnError);
+ pdu.setRequest(true);
+ registerRequestPdu(pdu);
+ _clusterPduOutgoingQueue.add(pdu);
+
+ synchronized(pdu) {
try {
- peerService = getPeerService(strPeer);
- } catch (RemoteException e) {
- s_logger.error("Unable to get cluster service on peer : " + strPeer);
+ pdu.wait();
+ } catch (InterruptedException e) {
}
+ }
- if(peerService != null) {
- try {
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("Send " + getSelfPeerName() + " -> " + strPeer + "." + agentId + " to remote");
- }
-
- long startTick = System.currentTimeMillis();
- String strResult = peerService.execute(getSelfPeerName(), agentId, _gson.toJson(cmds, Command[].class), stopOnError);
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("Completed " + getSelfPeerName() + " -> " + strPeer + "." + agentId + "in " +
- (System.currentTimeMillis() - startTick) + " ms, result: " + strResult);
- }
-
- if(strResult != null) {
- try {
- return _gson.fromJson(strResult, Answer[].class);
- } catch(Throwable e) {
- s_logger.error("Exception on parsing gson package from remote call to " + strPeer);
- }
- }
- return null;
- } catch (RemoteException e) {
- invalidatePeerService(strPeer);
- if(s_logger.isInfoEnabled()) {
- s_logger.info("Exception on remote execution, peer: " + strPeer + ", iteration: "
- + i + ", exception message :" + e.getMessage());
- }
- }
+ if(s_logger.isDebugEnabled()) {
+ s_logger.debug(getSelfPeerName() + " -> " + strPeer + "." + agentId + " completed. result: " +
+ pdu.getResponseResult());
+ }
+
+ if(pdu.getResponseResult() != null && pdu.getResponseResult().length() > 0) {
+ try {
+ return _gson.fromJson(pdu.getResponseResult(), Answer[].class);
+ } catch(Throwable e) {
+ s_logger.error("Exception on parsing gson package from remote call to " + strPeer);
}
}
return null;
}
-
- @Override
- public long executeAsync(String strPeer, long agentId, Command[] cmds, boolean stopOnError, Listener listener) {
- ClusterService peerService = null;
-
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("Async " + getSelfPeerName() + " -> " + strPeer + "." + agentId + " " +
- _gson.toJson(cmds, Command[].class));
- }
-
- for(int i = 0; i < 2; i++) {
- try {
- peerService = getPeerService(strPeer);
- } catch (RemoteException e) {
- s_logger.error("Unable to get cluster service on peer : " + strPeer);
- }
- if(peerService != null) {
- try {
- long seq = 0;
- synchronized(String.valueOf(agentId).intern()) {
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("Send Async " + getSelfPeerName() + " -> " + strPeer + "." + agentId + " to remote");
- }
-
- long startTick = System.currentTimeMillis();
- seq = peerService.executeAsync(getSelfPeerName(), agentId, _gson.toJson(cmds, Command[].class), stopOnError);
- if(seq > 0) {
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("Completed Async " + getSelfPeerName() + " -> " + strPeer + "." + agentId
- + " in " + (System.currentTimeMillis() - startTick) + " ms"
- + ", register local listener " + strPeer + "/" + seq);
- }
-
- registerAsyncCall(strPeer, seq, listener);
- } else {
- s_logger.warn("Completed Async " + getSelfPeerName() + " -> " + strPeer + "." + agentId
- + " in " + (System.currentTimeMillis() - startTick) + " ms, return indicates failure, seq: " + seq);
- }
- }
- return seq;
- } catch (RemoteException e) {
- invalidatePeerService(strPeer);
-
- if(s_logger.isInfoEnabled()) {
- s_logger.info("Exception on remote execution -> " + strPeer + ", iteration : " + i);
- }
- }
- }
- }
-
- return 0L;
- }
-
- @Override
- public boolean onAsyncResult(String executingPeer, long agentId, long seq, Answer[] answers) {
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("Process Async-call result from remote peer " + executingPeer + ", {" +
- agentId + "-" + seq + "} answers: " + (answers != null ? _gson.toJson(answers, Answer[].class): "null"));
- }
-
- Listener listener = null;
- synchronized(String.valueOf(agentId).intern()) {
- // need to synchronize it with executeAsync() to make sure listener have been registered
- // before this callback reaches back
- listener = getAsyncCallListener(executingPeer, seq);
- }
-
- if(listener != null) {
- long startTick = System.currentTimeMillis();
-
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("Processing answer {" + agentId + "-" + seq + "} from remote peer " + executingPeer);
- }
-
- listener.processAnswers(agentId, seq, answers);
-
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("Answer {" + agentId + "-" + seq + "} is processed in " +
- (System.currentTimeMillis() - startTick) + " ms");
- }
-
- if(!listener.isRecurring()) {
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("Listener is not recurring after async-result callback {" +
- agentId + "-" + seq + "}, unregister it");
- }
-
- unregisterAsyncCall(executingPeer, seq);
- } else {
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("Listener is recurring after async-result callback {" + agentId
- +"-" + seq + "}, will keep it");
- }
- return true;
- }
- } else {
- if(s_logger.isInfoEnabled()) {
- s_logger.info("Async-call Listener has not been registered yet for {" + agentId
- +"-" + seq + "}");
- }
- }
- return false;
- }
-
- @Override
- public boolean forwardAnswer(String targetPeer, long agentId, long seq, Answer[] answers) {
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("Forward -> " + targetPeer + " Async-call answer {" + agentId + "-" + seq +
- "} " + (answers != null? _gson.toJson(answers, Answer[].class):""));
- }
-
- final String targetPeerF = targetPeer;
- final Answer[] answersF = answers;
- final long agentIdF = agentId;
- final long seqF = seq;
-
- ClusterService peerService = null;
-
- for(int i = 0; i < 2; i++) {
- try {
- peerService = getPeerService(targetPeerF);
- } catch (RemoteException e) {
- s_logger.error("cluster service for peer " + targetPeerF + " no longer exists");
- }
-
- if(peerService != null) {
- try {
- boolean result = false;
-
- long startTick = System.currentTimeMillis();
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("Start forwarding Async-call answer {" + agentId + "-" + seq + "} to remote");
- }
-
- result = peerService.onAsyncResult(getSelfPeerName(), agentIdF, seqF, _gson.toJson(answersF, Answer[].class));
-
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("Completed forwarding Async-call answer {" + agentId + "-" + seq + "} in " +
- (System.currentTimeMillis() - startTick) + " ms, return result: " + result);
- }
-
- return result;
- } catch (RemoteException e) {
- s_logger.warn("Exception in performing remote call, ", e);
- invalidatePeerService(targetPeerF);
- }
- } else {
- s_logger.warn("Remote peer " + targetPeer + " no longer exists to process answer {" + agentId + "-"
- + seq + "}");
- }
- }
-
- return false;
- }
-
+
@Override
public String getPeerName(long agentHostId) {
@@ -511,10 +574,12 @@ public class ClusterManagerImpl implements ClusterManager {
public void notifyNodeLeft(List nodeList) {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Notify management server node left to listeners.");
-
- for(ManagementServerHostVO mshost : nodeList) {
+ }
+
+ for(ManagementServerHostVO mshost : nodeList) {
+ if(s_logger.isDebugEnabled())
s_logger.debug("Leaving node, IP: " + mshost.getServiceIP() + ", msid: " + mshost.getMsid());
- }
+ cancelClusterRequestToPeer(String.valueOf(mshost.getMsid()));
}
synchronized(_listeners) {
@@ -568,38 +633,6 @@ public class ClusterManagerImpl implements ClusterManager {
}
}
- private void registerAsyncCall(String strPeer, long seq, Listener listener) {
- String key = strPeer + "/" + seq;
-
- synchronized(_asyncCalls) {
- if(!_asyncCalls.containsKey(key)) {
- _asyncCalls.put(key, listener);
- }
- }
- }
-
- private Listener getAsyncCallListener(String strPeer, long seq) {
- String key = strPeer + "/" + seq;
-
- synchronized(_asyncCalls) {
- if(_asyncCalls.containsKey(key)) {
- return _asyncCalls.get(key);
- }
- }
-
- return null;
- }
-
- private void unregisterAsyncCall(String strPeer, long seq) {
- String key = strPeer + "/" + seq;
-
- synchronized(_asyncCalls) {
- if(_asyncCalls.containsKey(key)) {
- _asyncCalls.remove(key);
- }
- }
- }
-
private Runnable getHeartbeatTask() {
return new Runnable() {
@Override
@@ -677,12 +710,12 @@ public class ClusterManagerImpl implements ClusterManager {
s_logger.error("Runtime DB exception ", e.getCause());
if(e.getCause() instanceof ClusterInvalidSessionException) {
- s_logger.error("Invalid cluster session found");
+ s_logger.error("Invalid cluster session found, fence it");
queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeIsolated));
}
if(isRootCauseConnectionRelated(e.getCause())) {
- s_logger.error("DB communication problem detected");
+ s_logger.error("DB communication problem detected, fence it");
queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeIsolated));
}
@@ -690,12 +723,13 @@ public class ClusterManagerImpl implements ClusterManager {
} catch(ActiveFencingException e) {
queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeIsolated));
} catch (Throwable e) {
+ s_logger.error("Unexpected exception in cluster heartbeat", e);
if(isRootCauseConnectionRelated(e.getCause())) {
- s_logger.error("DB communication problem detected");
+ s_logger.error("DB communication problem detected, fence it");
queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeIsolated));
}
- s_logger.error("Problem with the cluster heartbeat!", e);
+ invalidHeartbeatConnection();
} finally {
txn.close("ClusterHeartBeat");
}
@@ -1205,7 +1239,6 @@ public class ClusterManagerImpl implements ClusterManager {
throw new ConfigurationException("Unable to set current cluster service adapter");
}
-
_agentLBEnabled = Boolean.valueOf(configDao.getValue(Config.AgentLbEnable.key()));
String connectedAgentsThreshold = configs.get("agent.load.threshold");
@@ -1218,6 +1251,13 @@ public class ClusterManagerImpl implements ClusterManager {
checkConflicts();
+ _executor.execute(getClusterPduSendingTask());
+
+ // TODO, make it configurable
+ for(int i = 0; i < 5; i++)
+ _executor.execute(getClusterPduNotificationTask());
+
+
if(s_logger.isInfoEnabled()) {
s_logger.info("Cluster manager is configured.");
}
diff --git a/server/src/com/cloud/cluster/ClusterService.java b/server/src/com/cloud/cluster/ClusterService.java
index dd8081b46ce..408eb15c292 100644
--- a/server/src/com/cloud/cluster/ClusterService.java
+++ b/server/src/com/cloud/cluster/ClusterService.java
@@ -21,9 +21,7 @@ package com.cloud.cluster;
import java.rmi.Remote;
import java.rmi.RemoteException;
-public interface ClusterService extends Remote {
- String execute(String callingPeer, long agentId, String gsonPackage, boolean stopOnError) throws RemoteException;
- long executeAsync(String callingPeer, long agentId, String gsonPackage, boolean stopOnError) throws RemoteException;
- boolean onAsyncResult(String executingPeer, long agentId, long seq, String gsonPackage) throws RemoteException;
- boolean ping(String callingPeer) throws RemoteException;
+public interface ClusterService extends Remote {
+ String execute(ClusterServicePdu pdu) throws RemoteException;
+ boolean ping(String callingPeer) throws RemoteException;
}
diff --git a/server/src/com/cloud/cluster/ClusterServicePdu.java b/server/src/com/cloud/cluster/ClusterServicePdu.java
new file mode 100644
index 00000000000..f0976c25be3
--- /dev/null
+++ b/server/src/com/cloud/cluster/ClusterServicePdu.java
@@ -0,0 +1,93 @@
+package com.cloud.cluster;
+
+public class ClusterServicePdu {
+
+ private long sequenceId;
+ private long ackSequenceId;
+
+ private String sourcePeer;
+ private String destPeer;
+
+ private long agentId;
+ private boolean stopOnError;
+ private String jsonPackage;
+
+ private boolean request = false;
+
+ private static long s_nextPduSequenceId = 1;
+
+ public ClusterServicePdu() {
+ sequenceId = getNextPduSequenceId();
+ ackSequenceId = 0;
+ agentId = 0;
+ stopOnError = false;
+ }
+
+ public synchronized long getNextPduSequenceId() {
+ return s_nextPduSequenceId++;
+ }
+
+ public long getSequenceId() {
+ return sequenceId;
+ }
+
+ public void setSequenceId(long sequenceId) {
+ this.sequenceId = sequenceId;
+ }
+
+ public long getAckSequenceId() {
+ return ackSequenceId;
+ }
+
+ public void setAckSequenceId(long ackSequenceId) {
+ this.ackSequenceId = ackSequenceId;
+ }
+
+ public String getSourcePeer() {
+ return sourcePeer;
+ }
+
+ public void setSourcePeer(String sourcePeer) {
+ this.sourcePeer = sourcePeer;
+ }
+
+ public String getDestPeer() {
+ return destPeer;
+ }
+
+ public void setDestPeer(String destPeer) {
+ this.destPeer = destPeer;
+ }
+
+ public long getAgentId() {
+ return agentId;
+ }
+
+ public void setAgentId(long agentId) {
+ this.agentId = agentId;
+ }
+
+ public boolean isStopOnError() {
+ return stopOnError;
+ }
+
+ public void setStopOnError(boolean stopOnError) {
+ this.stopOnError = stopOnError;
+ }
+
+ public String getJsonPackage() {
+ return jsonPackage;
+ }
+
+ public void setJsonPackage(String jsonPackage) {
+ this.jsonPackage = jsonPackage;
+ }
+
+ public boolean isRequest() {
+ return request;
+ }
+
+ public void setRequest(boolean value) {
+ this.request = value;
+ }
+}
diff --git a/server/src/com/cloud/cluster/ClusterServiceRequestPdu.java b/server/src/com/cloud/cluster/ClusterServiceRequestPdu.java
new file mode 100644
index 00000000000..1088d771503
--- /dev/null
+++ b/server/src/com/cloud/cluster/ClusterServiceRequestPdu.java
@@ -0,0 +1,37 @@
+package com.cloud.cluster;
+
+public class ClusterServiceRequestPdu extends ClusterServicePdu {
+
+ private String responseResult;
+ private long startTick;
+ private long timeout;
+
+ public ClusterServiceRequestPdu() {
+ startTick = System.currentTimeMillis();
+ timeout = -1;
+ }
+
+ public String getResponseResult() {
+ return responseResult;
+ }
+
+ public void setResponseResult(String responseResult) {
+ this.responseResult = responseResult;
+ }
+
+ public long getStartTick() {
+ return startTick;
+ }
+
+ public void setStartTick(long startTick) {
+ this.startTick = startTick;
+ }
+
+ public long getTimeout() {
+ return timeout;
+ }
+
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
+}
diff --git a/server/src/com/cloud/cluster/ClusterServiceServletHttpHandler.java b/server/src/com/cloud/cluster/ClusterServiceServletHttpHandler.java
index 9636683076b..f1066699424 100755
--- a/server/src/com/cloud/cluster/ClusterServiceServletHttpHandler.java
+++ b/server/src/com/cloud/cluster/ClusterServiceServletHttpHandler.java
@@ -33,28 +33,13 @@ import org.apache.http.protocol.HttpRequestHandler;
import org.apache.http.util.EntityUtils;
import org.apache.log4j.Logger;
-import com.cloud.agent.Listener;
-import com.cloud.agent.api.Answer;
-import com.cloud.agent.api.ChangeAgentAnswer;
-import com.cloud.agent.api.ChangeAgentCommand;
-import com.cloud.agent.api.Command;
-import com.cloud.agent.api.PropagateResourceEventCommand;
-import com.cloud.agent.api.TransferAgentCommand;
-import com.cloud.exception.AgentUnavailableException;
-import com.cloud.exception.OperationTimedoutException;
-import com.cloud.serializer.GsonHelper;
-import com.google.gson.Gson;
-
public class ClusterServiceServletHttpHandler implements HttpRequestHandler {
private static final Logger s_logger = Logger.getLogger(ClusterServiceServletHttpHandler.class);
- private final Gson gson;
private final ClusterManager manager;
public ClusterServiceServletHttpHandler(ClusterManager manager) {
this.manager = manager;
-
- gson = GsonHelper.getGson();
}
@Override
@@ -141,17 +126,9 @@ public class ClusterServiceServletHttpHandler implements HttpRequestHandler {
}
switch(nMethod) {
- case RemoteMethodConstants.METHOD_EXECUTE :
- responseContent = handleExecuteMethodCall(req);
- break;
-
- case RemoteMethodConstants.METHOD_EXECUTE_ASYNC :
- responseContent = handleExecuteAsyncMethodCall(req);
- break;
-
- case RemoteMethodConstants.METHOD_ASYNC_RESULT :
- responseContent = handleAsyncResultMethodCall(req);
- break;
+ case RemoteMethodConstants.METHOD_DELIVER_PDU :
+ responseContent = handleDeliverPduMethodCall(req);
+ break;
case RemoteMethodConstants.METHOD_PING :
responseContent = handlePingMethodCall(req);
@@ -179,200 +156,31 @@ public class ClusterServiceServletHttpHandler implements HttpRequestHandler {
writeResponse(response, HttpStatus.SC_BAD_REQUEST, null);
}
}
-
- private String handleExecuteMethodCall(HttpRequest req) {
- String agentId = (String)req.getParams().getParameter("agentId");
- String gsonPackage = (String)req.getParams().getParameter("gsonPackage");
- String stopOnError = (String)req.getParams().getParameter("stopOnError");
-
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("|->" + agentId + " " + gsonPackage);
- }
-
- Command [] cmds = null;
- try {
- cmds = gson.fromJson(gsonPackage, Command[].class);
- } catch(Throwable e) {
- assert(false);
- s_logger.error("Excection in gson decoding : ", e);
- }
-
- if (cmds.length == 1 && cmds[0] instanceof ChangeAgentCommand) { //intercepted
- ChangeAgentCommand cmd = (ChangeAgentCommand)cmds[0];
-
- if (s_logger.isDebugEnabled()) {
- s_logger.debug("Intercepting command for agent change: agent " + cmd.getAgentId() + " event: " + cmd.getEvent());
- }
- boolean result = false;
- try {
- result = manager.executeAgentUserRequest(cmd.getAgentId(), cmd.getEvent());
- if (s_logger.isDebugEnabled()) {
- s_logger.debug("Result is " + result);
- }
-
- } catch (AgentUnavailableException e) {
- s_logger.warn("Agent is unavailable", e);
- return null;
- }
-
- Answer[] answers = new Answer[1];
- answers[0] = new ChangeAgentAnswer(cmd, result);
- return gson.toJson(answers);
- } else if (cmds.length == 1 && cmds[0] instanceof TransferAgentCommand) {
- TransferAgentCommand cmd = (TransferAgentCommand) cmds[0];
- if (s_logger.isDebugEnabled()) {
- s_logger.debug("Intercepting command for agent rebalancing: agent " + cmd.getAgentId() + " event: " + cmd.getEvent());
- }
- boolean result = false;
- try {
- result = manager.rebalanceAgent(cmd.getAgentId(), cmd.getEvent(), cmd.getCurrentOwner(), cmd.getFutureOwner());
- if (s_logger.isDebugEnabled()) {
- s_logger.debug("Result is " + result);
- }
+ private String handleDeliverPduMethodCall(HttpRequest req) {
+
+ String pduSeq = (String)req.getParams().getParameter("pduSeq");
+ String pduAckSeq = (String)req.getParams().getParameter("pduAckSeq");
+ String sourcePeer = (String)req.getParams().getParameter("sourcePeer");
+ String destPeer = (String)req.getParams().getParameter("destPeer");
+ String agentId = (String)req.getParams().getParameter("agentId");
+ String gsonPackage = (String)req.getParams().getParameter("gsonPackage");
+ String stopOnError = (String)req.getParams().getParameter("stopOnError");
+ String requestAck = (String)req.getParams().getParameter("requestAck");
- } catch (AgentUnavailableException e) {
- s_logger.warn("Agent is unavailable", e);
- return null;
- } catch (OperationTimedoutException e) {
- s_logger.warn("Operation timed out", e);
- return null;
- }
- Answer[] answers = new Answer[1];
- answers[0] = new Answer(cmd, result, null);
- return gson.toJson(answers);
- } else if (cmds.length == 1 && cmds[0] instanceof PropagateResourceEventCommand) {
- PropagateResourceEventCommand cmd = (PropagateResourceEventCommand) cmds[0];
-
- if (s_logger.isDebugEnabled()) {
- s_logger.debug("Intercepting command for resource event: host " + cmd.getHostId() + " event: " + cmd.getEvent());
- }
- boolean result = false;
- try {
- result = manager.executeResourceUserRequest(cmd.getHostId(), cmd.getEvent());
- if (s_logger.isDebugEnabled()) {
- s_logger.debug("Result is " + result);
- }
-
- } catch (AgentUnavailableException e) {
- s_logger.warn("Agent is unavailable", e);
- return null;
- }
-
- Answer[] answers = new Answer[1];
- answers[0] = new Answer(cmd, result, null);
- return gson.toJson(answers);
- }
-
- try {
- long startTick = System.currentTimeMillis();
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("Send |-> " + agentId + " " + gsonPackage + " to agent manager");
- }
-
- Answer[] answers = manager.sendToAgent(Long.parseLong(agentId), cmds,
- Integer.parseInt(stopOnError) != 0 ? true : false);
-
- if(answers != null) {
- String jsonReturn = gson.toJson(answers);
-
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("Completed |-> " + agentId + " " + gsonPackage +
- " in " + (System.currentTimeMillis() - startTick) + " ms, return result: " + jsonReturn);
- }
-
- return jsonReturn;
- } else {
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("Completed |-> " + agentId + " " + gsonPackage +
- " in " + (System.currentTimeMillis() - startTick) + " ms, return null result");
- }
- }
- } catch(AgentUnavailableException e) {
- s_logger.warn("Agent is unavailable", e);
- } catch (OperationTimedoutException e) {
- s_logger.warn("Timed Out", e);
- }
-
- return null;
- }
-
- private String handleExecuteAsyncMethodCall(HttpRequest req) {
- String agentId = (String)req.getParams().getParameter("agentId");
- String gsonPackage = (String)req.getParams().getParameter("gsonPackage");
- String stopOnError = (String)req.getParams().getParameter("stopOnError");
- String callingPeer = (String)req.getParams().getParameter("caller");
-
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("Async " + callingPeer + " |-> " + agentId + " " + gsonPackage);
- }
-
- Command [] cmds = null;
- try {
- cmds = gson.fromJson(gsonPackage, Command[].class);
- } catch(Throwable e) {
- assert(false);
- s_logger.error("Excection in gson decoding : ", e);
- }
-
- Listener listener = new ClusterAsyncExectuionListener(manager, callingPeer);
- long seq = -1;
- try {
- long startTick = System.currentTimeMillis();
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("Send Async " + callingPeer + " |-> " + agentId + " " + gsonPackage + " to agent manager");
- }
-
- seq = manager.sendToAgent(Long.parseLong(agentId), cmds,
- Integer.parseInt(stopOnError) != 0 ? true : false, listener);
-
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("Complated Async " + callingPeer + " |-> " + agentId + " " + gsonPackage + " in " +
- + (System.currentTimeMillis() - startTick) + " ms, returned seq: " + seq);
- }
- } catch (AgentUnavailableException e) {
- s_logger.warn("Agent is unavailable", e);
- seq = -1;
- }
-
- return gson.toJson(seq);
- }
-
- private String handleAsyncResultMethodCall(HttpRequest req) {
- String agentId = (String)req.getParams().getParameter("agentId");
- String gsonPackage = (String)req.getParams().getParameter("gsonPackage");
- String seq = (String)req.getParams().getParameter("seq");
- String executingPeer = (String)req.getParams().getParameter("executingPeer");
-
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("Async callback " + executingPeer + "." + agentId + " |-> " + gsonPackage);
- }
-
- Answer[] answers = null;
- try {
- answers = gson.fromJson(gsonPackage, Answer[].class);
- } catch(Throwable e) {
- assert(false);
- s_logger.error("Excection in gson decoding : ", e);
- }
-
- long startTick = System.currentTimeMillis();
- if(manager.onAsyncResult(executingPeer, Long.parseLong(agentId), Long.parseLong(seq), answers)) {
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("Completed local callback in " + (System.currentTimeMillis() - startTick) +
- " ms, return recurring=true, let async listener contine on");
- }
-
- return "recurring=true";
- }
-
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("Completed local callback in " + (System.currentTimeMillis() - startTick) +
- " ms, return recurring=false, indicate to tear down async listener");
- }
-
- return "recurring=false";
- }
+ ClusterServicePdu pdu = new ClusterServicePdu();
+ pdu.setSourcePeer(sourcePeer);
+ pdu.setDestPeer(destPeer);
+ pdu.setAgentId(Long.parseLong(agentId));
+ pdu.setSequenceId(Long.parseLong(pduSeq));
+ pdu.setAckSequenceId(Long.parseLong(pduAckSeq));
+ pdu.setJsonPackage(gsonPackage);
+ pdu.setStopOnError("1".equals(stopOnError));
+ pdu.setRequest("1".equals(requestAck));
+
+ manager.OnReceiveClusterServicePdu(pdu);
+ return "true";
+ }
private String handlePingMethodCall(HttpRequest req) {
String callingPeer = (String)req.getParams().getParameter("callingPeer");
diff --git a/server/src/com/cloud/cluster/ClusterServiceServletImpl.java b/server/src/com/cloud/cluster/ClusterServiceServletImpl.java
index ce65fe18e87..a33ec1e1569 100644
--- a/server/src/com/cloud/cluster/ClusterServiceServletImpl.java
+++ b/server/src/com/cloud/cluster/ClusterServiceServletImpl.java
@@ -51,8 +51,28 @@ public class ClusterServiceServletImpl implements ClusterService {
this._requestTimeoutSeconds = requestTimeoutSeconds;
_gson = GsonHelper.getGson();
- }
-
+ }
+
+ @Override
+ public String execute(ClusterServicePdu pdu) throws RemoteException {
+
+ HttpClient client = getHttpClient();
+ PostMethod method = new PostMethod(_serviceUrl);
+
+ method.addParameter("method", Integer.toString(RemoteMethodConstants.METHOD_DELIVER_PDU));
+ method.addParameter("sourcePeer", pdu.getSourcePeer());
+ method.addParameter("destPeer", pdu.getDestPeer());
+ method.addParameter("pduSeq", Long.toString(pdu.getSequenceId()));
+ method.addParameter("pduAckSeq", Long.toString(pdu.getAckSequenceId()));
+ method.addParameter("agentId", Long.toString(pdu.getAgentId()));
+ method.addParameter("gsonPackage", pdu.getJsonPackage());
+ method.addParameter("stopOnError", pdu.isStopOnError() ? "1" : "0");
+ method.addParameter("requestAck", pdu.isRequest() ? "1" : "0");
+
+ return executePostMethod(client, method);
+ }
+
+/*
@Override
public String execute(String callingPeer, long agentId, String gsonPackage, boolean stopOnError) throws RemoteException {
if(s_logger.isDebugEnabled()) {
@@ -129,7 +149,7 @@ public class ClusterServiceServletImpl implements ClusterService {
}
return false;
}
-
+*/
@Override
public boolean ping(String callingPeer) throws RemoteException {
if(s_logger.isDebugEnabled()) {
@@ -187,11 +207,13 @@ public class ClusterServiceServletImpl implements ClusterService {
// for test purpose only
public static void main(String[] args) {
+/*
ClusterServiceServletImpl service = new ClusterServiceServletImpl("http://localhost:9090/clusterservice", 300);
try {
String result = service.execute("test", 1, "{ p1:v1, p2:v2 }", true);
System.out.println(result);
} catch (RemoteException e) {
}
+*/
}
}
diff --git a/server/src/com/cloud/cluster/DummyClusterManagerImpl.java b/server/src/com/cloud/cluster/DummyClusterManagerImpl.java
index 6275b8b21e7..999e1a98d58 100755
--- a/server/src/com/cloud/cluster/DummyClusterManagerImpl.java
+++ b/server/src/com/cloud/cluster/DummyClusterManagerImpl.java
@@ -43,38 +43,29 @@ public class DummyClusterManagerImpl implements ClusterManager {
private String _name;
private final String _clusterNodeIP = "127.0.0.1";
-
+
+ @Override
+ public void OnReceiveClusterServicePdu(ClusterServicePdu pdu) {
+ throw new CloudRuntimeException("Unsupported feature");
+ }
+
@Override
public Answer[] execute(String strPeer, long agentId, Command [] cmds, boolean stopOnError) {
throw new CloudRuntimeException("Unsupported feature");
- }
-
- @Override
- public long executeAsync(String strPeer, long agentId, Command[] cmds, boolean stopOnError, Listener listener) {
- throw new CloudRuntimeException("Unsupported feature");
- }
-
- @Override
- public boolean onAsyncResult(String executingPeer, long agentId, long seq, Answer[] answers) {
- throw new CloudRuntimeException("Unsupported feature");
- }
-
- @Override
- public boolean forwardAnswer(String targetPeer, long agentId, long seq, Answer[] answers) {
- throw new CloudRuntimeException("Unsupported feature");
- }
-
+ }
+
@Override
public Answer[] sendToAgent(Long hostId, Command [] cmds, boolean stopOnError)
throws AgentUnavailableException, OperationTimedoutException {
throw new CloudRuntimeException("Unsupported feature");
}
-
+
+/*
@Override
public long sendToAgent(Long hostId, Command[] cmds, boolean stopOnError, Listener listener) throws AgentUnavailableException {
throw new CloudRuntimeException("Unsupported feature");
}
-
+*/
@Override
public boolean executeAgentUserRequest(long agentId, Event event) throws AgentUnavailableException {
throw new CloudRuntimeException("Unsupported feature");
diff --git a/server/src/com/cloud/cluster/RemoteMethodConstants.java b/server/src/com/cloud/cluster/RemoteMethodConstants.java
index 335785f7fe2..c1ff62f2a23 100644
--- a/server/src/com/cloud/cluster/RemoteMethodConstants.java
+++ b/server/src/com/cloud/cluster/RemoteMethodConstants.java
@@ -19,9 +19,7 @@
package com.cloud.cluster;
public interface RemoteMethodConstants {
- public static final int METHOD_UNKNOWN = 0;
- public static final int METHOD_EXECUTE = 1;
- public static final int METHOD_EXECUTE_ASYNC = 2;
- public static final int METHOD_ASYNC_RESULT = 3;
+ public static final int METHOD_UNKNOWN = 0;
public static final int METHOD_PING = 4;
+ public static final int METHOD_DELIVER_PDU = 5;
}
diff --git a/server/test/com/cloud/cluster/CheckPointManagerTest.java b/server/test/com/cloud/cluster/CheckPointManagerTest.java
index ac2dd041441..132958a2a7a 100755
--- a/server/test/com/cloud/cluster/CheckPointManagerTest.java
+++ b/server/test/com/cloud/cluster/CheckPointManagerTest.java
@@ -258,36 +258,21 @@ public class CheckPointManagerTest extends TestCase {
return _name;
}
+ @Override
+ public void OnReceiveClusterServicePdu(ClusterServicePdu pdu) {
+ throw new CloudRuntimeException("Not implemented");
+ }
+
@Override
public Answer[] execute(String strPeer, long agentId, Command[] cmds, boolean stopOnError) {
throw new UnsupportedOperationException("Not implemented");
}
- @Override
- public long executeAsync(String strPeer, long agentId, Command[] cmds, boolean stopOnError, Listener listener) {
- throw new UnsupportedOperationException("Not implemented");
- }
-
- @Override
- public boolean onAsyncResult(String executingPeer, long agentId, long seq, Answer[] answers) {
- throw new UnsupportedOperationException("Not implemented");
- }
-
- @Override
- public boolean forwardAnswer(String targetPeer, long agentId, long seq, Answer[] answers) {
- throw new UnsupportedOperationException("Not implemented");
- }
-
@Override
public Answer[] sendToAgent(Long hostId, Command[] cmds, boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException {
throw new UnsupportedOperationException("Not implemented");
}
- @Override
- public long sendToAgent(Long hostId, Command[] cmds, boolean stopOnError, Listener listener) throws AgentUnavailableException {
- throw new UnsupportedOperationException("Not implemented");
- }
-
@Override
public boolean executeAgentUserRequest(long agentId, Event event) throws AgentUnavailableException {
throw new UnsupportedOperationException("Not implemented");
diff --git a/server/test/com/cloud/network/MockNetworkManagerImpl.java b/server/test/com/cloud/network/MockNetworkManagerImpl.java
index d9839062a08..06ef648e42c 100755
--- a/server/test/com/cloud/network/MockNetworkManagerImpl.java
+++ b/server/test/com/cloud/network/MockNetworkManagerImpl.java
@@ -850,5 +850,4 @@ public class MockNetworkManagerImpl implements NetworkManager, Manager, NetworkS
// TODO Auto-generated method stub
return null;
}
-
}