From f23e4cab71dcdb69a1030fbcdb01264b105368c8 Mon Sep 17 00:00:00 2001 From: Kelven Yang Date: Mon, 26 Mar 2012 14:46:33 -0700 Subject: [PATCH] bug 14301: Decouple synchronized crosss management server call with clustering transport. Reviewed-by: Kelven --- .../ClusterAsyncExectuionListener.java | 80 --- .../src/com/cloud/cluster/ClusterManager.java | 12 +- .../com/cloud/cluster/ClusterManagerImpl.java | 532 ++++++++++-------- .../src/com/cloud/cluster/ClusterService.java | 8 +- .../com/cloud/cluster/ClusterServicePdu.java | 93 +++ .../cluster/ClusterServiceRequestPdu.java | 37 ++ .../ClusterServiceServletHttpHandler.java | 244 +------- .../cluster/ClusterServiceServletImpl.java | 28 +- .../cluster/DummyClusterManagerImpl.java | 31 +- .../cloud/cluster/RemoteMethodConstants.java | 6 +- .../cloud/cluster/CheckPointManagerTest.java | 25 +- .../cloud/network/MockNetworkManagerImpl.java | 1 - 12 files changed, 492 insertions(+), 605 deletions(-) delete mode 100755 server/src/com/cloud/cluster/ClusterAsyncExectuionListener.java create mode 100644 server/src/com/cloud/cluster/ClusterServicePdu.java create mode 100644 server/src/com/cloud/cluster/ClusterServiceRequestPdu.java diff --git a/server/src/com/cloud/cluster/ClusterAsyncExectuionListener.java b/server/src/com/cloud/cluster/ClusterAsyncExectuionListener.java deleted file mode 100755 index 6a8aa81ced4..00000000000 --- a/server/src/com/cloud/cluster/ClusterAsyncExectuionListener.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Copyright (C) 2010 Cloud.com, Inc. All rights reserved. - * - * This software is licensed under the GNU General Public License v3 or later. - * - * It is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or any later version. - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - * - */ - -package com.cloud.cluster; - -import com.cloud.agent.Listener; -import com.cloud.agent.api.AgentControlAnswer; -import com.cloud.agent.api.AgentControlCommand; -import com.cloud.agent.api.Answer; -import com.cloud.agent.api.Command; -import com.cloud.agent.api.StartupCommand; -import com.cloud.host.HostVO; -import com.cloud.host.Status; - -public class ClusterAsyncExectuionListener implements Listener { - private final ClusterManager clusterMgr; - private final String callingPeer; - private boolean recurring = false; - - public ClusterAsyncExectuionListener(ClusterManager clusterMgr, String callingPeer) { - this.clusterMgr = clusterMgr; - this.callingPeer = callingPeer; - } - - @Override - public boolean processAnswers(long agentId, long seq, Answer[] answers) { - recurring = clusterMgr.forwardAnswer(callingPeer, agentId, seq, answers); - return true; - } - - @Override - public boolean processCommands(long agentId, long seq, Command[] commands) { - return false; - } - - @Override - public AgentControlAnswer processControlCommand(long agentId, AgentControlCommand cmd) { - return null; - } - - @Override - public void processConnect(HostVO agent, StartupCommand cmd, boolean forRebalance) { - } - - @Override - public boolean processDisconnect(long agentId, Status state) { - return false; - } - - @Override - public boolean isRecurring() { - return recurring; - } - - @Override - public boolean processTimeout(long agentId, long seq) { - return true; - } - - @Override - public int getTimeout() { - return -1; - } -} - diff --git a/server/src/com/cloud/cluster/ClusterManager.java b/server/src/com/cloud/cluster/ClusterManager.java index cec5ea7b17c..46fdff3eacf 100755 --- a/server/src/com/cloud/cluster/ClusterManager.java +++ b/server/src/com/cloud/cluster/ClusterManager.java @@ -18,7 +18,6 @@ package com.cloud.cluster; -import com.cloud.agent.Listener; import com.cloud.agent.api.Answer; import com.cloud.agent.api.Command; import com.cloud.exception.AgentUnavailableException; @@ -31,14 +30,11 @@ public interface ClusterManager extends Manager { public static final int DEFAULT_HEARTBEAT_INTERVAL = 1500; public static final int DEFAULT_HEARTBEAT_THRESHOLD = 150000; public static final String ALERT_SUBJECT = "cluster-alert"; - + + public void OnReceiveClusterServicePdu(ClusterServicePdu pdu); public Answer[] execute(String strPeer, long agentId, Command [] cmds, boolean stopOnError); - public long executeAsync(String strPeer, long agentId, Command[] cmds, boolean stopOnError, Listener listener); - public boolean onAsyncResult(String executingPeer, long agentId, long seq, Answer[] answers); - public boolean forwardAnswer(String targetPeer, long agentId, long seq, Answer[] answers); - - public Answer[] sendToAgent(Long hostId, Command [] cmds, boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException; - public long sendToAgent(Long hostId, Command[] cmds, boolean stopOnError, Listener listener) throws AgentUnavailableException; + + public Answer[] sendToAgent(Long hostId, Command [] cmds, boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException; public boolean executeAgentUserRequest(long agentId, Event event) throws AgentUnavailableException; public Boolean propagateAgentEvent(long agentId, Event event) throws AgentUnavailableException; public Boolean propagateResourceEvent(long agentId, ResourceState.Event event) throws AgentUnavailableException; diff --git a/server/src/com/cloud/cluster/ClusterManagerImpl.java b/server/src/com/cloud/cluster/ClusterManagerImpl.java index 0982cc7afaa..2d00b9873fb 100755 --- a/server/src/com/cloud/cluster/ClusterManagerImpl.java +++ b/server/src/com/cloud/cluster/ClusterManagerImpl.java @@ -33,8 +33,10 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -45,13 +47,13 @@ import org.apache.log4j.Logger; import com.cloud.agent.AgentManager; import com.cloud.agent.AgentManager.OnError; -import com.cloud.agent.Listener; import com.cloud.agent.api.Answer; +import com.cloud.agent.api.ChangeAgentAnswer; import com.cloud.agent.api.ChangeAgentCommand; import com.cloud.agent.api.Command; import com.cloud.agent.api.PropagateResourceEventCommand; +import com.cloud.agent.api.TransferAgentCommand; import com.cloud.agent.manager.Commands; -import com.cloud.cluster.ManagementServerHost.State; import com.cloud.cluster.agentlb.dao.HostTransferMapDao; import com.cloud.cluster.dao.ManagementServerHostDao; import com.cloud.cluster.dao.ManagementServerHostPeerDao; @@ -91,17 +93,14 @@ import com.google.gson.Gson; public class ClusterManagerImpl implements ClusterManager { private static final Logger s_logger = Logger.getLogger(ClusterManagerImpl.class); - private static final int EXECUTOR_SHUTDOWN_TIMEOUT = 1000; // 1 second - private final List _listeners = new ArrayList(); private final Map _activePeers = new HashMap(); private int _heartbeatInterval = ClusterManager.DEFAULT_HEARTBEAT_INTERVAL; private int _heartbeatThreshold = ClusterManager.DEFAULT_HEARTBEAT_THRESHOLD; private final Map _clusterPeers; - private final Map _asyncCalls; private final Gson _gson; @Inject @@ -142,10 +141,12 @@ public class ClusterManagerImpl implements ClusterManager { private double _connectedAgentsThreshold = 0.7; private static boolean _agentLbHappened = false; + private BlockingQueue _clusterPduOutgoingQueue = new LinkedBlockingQueue(); + private BlockingQueue _clusterPduIncomingQueue = new LinkedBlockingQueue(); + private Map _outgoingPdusWaitingForAck = new HashMap(); public ClusterManagerImpl() { _clusterPeers = new HashMap(); - _asyncCalls = new HashMap(); _gson = GsonHelper.getGson(); @@ -154,7 +155,242 @@ public class ClusterManagerImpl implements ClusterManager { // _executor = Executors.newCachedThreadPool(new NamedThreadFactory("Cluster-Worker")); } + + private void registerRequestPdu(ClusterServiceRequestPdu pdu) { + synchronized(_outgoingPdusWaitingForAck) { + _outgoingPdusWaitingForAck.put(pdu.getSequenceId(), pdu); + } + } + + private ClusterServiceRequestPdu popRequestPdu(long ackSequenceId) { + synchronized(_outgoingPdusWaitingForAck) { + if(_outgoingPdusWaitingForAck.get(ackSequenceId) != null) { + ClusterServiceRequestPdu pdu = _outgoingPdusWaitingForAck.get(ackSequenceId); + _outgoingPdusWaitingForAck.remove(ackSequenceId); + return pdu; + } + } + + return null; + } + + private void cancelClusterRequestToPeer(String strPeer) { + List candidates = new ArrayList(); + synchronized(_outgoingPdusWaitingForAck) { + for(Map.Entry entry : _outgoingPdusWaitingForAck.entrySet()) { + if(entry.getValue().getDestPeer().equalsIgnoreCase(strPeer)) + candidates.add(entry.getValue()); + } + for(ClusterServiceRequestPdu pdu : candidates) { + _outgoingPdusWaitingForAck.remove(pdu.getSequenceId()); + } + } + + for(ClusterServiceRequestPdu pdu : candidates) { + s_logger.warn("Cancel cluster request PDU to peer: " + strPeer + ", pdu: " + _gson.toJson(pdu)); + synchronized(pdu) { + pdu.notifyAll(); + } + } + } + + private Runnable getClusterPduSendingTask() { + return new Runnable() { + public void run() { + onSendingClusterPdu(); + } + }; + } + + private Runnable getClusterPduNotificationTask() { + return new Runnable() { + public void run() { + onNotifyingClusterPdu(); + } + }; + } + + private void onSendingClusterPdu() { + while(true) { + try { + ClusterServicePdu pdu = _clusterPduOutgoingQueue.take(); + + ClusterService peerService = null; + for(int i = 0; i < 2; i++) { + try { + peerService = getPeerService(pdu.getDestPeer()); + } catch (RemoteException e) { + s_logger.error("Unable to get cluster service on peer : " + pdu.getDestPeer()); + } + + if(peerService != null) { + try { + if(s_logger.isDebugEnabled()) { + s_logger.debug("Cluster PDU " + getSelfPeerName() + " -> " + pdu.getDestPeer() + ". agent: " + pdu.getAgentId() + + ", pdu seq: " + pdu.getSequenceId() + ", pdu ack seq: " + pdu.getAckSequenceId() + ", json: " + pdu.getJsonPackage()); + } + + long startTick = System.currentTimeMillis(); + String strResult = peerService.execute(pdu); + if(s_logger.isDebugEnabled()) { + s_logger.debug("Cluster PDU " + getSelfPeerName() + " -> " + pdu.getDestPeer() + " completed. time: " + + (System.currentTimeMillis() - startTick) + "ms. agent: " + pdu.getAgentId() + + ", pdu seq: " + pdu.getSequenceId() + ", pdu ack seq: " + pdu.getAckSequenceId() + ", json: " + pdu.getJsonPackage()); + } + + if("true".equals(strResult)) + break; + + } catch (RemoteException e) { + invalidatePeerService(pdu.getDestPeer()); + if(s_logger.isInfoEnabled()) { + s_logger.info("Exception on remote execution, peer: " + pdu.getDestPeer() + ", iteration: " + + i + ", exception message :" + e.getMessage()); + } + } + } + } + } catch(InterruptedException e) { + } catch(Throwable e) { + s_logger.error("Unexcpeted exception: ", e); + } + } + } + + private void onNotifyingClusterPdu() { + while(true) { + try { + ClusterServicePdu pdu = _clusterPduOutgoingQueue.take(); + if(pdu.isRequest()) { + String result = dispatchClusterServicePdu(pdu); + if(result == null) + result = ""; + + ClusterServicePdu responsePdu = new ClusterServicePdu(); + responsePdu.setSourcePeer(pdu.getDestPeer()); + responsePdu.setDestPeer(pdu.getSourcePeer()); + responsePdu.setAckSequenceId(pdu.getSequenceId()); + responsePdu.setJsonPackage(result); + + _clusterPduOutgoingQueue.put(responsePdu); + } else { + ClusterServiceRequestPdu requestPdu = popRequestPdu(pdu.getAckSequenceId()); + if(requestPdu != null) { + requestPdu.setResponseResult(pdu.getJsonPackage()); + synchronized(requestPdu) { + requestPdu.notifyAll(); + } + } else { + s_logger.warn("Original request has already been cancelled. pdu: " + _gson.toJson(pdu)); + } + } + } catch(InterruptedException e) { + } catch(Throwable e) { + s_logger.error("Unexcpeted exception: ", e); + } + } + } + + private String dispatchClusterServicePdu(ClusterServicePdu pdu) { + + if(s_logger.isDebugEnabled()) { + s_logger.debug("Dispatch ->" + pdu.getAgentId() + ", json: " + pdu.getJsonPackage()); + } + + Command [] cmds = null; + try { + cmds = _gson.fromJson(pdu.getJsonPackage(), Command[].class); + } catch(Throwable e) { + assert(false); + s_logger.error("Excection in gson decoding : ", e); + } + + if (cmds.length == 1 && cmds[0] instanceof ChangeAgentCommand) { //intercepted + ChangeAgentCommand cmd = (ChangeAgentCommand)cmds[0]; + + if (s_logger.isDebugEnabled()) { + s_logger.debug("Intercepting command for agent change: agent " + cmd.getAgentId() + " event: " + cmd.getEvent()); + } + boolean result = false; + try { + result = executeAgentUserRequest(cmd.getAgentId(), cmd.getEvent()); + if (s_logger.isDebugEnabled()) { + s_logger.debug("Result is " + result); + } + + } catch (AgentUnavailableException e) { + s_logger.warn("Agent is unavailable", e); + return null; + } + + Answer[] answers = new Answer[1]; + answers[0] = new ChangeAgentAnswer(cmd, result); + return _gson.toJson(answers); + } else if (cmds.length == 1 && cmds[0] instanceof TransferAgentCommand) { + TransferAgentCommand cmd = (TransferAgentCommand) cmds[0]; + + if (s_logger.isDebugEnabled()) { + s_logger.debug("Intercepting command for agent rebalancing: agent " + cmd.getAgentId() + " event: " + cmd.getEvent()); + } + boolean result = false; + try { + result = rebalanceAgent(cmd.getAgentId(), cmd.getEvent(), cmd.getCurrentOwner(), cmd.getFutureOwner()); + if (s_logger.isDebugEnabled()) { + s_logger.debug("Result is " + result); + } + + } catch (AgentUnavailableException e) { + s_logger.warn("Agent is unavailable", e); + return null; + } catch (OperationTimedoutException e) { + s_logger.warn("Operation timed out", e); + return null; + } + Answer[] answers = new Answer[1]; + answers[0] = new Answer(cmd, result, null); + return _gson.toJson(answers); + } + + try { + long startTick = System.currentTimeMillis(); + if(s_logger.isDebugEnabled()) { + s_logger.debug("Dispatch -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage()); + } + + Answer[] answers = sendToAgent(pdu.getAgentId(), cmds, pdu.isStopOnError()); + if(answers != null) { + String jsonReturn = _gson.toJson(answers); + + if(s_logger.isDebugEnabled()) { + s_logger.debug("Completed dispatching -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage() + + " in " + (System.currentTimeMillis() - startTick) + " ms, return result: " + jsonReturn); + } + + return jsonReturn; + } else { + if(s_logger.isDebugEnabled()) { + s_logger.debug("Completed dispatching -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage() + + " in " + (System.currentTimeMillis() - startTick) + " ms, return null result"); + } + } + } catch(AgentUnavailableException e) { + s_logger.warn("Agent is unavailable", e); + } catch (OperationTimedoutException e) { + s_logger.warn("Timed Out", e); + } + + return null; + } + + public void OnReceiveClusterServicePdu(ClusterServicePdu pdu) { + try { + _clusterPduIncomingQueue.put(pdu); + } catch (InterruptedException e) { + s_logger.warn("InterruptedException. pdu: " + _gson.toJson(pdu) + " is dropped"); + } + } + @Override public Answer[] sendToAgent(Long hostId, Command[] cmds, boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException { Commands commands = new Commands(stopOnError ? OnError.Stop : OnError.Continue); @@ -164,15 +400,6 @@ public class ClusterManagerImpl implements ClusterManager { return _agentMgr.send(hostId, commands); } - @Override - public long sendToAgent(Long hostId, Command[] cmds, boolean stopOnError, Listener listener) throws AgentUnavailableException { - Commands commands = new Commands(stopOnError ? OnError.Stop : OnError.Continue); - for (Command cmd : cmds) { - commands.addCommand(cmd); - } - return _agentMgr.send(hostId, commands, listener); - } - @Override public boolean executeAgentUserRequest(long agentId, Event event) throws AgentUnavailableException { return _agentMgr.executeUserRequest(agentId, event); @@ -229,7 +456,7 @@ public class ClusterManagerImpl implements ClusterManager { if (s_logger.isDebugEnabled()) { s_logger.debug("Forwarding " + cmds[0].toString() + " to " + peer.getMsid()); } - Answer[] answers = execute(peerName, agentId, cmds, true); + execute(peerName, agentId, cmds, true); } catch (Exception e) { s_logger.warn("Caught exception while talkign to " + peer.getMsid()); } @@ -238,208 +465,44 @@ public class ClusterManagerImpl implements ClusterManager { @Override public Answer[] execute(String strPeer, long agentId, Command [] cmds, boolean stopOnError) { - ClusterService peerService = null; - if(s_logger.isDebugEnabled()) { s_logger.debug(getSelfPeerName() + " -> " + strPeer + "." + agentId + " " + _gson.toJson(cmds, Command[].class)); } - - for(int i = 0; i < 2; i++) { + + ClusterServiceRequestPdu pdu = new ClusterServiceRequestPdu(); + pdu.setSourcePeer(getSelfPeerName()); + pdu.setDestPeer(strPeer); + pdu.setAgentId(agentId); + pdu.setJsonPackage(_gson.toJson(cmds, Command[].class)); + pdu.setStopOnError(stopOnError); + pdu.setRequest(true); + registerRequestPdu(pdu); + _clusterPduOutgoingQueue.add(pdu); + + synchronized(pdu) { try { - peerService = getPeerService(strPeer); - } catch (RemoteException e) { - s_logger.error("Unable to get cluster service on peer : " + strPeer); + pdu.wait(); + } catch (InterruptedException e) { } + } - if(peerService != null) { - try { - if(s_logger.isDebugEnabled()) { - s_logger.debug("Send " + getSelfPeerName() + " -> " + strPeer + "." + agentId + " to remote"); - } - - long startTick = System.currentTimeMillis(); - String strResult = peerService.execute(getSelfPeerName(), agentId, _gson.toJson(cmds, Command[].class), stopOnError); - if(s_logger.isDebugEnabled()) { - s_logger.debug("Completed " + getSelfPeerName() + " -> " + strPeer + "." + agentId + "in " + - (System.currentTimeMillis() - startTick) + " ms, result: " + strResult); - } - - if(strResult != null) { - try { - return _gson.fromJson(strResult, Answer[].class); - } catch(Throwable e) { - s_logger.error("Exception on parsing gson package from remote call to " + strPeer); - } - } - return null; - } catch (RemoteException e) { - invalidatePeerService(strPeer); - if(s_logger.isInfoEnabled()) { - s_logger.info("Exception on remote execution, peer: " + strPeer + ", iteration: " - + i + ", exception message :" + e.getMessage()); - } - } + if(s_logger.isDebugEnabled()) { + s_logger.debug(getSelfPeerName() + " -> " + strPeer + "." + agentId + " completed. result: " + + pdu.getResponseResult()); + } + + if(pdu.getResponseResult() != null && pdu.getResponseResult().length() > 0) { + try { + return _gson.fromJson(pdu.getResponseResult(), Answer[].class); + } catch(Throwable e) { + s_logger.error("Exception on parsing gson package from remote call to " + strPeer); } } return null; } - - @Override - public long executeAsync(String strPeer, long agentId, Command[] cmds, boolean stopOnError, Listener listener) { - ClusterService peerService = null; - - if(s_logger.isDebugEnabled()) { - s_logger.debug("Async " + getSelfPeerName() + " -> " + strPeer + "." + agentId + " " + - _gson.toJson(cmds, Command[].class)); - } - - for(int i = 0; i < 2; i++) { - try { - peerService = getPeerService(strPeer); - } catch (RemoteException e) { - s_logger.error("Unable to get cluster service on peer : " + strPeer); - } - if(peerService != null) { - try { - long seq = 0; - synchronized(String.valueOf(agentId).intern()) { - if(s_logger.isDebugEnabled()) { - s_logger.debug("Send Async " + getSelfPeerName() + " -> " + strPeer + "." + agentId + " to remote"); - } - - long startTick = System.currentTimeMillis(); - seq = peerService.executeAsync(getSelfPeerName(), agentId, _gson.toJson(cmds, Command[].class), stopOnError); - if(seq > 0) { - if(s_logger.isDebugEnabled()) { - s_logger.debug("Completed Async " + getSelfPeerName() + " -> " + strPeer + "." + agentId - + " in " + (System.currentTimeMillis() - startTick) + " ms" - + ", register local listener " + strPeer + "/" + seq); - } - - registerAsyncCall(strPeer, seq, listener); - } else { - s_logger.warn("Completed Async " + getSelfPeerName() + " -> " + strPeer + "." + agentId - + " in " + (System.currentTimeMillis() - startTick) + " ms, return indicates failure, seq: " + seq); - } - } - return seq; - } catch (RemoteException e) { - invalidatePeerService(strPeer); - - if(s_logger.isInfoEnabled()) { - s_logger.info("Exception on remote execution -> " + strPeer + ", iteration : " + i); - } - } - } - } - - return 0L; - } - - @Override - public boolean onAsyncResult(String executingPeer, long agentId, long seq, Answer[] answers) { - if(s_logger.isDebugEnabled()) { - s_logger.debug("Process Async-call result from remote peer " + executingPeer + ", {" + - agentId + "-" + seq + "} answers: " + (answers != null ? _gson.toJson(answers, Answer[].class): "null")); - } - - Listener listener = null; - synchronized(String.valueOf(agentId).intern()) { - // need to synchronize it with executeAsync() to make sure listener have been registered - // before this callback reaches back - listener = getAsyncCallListener(executingPeer, seq); - } - - if(listener != null) { - long startTick = System.currentTimeMillis(); - - if(s_logger.isDebugEnabled()) { - s_logger.debug("Processing answer {" + agentId + "-" + seq + "} from remote peer " + executingPeer); - } - - listener.processAnswers(agentId, seq, answers); - - if(s_logger.isDebugEnabled()) { - s_logger.debug("Answer {" + agentId + "-" + seq + "} is processed in " + - (System.currentTimeMillis() - startTick) + " ms"); - } - - if(!listener.isRecurring()) { - if(s_logger.isDebugEnabled()) { - s_logger.debug("Listener is not recurring after async-result callback {" + - agentId + "-" + seq + "}, unregister it"); - } - - unregisterAsyncCall(executingPeer, seq); - } else { - if(s_logger.isDebugEnabled()) { - s_logger.debug("Listener is recurring after async-result callback {" + agentId - +"-" + seq + "}, will keep it"); - } - return true; - } - } else { - if(s_logger.isInfoEnabled()) { - s_logger.info("Async-call Listener has not been registered yet for {" + agentId - +"-" + seq + "}"); - } - } - return false; - } - - @Override - public boolean forwardAnswer(String targetPeer, long agentId, long seq, Answer[] answers) { - if(s_logger.isDebugEnabled()) { - s_logger.debug("Forward -> " + targetPeer + " Async-call answer {" + agentId + "-" + seq + - "} " + (answers != null? _gson.toJson(answers, Answer[].class):"")); - } - - final String targetPeerF = targetPeer; - final Answer[] answersF = answers; - final long agentIdF = agentId; - final long seqF = seq; - - ClusterService peerService = null; - - for(int i = 0; i < 2; i++) { - try { - peerService = getPeerService(targetPeerF); - } catch (RemoteException e) { - s_logger.error("cluster service for peer " + targetPeerF + " no longer exists"); - } - - if(peerService != null) { - try { - boolean result = false; - - long startTick = System.currentTimeMillis(); - if(s_logger.isDebugEnabled()) { - s_logger.debug("Start forwarding Async-call answer {" + agentId + "-" + seq + "} to remote"); - } - - result = peerService.onAsyncResult(getSelfPeerName(), agentIdF, seqF, _gson.toJson(answersF, Answer[].class)); - - if(s_logger.isDebugEnabled()) { - s_logger.debug("Completed forwarding Async-call answer {" + agentId + "-" + seq + "} in " + - (System.currentTimeMillis() - startTick) + " ms, return result: " + result); - } - - return result; - } catch (RemoteException e) { - s_logger.warn("Exception in performing remote call, ", e); - invalidatePeerService(targetPeerF); - } - } else { - s_logger.warn("Remote peer " + targetPeer + " no longer exists to process answer {" + agentId + "-" - + seq + "}"); - } - } - - return false; - } - + @Override public String getPeerName(long agentHostId) { @@ -511,10 +574,12 @@ public class ClusterManagerImpl implements ClusterManager { public void notifyNodeLeft(List nodeList) { if(s_logger.isDebugEnabled()) { s_logger.debug("Notify management server node left to listeners."); - - for(ManagementServerHostVO mshost : nodeList) { + } + + for(ManagementServerHostVO mshost : nodeList) { + if(s_logger.isDebugEnabled()) s_logger.debug("Leaving node, IP: " + mshost.getServiceIP() + ", msid: " + mshost.getMsid()); - } + cancelClusterRequestToPeer(String.valueOf(mshost.getMsid())); } synchronized(_listeners) { @@ -568,38 +633,6 @@ public class ClusterManagerImpl implements ClusterManager { } } - private void registerAsyncCall(String strPeer, long seq, Listener listener) { - String key = strPeer + "/" + seq; - - synchronized(_asyncCalls) { - if(!_asyncCalls.containsKey(key)) { - _asyncCalls.put(key, listener); - } - } - } - - private Listener getAsyncCallListener(String strPeer, long seq) { - String key = strPeer + "/" + seq; - - synchronized(_asyncCalls) { - if(_asyncCalls.containsKey(key)) { - return _asyncCalls.get(key); - } - } - - return null; - } - - private void unregisterAsyncCall(String strPeer, long seq) { - String key = strPeer + "/" + seq; - - synchronized(_asyncCalls) { - if(_asyncCalls.containsKey(key)) { - _asyncCalls.remove(key); - } - } - } - private Runnable getHeartbeatTask() { return new Runnable() { @Override @@ -677,12 +710,12 @@ public class ClusterManagerImpl implements ClusterManager { s_logger.error("Runtime DB exception ", e.getCause()); if(e.getCause() instanceof ClusterInvalidSessionException) { - s_logger.error("Invalid cluster session found"); + s_logger.error("Invalid cluster session found, fence it"); queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeIsolated)); } if(isRootCauseConnectionRelated(e.getCause())) { - s_logger.error("DB communication problem detected"); + s_logger.error("DB communication problem detected, fence it"); queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeIsolated)); } @@ -690,12 +723,13 @@ public class ClusterManagerImpl implements ClusterManager { } catch(ActiveFencingException e) { queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeIsolated)); } catch (Throwable e) { + s_logger.error("Unexpected exception in cluster heartbeat", e); if(isRootCauseConnectionRelated(e.getCause())) { - s_logger.error("DB communication problem detected"); + s_logger.error("DB communication problem detected, fence it"); queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeIsolated)); } - s_logger.error("Problem with the cluster heartbeat!", e); + invalidHeartbeatConnection(); } finally { txn.close("ClusterHeartBeat"); } @@ -1205,7 +1239,6 @@ public class ClusterManagerImpl implements ClusterManager { throw new ConfigurationException("Unable to set current cluster service adapter"); } - _agentLBEnabled = Boolean.valueOf(configDao.getValue(Config.AgentLbEnable.key())); String connectedAgentsThreshold = configs.get("agent.load.threshold"); @@ -1218,6 +1251,13 @@ public class ClusterManagerImpl implements ClusterManager { checkConflicts(); + _executor.execute(getClusterPduSendingTask()); + + // TODO, make it configurable + for(int i = 0; i < 5; i++) + _executor.execute(getClusterPduNotificationTask()); + + if(s_logger.isInfoEnabled()) { s_logger.info("Cluster manager is configured."); } diff --git a/server/src/com/cloud/cluster/ClusterService.java b/server/src/com/cloud/cluster/ClusterService.java index dd8081b46ce..408eb15c292 100644 --- a/server/src/com/cloud/cluster/ClusterService.java +++ b/server/src/com/cloud/cluster/ClusterService.java @@ -21,9 +21,7 @@ package com.cloud.cluster; import java.rmi.Remote; import java.rmi.RemoteException; -public interface ClusterService extends Remote { - String execute(String callingPeer, long agentId, String gsonPackage, boolean stopOnError) throws RemoteException; - long executeAsync(String callingPeer, long agentId, String gsonPackage, boolean stopOnError) throws RemoteException; - boolean onAsyncResult(String executingPeer, long agentId, long seq, String gsonPackage) throws RemoteException; - boolean ping(String callingPeer) throws RemoteException; +public interface ClusterService extends Remote { + String execute(ClusterServicePdu pdu) throws RemoteException; + boolean ping(String callingPeer) throws RemoteException; } diff --git a/server/src/com/cloud/cluster/ClusterServicePdu.java b/server/src/com/cloud/cluster/ClusterServicePdu.java new file mode 100644 index 00000000000..f0976c25be3 --- /dev/null +++ b/server/src/com/cloud/cluster/ClusterServicePdu.java @@ -0,0 +1,93 @@ +package com.cloud.cluster; + +public class ClusterServicePdu { + + private long sequenceId; + private long ackSequenceId; + + private String sourcePeer; + private String destPeer; + + private long agentId; + private boolean stopOnError; + private String jsonPackage; + + private boolean request = false; + + private static long s_nextPduSequenceId = 1; + + public ClusterServicePdu() { + sequenceId = getNextPduSequenceId(); + ackSequenceId = 0; + agentId = 0; + stopOnError = false; + } + + public synchronized long getNextPduSequenceId() { + return s_nextPduSequenceId++; + } + + public long getSequenceId() { + return sequenceId; + } + + public void setSequenceId(long sequenceId) { + this.sequenceId = sequenceId; + } + + public long getAckSequenceId() { + return ackSequenceId; + } + + public void setAckSequenceId(long ackSequenceId) { + this.ackSequenceId = ackSequenceId; + } + + public String getSourcePeer() { + return sourcePeer; + } + + public void setSourcePeer(String sourcePeer) { + this.sourcePeer = sourcePeer; + } + + public String getDestPeer() { + return destPeer; + } + + public void setDestPeer(String destPeer) { + this.destPeer = destPeer; + } + + public long getAgentId() { + return agentId; + } + + public void setAgentId(long agentId) { + this.agentId = agentId; + } + + public boolean isStopOnError() { + return stopOnError; + } + + public void setStopOnError(boolean stopOnError) { + this.stopOnError = stopOnError; + } + + public String getJsonPackage() { + return jsonPackage; + } + + public void setJsonPackage(String jsonPackage) { + this.jsonPackage = jsonPackage; + } + + public boolean isRequest() { + return request; + } + + public void setRequest(boolean value) { + this.request = value; + } +} diff --git a/server/src/com/cloud/cluster/ClusterServiceRequestPdu.java b/server/src/com/cloud/cluster/ClusterServiceRequestPdu.java new file mode 100644 index 00000000000..1088d771503 --- /dev/null +++ b/server/src/com/cloud/cluster/ClusterServiceRequestPdu.java @@ -0,0 +1,37 @@ +package com.cloud.cluster; + +public class ClusterServiceRequestPdu extends ClusterServicePdu { + + private String responseResult; + private long startTick; + private long timeout; + + public ClusterServiceRequestPdu() { + startTick = System.currentTimeMillis(); + timeout = -1; + } + + public String getResponseResult() { + return responseResult; + } + + public void setResponseResult(String responseResult) { + this.responseResult = responseResult; + } + + public long getStartTick() { + return startTick; + } + + public void setStartTick(long startTick) { + this.startTick = startTick; + } + + public long getTimeout() { + return timeout; + } + + public void setTimeout(long timeout) { + this.timeout = timeout; + } +} diff --git a/server/src/com/cloud/cluster/ClusterServiceServletHttpHandler.java b/server/src/com/cloud/cluster/ClusterServiceServletHttpHandler.java index 9636683076b..f1066699424 100755 --- a/server/src/com/cloud/cluster/ClusterServiceServletHttpHandler.java +++ b/server/src/com/cloud/cluster/ClusterServiceServletHttpHandler.java @@ -33,28 +33,13 @@ import org.apache.http.protocol.HttpRequestHandler; import org.apache.http.util.EntityUtils; import org.apache.log4j.Logger; -import com.cloud.agent.Listener; -import com.cloud.agent.api.Answer; -import com.cloud.agent.api.ChangeAgentAnswer; -import com.cloud.agent.api.ChangeAgentCommand; -import com.cloud.agent.api.Command; -import com.cloud.agent.api.PropagateResourceEventCommand; -import com.cloud.agent.api.TransferAgentCommand; -import com.cloud.exception.AgentUnavailableException; -import com.cloud.exception.OperationTimedoutException; -import com.cloud.serializer.GsonHelper; -import com.google.gson.Gson; - public class ClusterServiceServletHttpHandler implements HttpRequestHandler { private static final Logger s_logger = Logger.getLogger(ClusterServiceServletHttpHandler.class); - private final Gson gson; private final ClusterManager manager; public ClusterServiceServletHttpHandler(ClusterManager manager) { this.manager = manager; - - gson = GsonHelper.getGson(); } @Override @@ -141,17 +126,9 @@ public class ClusterServiceServletHttpHandler implements HttpRequestHandler { } switch(nMethod) { - case RemoteMethodConstants.METHOD_EXECUTE : - responseContent = handleExecuteMethodCall(req); - break; - - case RemoteMethodConstants.METHOD_EXECUTE_ASYNC : - responseContent = handleExecuteAsyncMethodCall(req); - break; - - case RemoteMethodConstants.METHOD_ASYNC_RESULT : - responseContent = handleAsyncResultMethodCall(req); - break; + case RemoteMethodConstants.METHOD_DELIVER_PDU : + responseContent = handleDeliverPduMethodCall(req); + break; case RemoteMethodConstants.METHOD_PING : responseContent = handlePingMethodCall(req); @@ -179,200 +156,31 @@ public class ClusterServiceServletHttpHandler implements HttpRequestHandler { writeResponse(response, HttpStatus.SC_BAD_REQUEST, null); } } - - private String handleExecuteMethodCall(HttpRequest req) { - String agentId = (String)req.getParams().getParameter("agentId"); - String gsonPackage = (String)req.getParams().getParameter("gsonPackage"); - String stopOnError = (String)req.getParams().getParameter("stopOnError"); - - if(s_logger.isDebugEnabled()) { - s_logger.debug("|->" + agentId + " " + gsonPackage); - } - - Command [] cmds = null; - try { - cmds = gson.fromJson(gsonPackage, Command[].class); - } catch(Throwable e) { - assert(false); - s_logger.error("Excection in gson decoding : ", e); - } - - if (cmds.length == 1 && cmds[0] instanceof ChangeAgentCommand) { //intercepted - ChangeAgentCommand cmd = (ChangeAgentCommand)cmds[0]; - - if (s_logger.isDebugEnabled()) { - s_logger.debug("Intercepting command for agent change: agent " + cmd.getAgentId() + " event: " + cmd.getEvent()); - } - boolean result = false; - try { - result = manager.executeAgentUserRequest(cmd.getAgentId(), cmd.getEvent()); - if (s_logger.isDebugEnabled()) { - s_logger.debug("Result is " + result); - } - - } catch (AgentUnavailableException e) { - s_logger.warn("Agent is unavailable", e); - return null; - } - - Answer[] answers = new Answer[1]; - answers[0] = new ChangeAgentAnswer(cmd, result); - return gson.toJson(answers); - } else if (cmds.length == 1 && cmds[0] instanceof TransferAgentCommand) { - TransferAgentCommand cmd = (TransferAgentCommand) cmds[0]; - if (s_logger.isDebugEnabled()) { - s_logger.debug("Intercepting command for agent rebalancing: agent " + cmd.getAgentId() + " event: " + cmd.getEvent()); - } - boolean result = false; - try { - result = manager.rebalanceAgent(cmd.getAgentId(), cmd.getEvent(), cmd.getCurrentOwner(), cmd.getFutureOwner()); - if (s_logger.isDebugEnabled()) { - s_logger.debug("Result is " + result); - } + private String handleDeliverPduMethodCall(HttpRequest req) { + + String pduSeq = (String)req.getParams().getParameter("pduSeq"); + String pduAckSeq = (String)req.getParams().getParameter("pduAckSeq"); + String sourcePeer = (String)req.getParams().getParameter("sourcePeer"); + String destPeer = (String)req.getParams().getParameter("destPeer"); + String agentId = (String)req.getParams().getParameter("agentId"); + String gsonPackage = (String)req.getParams().getParameter("gsonPackage"); + String stopOnError = (String)req.getParams().getParameter("stopOnError"); + String requestAck = (String)req.getParams().getParameter("requestAck"); - } catch (AgentUnavailableException e) { - s_logger.warn("Agent is unavailable", e); - return null; - } catch (OperationTimedoutException e) { - s_logger.warn("Operation timed out", e); - return null; - } - Answer[] answers = new Answer[1]; - answers[0] = new Answer(cmd, result, null); - return gson.toJson(answers); - } else if (cmds.length == 1 && cmds[0] instanceof PropagateResourceEventCommand) { - PropagateResourceEventCommand cmd = (PropagateResourceEventCommand) cmds[0]; - - if (s_logger.isDebugEnabled()) { - s_logger.debug("Intercepting command for resource event: host " + cmd.getHostId() + " event: " + cmd.getEvent()); - } - boolean result = false; - try { - result = manager.executeResourceUserRequest(cmd.getHostId(), cmd.getEvent()); - if (s_logger.isDebugEnabled()) { - s_logger.debug("Result is " + result); - } - - } catch (AgentUnavailableException e) { - s_logger.warn("Agent is unavailable", e); - return null; - } - - Answer[] answers = new Answer[1]; - answers[0] = new Answer(cmd, result, null); - return gson.toJson(answers); - } - - try { - long startTick = System.currentTimeMillis(); - if(s_logger.isDebugEnabled()) { - s_logger.debug("Send |-> " + agentId + " " + gsonPackage + " to agent manager"); - } - - Answer[] answers = manager.sendToAgent(Long.parseLong(agentId), cmds, - Integer.parseInt(stopOnError) != 0 ? true : false); - - if(answers != null) { - String jsonReturn = gson.toJson(answers); - - if(s_logger.isDebugEnabled()) { - s_logger.debug("Completed |-> " + agentId + " " + gsonPackage + - " in " + (System.currentTimeMillis() - startTick) + " ms, return result: " + jsonReturn); - } - - return jsonReturn; - } else { - if(s_logger.isDebugEnabled()) { - s_logger.debug("Completed |-> " + agentId + " " + gsonPackage + - " in " + (System.currentTimeMillis() - startTick) + " ms, return null result"); - } - } - } catch(AgentUnavailableException e) { - s_logger.warn("Agent is unavailable", e); - } catch (OperationTimedoutException e) { - s_logger.warn("Timed Out", e); - } - - return null; - } - - private String handleExecuteAsyncMethodCall(HttpRequest req) { - String agentId = (String)req.getParams().getParameter("agentId"); - String gsonPackage = (String)req.getParams().getParameter("gsonPackage"); - String stopOnError = (String)req.getParams().getParameter("stopOnError"); - String callingPeer = (String)req.getParams().getParameter("caller"); - - if(s_logger.isDebugEnabled()) { - s_logger.debug("Async " + callingPeer + " |-> " + agentId + " " + gsonPackage); - } - - Command [] cmds = null; - try { - cmds = gson.fromJson(gsonPackage, Command[].class); - } catch(Throwable e) { - assert(false); - s_logger.error("Excection in gson decoding : ", e); - } - - Listener listener = new ClusterAsyncExectuionListener(manager, callingPeer); - long seq = -1; - try { - long startTick = System.currentTimeMillis(); - if(s_logger.isDebugEnabled()) { - s_logger.debug("Send Async " + callingPeer + " |-> " + agentId + " " + gsonPackage + " to agent manager"); - } - - seq = manager.sendToAgent(Long.parseLong(agentId), cmds, - Integer.parseInt(stopOnError) != 0 ? true : false, listener); - - if(s_logger.isDebugEnabled()) { - s_logger.debug("Complated Async " + callingPeer + " |-> " + agentId + " " + gsonPackage + " in " + - + (System.currentTimeMillis() - startTick) + " ms, returned seq: " + seq); - } - } catch (AgentUnavailableException e) { - s_logger.warn("Agent is unavailable", e); - seq = -1; - } - - return gson.toJson(seq); - } - - private String handleAsyncResultMethodCall(HttpRequest req) { - String agentId = (String)req.getParams().getParameter("agentId"); - String gsonPackage = (String)req.getParams().getParameter("gsonPackage"); - String seq = (String)req.getParams().getParameter("seq"); - String executingPeer = (String)req.getParams().getParameter("executingPeer"); - - if(s_logger.isDebugEnabled()) { - s_logger.debug("Async callback " + executingPeer + "." + agentId + " |-> " + gsonPackage); - } - - Answer[] answers = null; - try { - answers = gson.fromJson(gsonPackage, Answer[].class); - } catch(Throwable e) { - assert(false); - s_logger.error("Excection in gson decoding : ", e); - } - - long startTick = System.currentTimeMillis(); - if(manager.onAsyncResult(executingPeer, Long.parseLong(agentId), Long.parseLong(seq), answers)) { - if(s_logger.isDebugEnabled()) { - s_logger.debug("Completed local callback in " + (System.currentTimeMillis() - startTick) + - " ms, return recurring=true, let async listener contine on"); - } - - return "recurring=true"; - } - - if(s_logger.isDebugEnabled()) { - s_logger.debug("Completed local callback in " + (System.currentTimeMillis() - startTick) + - " ms, return recurring=false, indicate to tear down async listener"); - } - - return "recurring=false"; - } + ClusterServicePdu pdu = new ClusterServicePdu(); + pdu.setSourcePeer(sourcePeer); + pdu.setDestPeer(destPeer); + pdu.setAgentId(Long.parseLong(agentId)); + pdu.setSequenceId(Long.parseLong(pduSeq)); + pdu.setAckSequenceId(Long.parseLong(pduAckSeq)); + pdu.setJsonPackage(gsonPackage); + pdu.setStopOnError("1".equals(stopOnError)); + pdu.setRequest("1".equals(requestAck)); + + manager.OnReceiveClusterServicePdu(pdu); + return "true"; + } private String handlePingMethodCall(HttpRequest req) { String callingPeer = (String)req.getParams().getParameter("callingPeer"); diff --git a/server/src/com/cloud/cluster/ClusterServiceServletImpl.java b/server/src/com/cloud/cluster/ClusterServiceServletImpl.java index ce65fe18e87..a33ec1e1569 100644 --- a/server/src/com/cloud/cluster/ClusterServiceServletImpl.java +++ b/server/src/com/cloud/cluster/ClusterServiceServletImpl.java @@ -51,8 +51,28 @@ public class ClusterServiceServletImpl implements ClusterService { this._requestTimeoutSeconds = requestTimeoutSeconds; _gson = GsonHelper.getGson(); - } - + } + + @Override + public String execute(ClusterServicePdu pdu) throws RemoteException { + + HttpClient client = getHttpClient(); + PostMethod method = new PostMethod(_serviceUrl); + + method.addParameter("method", Integer.toString(RemoteMethodConstants.METHOD_DELIVER_PDU)); + method.addParameter("sourcePeer", pdu.getSourcePeer()); + method.addParameter("destPeer", pdu.getDestPeer()); + method.addParameter("pduSeq", Long.toString(pdu.getSequenceId())); + method.addParameter("pduAckSeq", Long.toString(pdu.getAckSequenceId())); + method.addParameter("agentId", Long.toString(pdu.getAgentId())); + method.addParameter("gsonPackage", pdu.getJsonPackage()); + method.addParameter("stopOnError", pdu.isStopOnError() ? "1" : "0"); + method.addParameter("requestAck", pdu.isRequest() ? "1" : "0"); + + return executePostMethod(client, method); + } + +/* @Override public String execute(String callingPeer, long agentId, String gsonPackage, boolean stopOnError) throws RemoteException { if(s_logger.isDebugEnabled()) { @@ -129,7 +149,7 @@ public class ClusterServiceServletImpl implements ClusterService { } return false; } - +*/ @Override public boolean ping(String callingPeer) throws RemoteException { if(s_logger.isDebugEnabled()) { @@ -187,11 +207,13 @@ public class ClusterServiceServletImpl implements ClusterService { // for test purpose only public static void main(String[] args) { +/* ClusterServiceServletImpl service = new ClusterServiceServletImpl("http://localhost:9090/clusterservice", 300); try { String result = service.execute("test", 1, "{ p1:v1, p2:v2 }", true); System.out.println(result); } catch (RemoteException e) { } +*/ } } diff --git a/server/src/com/cloud/cluster/DummyClusterManagerImpl.java b/server/src/com/cloud/cluster/DummyClusterManagerImpl.java index 6275b8b21e7..999e1a98d58 100755 --- a/server/src/com/cloud/cluster/DummyClusterManagerImpl.java +++ b/server/src/com/cloud/cluster/DummyClusterManagerImpl.java @@ -43,38 +43,29 @@ public class DummyClusterManagerImpl implements ClusterManager { private String _name; private final String _clusterNodeIP = "127.0.0.1"; - + + @Override + public void OnReceiveClusterServicePdu(ClusterServicePdu pdu) { + throw new CloudRuntimeException("Unsupported feature"); + } + @Override public Answer[] execute(String strPeer, long agentId, Command [] cmds, boolean stopOnError) { throw new CloudRuntimeException("Unsupported feature"); - } - - @Override - public long executeAsync(String strPeer, long agentId, Command[] cmds, boolean stopOnError, Listener listener) { - throw new CloudRuntimeException("Unsupported feature"); - } - - @Override - public boolean onAsyncResult(String executingPeer, long agentId, long seq, Answer[] answers) { - throw new CloudRuntimeException("Unsupported feature"); - } - - @Override - public boolean forwardAnswer(String targetPeer, long agentId, long seq, Answer[] answers) { - throw new CloudRuntimeException("Unsupported feature"); - } - + } + @Override public Answer[] sendToAgent(Long hostId, Command [] cmds, boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException { throw new CloudRuntimeException("Unsupported feature"); } - + +/* @Override public long sendToAgent(Long hostId, Command[] cmds, boolean stopOnError, Listener listener) throws AgentUnavailableException { throw new CloudRuntimeException("Unsupported feature"); } - +*/ @Override public boolean executeAgentUserRequest(long agentId, Event event) throws AgentUnavailableException { throw new CloudRuntimeException("Unsupported feature"); diff --git a/server/src/com/cloud/cluster/RemoteMethodConstants.java b/server/src/com/cloud/cluster/RemoteMethodConstants.java index 335785f7fe2..c1ff62f2a23 100644 --- a/server/src/com/cloud/cluster/RemoteMethodConstants.java +++ b/server/src/com/cloud/cluster/RemoteMethodConstants.java @@ -19,9 +19,7 @@ package com.cloud.cluster; public interface RemoteMethodConstants { - public static final int METHOD_UNKNOWN = 0; - public static final int METHOD_EXECUTE = 1; - public static final int METHOD_EXECUTE_ASYNC = 2; - public static final int METHOD_ASYNC_RESULT = 3; + public static final int METHOD_UNKNOWN = 0; public static final int METHOD_PING = 4; + public static final int METHOD_DELIVER_PDU = 5; } diff --git a/server/test/com/cloud/cluster/CheckPointManagerTest.java b/server/test/com/cloud/cluster/CheckPointManagerTest.java index ac2dd041441..132958a2a7a 100755 --- a/server/test/com/cloud/cluster/CheckPointManagerTest.java +++ b/server/test/com/cloud/cluster/CheckPointManagerTest.java @@ -258,36 +258,21 @@ public class CheckPointManagerTest extends TestCase { return _name; } + @Override + public void OnReceiveClusterServicePdu(ClusterServicePdu pdu) { + throw new CloudRuntimeException("Not implemented"); + } + @Override public Answer[] execute(String strPeer, long agentId, Command[] cmds, boolean stopOnError) { throw new UnsupportedOperationException("Not implemented"); } - @Override - public long executeAsync(String strPeer, long agentId, Command[] cmds, boolean stopOnError, Listener listener) { - throw new UnsupportedOperationException("Not implemented"); - } - - @Override - public boolean onAsyncResult(String executingPeer, long agentId, long seq, Answer[] answers) { - throw new UnsupportedOperationException("Not implemented"); - } - - @Override - public boolean forwardAnswer(String targetPeer, long agentId, long seq, Answer[] answers) { - throw new UnsupportedOperationException("Not implemented"); - } - @Override public Answer[] sendToAgent(Long hostId, Command[] cmds, boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException { throw new UnsupportedOperationException("Not implemented"); } - @Override - public long sendToAgent(Long hostId, Command[] cmds, boolean stopOnError, Listener listener) throws AgentUnavailableException { - throw new UnsupportedOperationException("Not implemented"); - } - @Override public boolean executeAgentUserRequest(long agentId, Event event) throws AgentUnavailableException { throw new UnsupportedOperationException("Not implemented"); diff --git a/server/test/com/cloud/network/MockNetworkManagerImpl.java b/server/test/com/cloud/network/MockNetworkManagerImpl.java index d9839062a08..06ef648e42c 100755 --- a/server/test/com/cloud/network/MockNetworkManagerImpl.java +++ b/server/test/com/cloud/network/MockNetworkManagerImpl.java @@ -850,5 +850,4 @@ public class MockNetworkManagerImpl implements NetworkManager, Manager, NetworkS // TODO Auto-generated method stub return null; } - }