diff --git a/core/test/com/cloud/agent/transport/RequestTest.java b/core/test/com/cloud/agent/transport/RequestTest.java index 683b2b506be..173d749bcc7 100644 --- a/core/test/com/cloud/agent/transport/RequestTest.java +++ b/core/test/com/cloud/agent/transport/RequestTest.java @@ -53,7 +53,7 @@ public class RequestTest extends TestCase { cmd2.addPortConfig("abc", "24", true, "eth0"); cmd2.addPortConfig("127.0.0.1", "44", false, "eth1"); Request sreq = new Request(2, 3, new Command[] { cmd1, cmd2, cmd3 }, true, true); - sreq.setSequence(1); + sreq.setSequence(892403717); Logger logger = Logger.getLogger(GsonHelper.class); Level level = logger.getLevel(); @@ -82,7 +82,7 @@ public class RequestTest extends TestCase { byte[] bytes = sreq.getBytes(); - assert Request.getSequence(bytes) == 1; + assert Request.getSequence(bytes) == 892403717; assert Request.getManagementServerId(bytes) == 3; assert Request.getAgentId(bytes) == 2; assert Request.getViaAgentId(bytes) == 2; diff --git a/server/src/com/cloud/agent/manager/AgentAttache.java b/server/src/com/cloud/agent/manager/AgentAttache.java index 470acb2c319..e8d1c77852e 100644 --- a/server/src/com/cloud/agent/manager/AgentAttache.java +++ b/server/src/com/cloud/agent/manager/AgentAttache.java @@ -329,9 +329,8 @@ public abstract class AgentAttache { public void send(Request req, final Listener listener) throws AgentUnavailableException { checkAvailability(req.getCommands()); - long seq = getNextSequence(); - req.setSequence(seq); - + long seq = req.getSequence(); + if (listener != null) { registerListener(seq, listener); } else if (s_logger.isDebugEnabled()) { @@ -376,9 +375,8 @@ public abstract class AgentAttache { public Answer[] send(Request req, int wait) throws AgentUnavailableException, OperationTimedoutException { SynchronousListener sl = new SynchronousListener(null); - long seq = getNextSequence(); - req.setSequence(seq); + long seq = req.getSequence(); send(req, sl); try { diff --git a/server/src/com/cloud/agent/manager/AgentManagerImpl.java b/server/src/com/cloud/agent/manager/AgentManagerImpl.java index e7e68a86b5b..958c1fd4745 100755 --- a/server/src/com/cloud/agent/manager/AgentManagerImpl.java +++ b/server/src/com/cloud/agent/manager/AgentManagerImpl.java @@ -804,6 +804,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { } Request req = new Request(hostId, _nodeId, cmds, commands.stopOnError(), true); + req.setSequence(agent.getNextSequence()); Answer[] answers = agent.send(req, timeout); notifyAnswersToMonitors(hostId, req.getSequence(), answers); commands.setAnswers(answers); @@ -818,6 +819,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { try { Request req = new Request(hostId, _nodeId, new CheckHealthCommand(), true); + req.setSequence(agent.getNextSequence()); Answer[] answers = agent.send(req, 50 * 1000); if (answers != null && answers[0] != null) { Status status = answers[0].getResult() ? Status.Up : Status.Down; @@ -863,6 +865,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { return -1; } Request req = new Request(hostId, _nodeId, cmds, commands.stopOnError(), true); + req.setSequence(agent.getNextSequence()); agent.send(req, listener); return req.getSequence(); } diff --git a/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java b/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java index faabc788040..67f744a2250 100644 --- a/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java +++ b/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java @@ -85,7 +85,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust public final static int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 5; // 5 seconds public long _loadSize = 100; protected Set _agentToTransferIds = new HashSet(); - private final long rebalanceTimeOut = 300000; // 5 mins - after this time remove the agent from the transfer list + private final long rebalanceTimeOut = 300000; // 5 mins - after this time remove the agent from the transfer list @Inject protected ClusterManager _clusterMgr = null; @@ -596,7 +596,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust final byte[] data = task.getData(); Version ver = Request.getVersion(data); - if (ver.ordinal() != Version.v1.ordinal()) { + if (ver.ordinal() != Version.v1.ordinal() && ver.ordinal() != Version.v3.ordinal()) { s_logger.warn("Wrong version for clustered agent request"); super.doTask(task); return; @@ -718,7 +718,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust return setToWaitForRebalance(agentId); } else if (event == Event.StartAgentRebalance) { return rebalanceHost(agentId); - } + } return true; } @@ -907,7 +907,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } if (result) { s_logger.debug("Got host id=" + hostId + " from management server " + map.getFutureOwner()); - } + } } else { s_logger.warn("Unable to find agent " + hostId + " on management server " + _nodeId); @@ -935,10 +935,10 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust if (map.getInitialOwner() != _nodeId) { s_logger.warn("Why finish rebalance called not by initial host owner???"); return false; - } + } boolean success = (event == Event.RebalanceCompleted) ? true : false; - if (s_logger.isDebugEnabled()) { + if (s_logger.isDebugEnabled()) { s_logger.debug("Finishing rebalancing for the agent " + hostId + " with result " + success); }