bug 12616: advanced startup command for direct connected agent

status 12616: resolved fixed
This commit is contained in:
Edison Su 2012-01-03 18:29:40 -08:00
parent 3750c7055b
commit 4a7c684526
3 changed files with 151 additions and 91 deletions

View File

@ -45,6 +45,7 @@ import com.cloud.agent.api.Command;
import com.cloud.agent.api.CronCommand;
import com.cloud.agent.api.ModifySshKeysCommand;
import com.cloud.agent.api.PingCommand;
import com.cloud.agent.api.ReadyCommand;
import com.cloud.agent.api.ShutdownCommand;
import com.cloud.agent.api.StartupAnswer;
import com.cloud.agent.api.StartupCommand;
@ -476,9 +477,28 @@ public class Agent implements HandlerFactory, IAgentControl {
cancelTasks();
_reconnectAllowed = false;
answer = new Answer(cmd, true, null);
} else if (cmd instanceof ReadyCommand) {
ReadyCommand ready = (ReadyCommand)cmd;
s_logger.debug("Received shutdownCommand, due to: " + ready.getDetails());
if (ready.getDetails() != null) {
cancelTasks();
_reconnectAllowed = false;
answer = new Answer(cmd, true, null);
} else {
_inProgress.incrementAndGet();
try {
answer = _resource.executeRequest(cmd);
} finally {
_inProgress.decrementAndGet();
}
if (answer == null) {
s_logger.debug("Response: unsupported command" + cmd.toString());
answer = Answer.createUnsupportedCommandAnswer(cmd);
}
}
} else if (cmd instanceof AgentControlCommand) {
answer = null;
synchronized (_controlListeners) {
answer = null;
synchronized (_controlListeners) {
for (IAgentControlListener listener : _controlListeners) {
answer = listener.processControlRequest(request, (AgentControlCommand) cmd);
if (answer != null) {

View File

@ -18,7 +18,7 @@
package com.cloud.agent.api;
public class ReadyCommand extends Command {
private String details;
public ReadyCommand() {
super();
}
@ -38,5 +38,13 @@ public class ReadyCommand extends Command {
public boolean executeInSequence() {
return true;
}
public void setDetails(String details) {
this.details = details;
}
public String getDetails() {
return this.details;
}
}

View File

@ -1149,7 +1149,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager {
}
}
}
Long dcId = host.getDataCenterId();
ReadyCommand ready = new ReadyCommand(dcId);
Answer answer = easySend(hostId, ready);
@ -1925,26 +1925,102 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager {
}
public AgentAttache handleConnect(final Link link,
final StartupCommand[] startup) throws IllegalArgumentException,
ConnectionException {
final StartupCommand[] startup, Request request) {
HostVO server = null;
boolean handled = notifyCreatorsOfConnection(startup);
if (!handled) {
server = createHost(startup, null, null, false, null, null);
StartupAnswer[] answers = new StartupAnswer[startup.length];
AgentAttache attache = null;
try {
boolean handled = notifyCreatorsOfConnection(startup);
if (!handled) {
server = createHost(startup, null, null, false, null, null);
} else {
server = _hostDao.findByGuid(startup[0].getGuid());
}
if (server == null) {
return null;
}
long id = server.getId();
attache = createAttache(id, server, link);
Command cmd;
for (int i = 0; i < startup.length; i++) {
cmd = startup[i];
if ((cmd instanceof StartupRoutingCommand) || (cmd instanceof StartupProxyCommand) || (cmd instanceof StartupSecondaryStorageCommand) || (cmd instanceof StartupStorageCommand)) {
answers[i] = new StartupAnswer(startup[i], attache.getId(), getPingInterval());
break;
}
}
} catch (ConnectionException e) {
Command cmd;
for (int i = 0; i < startup.length; i++) {
cmd = startup[i];
if ((cmd instanceof StartupRoutingCommand) || (cmd instanceof StartupProxyCommand) || (cmd instanceof StartupSecondaryStorageCommand) || (cmd instanceof StartupStorageCommand)) {
answers[i] = new StartupAnswer(startup[i], e.toString());
break;
}
}
} catch (IllegalArgumentException e) {
Command cmd;
for (int i = 0; i < startup.length; i++) {
cmd = startup[i];
if ((cmd instanceof StartupRoutingCommand) || (cmd instanceof StartupProxyCommand) || (cmd instanceof StartupSecondaryStorageCommand) || (cmd instanceof StartupStorageCommand)) {
answers[i] = new StartupAnswer(startup[i], e.toString());
break;
}
}
} catch (CloudRuntimeException e) {
Command cmd;
for (int i = 0; i < startup.length; i++) {
cmd = startup[i];
if ((cmd instanceof StartupRoutingCommand) || (cmd instanceof StartupProxyCommand) || (cmd instanceof StartupSecondaryStorageCommand) || (cmd instanceof StartupStorageCommand)) {
answers[i] = new StartupAnswer(startup[i], e.toString());
break;
}
}
}
Response response = null;
if (attache != null) {
response = new Response(request, answers[0], _nodeId, attache.getId());
} else {
server = _hostDao.findByGuid(startup[0].getGuid());
response = new Response(request, answers[0], _nodeId, -1);
}
if (server == null) {
return null;
try {
link.send(response.toBytes());
} catch (ClosedChannelException e) {
s_logger.debug("Failed to send startupanswer: " + e.toString());
return null;
}
if (attache == null) {
return null;
}
try {
attache = notifyMonitorsOfConnection(attache, startup, false);
return attache;
} catch (ConnectionException e) {
ReadyCommand ready = new ReadyCommand(null);
ready.setDetails(e.toString());
try {
easySend(attache.getId(), ready);
} catch (Exception e1) {
s_logger.debug("Failed to send readycommand, due to " + e.toString());
}
return null;
} catch (CloudRuntimeException e) {
ReadyCommand ready = new ReadyCommand(null);
ready.setDetails(e.toString());
try {
easySend(attache.getId(), ready);
} catch (Exception e1) {
s_logger.debug("Failed to send readycommand, due to " + e.toString());
}
return null;
}
long id = server.getId();
AgentAttache attache = createAttache(id, server, link);
attache = notifyMonitorsOfConnection(attache, startup, false);
return attache;
}
protected AgentAttache createAttache(long id, HostVO server, Link link) {
@ -2245,55 +2321,17 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager {
s_logger.warn("Throwing away a request because it came through as the first command on a connect: " + request);
return;
}
StartupCommand startup = (StartupCommand) cmd;
// if ((_upgradeMgr.registerForUpgrade(-1, startup.getVersion())
// == UpgradeManager.State.RequiresUpdate) &&
// (_upgradeMgr.getAgentUrl() != null)) {
// final UpgradeCommand upgrade = new
// UpgradeCommand(_upgradeMgr.getAgentUrl());
// final Request req = new Request(1, -1, -1, new Command[] {
// upgrade }, true, true);
// s_logger.info("Agent requires upgrade: " + req.toString());
// try {
// link.send(req.toBytes());
// } catch (ClosedChannelException e) {
// s_logger.warn("Unable to tell agent it should update.");
// }
// return;
// }
try {
StartupCommand[] startups = new StartupCommand[cmds.length];
for (int i = 0; i < cmds.length; i++) {
startups[i] = (StartupCommand) cmds[i];
}
attache = handleConnect(link, startups);
} catch (final IllegalArgumentException e) {
_alertMgr.sendAlert(AlertManager.ALERT_TYPE_HOST, 0, new Long(0), "Agent from " + startup.getPrivateIpAddress() + " is unable to connect due to " + e.getMessage(), "Agent from "
+ startup.getPrivateIpAddress() + " is unable to connect with " + request + " because of " + e.getMessage());
s_logger.warn("Unable to create attache for agent: " + request, e);
response = new Response(request, new StartupAnswer((StartupCommand) cmd, e.getMessage()), _nodeId, -1);
} catch (ConnectionException e) {
_alertMgr.sendAlert(AlertManager.ALERT_TYPE_HOST, 0, new Long(0), "Agent from " + startup.getPrivateIpAddress() + " is unable to connect due to " + e.getMessage(), "Agent from "
+ startup.getPrivateIpAddress() + " is unable to connect with " + request + " because of " + e.getMessage());
s_logger.warn("Unable to create attache for agent: " + request, e);
response = new Response(request, new StartupAnswer((StartupCommand) cmd, e.getMessage()), _nodeId, -1);
} catch (final CloudRuntimeException e) {
_alertMgr.sendAlert(AlertManager.ALERT_TYPE_HOST, 0, new Long(0), "Agent from " + startup.getPrivateIpAddress() + " is unable to connect due to " + e.getMessage(), "Agent from "
+ startup.getPrivateIpAddress() + " is unable to connect with " + request + " because of " + e.getMessage());
s_logger.warn("Unable to create attache for agent: " + request, e);
StartupCommand[] startups = new StartupCommand[cmds.length];
for (int i = 0; i < cmds.length; i++) {
startups[i] = (StartupCommand) cmds[i];
}
attache = handleConnect(link, startups, request);
if (attache == null) {
if (response == null) {
s_logger.warn("Unable to create attache for agent: " + request);
response = new Response(request, new StartupAnswer((StartupCommand) request.getCommand(), "Unable to register this agent"), _nodeId, -1);
}
try {
link.send(response.toBytes(), true);
} catch (final ClosedChannelException e) {
s_logger.warn("Response was not sent: " + response);
}
return;
s_logger.warn("Unable to create attache for agent: " + request);
}
return;
}
final long hostId = attache.getId();
@ -2318,23 +2356,15 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager {
}
final Answer[] answers = new Answer[cmds.length];
boolean startupSend = false;
for (int i = 0; i < cmds.length; i++) {
cmd = cmds[i];
Answer answer = null;
try {
if (cmd instanceof StartupRoutingCommand) {
final StartupRoutingCommand startup = (StartupRoutingCommand) cmd;
answer = new StartupAnswer(startup, attache.getId(), getPingInterval());
} else if (cmd instanceof StartupProxyCommand) {
final StartupProxyCommand startup = (StartupProxyCommand) cmd;
answer = new StartupAnswer(startup, attache.getId(), getPingInterval());
} else if (cmd instanceof StartupSecondaryStorageCommand) {
final StartupSecondaryStorageCommand startup = (StartupSecondaryStorageCommand) cmd;
answer = new StartupAnswer(startup, attache.getId(), getPingInterval());
} else if (cmd instanceof StartupStorageCommand) {
final StartupStorageCommand startup = (StartupStorageCommand) cmd;
answer = new StartupAnswer(startup, attache.getId(), getPingInterval());
} else if (cmd instanceof ShutdownCommand) {
if ((cmd instanceof StartupRoutingCommand) || (cmd instanceof StartupProxyCommand) || (cmd instanceof StartupSecondaryStorageCommand) || (cmd instanceof StartupStorageCommand)) {
startupSend = true;
continue;
} else if (cmd instanceof ShutdownCommand) {
final ShutdownCommand shutdown = (ShutdownCommand) cmd;
final String reason = shutdown.getReason();
s_logger.info("Host " + attache.getId() + " has informed us that it is shutting down with reason " + reason + " and detail " + shutdown.getDetail());
@ -2389,18 +2419,20 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager {
answers[i] = answer;
}
response = new Response(request, answers, _nodeId, attache.getId());
if (s_logger.isDebugEnabled()) {
if (logD) {
s_logger.debug("SeqA " + attache.getId() + "-" + response.getSequence() + ": Sending " + response);
} else {
s_logger.trace("SeqA " + attache.getId() + "-" + response.getSequence() + ": Sending " + response);
}
}
try {
link.send(response.toBytes());
} catch (final ClosedChannelException e) {
s_logger.warn("Unable to send response because connection is closed: " + response);
if (!startupSend) {
response = new Response(request, answers, _nodeId, attache.getId());
if (s_logger.isDebugEnabled()) {
if (logD) {
s_logger.debug("SeqA " + attache.getId() + "-" + response.getSequence() + ": Sending " + response);
} else {
s_logger.trace("SeqA " + attache.getId() + "-" + response.getSequence() + ": Sending " + response);
}
}
try {
link.send(response.toBytes());
} catch (final ClosedChannelException e) {
s_logger.warn("Unable to send response because connection is closed: " + response);
}
}
}