session info within sequence numbers

This commit is contained in:
Alex Huang 2011-05-17 18:01:05 -07:00
parent fd27127f6e
commit 9c69a983ba
6 changed files with 168 additions and 160 deletions

View File

@ -224,7 +224,7 @@ public class Agent implements HandlerFactory, IAgentControl {
final ShutdownCommand cmd = new ShutdownCommand(reason, detail);
try {
if (_link != null) {
Request req = new Request(0, (_id != null ? _id : -1), -1, cmd, false);
Request req = new Request((_id != null ? _id : -1), -1, cmd, false);
_link.send(req.toBytes());
}
} catch (final ClosedChannelException e) {
@ -289,7 +289,8 @@ public class Agent implements HandlerFactory, IAgentControl {
commands[i] = startup[i];
}
final Request request = new Request(getNextSequence(), _id != null ? _id : -1, -1, commands, false, false);
final Request request = new Request(_id != null ? _id : -1, -1, commands, false, false);
request.setSequence(getNextSequence());
if (s_logger.isDebugEnabled()) {
s_logger.debug("Sending Startup: " + request.toString());
@ -544,7 +545,8 @@ public class Agent implements HandlerFactory, IAgentControl {
}
final PingCommand ping = _resource.getCurrentStatus(getId());
final Request request = new Request(getNextSequence(), _id, -1, ping, false);
final Request request = new Request(_id, -1, ping, false);
request.setSequence(getNextSequence());
if (s_logger.isDebugEnabled()) {
s_logger.debug("Sending ping: " + request.toString());
}
@ -624,7 +626,9 @@ public class Agent implements HandlerFactory, IAgentControl {
@Override
public AgentControlAnswer sendRequest(AgentControlCommand cmd, int timeoutInMilliseconds) throws AgentControlChannelException {
Request request = new Request(this.getNextSequence(), this.getId(), -1, new Command[] { cmd }, true, false);
Request request = new Request(this.getId(), -1, new Command[] { cmd }, true, false);
request.setSequence(getNextSequence());
AgentControlListener listener = new AgentControlListener(request);
registerControlListener(listener);
@ -646,7 +650,8 @@ public class Agent implements HandlerFactory, IAgentControl {
@Override
public void postRequest(AgentControlCommand cmd) throws AgentControlChannelException {
Request request = new Request(this.getNextSequence(), this.getId(), -1, new Command[] { cmd }, true, false);
Request request = new Request(this.getId(), -1, new Command[] { cmd }, true, false);
request.setSequence(getNextSequence());
postRequest(request);
}

View File

@ -147,16 +147,20 @@ public class Request {
_content = content;
}
public Request(long seq, long agentId, long mgmtId, final Command command, boolean fromServer) {
this(seq, agentId, mgmtId, new Command[] { command }, true, fromServer);
public Request(long agentId, long mgmtId, Command command, boolean fromServer) {
this(agentId, mgmtId, new Command[] { command }, true, fromServer);
}
public Request(long seq, long agentId, long mgmtId, Command[] cmds, boolean stopOnError, boolean fromServer) {
this(Version.v3, seq, agentId, mgmtId, (short) 0, cmds);
public Request(long agentId, long mgmtId, Command[] cmds, boolean stopOnError, boolean fromServer) {
this(Version.v1, -1l, agentId, mgmtId, (short)0, cmds);
setStopOnError(stopOnError);
setFromServer(fromServer);
}
public void setSequence(long seq) {
_seq = seq;
}
protected void setInSequence(Command[] cmds) {
if (cmds == null) {
return;
@ -316,31 +320,55 @@ public class Request {
return (short) (((this instanceof Response) ? FLAG_RESPONSE : FLAG_REQUEST) | _flags);
}
public void log(long agentId, String msg) {
public void logD(String msg) {
logD(msg, true);
}
public void logD(String msg, boolean logContent) {
if (s_logger.isDebugEnabled()) {
s_logger.debug(log(msg, logContent));
}
}
public void logT(String msg, boolean logD) {
if (s_logger.isTraceEnabled()) {
s_logger.trace(log(msg, true));
} else if (logD && s_logger.isDebugEnabled()) {
s_logger.debug(log(msg, false));
}
}
public String log(String msg, boolean logContent) {
StringBuilder buf = new StringBuilder("Seq ");
buf.append(_agentId).append("-").append(_seq).append(": ");
buf.append(msg);
buf.append("{ ").append(getType());
buf.append(", Ver: ").append(_ver.toString());
buf.append(", MgmtId: ").append(_mgmtId).append(", via: ").append(_via);
buf.append(", Flags: ").append(Integer.toBinaryString(getFlags())).append(", ");
if (_cmds != null) {
try {
s_gogger.toJson(_cmds, buf);
} catch (Throwable e) {
s_logger.error("Gson serialization error on Request.toString() " + getClass().getCanonicalName(), e);
return;
if (logContent) {
buf.append(", Ver: ").append(_ver.toString());
buf.append(", Flags: ").append(Integer.toBinaryString(getFlags())).append(", ");
if (_cmds != null) {
try {
s_gogger.toJson(_cmds, buf);
} catch (Throwable e) {
StringBuilder buff = new StringBuilder();
for (Command cmd : _cmds) {
buff.append(cmd.getClass().getName()).append("/");
}
s_logger.error("Gson serialization error " + buff.toString(), e);
assert false : "More gson errors on " + buff.toString();
return "";
}
} else if (_content != null) {
buf.append(_content.subSequence(0, 32));
} else {
buf.append("I've got nada here!");
assert false : "How can both commands and content be null? What are we sending here?";
}
} else if (_content != null) {
buf.append(_content.subSequence(0, 32));
} else {
buf.append("I've got nada here!");
assert false : "How can both commands and content be null? What are we sending here?";
}
buf.append(" }");
s_logger.debug(buf.toString());
return buf.toString();
}
/**
@ -358,7 +386,7 @@ public class Request {
final ByteBuffer buff = ByteBuffer.wrap(bytes);
final byte ver = buff.get();
final Version version = Version.get(ver);
if (version.ordinal() < Version.v3.ordinal()) {
if (version.ordinal() != Version.v1.ordinal()) {
throw new UnsupportedVersionException("This version is no longer supported: " + version.toString(), UnsupportedVersionException.IncompatibleVersion);
}
final byte reserved = buff.get(); // tossed away for now.

View File

@ -23,6 +23,7 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
@ -57,40 +58,41 @@ import com.cloud.utils.concurrency.NamedThreadFactory;
*/
public abstract class AgentAttache {
private static final Logger s_logger = Logger.getLogger(AgentAttache.class);
private static final ScheduledExecutorService s_listenerExecutor = Executors.newScheduledThreadPool(10, new NamedThreadFactory("ListenerTimer"));
private static final Random s_rand = new Random(System.currentTimeMillis());
protected static final Comparator<Request> s_reqComparator =
new Comparator<Request>() {
@Override
public int compare(Request o1, Request o2) {
long seq1 = o1.getSequence();
long seq2 = o2.getSequence();
if (seq1 < seq2) {
return -1;
} else if (seq1 > seq2) {
return 1;
} else {
return 0;
}
@Override
public int compare(Request o1, Request o2) {
long seq1 = o1.getSequence();
long seq2 = o2.getSequence();
if (seq1 < seq2) {
return -1;
} else if (seq1 > seq2) {
return 1;
} else {
return 0;
}
};
}
};
protected static final Comparator<Object> s_seqComparator =
new Comparator<Object>() {
@Override
public int compare(Object o1, Object o2) {
long seq1 = ((Request) o1).getSequence();
long seq2 = (Long) o2;
if (seq1 < seq2) {
return -1;
} else if (seq1 > seq2) {
return 1;
} else {
return 0;
}
@Override
public int compare(Object o1, Object o2) {
long seq1 = ((Request) o1).getSequence();
long seq2 = (Long) o2;
if (seq1 < seq2) {
return -1;
} else if (seq1 > seq2) {
return 1;
} else {
return 0;
}
};
}
};
protected final long _id;
protected final ConcurrentHashMap<Long, Listener> _waitForList;
@ -98,9 +100,10 @@ public abstract class AgentAttache {
protected Long _currentSequence;
protected Status _status = Status.Connecting;
protected boolean _maintenance;
protected long _nextSequence;
protected AgentManager _agentMgr;
public final static String[] s_commandsAllowedInMaintenanceMode =
new String[] { MaintainCommand.class.toString(), MigrateCommand.class.toString(), StopCommand.class.toString(), CheckVirtualMachineCommand.class.toString(), PingTestCommand.class.toString(), CheckHealthCommand.class.toString(), ReadyCommand.class.toString(), ShutdownCommand.class.toString() };
protected final static String[] s_commandsNotAllowedInConnectingMode =
@ -109,7 +112,7 @@ public abstract class AgentAttache {
Arrays.sort(s_commandsAllowedInMaintenanceMode);
Arrays.sort(s_commandsNotAllowedInConnectingMode);
}
protected AgentAttache(AgentManager agentMgr, final long id, boolean maintenance) {
_id = id;
@ -118,27 +121,32 @@ public abstract class AgentAttache {
_maintenance = maintenance;
_requests = new LinkedList<Request>();
_agentMgr = agentMgr;
_nextSequence = s_rand.nextInt(Short.MAX_VALUE) << 48;
}
public synchronized long getNextSequence() {
return ++_nextSequence;
}
public synchronized void setMaintenanceMode(final boolean value) {
_maintenance = value;
}
public void ready() {
_status = Status.Up;
}
public boolean isReady() {
return _status == Status.Up;
}
public boolean isConnecting() {
return _status == Status.Connecting;
}
public boolean forForward() {
return false;
}
public boolean forForward() {
return false;
}
protected void checkAvailability(final Command[] cmds) throws AgentUnavailableException {
if (!_maintenance && _status != Status.Connecting) {
@ -152,7 +160,7 @@ public abstract class AgentAttache {
}
}
}
if (_status == Status.Connecting) {
for (final Command cmd : cmds) {
if (Arrays.binarySearch(s_commandsNotAllowedInConnectingMode, cmd.getClass().toString()) >= 0) {
@ -161,14 +169,14 @@ public abstract class AgentAttache {
}
}
}
protected synchronized void addRequest(Request req) {
int index = findRequest(req);
assert (index < 0) : "How can we get index again? " + index + ":" + req.toString();
_requests.add(-index - 1, req);
}
protected void cancel(Request req) {
long seq = req.getSequence();
cancel(seq);
@ -187,7 +195,7 @@ public abstract class AgentAttache {
_requests.remove(index);
}
}
protected synchronized int findRequest(Request req) {
return Collections.binarySearch(_requests, req, s_reqComparator);
}
@ -200,13 +208,13 @@ public abstract class AgentAttache {
protected String log(final long seq, final String msg) {
return "Seq " + _id + "-" + seq + ": " + msg;
}
protected void registerListener(final long seq, final Listener listener) {
if (s_logger.isTraceEnabled()) {
s_logger.trace(log(seq, "Registering listener"));
}
if (listener.getTimeout() != -1) {
s_listenerExecutor.schedule(new Alarm(seq), listener.getTimeout(), TimeUnit.SECONDS);
s_listenerExecutor.schedule(new Alarm(seq), listener.getTimeout(), TimeUnit.SECONDS);
}
_waitForList.put(seq, listener);
}
@ -217,7 +225,7 @@ public abstract class AgentAttache {
}
return _waitForList.remove(sequence);
}
protected Listener getListener(final long sequence) {
return _waitForList.get(sequence);
}
@ -225,20 +233,13 @@ public abstract class AgentAttache {
public long getId() {
return _id;
}
public int getQueueSize() {
return _requests.size();
}
public boolean processAnswers(final long seq, final Response resp) {
resp.log(_id, "Processing: ");
// if (s_logger.isDebugEnabled()) {
// if (!resp.executeInSequence()) {
// s_logger.debug(log(seq, "Processing: " + resp.toString()));
// } else {
// s_logger.trace(log(seq, "Processing: " + resp.toString()));
// }
// }
resp.logD("Processing: ", true);
final Answer[] answers = resp.getAnswers();
@ -267,7 +268,7 @@ public abstract class AgentAttache {
if (resp.executeInSequence()) {
sendNext(seq);
}
_agentMgr.notifyAnswersToMonitors(_id, seq, answers);
return processed;
}
@ -298,15 +299,16 @@ public abstract class AgentAttache {
return this._id == that._id;
} catch (ClassCastException e) {
assert false : "Who's sending an " + obj.getClass().getSimpleName() + " to AgentAttache.equals()? ";
return false;
return false;
}
}
public void send(Request req, final Listener listener) throws AgentUnavailableException {
checkAvailability(req.getCommands());
long seq = req.getSequence();
long seq = getNextSequence();
req.setSequence(seq);
if (listener != null) {
registerListener(seq, listener);
} else if (s_logger.isDebugEnabled()) {
@ -318,25 +320,19 @@ public abstract class AgentAttache {
if (isClosed()) {
throw new AgentUnavailableException("The link to the agent has been closed", _id);
}
if (req.executeInSequence() && _currentSequence != null) {
req.log(_id, "Waiting for Seq " + _currentSequence + " Scheduling: ");
// if (s_logger.isDebugEnabled()) {
// s_logger.debug(log(seq, "Waiting for Seq " + _currentSequence + " Scheduling: " + req.toString()));
// }
req.logD("Waiting for Seq " + _currentSequence + " Scheduling: ", true);
addRequest(req);
return;
}
// If we got to here either we're not suppose to set
// the _currentSequence or it is null already.
req.log(_id, "Sending ");
// if (s_logger.isDebugEnabled()) {
// s_logger.debug(log(seq, "Sending " + req.toString()));
// }
req.logD("Sending ", true);
send(req);
if (req.executeInSequence() && _currentSequence == null) {
_currentSequence = seq;
if (s_logger.isTraceEnabled()) {
@ -355,9 +351,10 @@ public abstract class AgentAttache {
}
}
public Answer[] send(Request req, final int wait) throws AgentUnavailableException, OperationTimedoutException {
final SynchronousListener sl = new SynchronousListener(null);
final long seq = req.getSequence();
public Answer[] send(Request req, int wait) throws AgentUnavailableException, OperationTimedoutException {
SynchronousListener sl = new SynchronousListener(null);
long seq = getNextSequence();
req.setSequence(seq);
send(req, sl);
@ -371,15 +368,7 @@ public abstract class AgentAttache {
}
if (answers != null) {
if (s_logger.isDebugEnabled()) {
new Response(req, answers).log(_id, "Received: ");
// if (req.executeInSequence()) {
// s_logger.debug(log(seq, "Received: " + new Response(req, answers).toString()));
// } else {
// s_logger.debug(log(seq, "Received: "));
// if (s_logger.isTraceEnabled()) {
// s_logger.trace(log(seq, "Received: " + new Response(req, answers).toString()));
// }
// }
new Response(req, answers).logD("Received: ", false);
}
return answers;
}
@ -387,10 +376,9 @@ public abstract class AgentAttache {
answers = sl.getAnswers(); // Try it again.
if (answers != null) {
if (s_logger.isDebugEnabled()) {
new Response(req, answers).log(_id, "Received after timeout: ");
// s_logger.debug(log(seq, "Received after timeout: " + new Response(req, answers).toString()));
new Response(req, answers).logD("Received after timeout: ", true);
}
_agentMgr.notifyAnswersToMonitors(_id, seq, answers);
return answers;
}
@ -430,7 +418,7 @@ public abstract class AgentAttache {
unregisterListener(seq);
}
}
protected synchronized void sendNext(final long seq) {
_currentSequence = null;
if (_requests.isEmpty()) {
@ -454,10 +442,10 @@ public abstract class AgentAttache {
}
_currentSequence = req.getSequence();
}
public void process(Answer[] answers) {
public void process(Answer[] answers) {
//do nothing
}
}
/**
* sends the request asynchronously.
@ -466,42 +454,42 @@ public abstract class AgentAttache {
* @throws AgentUnavailableException
*/
public abstract void send(Request req) throws AgentUnavailableException;
/**
* Update password.
* @param new/changed password.
*/
public abstract void updatePassword(Command new_password);
/**
* Process disconnect.
* @param state state of the agent.
*/
public abstract void disconnect(final Status state);
/**
* Is the agent closed for more commands?
* @return true if unable to reach agent or false if reachable.
*/
protected abstract boolean isClosed();
protected class Alarm implements Runnable {
long _seq;
public Alarm(long seq) {
_seq = seq;
}
@Override
public void run() {
try {
Listener listener = unregisterListener(_seq);
if (listener != null) {
cancel(_seq);
listener.processTimeout(_id, _seq);
}
} catch (Exception e) {
s_logger.warn("Exception ", e);
}
}
long _seq;
public Alarm(long seq) {
_seq = seq;
}
@Override
public void run() {
try {
Listener listener = unregisterListener(_seq);
if (listener != null) {
cancel(_seq);
listener.processTimeout(_id, _seq);
}
} catch (Exception e) {
s_logger.warn("Exception ", e);
}
}
}
}

View File

@ -786,10 +786,9 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager {
throw new AgentUnavailableException("agent not logged into this management server", hostId);
}
long seq = _hostDao.getNextSequence(hostId);
Request req = new Request(seq, hostId, _nodeId, cmds, commands.stopOnError(), true);
Request req = new Request(hostId, _nodeId, cmds, commands.stopOnError(), true);
Answer[] answers = agent.send(req, timeout);
notifyAnswersToMonitors(hostId, seq, answers);
notifyAnswersToMonitors(hostId, req.getSequence(), answers);
commands.setAnswers(answers);
return answers;
}
@ -801,8 +800,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager {
}
try {
long seq = _hostDao.getNextSequence(hostId);
Request req = new Request(seq, hostId, _nodeId, new CheckHealthCommand(), true);
Request req = new Request(hostId, _nodeId, new CheckHealthCommand(), true);
Answer[] answers = agent.send(req, 50 * 1000);
if (answers != null && answers[0] != null) {
Status status = answers[0].getResult() ? Status.Up : Status.Down;
@ -847,17 +845,16 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager {
if (cmds.length == 0) {
return -1;
}
long seq = _hostDao.getNextSequence(hostId);
Request req = new Request(seq, hostId, _nodeId, cmds, commands.stopOnError(), true);
Request req = new Request(hostId, _nodeId, cmds, commands.stopOnError(), true);
agent.send(req, listener);
return seq;
return req.getSequence();
}
@Override
public long gatherStats(final Long hostId, final Command cmd, final Listener listener) {
public long gatherStats(Long hostId, Command cmd, Listener listener) {
try {
return send(hostId, new Commands(cmd), listener);
} catch (final AgentUnavailableException e) {
@ -1232,10 +1229,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager {
}
if (s_logger.isDebugEnabled()) {
new Request(0l, -1l, -1l, cmds, true, false).log(-1, "Startup request from directly connected host: ");
// s_logger.debug("Startup request from directly connected host: "
// + new Request(0l, -1l, -1l, cmds, true, false, true)
// .toString());
new Request(-1l, -1l, cmds, true, false).log("Startup request from directly connected host: ", true);
}
try {
attache = handleDirectConnect(resource, cmds, details, old, hostTags, allocationState);

View File

@ -26,7 +26,6 @@ import com.cloud.agent.AgentManager;
import com.cloud.agent.api.CancelCommand;
import com.cloud.agent.api.ChangeAgentCommand;
import com.cloud.agent.api.Command;
import com.cloud.agent.api.UpdateHostPasswordCommand;
import com.cloud.agent.transport.Request;
import com.cloud.agent.transport.Request.Version;
import com.cloud.agent.transport.Response;
@ -397,7 +396,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
public void cancel(String peerName, long hostId, long sequence, String reason) {
CancelCommand cancel = new CancelCommand(sequence, reason);
Request req = new Request(-1, hostId, _nodeId, cancel, true);
Request req = new Request(hostId, _nodeId, cancel, true);
req.setControl(true);
routeToPeer(peerName, req.getBytes());
}

View File

@ -24,8 +24,6 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.naming.ConfigurationException;
import org.apache.log4j.Logger;
import com.cloud.agent.AgentManager;
@ -40,7 +38,6 @@ import com.cloud.exception.AgentUnavailableException;
import com.cloud.host.Status;
import com.cloud.host.Status.Event;
import com.cloud.resource.ServerResource;
import com.cloud.resource.hypervisor.HypervisorResource;
import com.cloud.utils.concurrency.NamedThreadFactory;
public class DirectAgentAttache extends AgentAttache {
@ -91,10 +88,7 @@ public class DirectAgentAttache extends AgentAttache {
@Override
public void send(Request req) throws AgentUnavailableException {
req.log(_id, "Executing: ");
// if (s_logger.isDebugEnabled()) {
// s_logger.debug(log(req.getSequence(), "Executing " + req.toString()));
// }
req.logD("Executing: ", true);
if (req instanceof Response) {
Response resp = (Response)req;
Answer[] answers = resp.getAnswers();
@ -158,7 +152,7 @@ public class DirectAgentAttache extends AgentAttache {
long seq = _seq++;
if (s_logger.isTraceEnabled()) {
s_logger.trace("SeqA " + _id + "-" + seq + ": " + new Request(seq, _id, -1, cmd, false).toString());
s_logger.trace("SeqA " + _id + "-" + seq + ": " + new Request(_id, -1, cmd, false).toString());
}
_mgr.handleCommands(DirectAgentAttache.this, seq, new Command[]{cmd});