From ab49c70add364ced20eb899bc922508c15ed8239 Mon Sep 17 00:00:00 2001 From: Alex Huang Date: Mon, 16 May 2011 09:28:51 -0700 Subject: [PATCH] Added some annotations for logging commands --- agent/src/com/cloud/agent/Agent.java | 544 +++++++++--------- api/src/com/cloud/agent/api/Command.java | 43 +- .../agent/api/SecurityIngressRuleAnswer.java | 6 - .../com/cloud/agent/transport/Request.java | 258 ++++----- .../cloud/agent/manager/AgentManagerImpl.java | 6 +- 5 files changed, 413 insertions(+), 444 deletions(-) diff --git a/agent/src/com/cloud/agent/Agent.java b/agent/src/com/cloud/agent/Agent.java index 26267c5e1f7..38c942bdd7e 100755 --- a/agent/src/com/cloud/agent/Agent.java +++ b/agent/src/com/cloud/agent/Agent.java @@ -23,7 +23,6 @@ import java.io.StringWriter; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.channels.ClosedChannelException; -import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -64,20 +63,20 @@ import com.cloud.utils.script.Script; /** * @config - * {@table - * || Param Name | Description | Values | Default || - * || type | Type of server | Storage / Computing / Routing | No Default || - * || workers | # of workers to process the requests | int | 1 || - * || host | host to connect to | ip address | localhost || - * || port | port to connect to | port number | 8250 || - * || instance | Used to allow multiple agents running on the same host | String | none || - * } - * - * For more configuration options, see the individual types. - * + * {@table + * || Param Name | Description | Values | Default || + * || type | Type of server | Storage / Computing / Routing | No Default || + * || workers | # of workers to process the requests | int | 1 || + * || host | host to connect to | ip address | localhost || + * || port | port to connect to | port number | 8250 || + * || instance | Used to allow multiple agents running on the same host | String | none || * } + * + * For more configuration options, see the individual types. + * **/ public class Agent implements HandlerFactory, IAgentControl { - private static final Logger s_logger = Logger.getLogger(Agent.class.getName()); + private static final Logger s_logger = Logger.getLogger(Agent.class.getName()); + public enum ExitStatus { Normal(0), // Normal status = 0. Upgrade(65), // Exiting for upgrade. @@ -85,6 +84,7 @@ public class Agent implements HandlerFactory, IAgentControl { Error(67); // Exiting because of error. int value; + ExitStatus(final int value) { this.value = value; } @@ -94,78 +94,66 @@ public class Agent implements HandlerFactory, IAgentControl { } } - List _controlListeners = new ArrayList(); - - IAgentShell _shell; - NioConnection _connection; - ServerResource _resource; - Link _link; - Long _id; - - Timer _timer = new Timer("Agent Timer"); - - List _watchList = new ArrayList(); - long _sequence = 0; - long _lastPingResponseTime = 0; - long _pingInterval = 0; - AtomicInteger _inProgress = new AtomicInteger(); - - StartupTask _startup = null; - boolean _reconnectAllowed = true; + List _controlListeners = new ArrayList(); + + IAgentShell _shell; + NioConnection _connection; + ServerResource _resource; + Link _link; + Long _id; + + Timer _timer = new Timer("Agent Timer"); + + List _watchList = new ArrayList(); + long _sequence = 0; + long _lastPingResponseTime = 0; + long _pingInterval = 0; + AtomicInteger _inProgress = new AtomicInteger(); + + StartupTask _startup = null; + boolean _reconnectAllowed = true; // for simulator use only public Agent(IAgentShell shell) { - _shell = shell; + _shell = shell; _link = null; - - _connection = new NioClient( - "Agent", - _shell.getHost(), - _shell.getPort(), - _shell.getWorkers(), - this); + + _connection = new NioClient("Agent", _shell.getHost(), _shell.getPort(), _shell.getWorkers(), this); Runtime.getRuntime().addShutdownHook(new ShutdownThread(this)); } - + public Agent(IAgentShell shell, int localAgentId, ServerResource resource) throws ConfigurationException { - _shell = shell; - _resource = resource; + _shell = shell; + _resource = resource; _link = null; - + resource.setAgentControl(this); - + String value = _shell.getPersistentProperty(getResourceName(), "id"); _id = value != null ? Long.parseLong(value) : null; s_logger.info("id is " + ((_id != null) ? _id : "")); - + final Map params = PropertiesUtil.toMap(_shell.getProperties()); - + // merge with properties from command line to let resource access command line parameters - for(Map.Entry cmdLineProp : _shell.getCmdLineProperties().entrySet()) { - params.put(cmdLineProp.getKey(), cmdLineProp.getValue()); + for (Map.Entry cmdLineProp : _shell.getCmdLineProperties().entrySet()) { + params.put(cmdLineProp.getKey(), cmdLineProp.getValue()); } - + if (!_resource.configure(getResourceName(), params)) { throw new ConfigurationException("Unable to configure " + _resource.getName()); } - - _connection = new NioClient( - "Agent", - _shell.getHost(), - _shell.getPort(), - _shell.getWorkers(), - this); + + _connection = new NioClient("Agent", _shell.getHost(), _shell.getPort(), _shell.getWorkers(), this); // ((NioClient)_connection).setBindAddress(_shell.getPrivateIp()); s_logger.debug("Adding shutdown hook"); Runtime.getRuntime().addShutdownHook(new ShutdownThread(this)); - s_logger.info("Agent [id = " + (_id != null ? _id : "new") + " : type = " + getResourceName() - + " : zone = " + _shell.getZone() + " : pod = " + _shell.getPod() - + " : workers = " + _shell.getWorkers() + " : host = " + _shell.getHost() - + " : port = " + _shell.getPort()); + s_logger.info("Agent [id = " + (_id != null ? _id : "new") + " : type = " + getResourceName() + " : zone = " + _shell.getZone() + " : pod = " + _shell.getPod() + " : workers = " + + _shell.getWorkers() + " : host = " + _shell.getHost() + " : port = " + _shell.getPort()); } public String getVersion() { @@ -173,8 +161,8 @@ public class Agent implements HandlerFactory, IAgentControl { } public String getResourceGuid() { - String guid = _shell.getGuid(); - return guid + "-" + getResourceName(); + String guid = _shell.getGuid(); + return guid + "-" + getResourceName(); } public String getZone() { @@ -196,29 +184,29 @@ public class Agent implements HandlerFactory, IAgentControl { public BackoffAlgorithm getBackoffAlgorithm() { return _shell.getBackoffAlgorithm(); } - + public String getResourceName() { - return _resource.getClass().getSimpleName(); + return _resource.getClass().getSimpleName(); } public void upgradeAgent(final String url, boolean protocol) { // shell needs to take care of synchronization when multiple-instances demand upgrade // at the same time _shell.upgradeAgent(url); - + // To stop agent after it has been upgraded, as shell executor may prematurely time out // tasks if agent is in shutting down process if (protocol) { - if (_connection != null) { - _connection.stop(); - _connection = null; - } - if (_resource != null) { - _resource.stop(); - _resource = null; - } + if (_connection != null) { + _connection.stop(); + _connection = null; + } + if (_resource != null) { + _resource.stop(); + _resource = null; + } } else { - stop(ShutdownCommand.Update, null); + stop(ShutdownCommand.Update, null); } } @@ -236,19 +224,19 @@ public class Agent implements HandlerFactory, IAgentControl { final ShutdownCommand cmd = new ShutdownCommand(reason, detail); try { if (_link != null) { - Request req = new Request(0, (_id != null? _id : -1), -1, cmd, false); + Request req = new Request(0, (_id != null ? _id : -1), -1, cmd, false); _link.send(req.toBytes()); } } catch (final ClosedChannelException e) { s_logger.warn("Unable to send: " + cmd.toString()); - } catch(Exception e) { + } catch (Exception e) { s_logger.warn("Unable to send: " + cmd.toString() + " due to exception: ", e); } s_logger.debug("Sending shutdown to management server"); try { - Thread.sleep(1000); + Thread.sleep(1000); } catch (final InterruptedException e) { - s_logger.debug("Who the heck interrupted me here?"); + s_logger.debug("Who the heck interrupted me here?"); } _connection.stop(); _connection = null; @@ -265,11 +253,11 @@ public class Agent implements HandlerFactory, IAgentControl { } public void setId(final Long id) { - s_logger.info("Set agent id " + id); + s_logger.info("Set agent id " + id); _id = id; _shell.setPersistentProperty(getResourceName(), "id", Long.toString(id)); } - + public void scheduleWatch(final Link link, final Request request, final long delay, final long period) { synchronized (_watchList) { if (s_logger.isDebugEnabled()) { @@ -280,9 +268,9 @@ public class Agent implements HandlerFactory, IAgentControl { _watchList.add(task); } } - + protected void cancelTasks() { - synchronized(_watchList) { + synchronized (_watchList) { for (final WatchTask task : _watchList) { task.cancel(); } @@ -292,23 +280,23 @@ public class Agent implements HandlerFactory, IAgentControl { _watchList.clear(); } } - + public void sendStartup(Link link) { final StartupCommand[] startup = _resource.initialize(); final Command[] commands = new Command[startup.length]; - for (int i=0; i< startup.length; i++){ - setupStartupCommand(startup[i]); - commands[i] = startup[i]; + for (int i = 0; i < startup.length; i++) { + setupStartupCommand(startup[i]); + commands[i] = startup[i]; } - - final Request request = new Request(getNextSequence(), _id != null ? _id : -1, -1, commands, false, false, false); + + final Request request = new Request(getNextSequence(), _id != null ? _id : -1, -1, commands, false, false); if (s_logger.isDebugEnabled()) { s_logger.debug("Sending Startup: " + request.toString()); } - synchronized(this) { - _startup = new StartupTask(link); - _timer.schedule(_startup, 180000); + synchronized (this) { + _startup = new StartupTask(link); + _timer.schedule(_startup, 180000); } try { link.send(request.toBytes()); @@ -316,46 +304,47 @@ public class Agent implements HandlerFactory, IAgentControl { s_logger.warn("Unable to send reques: " + request.toString()); } } - + protected void setupStartupCommand(StartupCommand startup) { InetAddress addr; try { addr = InetAddress.getLocalHost(); } catch (final UnknownHostException e) { s_logger.warn("unknow host? ", e); - //ignore + // ignore return; } - + final Script command = new Script("hostname", 500, s_logger); final OutputInterpreter.OneLineParser parser = new OutputInterpreter.OneLineParser(); final String result = command.execute(parser); final String hostname = result == null ? parser.getLine() : addr.toString(); - + startup.setId(getId()); - if (startup.getName() == null) - startup.setName(hostname); + if (startup.getName() == null) { + startup.setName(hostname); + } startup.setDataCenter(getZone()); startup.setPod(getPod()); startup.setGuid(getResourceGuid()); startup.setResourceName(getResourceName()); startup.setVersion(getVersion()); } - + @Override public Task create(Task.Type type, Link link, byte[] data) { return new ServerHandler(type, link, data); } - + protected void reconnect(final Link link) { if (!_reconnectAllowed) { return; } - synchronized(this) { - if (_startup != null) { - _startup.cancel(); - _startup= null; - } + synchronized (this) { + if (_startup != null) { + _startup.cancel(); + _startup = null; + } } link.close(); @@ -363,67 +352,62 @@ public class Agent implements HandlerFactory, IAgentControl { setLink(null); cancelTasks(); - + _resource.disconnected(); int inProgress = 0; do { _shell.getBackoffAlgorithm().waitBeforeRetry(); - + s_logger.info("Lost connection to the server. Dealing with the remaining commands..."); inProgress = _inProgress.get(); if (inProgress > 0) { - s_logger.info("Cannot connect because we still have " + inProgress + " commands in progress."); + s_logger.info("Cannot connect because we still have " + inProgress + " commands in progress."); } } while (inProgress > 0); _connection.stop(); - while (_connection.isStartup()){ + while (_connection.isStartup()) { _shell.getBackoffAlgorithm().waitBeforeRetry(); } try { _connection.cleanUp(); } catch (IOException e) { - s_logger.warn("Fail to clean up old connection. " + e); + s_logger.warn("Fail to clean up old connection. " + e); } - _connection = new NioClient( - "Agent", - _shell.getHost(), - _shell.getPort(), - _shell.getWorkers(), - this); + _connection = new NioClient("Agent", _shell.getHost(), _shell.getPort(), _shell.getWorkers(), this); do { - s_logger.info("Reconnecting..."); - _connection.start(); + s_logger.info("Reconnecting..."); + _connection.start(); _shell.getBackoffAlgorithm().waitBeforeRetry(); } while (!_connection.isStartup()); } - + public void processStartupAnswer(Answer answer, Response response, Link link) { - boolean cancelled = false; - synchronized(this) { + boolean cancelled = false; + synchronized (this) { if (_startup != null) { _startup.cancel(); _startup = null; } else { - cancelled = true; + cancelled = true; } } - - final StartupAnswer startup = (StartupAnswer)answer; + + final StartupAnswer startup = (StartupAnswer) answer; if (!startup.getResult()) { s_logger.error("Not allowed to connect to the server: " + answer.getDetails()); System.exit(1); } if (cancelled) { s_logger.warn("Threw away a startup answer because we're reconnecting."); - return; + return; } - + s_logger.info("Proccess agent startup answer, agent id = " + startup.getHostId()); - + setId(startup.getHostId()); _pingInterval = startup.getPingInterval() * 1000; // change to ms. @@ -431,73 +415,68 @@ public class Agent implements HandlerFactory, IAgentControl { scheduleWatch(link, response, _pingInterval, _pingInterval); s_logger.info("Startup Response Received: agent id = " + getId()); } - + protected void processRequest(final Request request, final Link link) { - boolean requestLogged = false; + boolean requestLogged = false; Response response = null; try { final Command[] cmds = request.getCommands(); final Answer[] answers = new Answer[cmds.length]; - for (int i = 0; i < cmds.length; i++) - { + for (int i = 0; i < cmds.length; i++) { final Command cmd = cmds[i]; Answer answer; - try - { - if (s_logger.isDebugEnabled()) - { - //this is a hack to make sure we do NOT log the ssh keys - if((cmd instanceof ModifySshKeysCommand)) - { - s_logger.debug("Received the request for command: ModifySshKeysCommand"); - } - else - { - if(!requestLogged) //ensures request is logged only once per method call - { - s_logger.debug("Request:" + request.toString()); - requestLogged = true; - } - } - + try { + if (s_logger.isDebugEnabled()) { + // this is a hack to make sure we do NOT log the ssh keys + if ((cmd instanceof ModifySshKeysCommand)) { + s_logger.debug("Received the request for command: ModifySshKeysCommand"); + } else { + if (!requestLogged) // ensures request is logged only once per method call + { + s_logger.debug("Request:" + request.toString()); + requestLogged = true; + } + } + s_logger.debug("Processing command: " + cmd.toString()); } if (cmd instanceof CronCommand) { - final CronCommand watch = (CronCommand)cmd; + final CronCommand watch = (CronCommand) cmd; scheduleWatch(link, request, watch.getInterval() * 1000, watch.getInterval() * 1000); answer = new Answer(cmd, true, null); } else if (cmd instanceof UpgradeCommand) { - final UpgradeCommand upgrade = (UpgradeCommand)cmd; + final UpgradeCommand upgrade = (UpgradeCommand) cmd; answer = upgradeAgent(upgrade.getUpgradeUrl(), upgrade); } else if (cmd instanceof ShutdownCommand) { - ShutdownCommand shutdown = (ShutdownCommand)cmd; + ShutdownCommand shutdown = (ShutdownCommand) cmd; s_logger.debug("Received shutdownCommand, due to: " + shutdown.getReason()); cancelTasks(); _reconnectAllowed = false; answer = new Answer(cmd, true, null); - } else if(cmd instanceof AgentControlCommand) { - answer = null; - synchronized(_controlListeners) { - for(IAgentControlListener listener: _controlListeners) { - answer = listener.processControlRequest(request, (AgentControlCommand)cmd); - if(answer != null) - break; - } - } - - if(answer == null) { - s_logger.warn("No handler found to process cmd: " + cmd.toString()); - answer = new AgentControlAnswer(cmd); - } - + } else if (cmd instanceof AgentControlCommand) { + answer = null; + synchronized (_controlListeners) { + for (IAgentControlListener listener : _controlListeners) { + answer = listener.processControlRequest(request, (AgentControlCommand) cmd); + if (answer != null) { + break; + } + } + } + + if (answer == null) { + s_logger.warn("No handler found to process cmd: " + cmd.toString()); + answer = new AgentControlAnswer(cmd); + } + } else { _inProgress.incrementAndGet(); try { - answer = _resource.executeRequest(cmd); + answer = _resource.executeRequest(cmd); } finally { - _inProgress.decrementAndGet(); + _inProgress.decrementAndGet(); } if (answer == null) { s_logger.debug("Response: unsupported command" + cmd.toString()); @@ -510,7 +489,7 @@ public class Agent implements HandlerFactory, IAgentControl { th.printStackTrace(new PrintWriter(writer)); answer = new Answer(cmd, false, writer.toString()); } - + answers[i] = answer; if (!answer.getResult() && request.stopOnError()) { for (i++; i < cmds.length; i++) { @@ -534,26 +513,26 @@ public class Agent implements HandlerFactory, IAgentControl { } } } - + public void processResponse(final Response response, final Link link) { final Answer answer = response.getAnswer(); if (s_logger.isDebugEnabled()) { s_logger.debug("Received response: " + response.toString()); } if (answer instanceof StartupAnswer) { - processStartupAnswer(answer, response, link); - } else if(answer instanceof AgentControlAnswer) { - // Notice, we are doing callback while holding a lock! - synchronized(_controlListeners) { - for(IAgentControlListener listener : _controlListeners) { - listener.processControlResponse(response, (AgentControlAnswer)answer); - } - } + processStartupAnswer(answer, response, link); + } else if (answer instanceof AgentControlAnswer) { + // Notice, we are doing callback while holding a lock! + synchronized (_controlListeners) { + for (IAgentControlListener listener : _controlListeners) { + listener.processControlResponse(response, (AgentControlAnswer) answer); + } + } } else { setLastPingResponseTime(); } } - + public void processOtherTask(Task task) { final Object obj = task.get(); if (obj instanceof Response) { @@ -575,15 +554,15 @@ public class Agent implements HandlerFactory, IAgentControl { } catch (final ClosedChannelException e) { s_logger.warn("Unable to send request: " + request.toString()); } - } else if (obj instanceof Request){ - final Request req = (Request)obj; + } else if (obj instanceof Request) { + final Request req = (Request) obj; final Command command = req.getCommand(); Answer answer = null; _inProgress.incrementAndGet(); try { - answer = _resource.executeRequest(command); + answer = _resource.executeRequest(command); } finally { - _inProgress.decrementAndGet(); + _inProgress.decrementAndGet(); } if (answer != null) { final Response response = new Response(req, answer); @@ -601,12 +580,12 @@ public class Agent implements HandlerFactory, IAgentControl { s_logger.warn("Ignoring an unknown task"); } } - + protected UpgradeAnswer upgradeAgent(final String url, final UpgradeCommand cmd) { try { upgradeAgent(url, cmd == null); return null; - } catch(final Exception e) { + } catch (final Exception e) { s_logger.error("Unable to run this agent because we couldn't complete the upgrade process.", e); if (cmd != null) { final StringWriter writer = new StringWriter(); @@ -624,112 +603,113 @@ public class Agent implements HandlerFactory, IAgentControl { public synchronized void setLastPingResponseTime() { _lastPingResponseTime = System.currentTimeMillis(); } - + protected synchronized long getNextSequence() { return _sequence++; } - + @Override - public void registerControlListener(IAgentControlListener listener) { - synchronized(_controlListeners) { - _controlListeners.add(listener); - } - } - - @Override - public void unregisterControlListener(IAgentControlListener listener) { - synchronized(_controlListeners) { - _controlListeners.remove(listener); - } - } - - @Override - public AgentControlAnswer sendRequest(AgentControlCommand cmd, int timeoutInMilliseconds) throws AgentControlChannelException { - Request request = new Request(this.getNextSequence(), this.getId(), - -1, new Command[] {cmd}, true, false, false); - AgentControlListener listener = new AgentControlListener(request); - - registerControlListener(listener); - try { - postRequest(request); - synchronized(listener) { - try { - listener.wait(timeoutInMilliseconds); - } catch (InterruptedException e) { - s_logger.warn("sendRequest is interrupted, exit waiting"); - } - } - - return listener.getAnswer(); - } finally { - unregisterControlListener(listener); + public void registerControlListener(IAgentControlListener listener) { + synchronized (_controlListeners) { + _controlListeners.add(listener); } } - + @Override - public void postRequest(AgentControlCommand cmd) throws AgentControlChannelException { - Request request = new Request(this.getNextSequence(), this.getId(), - -1, new Command[] {cmd}, true, false, false); + public void unregisterControlListener(IAgentControlListener listener) { + synchronized (_controlListeners) { + _controlListeners.remove(listener); + } + } + + @Override + public AgentControlAnswer sendRequest(AgentControlCommand cmd, int timeoutInMilliseconds) throws AgentControlChannelException { + Request request = new Request(this.getNextSequence(), this.getId(), -1, new Command[] { cmd }, true, false); + AgentControlListener listener = new AgentControlListener(request); + + registerControlListener(listener); + try { + postRequest(request); + synchronized (listener) { + try { + listener.wait(timeoutInMilliseconds); + } catch (InterruptedException e) { + s_logger.warn("sendRequest is interrupted, exit waiting"); + } + } + + return listener.getAnswer(); + } finally { + unregisterControlListener(listener); + } + } + + @Override + public void postRequest(AgentControlCommand cmd) throws AgentControlChannelException { + Request request = new Request(this.getNextSequence(), this.getId(), -1, new Command[] { cmd }, true, false); postRequest(request); } - + private void postRequest(Request request) throws AgentControlChannelException { - if(_link != null) { - try { - _link.send(request.toBytes()); - } catch (final ClosedChannelException e) { - s_logger.warn("Unable to post agent control reques: " + request.toString()); - throw new AgentControlChannelException("Unable to post agent control request due to " + e.getMessage()); - } + if (_link != null) { + try { + _link.send(request.toBytes()); + } catch (final ClosedChannelException e) { + s_logger.warn("Unable to post agent control reques: " + request.toString()); + throw new AgentControlChannelException("Unable to post agent control request due to " + e.getMessage()); + } } else { throw new AgentControlChannelException("Unable to post agent control request as link is not available"); } } - + public class AgentControlListener implements IAgentControlListener { - private AgentControlAnswer _answer; - private final Request _request; - - public AgentControlListener(Request request) { - _request = request; - } - - public AgentControlAnswer getAnswer() { - return _answer; - } - - @Override + private AgentControlAnswer _answer; + private final Request _request; + + public AgentControlListener(Request request) { + _request = request; + } + + public AgentControlAnswer getAnswer() { + return _answer; + } + + @Override public Answer processControlRequest(Request request, AgentControlCommand cmd) { - return null; - } - - @Override + return null; + } + + @Override public void processControlResponse(Response response, AgentControlAnswer answer) { - if(_request.getSequence() == response.getSequence()) { - _answer = answer; - synchronized(this) { - notifyAll(); - } - } - } + if (_request.getSequence() == response.getSequence()) { + _answer = answer; + synchronized (this) { + notifyAll(); + } + } + } } - + protected class ShutdownThread extends Thread { Agent _agent; + public ShutdownThread(final Agent agent) { - super("AgentShutdownThread"); + super("AgentShutdownThread"); _agent = agent; } + @Override public void run() { _agent.stop(ShutdownCommand.Requested, null); } } - + public class WatchTask extends TimerTask { protected Request _request; - protected Agent _agent; - protected Link _link; + protected Agent _agent; + protected Link _link; + public WatchTask(final Link link, final Request request, final Agent agent) { super(); _request = request; @@ -749,45 +729,45 @@ public class Agent implements HandlerFactory, IAgentControl { } } } - + public class StartupTask extends TimerTask { - protected Link _link; + protected Link _link; protected volatile boolean cancelled = false; public StartupTask(final Link link) { - s_logger.debug("Startup task created"); + s_logger.debug("Startup task created"); _link = link; } @Override public synchronized boolean cancel() { - // TimerTask.cancel may fail depends on the calling context - if (!cancelled) { - cancelled = true; - s_logger.debug("Startup task cancelled"); - return super.cancel(); - } - return true; + // TimerTask.cancel may fail depends on the calling context + if (!cancelled) { + cancelled = true; + s_logger.debug("Startup task cancelled"); + return super.cancel(); + } + return true; } @Override public synchronized void run() { - if(!cancelled) { - if(s_logger.isInfoEnabled()) { - s_logger.info("The startup command is now cancelled"); - } - cancelled = true; - _startup = null; - reconnect(_link); - } + if (!cancelled) { + if (s_logger.isInfoEnabled()) { + s_logger.info("The startup command is now cancelled"); + } + cancelled = true; + _startup = null; + reconnect(_link); + } } } - + public class ServerHandler extends Task { public ServerHandler(Task.Type type, Link link, byte[] data) { super(type, link, data); } - + public ServerHandler(Task.Type type, Link link, Request req) { super(type, link, req); } @@ -803,7 +783,7 @@ public class Agent implements HandlerFactory, IAgentControl { try { request = Request.parse(task.getData()); if (request instanceof Response) { - processResponse((Response)request, task.getLink()); + processResponse((Response) request, task.getLink()); } else { processRequest(request, task.getLink()); } diff --git a/api/src/com/cloud/agent/api/Command.java b/api/src/com/cloud/agent/api/Command.java index 1d83c7b6ee4..544e6c2dee1 100755 --- a/api/src/com/cloud/agent/api/Command.java +++ b/api/src/com/cloud/agent/api/Command.java @@ -20,35 +20,35 @@ package com.cloud.agent.api; import java.util.HashMap; import java.util.Map; +import com.cloud.agent.api.LogLevel.Log4jLevel; /** - * Command is a command that is sent between the management agent and management - * server. Parameter and Command are loosely connected. The protocol layer does - * not care what parameter is carried with which command. That tie in is made at - * a higher level than here. + * All communications between the agent and the management server must be + * implemented by classes that extends the Command class. Command specifies + * all of the methods that needs to be implemented by the children classes. * - * Parameter names can only be 4 characters long and is checked with an assert. - * The value of the parameter is basically an arbitrary length byte array. */ +@LogLevel(level = Log4jLevel.Debug) public abstract class Command { - // allow command to carry over hypervisor or other environment related context info + // allow command to carry over hypervisor or other environment related context info protected Map contextMap = new HashMap(); - + protected Command() { } - + @Override - public String toString() { - return this.getClass().getSimpleName(); + public final String toString() { + return this.getClass().getName(); } - + + /** + * @return Does this command need to be executed in sequence on the agent? + * When this is set to true, the commands are executed by a single + * thread on the agent. + */ public abstract boolean executeInSequence(); - - public boolean logTrace() { - return false; - } - + public void setContextParam(String name, String value) { contextMap.put(name, value); } @@ -56,9 +56,12 @@ public abstract class Command { public String getContextParam(String name) { return contextMap.get(name); } - - public boolean doNotLogCommandParams(){ - return false; + + public boolean logTrace() { + return false; } + public boolean doNotLogCommandParams() { + return false; + } } diff --git a/api/src/com/cloud/agent/api/SecurityIngressRuleAnswer.java b/api/src/com/cloud/agent/api/SecurityIngressRuleAnswer.java index e53519d7fce..b56048bb28d 100644 --- a/api/src/com/cloud/agent/api/SecurityIngressRuleAnswer.java +++ b/api/src/com/cloud/agent/api/SecurityIngressRuleAnswer.java @@ -44,10 +44,4 @@ public class SecurityIngressRuleAnswer extends Answer { return vmId; } - @Override - public String toString() { - return "[NWGRPRuleAns: vmId=" + vmId + ", seqno=" + logSequenceNumber+"]"; - } - - } diff --git a/core/src/com/cloud/agent/transport/Request.java b/core/src/com/cloud/agent/transport/Request.java index b340687f9e3..2a7c8967cbc 100755 --- a/core/src/com/cloud/agent/transport/Request.java +++ b/core/src/com/cloud/agent/transport/Request.java @@ -32,8 +32,6 @@ import com.cloud.exception.UnsupportedVersionException; import com.cloud.utils.NumbersUtil; import com.cloud.utils.Pair; import com.cloud.utils.exception.CloudRuntimeException; -import com.google.gson.ExclusionStrategy; -import com.google.gson.FieldAttributes; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonArray; @@ -46,24 +44,22 @@ import com.google.gson.JsonSerializationContext; import com.google.gson.JsonSerializer; import com.google.gson.reflect.TypeToken; -import edu.emory.mathcs.backport.java.util.Arrays; - /** * Request is a simple wrapper around command and answer to add sequencing, * versioning, and flags. Note that the version here represents the changes - * in the over the wire protocol. For example, if we decide to not use Gson. - * It does not version the changes in the actual commands. That's expected + * in the over the wire protocol. For example, if we decide to not use Gson. + * It does not version the changes in the actual commands. That's expected * to be done by adding new classes to the command and answer list. - * + * * A request looks as follows: - * 1. Version - 1 byte; - * 2. Flags - 3 bytes; - * 3. Sequence - 8 bytes; - * 4. Length - 4 bytes; - * 5. ManagementServerId - 8 bytes; - * 6. AgentId - 8 bytes; - * 7. Data Package. - * + * 1. Version - 1 byte; + * 2. Flags - 3 bytes; + * 3. Sequence - 8 bytes; + * 4. Length - 4 bytes; + * 5. ManagementServerId - 8 bytes; + * 6. AgentId - 8 bytes; + * 7. Data Package. + * */ public class Request { private static final Logger s_logger = Logger.getLogger(Request.class); @@ -83,13 +79,13 @@ public class Request { } }; - protected static final short FLAG_RESPONSE = 0x0; - protected static final short FLAG_REQUEST = 0x1; - protected static final short FLAG_STOP_ON_ERROR = 0x2; - protected static final short FLAG_IN_SEQUENCE = 0x4; - protected static final short FLAG_REVERT_ON_ERROR = 0x8; - protected static final short FLAG_FROM_SERVER = 0x20; - protected static final short FLAG_CONTROL = 0x40; + protected static final short FLAG_RESPONSE = 0x0; + protected static final short FLAG_REQUEST = 0x1; + protected static final short FLAG_STOP_ON_ERROR = 0x2; + protected static final short FLAG_IN_SEQUENCE = 0x4; + protected static final short FLAG_REVERT_ON_ERROR = 0x8; + protected static final short FLAG_FROM_SERVER = 0x20; + protected static final short FLAG_CONTROL = 0x40; protected static final GsonBuilder s_gBuilder; @@ -98,26 +94,28 @@ public class Request { setDefaultGsonConfig(s_gBuilder); s_logger.info("Default Builder inited."); } - - public static void setDefaultGsonConfig(GsonBuilder builder){ - builder.registerTypeAdapter(Command[].class, new ArrayTypeAdaptor()); - builder.registerTypeAdapter(Answer[].class, new ArrayTypeAdaptor()); - builder.registerTypeAdapter(new TypeToken>() {}.getType(), new PortConfigListTypeAdaptor()); - builder.registerTypeAdapter(new TypeToken>() {}.getType(), new NwGroupsCommandTypeAdaptor()); + + public static void setDefaultGsonConfig(GsonBuilder builder) { + builder.registerTypeAdapter(Command[].class, new ArrayTypeAdaptor()); + builder.registerTypeAdapter(Answer[].class, new ArrayTypeAdaptor()); + builder.registerTypeAdapter(new TypeToken>() { + }.getType(), new PortConfigListTypeAdaptor()); + builder.registerTypeAdapter(new TypeToken>() { + }.getType(), new NwGroupsCommandTypeAdaptor()); } public static GsonBuilder initBuilder() { return s_gBuilder; } - - protected Version _ver; - protected long _seq; - protected short _flags; - protected long _mgmtId; - protected long _agentId; - protected Command[] _cmds; - protected String _content; + protected Version _ver; + protected long _session; + protected long _seq; + protected short _flags; + protected long _mgmtId; + protected long _agentId; + protected Command[] _cmds; + protected String _content; protected Request() { } @@ -131,23 +129,22 @@ public class Request { _mgmtId = mgmtId; setInSequence(cmds); } - + protected Request(Version ver, long seq, long agentId, long mgmtId, short flags, final String content) { - this(ver, seq, agentId, mgmtId, flags, (Command[])null); + this(ver, seq, agentId, mgmtId, flags, (Command[]) null); _content = content; } - + public Request(long seq, long agentId, long mgmtId, final Command command, boolean fromServer) { - this(seq, agentId, mgmtId, new Command[] {command}, true, fromServer, true); + this(seq, agentId, mgmtId, new Command[] { command }, true, fromServer); } - - public Request(long seq, long agentId, long mgmtId, Command[] cmds, boolean stopOnError, boolean fromServer, boolean revert) { - this(Version.v3, seq, agentId, mgmtId, (short)0, cmds); + + public Request(long seq, long agentId, long mgmtId, Command[] cmds, boolean stopOnError, boolean fromServer) { + this(Version.v3, seq, agentId, mgmtId, (short) 0, cmds); setStopOnError(stopOnError); setFromServer(fromServer); - setRevertOnError(revert); } - + protected void setInSequence(Command[] cmds) { if (cmds == null) { return; @@ -159,7 +156,7 @@ public class Request { } } } - + protected Request(final Request that, final Command[] cmds) { this._ver = that._ver; this._seq = that._seq; @@ -170,36 +167,35 @@ public class Request { this._agentId = that._agentId; setFromServer(!that.isFromServer()); } - + private final void setStopOnError(boolean stopOnError) { _flags |= (stopOnError ? FLAG_STOP_ON_ERROR : 0); } - + private final void setInSequence(boolean inSequence) { _flags |= (inSequence ? FLAG_IN_SEQUENCE : 0); } - - + public boolean isControl() { - return (_flags & FLAG_CONTROL) > 0; + return (_flags & FLAG_CONTROL) > 0; } - + public void setControl(boolean control) { _flags |= (control ? FLAG_CONTROL : 0); } - + public boolean revertOnError() { return (_flags & FLAG_CONTROL) > 0; } - + private final void setRevertOnError(boolean revertOnError) { _flags |= (revertOnError ? FLAG_REVERT_ON_ERROR : 0); } - + private final void setFromServer(boolean fromServer) { _flags |= (fromServer ? FLAG_FROM_SERVER : 0); } - + public long getManagementServerId() { return _mgmtId; } @@ -207,11 +203,11 @@ public class Request { public boolean isFromServer() { return (_flags & FLAG_FROM_SERVER) > 0; } - + public Version getVersion() { return _ver; } - + public void setAgentId(long agentId) { _agentId = agentId; } @@ -229,16 +225,16 @@ public class Request { } public Command getCommand() { - getCommands(); + getCommands(); return _cmds[0]; } public Command[] getCommands() { - if (_cmds == null) { + if (_cmds == null) { final Gson json = s_gBuilder.create(); - _cmds = json.fromJson(_content, Command[].class); - } - return _cmds; + _cmds = json.fromJson(_content, Command[].class); + } + return _cmds; } /** @@ -248,20 +244,20 @@ public class Request { public String toString() { String content = _content; if (content == null) { - final Gson gson = s_gBuilder.create(); + final Gson gson = s_gBuilder.create(); try { - content = gson.toJson(_cmds); - } catch(Throwable e) { - s_logger.error("Gson serialization error on Request.toString() " + getClass().getCanonicalName(), e); + content = gson.toJson(_cmds); + } catch (Throwable e) { + s_logger.error("Gson serialization error on Request.toString() " + getClass().getCanonicalName(), e); } } final StringBuilder buffer = new StringBuilder(); buffer.append("{ ").append(getType()); - buffer.append(", Seq: ").append(_seq).append(", Ver: ").append(_ver.toString()).append(", MgmtId: ").append(_mgmtId).append(", AgentId: ").append(_agentId).append(", Flags: ").append(Integer.toBinaryString(getFlags())); + buffer.append(", Seq: ").append(_seq).append(", Ver: ").append(_ver.toString()).append(", MgmtId: ").append(_mgmtId).append(", AgentId: ").append(_agentId).append(", Flags: ") + .append(Integer.toBinaryString(getFlags())); buffer.append(", ").append(content).append(" }"); return buffer.toString(); } - protected String getType() { return "Cmd "; @@ -270,7 +266,7 @@ public class Request { protected ByteBuffer serializeHeader(final int contentSize) { final ByteBuffer buffer = ByteBuffer.allocate(32); buffer.put(getVersionInByte()); - buffer.put((byte)0); + buffer.put((byte) 0); buffer.putShort(getFlags()); buffer.putLong(_seq); buffer.putInt(contentSize); @@ -286,7 +282,7 @@ public class Request { final ByteBuffer[] buffers = new ByteBuffer[2]; if (_content == null) { - _content = gson.toJson(_cmds, _cmds.getClass()); + _content = gson.toJson(_cmds, _cmds.getClass()); } buffers[1] = ByteBuffer.wrap(_content.getBytes()); buffers[0] = serializeHeader(buffers[1].capacity()); @@ -305,18 +301,18 @@ public class Request { } protected byte getVersionInByte() { - return (byte)_ver.ordinal(); + return (byte) _ver.ordinal(); } protected short getFlags() { - return (short)(((this instanceof Response) ? FLAG_RESPONSE : FLAG_REQUEST) | _flags); + return (short) (((this instanceof Response) ? FLAG_RESPONSE : FLAG_REQUEST) | _flags); } - + public void log(long agentId, String msg) { if (!s_logger.isDebugEnabled()) { return; } - + StringBuilder buf = new StringBuilder("Seq "); buf.append(agentId).append("-").append(_seq).append(": "); boolean debug = false; @@ -324,12 +320,12 @@ public class Request { List cmdListTonotLog = new ArrayList(); if (_cmds != null) { for (Command cmd : _cmds) { - if(cmd.doNotLogCommandParams()){ - cmdListTonotLog.add(cmd); - } + if (cmd.doNotLogCommandParams()) { + cmdListTonotLog.add(cmd); + } } } - + if (_cmds != null) { for (Command cmd : _cmds) { if (!cmd.logTrace()) { @@ -340,39 +336,39 @@ public class Request { } else { debug = true; } - + buf.append(msg).append(toString()); - - if(!cmdListTonotLog.isEmpty()){ - removeCmdContentFromLog(cmdListTonotLog, buf); + + if (!cmdListTonotLog.isEmpty()) { + removeCmdContentFromLog(cmdListTonotLog, buf); } - + if (executeInSequence() || debug) { s_logger.debug(buf.toString()); } else { s_logger.trace(buf.toString()); } } - - private void removeCmdContentFromLog(List cmdListTonotLog, StringBuilder buf){ - for (Command cmd : cmdListTonotLog){ - int cmdNameIndex = buf.indexOf(cmd.toString()); - if(cmdNameIndex != -1){ - int colonIndex = buf.indexOf(":", cmdNameIndex); - int cmdEndIndex = buf.indexOf("]", cmdNameIndex); - if(colonIndex != -1 && cmdEndIndex != -1){ - buf.replace(colonIndex+1, cmdEndIndex, "{}}"); - } - } - } + + private void removeCmdContentFromLog(List cmdListTonotLog, StringBuilder buf) { + for (Command cmd : cmdListTonotLog) { + int cmdNameIndex = buf.indexOf(cmd.toString()); + if (cmdNameIndex != -1) { + int colonIndex = buf.indexOf(":", cmdNameIndex); + int cmdEndIndex = buf.indexOf("]", cmdNameIndex); + if (colonIndex != -1 && cmdEndIndex != -1) { + buf.replace(colonIndex + 1, cmdEndIndex, "{}}"); + } + } + } } - + /** - * Factory method for Request and Response. It expects the bytes to be + * Factory method for Request and Response. It expects the bytes to be * correctly formed so it's possible that it throws underflow exceptions * but you shouldn't be concerned about that since that all bytes sent in * should already be formatted correctly. - * + * * @param bytes bytes to be converted. * @return Request or Response depending on the data. * @throws ClassNotFoundException if the Command or Answer can not be formed. @@ -415,74 +411,71 @@ public class Request { } public long getAgentId() { - return _agentId; + return _agentId; } - + public static boolean requiresSequentialExecution(final byte[] bytes) { return (bytes[3] & FLAG_IN_SEQUENCE) > 0; } - + public static Version getVersion(final byte[] bytes) throws UnsupportedVersionException { - try { - return Version.get(bytes[0]); - } catch (UnsupportedVersionException e) { - throw new CloudRuntimeException("Unsupported version: " + bytes[0]); - } + try { + return Version.get(bytes[0]); + } catch (UnsupportedVersionException e) { + throw new CloudRuntimeException("Unsupported version: " + bytes[0]); + } } - + public static long getManagementServerId(final byte[] bytes) { - return NumbersUtil.bytesToLong(bytes, 16); + return NumbersUtil.bytesToLong(bytes, 16); } - + public static long getAgentId(final byte[] bytes) { - return NumbersUtil.bytesToLong(bytes, 24); + return NumbersUtil.bytesToLong(bytes, 24); } - + public static boolean fromServer(final byte[] bytes) { - return (bytes[3] & FLAG_FROM_SERVER) > 0; + return (bytes[3] & FLAG_FROM_SERVER) > 0; } - + public static boolean isRequest(final byte[] bytes) { - return (bytes[3] & FLAG_REQUEST) > 0; + return (bytes[3] & FLAG_REQUEST) > 0; } - + public static long getSequence(final byte[] bytes) { - return NumbersUtil.bytesToLong(bytes, 4); + return NumbersUtil.bytesToLong(bytes, 4); } - + public static boolean isControl(final byte[] bytes) { return (bytes[3] & FLAG_CONTROL) > 0; } - - public static class NwGroupsCommandTypeAdaptor implements JsonDeserializer>, JsonSerializer> { + + public static class NwGroupsCommandTypeAdaptor implements JsonDeserializer>, JsonSerializer> { public NwGroupsCommandTypeAdaptor() { } - + @Override - public JsonElement serialize(Pair src, - java.lang.reflect.Type typeOfSrc, JsonSerializationContext context) { + public JsonElement serialize(Pair src, java.lang.reflect.Type typeOfSrc, JsonSerializationContext context) { JsonArray array = new JsonArray(); Gson json = s_gBuilder.create(); - if(src.first() != null) { + if (src.first() != null) { array.add(json.toJsonTree(src.first())); } else { array.add(new JsonNull()); } - + if (src.second() != null) { array.add(json.toJsonTree(src.second())); } else { array.add(new JsonNull()); } - + return array; } @Override - public Pair deserialize(JsonElement json, - java.lang.reflect.Type type, JsonDeserializationContext context) - throws JsonParseException { + public Pair deserialize(JsonElement json, java.lang.reflect.Type type, JsonDeserializationContext context) throws JsonParseException { Pair pairs = new Pair(null, null); JsonArray array = json.getAsJsonArray(); if (array.size() != 2) { @@ -500,9 +493,9 @@ public class Request { return pairs; } - + } - + public static class PortConfigListTypeAdaptor implements JsonDeserializer>, JsonSerializer> { public PortConfigListTypeAdaptor() { @@ -525,8 +518,7 @@ public class Request { } @Override - public List deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) - throws JsonParseException { + public List deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException { if (json.isJsonNull()) { return new ArrayList(); } diff --git a/server/src/com/cloud/agent/manager/AgentManagerImpl.java b/server/src/com/cloud/agent/manager/AgentManagerImpl.java index 14bef758b10..3910bf405d9 100755 --- a/server/src/com/cloud/agent/manager/AgentManagerImpl.java +++ b/server/src/com/cloud/agent/manager/AgentManagerImpl.java @@ -1354,7 +1354,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, ResourceS } long seq = _hostDao.getNextSequence(hostId); - Request req = new Request(seq, hostId, _nodeId, cmds, commands.stopOnError(), true, false); + Request req = new Request(seq, hostId, _nodeId, cmds, commands.stopOnError(), true); Answer[] answers = agent.send(req, timeout); notifyAnswersToMonitors(hostId, seq, answers); commands.setAnswers(answers); @@ -1415,7 +1415,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, ResourceS return -1; } long seq = _hostDao.getNextSequence(hostId); - Request req = new Request(seq, hostId, _nodeId, cmds, commands.stopOnError(), true, false); + Request req = new Request(seq, hostId, _nodeId, cmds, commands.stopOnError(), true); agent.send(req, listener); return seq; } @@ -1791,7 +1791,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, ResourceS } if (s_logger.isDebugEnabled()) { - new Request(0l, -1l, -1l, cmds, true, false, true).log(-1, "Startup request from directly connected host: "); + new Request(0l, -1l, -1l, cmds, true, false).log(-1, "Startup request from directly connected host: "); // s_logger.debug("Startup request from directly connected host: " // + new Request(0l, -1l, -1l, cmds, true, false, true) // .toString());