diff --git a/server/src/com/cloud/agent/manager/ConnectedAgentAttache.java b/server/src/com/cloud/agent/manager/ConnectedAgentAttache.java index 85a61d226fa..953ff2eec62 100644 --- a/server/src/com/cloud/agent/manager/ConnectedAgentAttache.java +++ b/server/src/com/cloud/agent/manager/ConnectedAgentAttache.java @@ -33,7 +33,7 @@ import com.cloud.utils.nio.Link; */ public class ConnectedAgentAttache extends AgentAttache { private static final Logger s_logger = Logger.getLogger(ConnectedAgentAttache.class); - + protected Link _link; public ConnectedAgentAttache(AgentManager agentMgr, final long id, final Link link, boolean maintenance) { @@ -49,12 +49,12 @@ public class ConnectedAgentAttache extends AgentAttache { throw new AgentUnavailableException("Channel is closed", _id); } } - + @Override public synchronized boolean isClosed() { return _link == null; } - + @Override public void disconnect(final Status state) { synchronized (this) { @@ -76,17 +76,22 @@ public class ConnectedAgentAttache extends AgentAttache { return super.equals(obj) && this._link == that._link && this._link != null; } catch (ClassCastException e) { assert false : "Who's sending an " + obj.getClass().getSimpleName() + " to " + this.getClass().getSimpleName() + ".equals()? "; - return false; + return false; } } - + @Override - public void finalize() { - assert _link == null : "Duh...Says you....Forgot to call disconnect()!"; - synchronized(this) { - if (_link != null) { - disconnect(Status.Alert); + protected void finalize() throws Throwable { + try { + assert _link == null : "Duh...Says you....Forgot to call disconnect()!"; + synchronized (this) { + if (_link != null) { + s_logger.warn("Lost attache " + _id); + disconnect(Status.Alert); + } } + } finally { + super.finalize(); } } } diff --git a/server/src/com/cloud/agent/manager/DirectAgentAttache.java b/server/src/com/cloud/agent/manager/DirectAgentAttache.java index 64671bc869c..30ddedae0c7 100644 --- a/server/src/com/cloud/agent/manager/DirectAgentAttache.java +++ b/server/src/com/cloud/agent/manager/DirectAgentAttache.java @@ -42,76 +42,76 @@ import com.cloud.utils.concurrency.NamedThreadFactory; public class DirectAgentAttache extends AgentAttache { private final static Logger s_logger = Logger.getLogger(DirectAgentAttache.class); - + ServerResource _resource; static ScheduledExecutorService s_executor = new ScheduledThreadPoolExecutor(100, new NamedThreadFactory("DirectAgent")); List> _futures = new ArrayList>(); AgentManagerImpl _mgr; long _seq = 0; - public DirectAgentAttache(AgentManager agentMgr, long id, ServerResource resource, boolean maintenance, AgentManagerImpl mgr) { - super(agentMgr, id, maintenance); - _resource = resource; - _mgr = mgr; - } + public DirectAgentAttache(AgentManager agentMgr, long id, ServerResource resource, boolean maintenance, AgentManagerImpl mgr) { + super(agentMgr, id, maintenance); + _resource = resource; + _mgr = mgr; + } - @Override - public void disconnect(Status state) { - if (s_logger.isDebugEnabled()) { - s_logger.debug("Processing disconnect " + _id); - } - - for (ScheduledFuture future : _futures) { - future.cancel(false); - } + @Override + public void disconnect(Status state) { + if (s_logger.isDebugEnabled()) { + s_logger.debug("Processing disconnect " + _id); + } - synchronized(this) { - if( _resource != null ) { - _resource.disconnected(); - _resource = null; - } - } - } + for (ScheduledFuture future : _futures) { + future.cancel(false); + } - @Override - public boolean equals(Object obj) { - if (!(obj instanceof DirectAgentAttache)) { - return false; - } - return super.equals(obj); - } + synchronized(this) { + if( _resource != null ) { + _resource.disconnected(); + _resource = null; + } + } + } - @Override - public synchronized boolean isClosed() { - return _resource == null; - } - - @Override - public void send(Request req) throws AgentUnavailableException { - req.log(_id, "Executing: "); -// if (s_logger.isDebugEnabled()) { -// s_logger.debug(log(req.getSequence(), "Executing " + req.toString())); -// } - if (req instanceof Response) { - Response resp = (Response)req; - Answer[] answers = resp.getAnswers(); - if (answers != null && answers[0] instanceof StartupAnswer) { - StartupAnswer startup = (StartupAnswer)answers[0]; - int interval = startup.getPingInterval(); - _futures.add(s_executor.scheduleAtFixedRate(new PingTask(), interval, interval, TimeUnit.SECONDS)); - } - } else { - Command[] cmds = req.getCommands(); - if (cmds.length > 0 && !(cmds[0] instanceof CronCommand)) { - s_executor.execute(new Task(req)); - } else { - CronCommand cmd = (CronCommand)cmds[0]; - _futures.add(s_executor.scheduleAtFixedRate(new Task(req), cmd.getInterval(), cmd.getInterval(), TimeUnit.SECONDS)); - } - } - } - - @Override + @Override + public boolean equals(Object obj) { + if (!(obj instanceof DirectAgentAttache)) { + return false; + } + return super.equals(obj); + } + + @Override + public synchronized boolean isClosed() { + return _resource == null; + } + + @Override + public void send(Request req) throws AgentUnavailableException { + req.log(_id, "Executing: "); + // if (s_logger.isDebugEnabled()) { + // s_logger.debug(log(req.getSequence(), "Executing " + req.toString())); + // } + if (req instanceof Response) { + Response resp = (Response)req; + Answer[] answers = resp.getAnswers(); + if (answers != null && answers[0] instanceof StartupAnswer) { + StartupAnswer startup = (StartupAnswer)answers[0]; + int interval = startup.getPingInterval(); + _futures.add(s_executor.scheduleAtFixedRate(new PingTask(), interval, interval, TimeUnit.SECONDS)); + } + } else { + Command[] cmds = req.getCommands(); + if (cmds.length > 0 && !(cmds[0] instanceof CronCommand)) { + s_executor.execute(new Task(req)); + } else { + CronCommand cmd = (CronCommand)cmds[0]; + _futures.add(s_executor.scheduleAtFixedRate(new Task(req), cmd.getInterval(), cmd.getInterval(), TimeUnit.SECONDS)); + } + } + } + + @Override public void process(Answer[] answers) { if (answers != null && answers[0] instanceof StartupAnswer) { StartupAnswer startup = (StartupAnswer)answers[0]; @@ -119,82 +119,87 @@ public class DirectAgentAttache extends AgentAttache { s_logger.info("StartupAnswer received " + startup.getHostId() + " Interval = " + interval ); _futures.add(s_executor.scheduleAtFixedRate(new PingTask(), interval, interval, TimeUnit.SECONDS)); } - } - + } + @Override - protected void finalize() { - assert _resource == null : "Come on now....If you're going to dabble in agent code, you better know how to close out our resources. Ever considered why there's a method called disconnect()?"; - synchronized(this) { - if (_resource != null) { - disconnect(Status.Alert); + protected void finalize() throws Throwable { + try { + assert _resource == null : "Come on now....If you're going to dabble in agent code, you better know how to close out our resources. Ever considered why there's a method called disconnect()?"; + synchronized (this) { + if (_resource != null) { + s_logger.warn("Lost attache for " + _id); + disconnect(Status.Alert); + } } + } finally { + super.finalize(); } } - - protected class PingTask implements Runnable { - @Override - public synchronized void run() { - try { - ServerResource resource = _resource; - - if (resource != null) { - PingCommand cmd = resource.getCurrentStatus(_id); - if (cmd == null) { - s_logger.warn("Unable to get current status on " + _id); - _mgr.disconnect(DirectAgentAttache.this, Event.AgentDisconnected, true); - return; - } + + protected class PingTask implements Runnable { + @Override + public synchronized void run() { + try { + ServerResource resource = _resource; + + if (resource != null) { + PingCommand cmd = resource.getCurrentStatus(_id); + if (cmd == null) { + s_logger.warn("Unable to get current status on " + _id); + _mgr.disconnect(DirectAgentAttache.this, Event.AgentDisconnected, true); + return; + } if (s_logger.isDebugEnabled()) { s_logger.debug("Ping from " + _id); } - long seq = _seq++; - - if (s_logger.isTraceEnabled()) { - s_logger.trace("SeqA " + _id + "-" + seq + ": " + new Request(seq, _id, -1, cmd, false).toString()); - } + long seq = _seq++; - _mgr.handleCommands(DirectAgentAttache.this, seq, new Command[]{cmd}); - } else { - s_logger.debug("Unable to send ping because agent is disconnected " + _id); - } - } catch (Exception e) { - s_logger.warn("Unable to complete the ping task", e); - } - } - } - + if (s_logger.isTraceEnabled()) { + s_logger.trace("SeqA " + _id + "-" + seq + ": " + new Request(seq, _id, -1, cmd, false).toString()); + } - protected class Task implements Runnable { - Request _req; - - public Task(Request req) { - _req = req; - } - - @Override - public void run() { + _mgr.handleCommands(DirectAgentAttache.this, seq, new Command[]{cmd}); + } else { + s_logger.debug("Unable to send ping because agent is disconnected " + _id); + } + } catch (Exception e) { + s_logger.warn("Unable to complete the ping task", e); + } + } + } + + + protected class Task implements Runnable { + Request _req; + + public Task(Request req) { + _req = req; + } + + @Override + public void run() { long seq = _req.getSequence(); try { ServerResource resource = _resource; - Command[] cmds = _req.getCommands(); - boolean stopOnError = _req.stopOnError(); - - if (s_logger.isDebugEnabled()) { - s_logger.debug(log(seq, "Executing request")); - } - ArrayList answers = new ArrayList(cmds.length); - for (int i = 0; i < cmds.length; i++) { - Answer answer = null; - try { - if (resource != null) { - answer = resource.executeRequest(cmds[i]); - } else { - answer = new Answer(cmds[i], false, "Agent is disconnected"); - } - } catch (Exception e) { - s_logger.warn(log(seq, "Exception Caught while executing command"), e); - answer = new Answer(cmds[i], false, e.toString()); - } + Command[] cmds = _req.getCommands(); + boolean stopOnError = _req.stopOnError(); + + if (s_logger.isDebugEnabled()) { + s_logger.debug(log(seq, "Executing request")); + } + ArrayList answers = new ArrayList(cmds.length); + for (int i = 0; i < cmds.length; i++) { + Answer answer = null; + try { + if (resource != null) { + answer = resource.executeRequest(cmds[i]); + } else { + answer = new Answer(cmds[i], false, "Agent is disconnected"); + } + } catch (Exception e) { + s_logger.warn(log(seq, "Exception Caught while executing command"), e); + answer = new Answer(cmds[i], false, e.toString()); + } answers.add(answer); if (!answer.getResult() && stopOnError) { if (i < cmds.length - 1 && s_logger.isDebugEnabled()) { @@ -202,17 +207,17 @@ public class DirectAgentAttache extends AgentAttache { } break; } - } - - Response resp = new Response(_req, answers.toArray(new Answer[answers.size()])); - if (s_logger.isDebugEnabled()) { - s_logger.debug(log(seq, "Response Received: ")); - } - - processAnswers(seq, resp); - } catch (Exception e) { - s_logger.warn(log(seq, "Exception caught "), e); - } - } - } + } + + Response resp = new Response(_req, answers.toArray(new Answer[answers.size()])); + if (s_logger.isDebugEnabled()) { + s_logger.debug(log(seq, "Response Received: ")); + } + + processAnswers(seq, resp); + } catch (Exception e) { + s_logger.warn(log(seq, "Exception caught "), e); + } + } + } }