diff --git a/agent/src/com/cloud/agent/Agent.java b/agent/src/com/cloud/agent/Agent.java index 38c942bdd7e..966753e104e 100755 --- a/agent/src/com/cloud/agent/Agent.java +++ b/agent/src/com/cloud/agent/Agent.java @@ -224,7 +224,7 @@ public class Agent implements HandlerFactory, IAgentControl { final ShutdownCommand cmd = new ShutdownCommand(reason, detail); try { if (_link != null) { - Request req = new Request(0, (_id != null ? _id : -1), -1, cmd, false); + Request req = new Request((_id != null ? _id : -1), -1, cmd, false); _link.send(req.toBytes()); } } catch (final ClosedChannelException e) { @@ -289,7 +289,8 @@ public class Agent implements HandlerFactory, IAgentControl { commands[i] = startup[i]; } - final Request request = new Request(getNextSequence(), _id != null ? _id : -1, -1, commands, false, false); + final Request request = new Request(_id != null ? _id : -1, -1, commands, false, false); + request.setSequence(getNextSequence()); if (s_logger.isDebugEnabled()) { s_logger.debug("Sending Startup: " + request.toString()); @@ -544,7 +545,8 @@ public class Agent implements HandlerFactory, IAgentControl { } final PingCommand ping = _resource.getCurrentStatus(getId()); - final Request request = new Request(getNextSequence(), _id, -1, ping, false); + final Request request = new Request(_id, -1, ping, false); + request.setSequence(getNextSequence()); if (s_logger.isDebugEnabled()) { s_logger.debug("Sending ping: " + request.toString()); } @@ -624,7 +626,9 @@ public class Agent implements HandlerFactory, IAgentControl { @Override public AgentControlAnswer sendRequest(AgentControlCommand cmd, int timeoutInMilliseconds) throws AgentControlChannelException { - Request request = new Request(this.getNextSequence(), this.getId(), -1, new Command[] { cmd }, true, false); + Request request = new Request(this.getId(), -1, new Command[] { cmd }, true, false); + request.setSequence(getNextSequence()); + AgentControlListener listener = new AgentControlListener(request); registerControlListener(listener); @@ -646,7 +650,8 @@ public class Agent implements HandlerFactory, IAgentControl { @Override public void postRequest(AgentControlCommand cmd) throws AgentControlChannelException { - Request request = new Request(this.getNextSequence(), this.getId(), -1, new Command[] { cmd }, true, false); + Request request = new Request(this.getId(), -1, new Command[] { cmd }, true, false); + request.setSequence(getNextSequence()); postRequest(request); } diff --git a/core/src/com/cloud/agent/transport/Request.java b/core/src/com/cloud/agent/transport/Request.java index 6efeeab6327..f198b2bf0ec 100755 --- a/core/src/com/cloud/agent/transport/Request.java +++ b/core/src/com/cloud/agent/transport/Request.java @@ -147,16 +147,20 @@ public class Request { _content = content; } - public Request(long seq, long agentId, long mgmtId, final Command command, boolean fromServer) { - this(seq, agentId, mgmtId, new Command[] { command }, true, fromServer); + public Request(long agentId, long mgmtId, Command command, boolean fromServer) { + this(agentId, mgmtId, new Command[] { command }, true, fromServer); } - public Request(long seq, long agentId, long mgmtId, Command[] cmds, boolean stopOnError, boolean fromServer) { - this(Version.v3, seq, agentId, mgmtId, (short) 0, cmds); + public Request(long agentId, long mgmtId, Command[] cmds, boolean stopOnError, boolean fromServer) { + this(Version.v1, -1l, agentId, mgmtId, (short)0, cmds); setStopOnError(stopOnError); setFromServer(fromServer); } + public void setSequence(long seq) { + _seq = seq; + } + protected void setInSequence(Command[] cmds) { if (cmds == null) { return; @@ -316,31 +320,55 @@ public class Request { return (short) (((this instanceof Response) ? FLAG_RESPONSE : FLAG_REQUEST) | _flags); } - public void log(long agentId, String msg) { + public void logD(String msg) { + logD(msg, true); + } + + public void logD(String msg, boolean logContent) { + if (s_logger.isDebugEnabled()) { + s_logger.debug(log(msg, logContent)); + } + } + + public void logT(String msg, boolean logD) { + if (s_logger.isTraceEnabled()) { + s_logger.trace(log(msg, true)); + } else if (logD && s_logger.isDebugEnabled()) { + s_logger.debug(log(msg, false)); + } + } + + public String log(String msg, boolean logContent) { StringBuilder buf = new StringBuilder("Seq "); buf.append(_agentId).append("-").append(_seq).append(": "); buf.append(msg); buf.append("{ ").append(getType()); - buf.append(", Ver: ").append(_ver.toString()); buf.append(", MgmtId: ").append(_mgmtId).append(", via: ").append(_via); - buf.append(", Flags: ").append(Integer.toBinaryString(getFlags())).append(", "); - if (_cmds != null) { - try { - s_gogger.toJson(_cmds, buf); - } catch (Throwable e) { - s_logger.error("Gson serialization error on Request.toString() " + getClass().getCanonicalName(), e); - return; + if (logContent) { + buf.append(", Ver: ").append(_ver.toString()); + buf.append(", Flags: ").append(Integer.toBinaryString(getFlags())).append(", "); + if (_cmds != null) { + try { + s_gogger.toJson(_cmds, buf); + } catch (Throwable e) { + StringBuilder buff = new StringBuilder(); + for (Command cmd : _cmds) { + buff.append(cmd.getClass().getName()).append("/"); + } + s_logger.error("Gson serialization error " + buff.toString(), e); + assert false : "More gson errors on " + buff.toString(); + return ""; + } + } else if (_content != null) { + buf.append(_content.subSequence(0, 32)); + } else { + buf.append("I've got nada here!"); + assert false : "How can both commands and content be null? What are we sending here?"; } - } else if (_content != null) { - buf.append(_content.subSequence(0, 32)); - } else { - buf.append("I've got nada here!"); - assert false : "How can both commands and content be null? What are we sending here?"; } buf.append(" }"); - - s_logger.debug(buf.toString()); + return buf.toString(); } /** @@ -358,7 +386,7 @@ public class Request { final ByteBuffer buff = ByteBuffer.wrap(bytes); final byte ver = buff.get(); final Version version = Version.get(ver); - if (version.ordinal() < Version.v3.ordinal()) { + if (version.ordinal() != Version.v1.ordinal()) { throw new UnsupportedVersionException("This version is no longer supported: " + version.toString(), UnsupportedVersionException.IncompatibleVersion); } final byte reserved = buff.get(); // tossed away for now. diff --git a/server/src/com/cloud/agent/manager/AgentAttache.java b/server/src/com/cloud/agent/manager/AgentAttache.java index f1899c4c215..1c631d9837d 100644 --- a/server/src/com/cloud/agent/manager/AgentAttache.java +++ b/server/src/com/cloud/agent/manager/AgentAttache.java @@ -23,6 +23,7 @@ import java.util.Comparator; import java.util.Iterator; import java.util.LinkedList; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; @@ -57,40 +58,41 @@ import com.cloud.utils.concurrency.NamedThreadFactory; */ public abstract class AgentAttache { private static final Logger s_logger = Logger.getLogger(AgentAttache.class); - + private static final ScheduledExecutorService s_listenerExecutor = Executors.newScheduledThreadPool(10, new NamedThreadFactory("ListenerTimer")); - + private static final Random s_rand = new Random(System.currentTimeMillis()); + protected static final Comparator s_reqComparator = new Comparator() { - @Override - public int compare(Request o1, Request o2) { - long seq1 = o1.getSequence(); - long seq2 = o2.getSequence(); - if (seq1 < seq2) { - return -1; - } else if (seq1 > seq2) { - return 1; - } else { - return 0; - } + @Override + public int compare(Request o1, Request o2) { + long seq1 = o1.getSequence(); + long seq2 = o2.getSequence(); + if (seq1 < seq2) { + return -1; + } else if (seq1 > seq2) { + return 1; + } else { + return 0; } - }; - + } + }; + protected static final Comparator s_seqComparator = new Comparator() { - @Override - public int compare(Object o1, Object o2) { - long seq1 = ((Request) o1).getSequence(); - long seq2 = (Long) o2; - if (seq1 < seq2) { - return -1; - } else if (seq1 > seq2) { - return 1; - } else { - return 0; - } + @Override + public int compare(Object o1, Object o2) { + long seq1 = ((Request) o1).getSequence(); + long seq2 = (Long) o2; + if (seq1 < seq2) { + return -1; + } else if (seq1 > seq2) { + return 1; + } else { + return 0; } - }; + } + }; protected final long _id; protected final ConcurrentHashMap _waitForList; @@ -98,9 +100,10 @@ public abstract class AgentAttache { protected Long _currentSequence; protected Status _status = Status.Connecting; protected boolean _maintenance; - + protected long _nextSequence; + protected AgentManager _agentMgr; - + public final static String[] s_commandsAllowedInMaintenanceMode = new String[] { MaintainCommand.class.toString(), MigrateCommand.class.toString(), StopCommand.class.toString(), CheckVirtualMachineCommand.class.toString(), PingTestCommand.class.toString(), CheckHealthCommand.class.toString(), ReadyCommand.class.toString(), ShutdownCommand.class.toString() }; protected final static String[] s_commandsNotAllowedInConnectingMode = @@ -109,7 +112,7 @@ public abstract class AgentAttache { Arrays.sort(s_commandsAllowedInMaintenanceMode); Arrays.sort(s_commandsNotAllowedInConnectingMode); } - + protected AgentAttache(AgentManager agentMgr, final long id, boolean maintenance) { _id = id; @@ -118,27 +121,32 @@ public abstract class AgentAttache { _maintenance = maintenance; _requests = new LinkedList(); _agentMgr = agentMgr; + _nextSequence = s_rand.nextInt(Short.MAX_VALUE) << 48; + } + + public synchronized long getNextSequence() { + return ++_nextSequence; } public synchronized void setMaintenanceMode(final boolean value) { _maintenance = value; } - + public void ready() { _status = Status.Up; } - + public boolean isReady() { return _status == Status.Up; } - + public boolean isConnecting() { return _status == Status.Connecting; } - - public boolean forForward() { - return false; - } + + public boolean forForward() { + return false; + } protected void checkAvailability(final Command[] cmds) throws AgentUnavailableException { if (!_maintenance && _status != Status.Connecting) { @@ -152,7 +160,7 @@ public abstract class AgentAttache { } } } - + if (_status == Status.Connecting) { for (final Command cmd : cmds) { if (Arrays.binarySearch(s_commandsNotAllowedInConnectingMode, cmd.getClass().toString()) >= 0) { @@ -161,14 +169,14 @@ public abstract class AgentAttache { } } } - + protected synchronized void addRequest(Request req) { int index = findRequest(req); assert (index < 0) : "How can we get index again? " + index + ":" + req.toString(); _requests.add(-index - 1, req); } - + protected void cancel(Request req) { long seq = req.getSequence(); cancel(seq); @@ -187,7 +195,7 @@ public abstract class AgentAttache { _requests.remove(index); } } - + protected synchronized int findRequest(Request req) { return Collections.binarySearch(_requests, req, s_reqComparator); } @@ -200,13 +208,13 @@ public abstract class AgentAttache { protected String log(final long seq, final String msg) { return "Seq " + _id + "-" + seq + ": " + msg; } - + protected void registerListener(final long seq, final Listener listener) { if (s_logger.isTraceEnabled()) { s_logger.trace(log(seq, "Registering listener")); } if (listener.getTimeout() != -1) { - s_listenerExecutor.schedule(new Alarm(seq), listener.getTimeout(), TimeUnit.SECONDS); + s_listenerExecutor.schedule(new Alarm(seq), listener.getTimeout(), TimeUnit.SECONDS); } _waitForList.put(seq, listener); } @@ -217,7 +225,7 @@ public abstract class AgentAttache { } return _waitForList.remove(sequence); } - + protected Listener getListener(final long sequence) { return _waitForList.get(sequence); } @@ -225,20 +233,13 @@ public abstract class AgentAttache { public long getId() { return _id; } - + public int getQueueSize() { return _requests.size(); } public boolean processAnswers(final long seq, final Response resp) { - resp.log(_id, "Processing: "); -// if (s_logger.isDebugEnabled()) { -// if (!resp.executeInSequence()) { -// s_logger.debug(log(seq, "Processing: " + resp.toString())); -// } else { -// s_logger.trace(log(seq, "Processing: " + resp.toString())); -// } -// } + resp.logD("Processing: ", true); final Answer[] answers = resp.getAnswers(); @@ -267,7 +268,7 @@ public abstract class AgentAttache { if (resp.executeInSequence()) { sendNext(seq); } - + _agentMgr.notifyAnswersToMonitors(_id, seq, answers); return processed; } @@ -298,15 +299,16 @@ public abstract class AgentAttache { return this._id == that._id; } catch (ClassCastException e) { assert false : "Who's sending an " + obj.getClass().getSimpleName() + " to AgentAttache.equals()? "; - return false; + return false; } } - + public void send(Request req, final Listener listener) throws AgentUnavailableException { checkAvailability(req.getCommands()); - - long seq = req.getSequence(); - + + long seq = getNextSequence(); + req.setSequence(seq); + if (listener != null) { registerListener(seq, listener); } else if (s_logger.isDebugEnabled()) { @@ -318,25 +320,19 @@ public abstract class AgentAttache { if (isClosed()) { throw new AgentUnavailableException("The link to the agent has been closed", _id); } - + if (req.executeInSequence() && _currentSequence != null) { - req.log(_id, "Waiting for Seq " + _currentSequence + " Scheduling: "); -// if (s_logger.isDebugEnabled()) { -// s_logger.debug(log(seq, "Waiting for Seq " + _currentSequence + " Scheduling: " + req.toString())); -// } + req.logD("Waiting for Seq " + _currentSequence + " Scheduling: ", true); addRequest(req); return; } - + // If we got to here either we're not suppose to set // the _currentSequence or it is null already. - - req.log(_id, "Sending "); -// if (s_logger.isDebugEnabled()) { -// s_logger.debug(log(seq, "Sending " + req.toString())); -// } + + req.logD("Sending ", true); send(req); - + if (req.executeInSequence() && _currentSequence == null) { _currentSequence = seq; if (s_logger.isTraceEnabled()) { @@ -355,9 +351,10 @@ public abstract class AgentAttache { } } - public Answer[] send(Request req, final int wait) throws AgentUnavailableException, OperationTimedoutException { - final SynchronousListener sl = new SynchronousListener(null); - final long seq = req.getSequence(); + public Answer[] send(Request req, int wait) throws AgentUnavailableException, OperationTimedoutException { + SynchronousListener sl = new SynchronousListener(null); + long seq = getNextSequence(); + req.setSequence(seq); send(req, sl); @@ -371,15 +368,7 @@ public abstract class AgentAttache { } if (answers != null) { if (s_logger.isDebugEnabled()) { - new Response(req, answers).log(_id, "Received: "); -// if (req.executeInSequence()) { -// s_logger.debug(log(seq, "Received: " + new Response(req, answers).toString())); -// } else { -// s_logger.debug(log(seq, "Received: ")); -// if (s_logger.isTraceEnabled()) { -// s_logger.trace(log(seq, "Received: " + new Response(req, answers).toString())); -// } -// } + new Response(req, answers).logD("Received: ", false); } return answers; } @@ -387,10 +376,9 @@ public abstract class AgentAttache { answers = sl.getAnswers(); // Try it again. if (answers != null) { if (s_logger.isDebugEnabled()) { - new Response(req, answers).log(_id, "Received after timeout: "); -// s_logger.debug(log(seq, "Received after timeout: " + new Response(req, answers).toString())); + new Response(req, answers).logD("Received after timeout: ", true); } - + _agentMgr.notifyAnswersToMonitors(_id, seq, answers); return answers; } @@ -430,7 +418,7 @@ public abstract class AgentAttache { unregisterListener(seq); } } - + protected synchronized void sendNext(final long seq) { _currentSequence = null; if (_requests.isEmpty()) { @@ -454,10 +442,10 @@ public abstract class AgentAttache { } _currentSequence = req.getSequence(); } - - public void process(Answer[] answers) { + + public void process(Answer[] answers) { //do nothing - } + } /** * sends the request asynchronously. @@ -466,42 +454,42 @@ public abstract class AgentAttache { * @throws AgentUnavailableException */ public abstract void send(Request req) throws AgentUnavailableException; - + /** * Update password. * @param new/changed password. */ public abstract void updatePassword(Command new_password); - + /** * Process disconnect. * @param state state of the agent. */ public abstract void disconnect(final Status state); - + /** * Is the agent closed for more commands? * @return true if unable to reach agent or false if reachable. */ protected abstract boolean isClosed(); - + protected class Alarm implements Runnable { - long _seq; - public Alarm(long seq) { - _seq = seq; - } - - @Override - public void run() { - try { - Listener listener = unregisterListener(_seq); - if (listener != null) { - cancel(_seq); - listener.processTimeout(_id, _seq); - } - } catch (Exception e) { - s_logger.warn("Exception ", e); - } - } + long _seq; + public Alarm(long seq) { + _seq = seq; + } + + @Override + public void run() { + try { + Listener listener = unregisterListener(_seq); + if (listener != null) { + cancel(_seq); + listener.processTimeout(_id, _seq); + } + } catch (Exception e) { + s_logger.warn("Exception ", e); + } + } } } diff --git a/server/src/com/cloud/agent/manager/AgentManagerImpl.java b/server/src/com/cloud/agent/manager/AgentManagerImpl.java index 412c1dcb30f..2b491fbfa34 100755 --- a/server/src/com/cloud/agent/manager/AgentManagerImpl.java +++ b/server/src/com/cloud/agent/manager/AgentManagerImpl.java @@ -786,10 +786,9 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { throw new AgentUnavailableException("agent not logged into this management server", hostId); } - long seq = _hostDao.getNextSequence(hostId); - Request req = new Request(seq, hostId, _nodeId, cmds, commands.stopOnError(), true); + Request req = new Request(hostId, _nodeId, cmds, commands.stopOnError(), true); Answer[] answers = agent.send(req, timeout); - notifyAnswersToMonitors(hostId, seq, answers); + notifyAnswersToMonitors(hostId, req.getSequence(), answers); commands.setAnswers(answers); return answers; } @@ -801,8 +800,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { } try { - long seq = _hostDao.getNextSequence(hostId); - Request req = new Request(seq, hostId, _nodeId, new CheckHealthCommand(), true); + Request req = new Request(hostId, _nodeId, new CheckHealthCommand(), true); Answer[] answers = agent.send(req, 50 * 1000); if (answers != null && answers[0] != null) { Status status = answers[0].getResult() ? Status.Up : Status.Down; @@ -847,17 +845,16 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { if (cmds.length == 0) { return -1; } - long seq = _hostDao.getNextSequence(hostId); - Request req = new Request(seq, hostId, _nodeId, cmds, commands.stopOnError(), true); + Request req = new Request(hostId, _nodeId, cmds, commands.stopOnError(), true); agent.send(req, listener); - return seq; + return req.getSequence(); } @Override - public long gatherStats(final Long hostId, final Command cmd, final Listener listener) { + public long gatherStats(Long hostId, Command cmd, Listener listener) { try { return send(hostId, new Commands(cmd), listener); } catch (final AgentUnavailableException e) { @@ -1232,10 +1229,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { } if (s_logger.isDebugEnabled()) { - new Request(0l, -1l, -1l, cmds, true, false).log(-1, "Startup request from directly connected host: "); - // s_logger.debug("Startup request from directly connected host: " - // + new Request(0l, -1l, -1l, cmds, true, false, true) - // .toString()); + new Request(-1l, -1l, cmds, true, false).log("Startup request from directly connected host: ", true); } try { attache = handleDirectConnect(resource, cmds, details, old, hostTags, allocationState); diff --git a/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java b/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java index f6a6b0778d8..2aa192ec626 100644 --- a/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java +++ b/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java @@ -26,7 +26,6 @@ import com.cloud.agent.AgentManager; import com.cloud.agent.api.CancelCommand; import com.cloud.agent.api.ChangeAgentCommand; import com.cloud.agent.api.Command; -import com.cloud.agent.api.UpdateHostPasswordCommand; import com.cloud.agent.transport.Request; import com.cloud.agent.transport.Request.Version; import com.cloud.agent.transport.Response; @@ -397,7 +396,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust public void cancel(String peerName, long hostId, long sequence, String reason) { CancelCommand cancel = new CancelCommand(sequence, reason); - Request req = new Request(-1, hostId, _nodeId, cancel, true); + Request req = new Request(hostId, _nodeId, cancel, true); req.setControl(true); routeToPeer(peerName, req.getBytes()); } diff --git a/server/src/com/cloud/agent/manager/DirectAgentAttache.java b/server/src/com/cloud/agent/manager/DirectAgentAttache.java index a2b182b732d..c36a3c77942 100644 --- a/server/src/com/cloud/agent/manager/DirectAgentAttache.java +++ b/server/src/com/cloud/agent/manager/DirectAgentAttache.java @@ -24,8 +24,6 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import javax.naming.ConfigurationException; - import org.apache.log4j.Logger; import com.cloud.agent.AgentManager; @@ -40,7 +38,6 @@ import com.cloud.exception.AgentUnavailableException; import com.cloud.host.Status; import com.cloud.host.Status.Event; import com.cloud.resource.ServerResource; -import com.cloud.resource.hypervisor.HypervisorResource; import com.cloud.utils.concurrency.NamedThreadFactory; public class DirectAgentAttache extends AgentAttache { @@ -91,10 +88,7 @@ public class DirectAgentAttache extends AgentAttache { @Override public void send(Request req) throws AgentUnavailableException { - req.log(_id, "Executing: "); - // if (s_logger.isDebugEnabled()) { - // s_logger.debug(log(req.getSequence(), "Executing " + req.toString())); - // } + req.logD("Executing: ", true); if (req instanceof Response) { Response resp = (Response)req; Answer[] answers = resp.getAnswers(); @@ -158,7 +152,7 @@ public class DirectAgentAttache extends AgentAttache { long seq = _seq++; if (s_logger.isTraceEnabled()) { - s_logger.trace("SeqA " + _id + "-" + seq + ": " + new Request(seq, _id, -1, cmd, false).toString()); + s_logger.trace("SeqA " + _id + "-" + seq + ": " + new Request(_id, -1, cmd, false).toString()); } _mgr.handleCommands(DirectAgentAttache.this, seq, new Command[]{cmd});