Added some annotations for logging commands

This commit is contained in:
Alex Huang 2011-05-16 09:28:51 -07:00
parent 3bf5a181c0
commit ab49c70add
5 changed files with 413 additions and 444 deletions

View File

@ -23,7 +23,6 @@ import java.io.StringWriter;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -64,20 +63,20 @@ import com.cloud.utils.script.Script;
/**
* @config
* {@table
* || Param Name | Description | Values | Default ||
* || type | Type of server | Storage / Computing / Routing | No Default ||
* || workers | # of workers to process the requests | int | 1 ||
* || host | host to connect to | ip address | localhost ||
* || port | port to connect to | port number | 8250 ||
* || instance | Used to allow multiple agents running on the same host | String | none ||
* }
*
* For more configuration options, see the individual types.
*
* {@table
* || Param Name | Description | Values | Default ||
* || type | Type of server | Storage / Computing / Routing | No Default ||
* || workers | # of workers to process the requests | int | 1 ||
* || host | host to connect to | ip address | localhost ||
* || port | port to connect to | port number | 8250 ||
* || instance | Used to allow multiple agents running on the same host | String | none || * }
*
* For more configuration options, see the individual types.
*
**/
public class Agent implements HandlerFactory, IAgentControl {
private static final Logger s_logger = Logger.getLogger(Agent.class.getName());
private static final Logger s_logger = Logger.getLogger(Agent.class.getName());
public enum ExitStatus {
Normal(0), // Normal status = 0.
Upgrade(65), // Exiting for upgrade.
@ -85,6 +84,7 @@ public class Agent implements HandlerFactory, IAgentControl {
Error(67); // Exiting because of error.
int value;
ExitStatus(final int value) {
this.value = value;
}
@ -94,78 +94,66 @@ public class Agent implements HandlerFactory, IAgentControl {
}
}
List<IAgentControlListener> _controlListeners = new ArrayList<IAgentControlListener>();
IAgentShell _shell;
NioConnection _connection;
ServerResource _resource;
Link _link;
Long _id;
Timer _timer = new Timer("Agent Timer");
List<WatchTask> _watchList = new ArrayList<WatchTask>();
long _sequence = 0;
long _lastPingResponseTime = 0;
long _pingInterval = 0;
AtomicInteger _inProgress = new AtomicInteger();
StartupTask _startup = null;
boolean _reconnectAllowed = true;
List<IAgentControlListener> _controlListeners = new ArrayList<IAgentControlListener>();
IAgentShell _shell;
NioConnection _connection;
ServerResource _resource;
Link _link;
Long _id;
Timer _timer = new Timer("Agent Timer");
List<WatchTask> _watchList = new ArrayList<WatchTask>();
long _sequence = 0;
long _lastPingResponseTime = 0;
long _pingInterval = 0;
AtomicInteger _inProgress = new AtomicInteger();
StartupTask _startup = null;
boolean _reconnectAllowed = true;
// for simulator use only
public Agent(IAgentShell shell) {
_shell = shell;
_shell = shell;
_link = null;
_connection = new NioClient(
"Agent",
_shell.getHost(),
_shell.getPort(),
_shell.getWorkers(),
this);
_connection = new NioClient("Agent", _shell.getHost(), _shell.getPort(), _shell.getWorkers(), this);
Runtime.getRuntime().addShutdownHook(new ShutdownThread(this));
}
public Agent(IAgentShell shell, int localAgentId, ServerResource resource) throws ConfigurationException {
_shell = shell;
_resource = resource;
_shell = shell;
_resource = resource;
_link = null;
resource.setAgentControl(this);
String value = _shell.getPersistentProperty(getResourceName(), "id");
_id = value != null ? Long.parseLong(value) : null;
s_logger.info("id is " + ((_id != null) ? _id : ""));
final Map<String, Object> params = PropertiesUtil.toMap(_shell.getProperties());
// merge with properties from command line to let resource access command line parameters
for(Map.Entry<String, Object> cmdLineProp : _shell.getCmdLineProperties().entrySet()) {
params.put(cmdLineProp.getKey(), cmdLineProp.getValue());
for (Map.Entry<String, Object> cmdLineProp : _shell.getCmdLineProperties().entrySet()) {
params.put(cmdLineProp.getKey(), cmdLineProp.getValue());
}
if (!_resource.configure(getResourceName(), params)) {
throw new ConfigurationException("Unable to configure " + _resource.getName());
}
_connection = new NioClient(
"Agent",
_shell.getHost(),
_shell.getPort(),
_shell.getWorkers(),
this);
_connection = new NioClient("Agent", _shell.getHost(), _shell.getPort(), _shell.getWorkers(), this);
// ((NioClient)_connection).setBindAddress(_shell.getPrivateIp());
s_logger.debug("Adding shutdown hook");
Runtime.getRuntime().addShutdownHook(new ShutdownThread(this));
s_logger.info("Agent [id = " + (_id != null ? _id : "new") + " : type = " + getResourceName()
+ " : zone = " + _shell.getZone() + " : pod = " + _shell.getPod()
+ " : workers = " + _shell.getWorkers() + " : host = " + _shell.getHost()
+ " : port = " + _shell.getPort());
s_logger.info("Agent [id = " + (_id != null ? _id : "new") + " : type = " + getResourceName() + " : zone = " + _shell.getZone() + " : pod = " + _shell.getPod() + " : workers = "
+ _shell.getWorkers() + " : host = " + _shell.getHost() + " : port = " + _shell.getPort());
}
public String getVersion() {
@ -173,8 +161,8 @@ public class Agent implements HandlerFactory, IAgentControl {
}
public String getResourceGuid() {
String guid = _shell.getGuid();
return guid + "-" + getResourceName();
String guid = _shell.getGuid();
return guid + "-" + getResourceName();
}
public String getZone() {
@ -196,29 +184,29 @@ public class Agent implements HandlerFactory, IAgentControl {
public BackoffAlgorithm getBackoffAlgorithm() {
return _shell.getBackoffAlgorithm();
}
public String getResourceName() {
return _resource.getClass().getSimpleName();
return _resource.getClass().getSimpleName();
}
public void upgradeAgent(final String url, boolean protocol) {
// shell needs to take care of synchronization when multiple-instances demand upgrade
// at the same time
_shell.upgradeAgent(url);
// To stop agent after it has been upgraded, as shell executor may prematurely time out
// tasks if agent is in shutting down process
if (protocol) {
if (_connection != null) {
_connection.stop();
_connection = null;
}
if (_resource != null) {
_resource.stop();
_resource = null;
}
if (_connection != null) {
_connection.stop();
_connection = null;
}
if (_resource != null) {
_resource.stop();
_resource = null;
}
} else {
stop(ShutdownCommand.Update, null);
stop(ShutdownCommand.Update, null);
}
}
@ -236,19 +224,19 @@ public class Agent implements HandlerFactory, IAgentControl {
final ShutdownCommand cmd = new ShutdownCommand(reason, detail);
try {
if (_link != null) {
Request req = new Request(0, (_id != null? _id : -1), -1, cmd, false);
Request req = new Request(0, (_id != null ? _id : -1), -1, cmd, false);
_link.send(req.toBytes());
}
} catch (final ClosedChannelException e) {
s_logger.warn("Unable to send: " + cmd.toString());
} catch(Exception e) {
} catch (Exception e) {
s_logger.warn("Unable to send: " + cmd.toString() + " due to exception: ", e);
}
s_logger.debug("Sending shutdown to management server");
try {
Thread.sleep(1000);
Thread.sleep(1000);
} catch (final InterruptedException e) {
s_logger.debug("Who the heck interrupted me here?");
s_logger.debug("Who the heck interrupted me here?");
}
_connection.stop();
_connection = null;
@ -265,11 +253,11 @@ public class Agent implements HandlerFactory, IAgentControl {
}
public void setId(final Long id) {
s_logger.info("Set agent id " + id);
s_logger.info("Set agent id " + id);
_id = id;
_shell.setPersistentProperty(getResourceName(), "id", Long.toString(id));
}
public void scheduleWatch(final Link link, final Request request, final long delay, final long period) {
synchronized (_watchList) {
if (s_logger.isDebugEnabled()) {
@ -280,9 +268,9 @@ public class Agent implements HandlerFactory, IAgentControl {
_watchList.add(task);
}
}
protected void cancelTasks() {
synchronized(_watchList) {
synchronized (_watchList) {
for (final WatchTask task : _watchList) {
task.cancel();
}
@ -292,23 +280,23 @@ public class Agent implements HandlerFactory, IAgentControl {
_watchList.clear();
}
}
public void sendStartup(Link link) {
final StartupCommand[] startup = _resource.initialize();
final Command[] commands = new Command[startup.length];
for (int i=0; i< startup.length; i++){
setupStartupCommand(startup[i]);
commands[i] = startup[i];
for (int i = 0; i < startup.length; i++) {
setupStartupCommand(startup[i]);
commands[i] = startup[i];
}
final Request request = new Request(getNextSequence(), _id != null ? _id : -1, -1, commands, false, false, false);
final Request request = new Request(getNextSequence(), _id != null ? _id : -1, -1, commands, false, false);
if (s_logger.isDebugEnabled()) {
s_logger.debug("Sending Startup: " + request.toString());
}
synchronized(this) {
_startup = new StartupTask(link);
_timer.schedule(_startup, 180000);
synchronized (this) {
_startup = new StartupTask(link);
_timer.schedule(_startup, 180000);
}
try {
link.send(request.toBytes());
@ -316,46 +304,47 @@ public class Agent implements HandlerFactory, IAgentControl {
s_logger.warn("Unable to send reques: " + request.toString());
}
}
protected void setupStartupCommand(StartupCommand startup) {
InetAddress addr;
try {
addr = InetAddress.getLocalHost();
} catch (final UnknownHostException e) {
s_logger.warn("unknow host? ", e);
//ignore
// ignore
return;
}
final Script command = new Script("hostname", 500, s_logger);
final OutputInterpreter.OneLineParser parser = new OutputInterpreter.OneLineParser();
final String result = command.execute(parser);
final String hostname = result == null ? parser.getLine() : addr.toString();
startup.setId(getId());
if (startup.getName() == null)
startup.setName(hostname);
if (startup.getName() == null) {
startup.setName(hostname);
}
startup.setDataCenter(getZone());
startup.setPod(getPod());
startup.setGuid(getResourceGuid());
startup.setResourceName(getResourceName());
startup.setVersion(getVersion());
}
@Override
public Task create(Task.Type type, Link link, byte[] data) {
return new ServerHandler(type, link, data);
}
protected void reconnect(final Link link) {
if (!_reconnectAllowed) {
return;
}
synchronized(this) {
if (_startup != null) {
_startup.cancel();
_startup= null;
}
synchronized (this) {
if (_startup != null) {
_startup.cancel();
_startup = null;
}
}
link.close();
@ -363,67 +352,62 @@ public class Agent implements HandlerFactory, IAgentControl {
setLink(null);
cancelTasks();
_resource.disconnected();
int inProgress = 0;
do {
_shell.getBackoffAlgorithm().waitBeforeRetry();
s_logger.info("Lost connection to the server. Dealing with the remaining commands...");
inProgress = _inProgress.get();
if (inProgress > 0) {
s_logger.info("Cannot connect because we still have " + inProgress + " commands in progress.");
s_logger.info("Cannot connect because we still have " + inProgress + " commands in progress.");
}
} while (inProgress > 0);
_connection.stop();
while (_connection.isStartup()){
while (_connection.isStartup()) {
_shell.getBackoffAlgorithm().waitBeforeRetry();
}
try {
_connection.cleanUp();
} catch (IOException e) {
s_logger.warn("Fail to clean up old connection. " + e);
s_logger.warn("Fail to clean up old connection. " + e);
}
_connection = new NioClient(
"Agent",
_shell.getHost(),
_shell.getPort(),
_shell.getWorkers(),
this);
_connection = new NioClient("Agent", _shell.getHost(), _shell.getPort(), _shell.getWorkers(), this);
do {
s_logger.info("Reconnecting...");
_connection.start();
s_logger.info("Reconnecting...");
_connection.start();
_shell.getBackoffAlgorithm().waitBeforeRetry();
} while (!_connection.isStartup());
}
public void processStartupAnswer(Answer answer, Response response, Link link) {
boolean cancelled = false;
synchronized(this) {
boolean cancelled = false;
synchronized (this) {
if (_startup != null) {
_startup.cancel();
_startup = null;
} else {
cancelled = true;
cancelled = true;
}
}
final StartupAnswer startup = (StartupAnswer)answer;
final StartupAnswer startup = (StartupAnswer) answer;
if (!startup.getResult()) {
s_logger.error("Not allowed to connect to the server: " + answer.getDetails());
System.exit(1);
}
if (cancelled) {
s_logger.warn("Threw away a startup answer because we're reconnecting.");
return;
return;
}
s_logger.info("Proccess agent startup answer, agent id = " + startup.getHostId());
setId(startup.getHostId());
_pingInterval = startup.getPingInterval() * 1000; // change to ms.
@ -431,73 +415,68 @@ public class Agent implements HandlerFactory, IAgentControl {
scheduleWatch(link, response, _pingInterval, _pingInterval);
s_logger.info("Startup Response Received: agent id = " + getId());
}
protected void processRequest(final Request request, final Link link) {
boolean requestLogged = false;
boolean requestLogged = false;
Response response = null;
try {
final Command[] cmds = request.getCommands();
final Answer[] answers = new Answer[cmds.length];
for (int i = 0; i < cmds.length; i++)
{
for (int i = 0; i < cmds.length; i++) {
final Command cmd = cmds[i];
Answer answer;
try
{
if (s_logger.isDebugEnabled())
{
//this is a hack to make sure we do NOT log the ssh keys
if((cmd instanceof ModifySshKeysCommand))
{
s_logger.debug("Received the request for command: ModifySshKeysCommand");
}
else
{
if(!requestLogged) //ensures request is logged only once per method call
{
s_logger.debug("Request:" + request.toString());
requestLogged = true;
}
}
try {
if (s_logger.isDebugEnabled()) {
// this is a hack to make sure we do NOT log the ssh keys
if ((cmd instanceof ModifySshKeysCommand)) {
s_logger.debug("Received the request for command: ModifySshKeysCommand");
} else {
if (!requestLogged) // ensures request is logged only once per method call
{
s_logger.debug("Request:" + request.toString());
requestLogged = true;
}
}
s_logger.debug("Processing command: " + cmd.toString());
}
if (cmd instanceof CronCommand) {
final CronCommand watch = (CronCommand)cmd;
final CronCommand watch = (CronCommand) cmd;
scheduleWatch(link, request, watch.getInterval() * 1000, watch.getInterval() * 1000);
answer = new Answer(cmd, true, null);
} else if (cmd instanceof UpgradeCommand) {
final UpgradeCommand upgrade = (UpgradeCommand)cmd;
final UpgradeCommand upgrade = (UpgradeCommand) cmd;
answer = upgradeAgent(upgrade.getUpgradeUrl(), upgrade);
} else if (cmd instanceof ShutdownCommand) {
ShutdownCommand shutdown = (ShutdownCommand)cmd;
ShutdownCommand shutdown = (ShutdownCommand) cmd;
s_logger.debug("Received shutdownCommand, due to: " + shutdown.getReason());
cancelTasks();
_reconnectAllowed = false;
answer = new Answer(cmd, true, null);
} else if(cmd instanceof AgentControlCommand) {
answer = null;
synchronized(_controlListeners) {
for(IAgentControlListener listener: _controlListeners) {
answer = listener.processControlRequest(request, (AgentControlCommand)cmd);
if(answer != null)
break;
}
}
if(answer == null) {
s_logger.warn("No handler found to process cmd: " + cmd.toString());
answer = new AgentControlAnswer(cmd);
}
} else if (cmd instanceof AgentControlCommand) {
answer = null;
synchronized (_controlListeners) {
for (IAgentControlListener listener : _controlListeners) {
answer = listener.processControlRequest(request, (AgentControlCommand) cmd);
if (answer != null) {
break;
}
}
}
if (answer == null) {
s_logger.warn("No handler found to process cmd: " + cmd.toString());
answer = new AgentControlAnswer(cmd);
}
} else {
_inProgress.incrementAndGet();
try {
answer = _resource.executeRequest(cmd);
answer = _resource.executeRequest(cmd);
} finally {
_inProgress.decrementAndGet();
_inProgress.decrementAndGet();
}
if (answer == null) {
s_logger.debug("Response: unsupported command" + cmd.toString());
@ -510,7 +489,7 @@ public class Agent implements HandlerFactory, IAgentControl {
th.printStackTrace(new PrintWriter(writer));
answer = new Answer(cmd, false, writer.toString());
}
answers[i] = answer;
if (!answer.getResult() && request.stopOnError()) {
for (i++; i < cmds.length; i++) {
@ -534,26 +513,26 @@ public class Agent implements HandlerFactory, IAgentControl {
}
}
}
public void processResponse(final Response response, final Link link) {
final Answer answer = response.getAnswer();
if (s_logger.isDebugEnabled()) {
s_logger.debug("Received response: " + response.toString());
}
if (answer instanceof StartupAnswer) {
processStartupAnswer(answer, response, link);
} else if(answer instanceof AgentControlAnswer) {
// Notice, we are doing callback while holding a lock!
synchronized(_controlListeners) {
for(IAgentControlListener listener : _controlListeners) {
listener.processControlResponse(response, (AgentControlAnswer)answer);
}
}
processStartupAnswer(answer, response, link);
} else if (answer instanceof AgentControlAnswer) {
// Notice, we are doing callback while holding a lock!
synchronized (_controlListeners) {
for (IAgentControlListener listener : _controlListeners) {
listener.processControlResponse(response, (AgentControlAnswer) answer);
}
}
} else {
setLastPingResponseTime();
}
}
public void processOtherTask(Task task) {
final Object obj = task.get();
if (obj instanceof Response) {
@ -575,15 +554,15 @@ public class Agent implements HandlerFactory, IAgentControl {
} catch (final ClosedChannelException e) {
s_logger.warn("Unable to send request: " + request.toString());
}
} else if (obj instanceof Request){
final Request req = (Request)obj;
} else if (obj instanceof Request) {
final Request req = (Request) obj;
final Command command = req.getCommand();
Answer answer = null;
_inProgress.incrementAndGet();
try {
answer = _resource.executeRequest(command);
answer = _resource.executeRequest(command);
} finally {
_inProgress.decrementAndGet();
_inProgress.decrementAndGet();
}
if (answer != null) {
final Response response = new Response(req, answer);
@ -601,12 +580,12 @@ public class Agent implements HandlerFactory, IAgentControl {
s_logger.warn("Ignoring an unknown task");
}
}
protected UpgradeAnswer upgradeAgent(final String url, final UpgradeCommand cmd) {
try {
upgradeAgent(url, cmd == null);
return null;
} catch(final Exception e) {
} catch (final Exception e) {
s_logger.error("Unable to run this agent because we couldn't complete the upgrade process.", e);
if (cmd != null) {
final StringWriter writer = new StringWriter();
@ -624,112 +603,113 @@ public class Agent implements HandlerFactory, IAgentControl {
public synchronized void setLastPingResponseTime() {
_lastPingResponseTime = System.currentTimeMillis();
}
protected synchronized long getNextSequence() {
return _sequence++;
}
@Override
public void registerControlListener(IAgentControlListener listener) {
synchronized(_controlListeners) {
_controlListeners.add(listener);
}
}
@Override
public void unregisterControlListener(IAgentControlListener listener) {
synchronized(_controlListeners) {
_controlListeners.remove(listener);
}
}
@Override
public AgentControlAnswer sendRequest(AgentControlCommand cmd, int timeoutInMilliseconds) throws AgentControlChannelException {
Request request = new Request(this.getNextSequence(), this.getId(),
-1, new Command[] {cmd}, true, false, false);
AgentControlListener listener = new AgentControlListener(request);
registerControlListener(listener);
try {
postRequest(request);
synchronized(listener) {
try {
listener.wait(timeoutInMilliseconds);
} catch (InterruptedException e) {
s_logger.warn("sendRequest is interrupted, exit waiting");
}
}
return listener.getAnswer();
} finally {
unregisterControlListener(listener);
public void registerControlListener(IAgentControlListener listener) {
synchronized (_controlListeners) {
_controlListeners.add(listener);
}
}
@Override
public void postRequest(AgentControlCommand cmd) throws AgentControlChannelException {
Request request = new Request(this.getNextSequence(), this.getId(),
-1, new Command[] {cmd}, true, false, false);
public void unregisterControlListener(IAgentControlListener listener) {
synchronized (_controlListeners) {
_controlListeners.remove(listener);
}
}
@Override
public AgentControlAnswer sendRequest(AgentControlCommand cmd, int timeoutInMilliseconds) throws AgentControlChannelException {
Request request = new Request(this.getNextSequence(), this.getId(), -1, new Command[] { cmd }, true, false);
AgentControlListener listener = new AgentControlListener(request);
registerControlListener(listener);
try {
postRequest(request);
synchronized (listener) {
try {
listener.wait(timeoutInMilliseconds);
} catch (InterruptedException e) {
s_logger.warn("sendRequest is interrupted, exit waiting");
}
}
return listener.getAnswer();
} finally {
unregisterControlListener(listener);
}
}
@Override
public void postRequest(AgentControlCommand cmd) throws AgentControlChannelException {
Request request = new Request(this.getNextSequence(), this.getId(), -1, new Command[] { cmd }, true, false);
postRequest(request);
}
private void postRequest(Request request) throws AgentControlChannelException {
if(_link != null) {
try {
_link.send(request.toBytes());
} catch (final ClosedChannelException e) {
s_logger.warn("Unable to post agent control reques: " + request.toString());
throw new AgentControlChannelException("Unable to post agent control request due to " + e.getMessage());
}
if (_link != null) {
try {
_link.send(request.toBytes());
} catch (final ClosedChannelException e) {
s_logger.warn("Unable to post agent control reques: " + request.toString());
throw new AgentControlChannelException("Unable to post agent control request due to " + e.getMessage());
}
} else {
throw new AgentControlChannelException("Unable to post agent control request as link is not available");
}
}
public class AgentControlListener implements IAgentControlListener {
private AgentControlAnswer _answer;
private final Request _request;
public AgentControlListener(Request request) {
_request = request;
}
public AgentControlAnswer getAnswer() {
return _answer;
}
@Override
private AgentControlAnswer _answer;
private final Request _request;
public AgentControlListener(Request request) {
_request = request;
}
public AgentControlAnswer getAnswer() {
return _answer;
}
@Override
public Answer processControlRequest(Request request, AgentControlCommand cmd) {
return null;
}
@Override
return null;
}
@Override
public void processControlResponse(Response response, AgentControlAnswer answer) {
if(_request.getSequence() == response.getSequence()) {
_answer = answer;
synchronized(this) {
notifyAll();
}
}
}
if (_request.getSequence() == response.getSequence()) {
_answer = answer;
synchronized (this) {
notifyAll();
}
}
}
}
protected class ShutdownThread extends Thread {
Agent _agent;
public ShutdownThread(final Agent agent) {
super("AgentShutdownThread");
super("AgentShutdownThread");
_agent = agent;
}
@Override
public void run() {
_agent.stop(ShutdownCommand.Requested, null);
}
}
public class WatchTask extends TimerTask {
protected Request _request;
protected Agent _agent;
protected Link _link;
protected Agent _agent;
protected Link _link;
public WatchTask(final Link link, final Request request, final Agent agent) {
super();
_request = request;
@ -749,45 +729,45 @@ public class Agent implements HandlerFactory, IAgentControl {
}
}
}
public class StartupTask extends TimerTask {
protected Link _link;
protected Link _link;
protected volatile boolean cancelled = false;
public StartupTask(final Link link) {
s_logger.debug("Startup task created");
s_logger.debug("Startup task created");
_link = link;
}
@Override
public synchronized boolean cancel() {
// TimerTask.cancel may fail depends on the calling context
if (!cancelled) {
cancelled = true;
s_logger.debug("Startup task cancelled");
return super.cancel();
}
return true;
// TimerTask.cancel may fail depends on the calling context
if (!cancelled) {
cancelled = true;
s_logger.debug("Startup task cancelled");
return super.cancel();
}
return true;
}
@Override
public synchronized void run() {
if(!cancelled) {
if(s_logger.isInfoEnabled()) {
s_logger.info("The startup command is now cancelled");
}
cancelled = true;
_startup = null;
reconnect(_link);
}
if (!cancelled) {
if (s_logger.isInfoEnabled()) {
s_logger.info("The startup command is now cancelled");
}
cancelled = true;
_startup = null;
reconnect(_link);
}
}
}
public class ServerHandler extends Task {
public ServerHandler(Task.Type type, Link link, byte[] data) {
super(type, link, data);
}
public ServerHandler(Task.Type type, Link link, Request req) {
super(type, link, req);
}
@ -803,7 +783,7 @@ public class Agent implements HandlerFactory, IAgentControl {
try {
request = Request.parse(task.getData());
if (request instanceof Response) {
processResponse((Response)request, task.getLink());
processResponse((Response) request, task.getLink());
} else {
processRequest(request, task.getLink());
}

View File

@ -20,35 +20,35 @@ package com.cloud.agent.api;
import java.util.HashMap;
import java.util.Map;
import com.cloud.agent.api.LogLevel.Log4jLevel;
/**
* Command is a command that is sent between the management agent and management
* server. Parameter and Command are loosely connected. The protocol layer does
* not care what parameter is carried with which command. That tie in is made at
* a higher level than here.
* All communications between the agent and the management server must be
* implemented by classes that extends the Command class. Command specifies
* all of the methods that needs to be implemented by the children classes.
*
* Parameter names can only be 4 characters long and is checked with an assert.
* The value of the parameter is basically an arbitrary length byte array.
*/
@LogLevel(level = Log4jLevel.Debug)
public abstract class Command {
// allow command to carry over hypervisor or other environment related context info
// allow command to carry over hypervisor or other environment related context info
protected Map<String, String> contextMap = new HashMap<String, String>();
protected Command() {
}
@Override
public String toString() {
return this.getClass().getSimpleName();
public final String toString() {
return this.getClass().getName();
}
/**
* @return Does this command need to be executed in sequence on the agent?
* When this is set to true, the commands are executed by a single
* thread on the agent.
*/
public abstract boolean executeInSequence();
public boolean logTrace() {
return false;
}
public void setContextParam(String name, String value) {
contextMap.put(name, value);
}
@ -56,9 +56,12 @@ public abstract class Command {
public String getContextParam(String name) {
return contextMap.get(name);
}
public boolean doNotLogCommandParams(){
return false;
public boolean logTrace() {
return false;
}
public boolean doNotLogCommandParams() {
return false;
}
}

View File

@ -44,10 +44,4 @@ public class SecurityIngressRuleAnswer extends Answer {
return vmId;
}
@Override
public String toString() {
return "[NWGRPRuleAns: vmId=" + vmId + ", seqno=" + logSequenceNumber+"]";
}
}

View File

@ -32,8 +32,6 @@ import com.cloud.exception.UnsupportedVersionException;
import com.cloud.utils.NumbersUtil;
import com.cloud.utils.Pair;
import com.cloud.utils.exception.CloudRuntimeException;
import com.google.gson.ExclusionStrategy;
import com.google.gson.FieldAttributes;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonArray;
@ -46,24 +44,22 @@ import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
import com.google.gson.reflect.TypeToken;
import edu.emory.mathcs.backport.java.util.Arrays;
/**
* Request is a simple wrapper around command and answer to add sequencing,
* versioning, and flags. Note that the version here represents the changes
* in the over the wire protocol. For example, if we decide to not use Gson.
* It does not version the changes in the actual commands. That's expected
* in the over the wire protocol. For example, if we decide to not use Gson.
* It does not version the changes in the actual commands. That's expected
* to be done by adding new classes to the command and answer list.
*
*
* A request looks as follows:
* 1. Version - 1 byte;
* 2. Flags - 3 bytes;
* 3. Sequence - 8 bytes;
* 4. Length - 4 bytes;
* 5. ManagementServerId - 8 bytes;
* 6. AgentId - 8 bytes;
* 7. Data Package.
*
* 1. Version - 1 byte;
* 2. Flags - 3 bytes;
* 3. Sequence - 8 bytes;
* 4. Length - 4 bytes;
* 5. ManagementServerId - 8 bytes;
* 6. AgentId - 8 bytes;
* 7. Data Package.
*
*/
public class Request {
private static final Logger s_logger = Logger.getLogger(Request.class);
@ -83,13 +79,13 @@ public class Request {
}
};
protected static final short FLAG_RESPONSE = 0x0;
protected static final short FLAG_REQUEST = 0x1;
protected static final short FLAG_STOP_ON_ERROR = 0x2;
protected static final short FLAG_IN_SEQUENCE = 0x4;
protected static final short FLAG_REVERT_ON_ERROR = 0x8;
protected static final short FLAG_FROM_SERVER = 0x20;
protected static final short FLAG_CONTROL = 0x40;
protected static final short FLAG_RESPONSE = 0x0;
protected static final short FLAG_REQUEST = 0x1;
protected static final short FLAG_STOP_ON_ERROR = 0x2;
protected static final short FLAG_IN_SEQUENCE = 0x4;
protected static final short FLAG_REVERT_ON_ERROR = 0x8;
protected static final short FLAG_FROM_SERVER = 0x20;
protected static final short FLAG_CONTROL = 0x40;
protected static final GsonBuilder s_gBuilder;
@ -98,26 +94,28 @@ public class Request {
setDefaultGsonConfig(s_gBuilder);
s_logger.info("Default Builder inited.");
}
public static void setDefaultGsonConfig(GsonBuilder builder){
builder.registerTypeAdapter(Command[].class, new ArrayTypeAdaptor<Command>());
builder.registerTypeAdapter(Answer[].class, new ArrayTypeAdaptor<Answer>());
builder.registerTypeAdapter(new TypeToken<List<PortConfig>>() {}.getType(), new PortConfigListTypeAdaptor());
builder.registerTypeAdapter(new TypeToken<Pair<Long, Long>>() {}.getType(), new NwGroupsCommandTypeAdaptor());
public static void setDefaultGsonConfig(GsonBuilder builder) {
builder.registerTypeAdapter(Command[].class, new ArrayTypeAdaptor<Command>());
builder.registerTypeAdapter(Answer[].class, new ArrayTypeAdaptor<Answer>());
builder.registerTypeAdapter(new TypeToken<List<PortConfig>>() {
}.getType(), new PortConfigListTypeAdaptor());
builder.registerTypeAdapter(new TypeToken<Pair<Long, Long>>() {
}.getType(), new NwGroupsCommandTypeAdaptor());
}
public static GsonBuilder initBuilder() {
return s_gBuilder;
}
protected Version _ver;
protected long _seq;
protected short _flags;
protected long _mgmtId;
protected long _agentId;
protected Command[] _cmds;
protected String _content;
protected Version _ver;
protected long _session;
protected long _seq;
protected short _flags;
protected long _mgmtId;
protected long _agentId;
protected Command[] _cmds;
protected String _content;
protected Request() {
}
@ -131,23 +129,22 @@ public class Request {
_mgmtId = mgmtId;
setInSequence(cmds);
}
protected Request(Version ver, long seq, long agentId, long mgmtId, short flags, final String content) {
this(ver, seq, agentId, mgmtId, flags, (Command[])null);
this(ver, seq, agentId, mgmtId, flags, (Command[]) null);
_content = content;
}
public Request(long seq, long agentId, long mgmtId, final Command command, boolean fromServer) {
this(seq, agentId, mgmtId, new Command[] {command}, true, fromServer, true);
this(seq, agentId, mgmtId, new Command[] { command }, true, fromServer);
}
public Request(long seq, long agentId, long mgmtId, Command[] cmds, boolean stopOnError, boolean fromServer, boolean revert) {
this(Version.v3, seq, agentId, mgmtId, (short)0, cmds);
public Request(long seq, long agentId, long mgmtId, Command[] cmds, boolean stopOnError, boolean fromServer) {
this(Version.v3, seq, agentId, mgmtId, (short) 0, cmds);
setStopOnError(stopOnError);
setFromServer(fromServer);
setRevertOnError(revert);
}
protected void setInSequence(Command[] cmds) {
if (cmds == null) {
return;
@ -159,7 +156,7 @@ public class Request {
}
}
}
protected Request(final Request that, final Command[] cmds) {
this._ver = that._ver;
this._seq = that._seq;
@ -170,36 +167,35 @@ public class Request {
this._agentId = that._agentId;
setFromServer(!that.isFromServer());
}
private final void setStopOnError(boolean stopOnError) {
_flags |= (stopOnError ? FLAG_STOP_ON_ERROR : 0);
}
private final void setInSequence(boolean inSequence) {
_flags |= (inSequence ? FLAG_IN_SEQUENCE : 0);
}
public boolean isControl() {
return (_flags & FLAG_CONTROL) > 0;
return (_flags & FLAG_CONTROL) > 0;
}
public void setControl(boolean control) {
_flags |= (control ? FLAG_CONTROL : 0);
}
public boolean revertOnError() {
return (_flags & FLAG_CONTROL) > 0;
}
private final void setRevertOnError(boolean revertOnError) {
_flags |= (revertOnError ? FLAG_REVERT_ON_ERROR : 0);
}
private final void setFromServer(boolean fromServer) {
_flags |= (fromServer ? FLAG_FROM_SERVER : 0);
}
public long getManagementServerId() {
return _mgmtId;
}
@ -207,11 +203,11 @@ public class Request {
public boolean isFromServer() {
return (_flags & FLAG_FROM_SERVER) > 0;
}
public Version getVersion() {
return _ver;
}
public void setAgentId(long agentId) {
_agentId = agentId;
}
@ -229,16 +225,16 @@ public class Request {
}
public Command getCommand() {
getCommands();
getCommands();
return _cmds[0];
}
public Command[] getCommands() {
if (_cmds == null) {
if (_cmds == null) {
final Gson json = s_gBuilder.create();
_cmds = json.fromJson(_content, Command[].class);
}
return _cmds;
_cmds = json.fromJson(_content, Command[].class);
}
return _cmds;
}
/**
@ -248,20 +244,20 @@ public class Request {
public String toString() {
String content = _content;
if (content == null) {
final Gson gson = s_gBuilder.create();
final Gson gson = s_gBuilder.create();
try {
content = gson.toJson(_cmds);
} catch(Throwable e) {
s_logger.error("Gson serialization error on Request.toString() " + getClass().getCanonicalName(), e);
content = gson.toJson(_cmds);
} catch (Throwable e) {
s_logger.error("Gson serialization error on Request.toString() " + getClass().getCanonicalName(), e);
}
}
final StringBuilder buffer = new StringBuilder();
buffer.append("{ ").append(getType());
buffer.append(", Seq: ").append(_seq).append(", Ver: ").append(_ver.toString()).append(", MgmtId: ").append(_mgmtId).append(", AgentId: ").append(_agentId).append(", Flags: ").append(Integer.toBinaryString(getFlags()));
buffer.append(", Seq: ").append(_seq).append(", Ver: ").append(_ver.toString()).append(", MgmtId: ").append(_mgmtId).append(", AgentId: ").append(_agentId).append(", Flags: ")
.append(Integer.toBinaryString(getFlags()));
buffer.append(", ").append(content).append(" }");
return buffer.toString();
}
protected String getType() {
return "Cmd ";
@ -270,7 +266,7 @@ public class Request {
protected ByteBuffer serializeHeader(final int contentSize) {
final ByteBuffer buffer = ByteBuffer.allocate(32);
buffer.put(getVersionInByte());
buffer.put((byte)0);
buffer.put((byte) 0);
buffer.putShort(getFlags());
buffer.putLong(_seq);
buffer.putInt(contentSize);
@ -286,7 +282,7 @@ public class Request {
final ByteBuffer[] buffers = new ByteBuffer[2];
if (_content == null) {
_content = gson.toJson(_cmds, _cmds.getClass());
_content = gson.toJson(_cmds, _cmds.getClass());
}
buffers[1] = ByteBuffer.wrap(_content.getBytes());
buffers[0] = serializeHeader(buffers[1].capacity());
@ -305,18 +301,18 @@ public class Request {
}
protected byte getVersionInByte() {
return (byte)_ver.ordinal();
return (byte) _ver.ordinal();
}
protected short getFlags() {
return (short)(((this instanceof Response) ? FLAG_RESPONSE : FLAG_REQUEST) | _flags);
return (short) (((this instanceof Response) ? FLAG_RESPONSE : FLAG_REQUEST) | _flags);
}
public void log(long agentId, String msg) {
if (!s_logger.isDebugEnabled()) {
return;
}
StringBuilder buf = new StringBuilder("Seq ");
buf.append(agentId).append("-").append(_seq).append(": ");
boolean debug = false;
@ -324,12 +320,12 @@ public class Request {
List<Command> cmdListTonotLog = new ArrayList<Command>();
if (_cmds != null) {
for (Command cmd : _cmds) {
if(cmd.doNotLogCommandParams()){
cmdListTonotLog.add(cmd);
}
if (cmd.doNotLogCommandParams()) {
cmdListTonotLog.add(cmd);
}
}
}
if (_cmds != null) {
for (Command cmd : _cmds) {
if (!cmd.logTrace()) {
@ -340,39 +336,39 @@ public class Request {
} else {
debug = true;
}
buf.append(msg).append(toString());
if(!cmdListTonotLog.isEmpty()){
removeCmdContentFromLog(cmdListTonotLog, buf);
if (!cmdListTonotLog.isEmpty()) {
removeCmdContentFromLog(cmdListTonotLog, buf);
}
if (executeInSequence() || debug) {
s_logger.debug(buf.toString());
} else {
s_logger.trace(buf.toString());
}
}
private void removeCmdContentFromLog(List<Command> cmdListTonotLog, StringBuilder buf){
for (Command cmd : cmdListTonotLog){
int cmdNameIndex = buf.indexOf(cmd.toString());
if(cmdNameIndex != -1){
int colonIndex = buf.indexOf(":", cmdNameIndex);
int cmdEndIndex = buf.indexOf("]", cmdNameIndex);
if(colonIndex != -1 && cmdEndIndex != -1){
buf.replace(colonIndex+1, cmdEndIndex, "{}}");
}
}
}
private void removeCmdContentFromLog(List<Command> cmdListTonotLog, StringBuilder buf) {
for (Command cmd : cmdListTonotLog) {
int cmdNameIndex = buf.indexOf(cmd.toString());
if (cmdNameIndex != -1) {
int colonIndex = buf.indexOf(":", cmdNameIndex);
int cmdEndIndex = buf.indexOf("]", cmdNameIndex);
if (colonIndex != -1 && cmdEndIndex != -1) {
buf.replace(colonIndex + 1, cmdEndIndex, "{}}");
}
}
}
}
/**
* Factory method for Request and Response. It expects the bytes to be
* Factory method for Request and Response. It expects the bytes to be
* correctly formed so it's possible that it throws underflow exceptions
* but you shouldn't be concerned about that since that all bytes sent in
* should already be formatted correctly.
*
*
* @param bytes bytes to be converted.
* @return Request or Response depending on the data.
* @throws ClassNotFoundException if the Command or Answer can not be formed.
@ -415,74 +411,71 @@ public class Request {
}
public long getAgentId() {
return _agentId;
return _agentId;
}
public static boolean requiresSequentialExecution(final byte[] bytes) {
return (bytes[3] & FLAG_IN_SEQUENCE) > 0;
}
public static Version getVersion(final byte[] bytes) throws UnsupportedVersionException {
try {
return Version.get(bytes[0]);
} catch (UnsupportedVersionException e) {
throw new CloudRuntimeException("Unsupported version: " + bytes[0]);
}
try {
return Version.get(bytes[0]);
} catch (UnsupportedVersionException e) {
throw new CloudRuntimeException("Unsupported version: " + bytes[0]);
}
}
public static long getManagementServerId(final byte[] bytes) {
return NumbersUtil.bytesToLong(bytes, 16);
return NumbersUtil.bytesToLong(bytes, 16);
}
public static long getAgentId(final byte[] bytes) {
return NumbersUtil.bytesToLong(bytes, 24);
return NumbersUtil.bytesToLong(bytes, 24);
}
public static boolean fromServer(final byte[] bytes) {
return (bytes[3] & FLAG_FROM_SERVER) > 0;
return (bytes[3] & FLAG_FROM_SERVER) > 0;
}
public static boolean isRequest(final byte[] bytes) {
return (bytes[3] & FLAG_REQUEST) > 0;
return (bytes[3] & FLAG_REQUEST) > 0;
}
public static long getSequence(final byte[] bytes) {
return NumbersUtil.bytesToLong(bytes, 4);
return NumbersUtil.bytesToLong(bytes, 4);
}
public static boolean isControl(final byte[] bytes) {
return (bytes[3] & FLAG_CONTROL) > 0;
}
public static class NwGroupsCommandTypeAdaptor implements JsonDeserializer<Pair<Long, Long>>, JsonSerializer<Pair<Long,Long>> {
public static class NwGroupsCommandTypeAdaptor implements JsonDeserializer<Pair<Long, Long>>, JsonSerializer<Pair<Long, Long>> {
public NwGroupsCommandTypeAdaptor() {
}
@Override
public JsonElement serialize(Pair<Long, Long> src,
java.lang.reflect.Type typeOfSrc, JsonSerializationContext context) {
public JsonElement serialize(Pair<Long, Long> src, java.lang.reflect.Type typeOfSrc, JsonSerializationContext context) {
JsonArray array = new JsonArray();
Gson json = s_gBuilder.create();
if(src.first() != null) {
if (src.first() != null) {
array.add(json.toJsonTree(src.first()));
} else {
array.add(new JsonNull());
}
if (src.second() != null) {
array.add(json.toJsonTree(src.second()));
} else {
array.add(new JsonNull());
}
return array;
}
@Override
public Pair<Long, Long> deserialize(JsonElement json,
java.lang.reflect.Type type, JsonDeserializationContext context)
throws JsonParseException {
public Pair<Long, Long> deserialize(JsonElement json, java.lang.reflect.Type type, JsonDeserializationContext context) throws JsonParseException {
Pair<Long, Long> pairs = new Pair<Long, Long>(null, null);
JsonArray array = json.getAsJsonArray();
if (array.size() != 2) {
@ -500,9 +493,9 @@ public class Request {
return pairs;
}
}
public static class PortConfigListTypeAdaptor implements JsonDeserializer<List<PortConfig>>, JsonSerializer<List<PortConfig>> {
public PortConfigListTypeAdaptor() {
@ -525,8 +518,7 @@ public class Request {
}
@Override
public List<PortConfig> deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context)
throws JsonParseException {
public List<PortConfig> deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
if (json.isJsonNull()) {
return new ArrayList<PortConfig>();
}

View File

@ -1354,7 +1354,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, ResourceS
}
long seq = _hostDao.getNextSequence(hostId);
Request req = new Request(seq, hostId, _nodeId, cmds, commands.stopOnError(), true, false);
Request req = new Request(seq, hostId, _nodeId, cmds, commands.stopOnError(), true);
Answer[] answers = agent.send(req, timeout);
notifyAnswersToMonitors(hostId, seq, answers);
commands.setAnswers(answers);
@ -1415,7 +1415,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, ResourceS
return -1;
}
long seq = _hostDao.getNextSequence(hostId);
Request req = new Request(seq, hostId, _nodeId, cmds, commands.stopOnError(), true, false);
Request req = new Request(seq, hostId, _nodeId, cmds, commands.stopOnError(), true);
agent.send(req, listener);
return seq;
}
@ -1791,7 +1791,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, ResourceS
}
if (s_logger.isDebugEnabled()) {
new Request(0l, -1l, -1l, cmds, true, false, true).log(-1, "Startup request from directly connected host: ");
new Request(0l, -1l, -1l, cmds, true, false).log(-1, "Startup request from directly connected host: ");
// s_logger.debug("Startup request from directly connected host: "
// + new Request(0l, -1l, -1l, cmds, true, false, true)
// .toString());