From 4a7c684526b84c1735c652acd886c489d753018e Mon Sep 17 00:00:00 2001 From: Edison Su Date: Tue, 3 Jan 2012 18:29:40 -0800 Subject: [PATCH] bug 12616: advanced startup command for direct connected agent status 12616: resolved fixed --- agent/src/com/cloud/agent/Agent.java | 24 +- api/src/com/cloud/agent/api/ReadyCommand.java | 10 +- .../cloud/agent/manager/AgentManagerImpl.java | 208 ++++++++++-------- 3 files changed, 151 insertions(+), 91 deletions(-) diff --git a/agent/src/com/cloud/agent/Agent.java b/agent/src/com/cloud/agent/Agent.java index 42938cd8136..5dac32c2a4f 100755 --- a/agent/src/com/cloud/agent/Agent.java +++ b/agent/src/com/cloud/agent/Agent.java @@ -45,6 +45,7 @@ import com.cloud.agent.api.Command; import com.cloud.agent.api.CronCommand; import com.cloud.agent.api.ModifySshKeysCommand; import com.cloud.agent.api.PingCommand; +import com.cloud.agent.api.ReadyCommand; import com.cloud.agent.api.ShutdownCommand; import com.cloud.agent.api.StartupAnswer; import com.cloud.agent.api.StartupCommand; @@ -476,9 +477,28 @@ public class Agent implements HandlerFactory, IAgentControl { cancelTasks(); _reconnectAllowed = false; answer = new Answer(cmd, true, null); + } else if (cmd instanceof ReadyCommand) { + ReadyCommand ready = (ReadyCommand)cmd; + s_logger.debug("Received shutdownCommand, due to: " + ready.getDetails()); + if (ready.getDetails() != null) { + cancelTasks(); + _reconnectAllowed = false; + answer = new Answer(cmd, true, null); + } else { + _inProgress.incrementAndGet(); + try { + answer = _resource.executeRequest(cmd); + } finally { + _inProgress.decrementAndGet(); + } + if (answer == null) { + s_logger.debug("Response: unsupported command" + cmd.toString()); + answer = Answer.createUnsupportedCommandAnswer(cmd); + } + } } else if (cmd instanceof AgentControlCommand) { - answer = null; - synchronized (_controlListeners) { + answer = null; + synchronized (_controlListeners) { for (IAgentControlListener listener : _controlListeners) { answer = listener.processControlRequest(request, (AgentControlCommand) cmd); if (answer != null) { diff --git a/api/src/com/cloud/agent/api/ReadyCommand.java b/api/src/com/cloud/agent/api/ReadyCommand.java index a554fbed90b..51b4d3bed2a 100644 --- a/api/src/com/cloud/agent/api/ReadyCommand.java +++ b/api/src/com/cloud/agent/api/ReadyCommand.java @@ -18,7 +18,7 @@ package com.cloud.agent.api; public class ReadyCommand extends Command { - + private String details; public ReadyCommand() { super(); } @@ -38,5 +38,13 @@ public class ReadyCommand extends Command { public boolean executeInSequence() { return true; } + + public void setDetails(String details) { + this.details = details; + } + + public String getDetails() { + return this.details; + } } diff --git a/server/src/com/cloud/agent/manager/AgentManagerImpl.java b/server/src/com/cloud/agent/manager/AgentManagerImpl.java index 788f51a831f..8d368ee5fd2 100755 --- a/server/src/com/cloud/agent/manager/AgentManagerImpl.java +++ b/server/src/com/cloud/agent/manager/AgentManagerImpl.java @@ -1149,7 +1149,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { } } } - + Long dcId = host.getDataCenterId(); ReadyCommand ready = new ReadyCommand(dcId); Answer answer = easySend(hostId, ready); @@ -1925,26 +1925,102 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { } public AgentAttache handleConnect(final Link link, - final StartupCommand[] startup) throws IllegalArgumentException, - ConnectionException { + final StartupCommand[] startup, Request request) { HostVO server = null; - boolean handled = notifyCreatorsOfConnection(startup); - if (!handled) { - server = createHost(startup, null, null, false, null, null); + StartupAnswer[] answers = new StartupAnswer[startup.length]; + AgentAttache attache = null; + try { + boolean handled = notifyCreatorsOfConnection(startup); + if (!handled) { + server = createHost(startup, null, null, false, null, null); + } else { + server = _hostDao.findByGuid(startup[0].getGuid()); + } + + if (server == null) { + return null; + } + long id = server.getId(); + + attache = createAttache(id, server, link); + + Command cmd; + for (int i = 0; i < startup.length; i++) { + cmd = startup[i]; + if ((cmd instanceof StartupRoutingCommand) || (cmd instanceof StartupProxyCommand) || (cmd instanceof StartupSecondaryStorageCommand) || (cmd instanceof StartupStorageCommand)) { + answers[i] = new StartupAnswer(startup[i], attache.getId(), getPingInterval()); + break; + } + } + } catch (ConnectionException e) { + Command cmd; + for (int i = 0; i < startup.length; i++) { + cmd = startup[i]; + if ((cmd instanceof StartupRoutingCommand) || (cmd instanceof StartupProxyCommand) || (cmd instanceof StartupSecondaryStorageCommand) || (cmd instanceof StartupStorageCommand)) { + answers[i] = new StartupAnswer(startup[i], e.toString()); + break; + } + } + } catch (IllegalArgumentException e) { + Command cmd; + for (int i = 0; i < startup.length; i++) { + cmd = startup[i]; + if ((cmd instanceof StartupRoutingCommand) || (cmd instanceof StartupProxyCommand) || (cmd instanceof StartupSecondaryStorageCommand) || (cmd instanceof StartupStorageCommand)) { + answers[i] = new StartupAnswer(startup[i], e.toString()); + break; + } + } + } catch (CloudRuntimeException e) { + Command cmd; + for (int i = 0; i < startup.length; i++) { + cmd = startup[i]; + if ((cmd instanceof StartupRoutingCommand) || (cmd instanceof StartupProxyCommand) || (cmd instanceof StartupSecondaryStorageCommand) || (cmd instanceof StartupStorageCommand)) { + answers[i] = new StartupAnswer(startup[i], e.toString()); + break; + } + } + } + + Response response = null; + if (attache != null) { + response = new Response(request, answers[0], _nodeId, attache.getId()); } else { - server = _hostDao.findByGuid(startup[0].getGuid()); + response = new Response(request, answers[0], _nodeId, -1); } - - if (server == null) { - return null; + + try { + link.send(response.toBytes()); + } catch (ClosedChannelException e) { + s_logger.debug("Failed to send startupanswer: " + e.toString()); + return null; + } + + if (attache == null) { + return null; + } + + try { + attache = notifyMonitorsOfConnection(attache, startup, false); + return attache; + } catch (ConnectionException e) { + ReadyCommand ready = new ReadyCommand(null); + ready.setDetails(e.toString()); + try { + easySend(attache.getId(), ready); + } catch (Exception e1) { + s_logger.debug("Failed to send readycommand, due to " + e.toString()); + } + return null; + } catch (CloudRuntimeException e) { + ReadyCommand ready = new ReadyCommand(null); + ready.setDetails(e.toString()); + try { + easySend(attache.getId(), ready); + } catch (Exception e1) { + s_logger.debug("Failed to send readycommand, due to " + e.toString()); + } + return null; } - long id = server.getId(); - - AgentAttache attache = createAttache(id, server, link); - - attache = notifyMonitorsOfConnection(attache, startup, false); - - return attache; } protected AgentAttache createAttache(long id, HostVO server, Link link) { @@ -2245,55 +2321,17 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { s_logger.warn("Throwing away a request because it came through as the first command on a connect: " + request); return; } - StartupCommand startup = (StartupCommand) cmd; - // if ((_upgradeMgr.registerForUpgrade(-1, startup.getVersion()) - // == UpgradeManager.State.RequiresUpdate) && - // (_upgradeMgr.getAgentUrl() != null)) { - // final UpgradeCommand upgrade = new - // UpgradeCommand(_upgradeMgr.getAgentUrl()); - // final Request req = new Request(1, -1, -1, new Command[] { - // upgrade }, true, true); - // s_logger.info("Agent requires upgrade: " + req.toString()); - // try { - // link.send(req.toBytes()); - // } catch (ClosedChannelException e) { - // s_logger.warn("Unable to tell agent it should update."); - // } - // return; - // } - try { - StartupCommand[] startups = new StartupCommand[cmds.length]; - for (int i = 0; i < cmds.length; i++) { - startups[i] = (StartupCommand) cmds[i]; - } - attache = handleConnect(link, startups); - } catch (final IllegalArgumentException e) { - _alertMgr.sendAlert(AlertManager.ALERT_TYPE_HOST, 0, new Long(0), "Agent from " + startup.getPrivateIpAddress() + " is unable to connect due to " + e.getMessage(), "Agent from " - + startup.getPrivateIpAddress() + " is unable to connect with " + request + " because of " + e.getMessage()); - s_logger.warn("Unable to create attache for agent: " + request, e); - response = new Response(request, new StartupAnswer((StartupCommand) cmd, e.getMessage()), _nodeId, -1); - } catch (ConnectionException e) { - _alertMgr.sendAlert(AlertManager.ALERT_TYPE_HOST, 0, new Long(0), "Agent from " + startup.getPrivateIpAddress() + " is unable to connect due to " + e.getMessage(), "Agent from " - + startup.getPrivateIpAddress() + " is unable to connect with " + request + " because of " + e.getMessage()); - s_logger.warn("Unable to create attache for agent: " + request, e); - response = new Response(request, new StartupAnswer((StartupCommand) cmd, e.getMessage()), _nodeId, -1); - } catch (final CloudRuntimeException e) { - _alertMgr.sendAlert(AlertManager.ALERT_TYPE_HOST, 0, new Long(0), "Agent from " + startup.getPrivateIpAddress() + " is unable to connect due to " + e.getMessage(), "Agent from " - + startup.getPrivateIpAddress() + " is unable to connect with " + request + " because of " + e.getMessage()); - s_logger.warn("Unable to create attache for agent: " + request, e); + + StartupCommand[] startups = new StartupCommand[cmds.length]; + for (int i = 0; i < cmds.length; i++) { + startups[i] = (StartupCommand) cmds[i]; } + attache = handleConnect(link, startups, request); + if (attache == null) { - if (response == null) { - s_logger.warn("Unable to create attache for agent: " + request); - response = new Response(request, new StartupAnswer((StartupCommand) request.getCommand(), "Unable to register this agent"), _nodeId, -1); - } - try { - link.send(response.toBytes(), true); - } catch (final ClosedChannelException e) { - s_logger.warn("Response was not sent: " + response); - } - return; + s_logger.warn("Unable to create attache for agent: " + request); } + return; } final long hostId = attache.getId(); @@ -2318,23 +2356,15 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { } final Answer[] answers = new Answer[cmds.length]; + boolean startupSend = false; for (int i = 0; i < cmds.length; i++) { cmd = cmds[i]; Answer answer = null; try { - if (cmd instanceof StartupRoutingCommand) { - final StartupRoutingCommand startup = (StartupRoutingCommand) cmd; - answer = new StartupAnswer(startup, attache.getId(), getPingInterval()); - } else if (cmd instanceof StartupProxyCommand) { - final StartupProxyCommand startup = (StartupProxyCommand) cmd; - answer = new StartupAnswer(startup, attache.getId(), getPingInterval()); - } else if (cmd instanceof StartupSecondaryStorageCommand) { - final StartupSecondaryStorageCommand startup = (StartupSecondaryStorageCommand) cmd; - answer = new StartupAnswer(startup, attache.getId(), getPingInterval()); - } else if (cmd instanceof StartupStorageCommand) { - final StartupStorageCommand startup = (StartupStorageCommand) cmd; - answer = new StartupAnswer(startup, attache.getId(), getPingInterval()); - } else if (cmd instanceof ShutdownCommand) { + if ((cmd instanceof StartupRoutingCommand) || (cmd instanceof StartupProxyCommand) || (cmd instanceof StartupSecondaryStorageCommand) || (cmd instanceof StartupStorageCommand)) { + startupSend = true; + continue; + } else if (cmd instanceof ShutdownCommand) { final ShutdownCommand shutdown = (ShutdownCommand) cmd; final String reason = shutdown.getReason(); s_logger.info("Host " + attache.getId() + " has informed us that it is shutting down with reason " + reason + " and detail " + shutdown.getDetail()); @@ -2389,18 +2419,20 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { answers[i] = answer; } - response = new Response(request, answers, _nodeId, attache.getId()); - if (s_logger.isDebugEnabled()) { - if (logD) { - s_logger.debug("SeqA " + attache.getId() + "-" + response.getSequence() + ": Sending " + response); - } else { - s_logger.trace("SeqA " + attache.getId() + "-" + response.getSequence() + ": Sending " + response); - } - } - try { - link.send(response.toBytes()); - } catch (final ClosedChannelException e) { - s_logger.warn("Unable to send response because connection is closed: " + response); + if (!startupSend) { + response = new Response(request, answers, _nodeId, attache.getId()); + if (s_logger.isDebugEnabled()) { + if (logD) { + s_logger.debug("SeqA " + attache.getId() + "-" + response.getSequence() + ": Sending " + response); + } else { + s_logger.trace("SeqA " + attache.getId() + "-" + response.getSequence() + ": Sending " + response); + } + } + try { + link.send(response.toBytes()); + } catch (final ClosedChannelException e) { + s_logger.warn("Unable to send response because connection is closed: " + response); + } } }