mirror of https://github.com/apache/cloudstack.git
Better logging and better finalize
This commit is contained in:
parent
753b6a00a6
commit
4ee5f1ca42
|
|
@ -33,7 +33,7 @@ import com.cloud.utils.nio.Link;
|
|||
*/
|
||||
public class ConnectedAgentAttache extends AgentAttache {
|
||||
private static final Logger s_logger = Logger.getLogger(ConnectedAgentAttache.class);
|
||||
|
||||
|
||||
protected Link _link;
|
||||
|
||||
public ConnectedAgentAttache(AgentManager agentMgr, final long id, final Link link, boolean maintenance) {
|
||||
|
|
@ -49,12 +49,12 @@ public class ConnectedAgentAttache extends AgentAttache {
|
|||
throw new AgentUnavailableException("Channel is closed", _id);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public synchronized boolean isClosed() {
|
||||
return _link == null;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void disconnect(final Status state) {
|
||||
synchronized (this) {
|
||||
|
|
@ -76,17 +76,22 @@ public class ConnectedAgentAttache extends AgentAttache {
|
|||
return super.equals(obj) && this._link == that._link && this._link != null;
|
||||
} catch (ClassCastException e) {
|
||||
assert false : "Who's sending an " + obj.getClass().getSimpleName() + " to " + this.getClass().getSimpleName() + ".equals()? ";
|
||||
return false;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void finalize() {
|
||||
assert _link == null : "Duh...Says you....Forgot to call disconnect()!";
|
||||
synchronized(this) {
|
||||
if (_link != null) {
|
||||
disconnect(Status.Alert);
|
||||
protected void finalize() throws Throwable {
|
||||
try {
|
||||
assert _link == null : "Duh...Says you....Forgot to call disconnect()!";
|
||||
synchronized (this) {
|
||||
if (_link != null) {
|
||||
s_logger.warn("Lost attache " + _id);
|
||||
disconnect(Status.Alert);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
super.finalize();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -42,76 +42,76 @@ import com.cloud.utils.concurrency.NamedThreadFactory;
|
|||
|
||||
public class DirectAgentAttache extends AgentAttache {
|
||||
private final static Logger s_logger = Logger.getLogger(DirectAgentAttache.class);
|
||||
|
||||
|
||||
ServerResource _resource;
|
||||
static ScheduledExecutorService s_executor = new ScheduledThreadPoolExecutor(100, new NamedThreadFactory("DirectAgent"));
|
||||
List<ScheduledFuture<?>> _futures = new ArrayList<ScheduledFuture<?>>();
|
||||
AgentManagerImpl _mgr;
|
||||
long _seq = 0;
|
||||
|
||||
public DirectAgentAttache(AgentManager agentMgr, long id, ServerResource resource, boolean maintenance, AgentManagerImpl mgr) {
|
||||
super(agentMgr, id, maintenance);
|
||||
_resource = resource;
|
||||
_mgr = mgr;
|
||||
}
|
||||
public DirectAgentAttache(AgentManager agentMgr, long id, ServerResource resource, boolean maintenance, AgentManagerImpl mgr) {
|
||||
super(agentMgr, id, maintenance);
|
||||
_resource = resource;
|
||||
_mgr = mgr;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void disconnect(Status state) {
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Processing disconnect " + _id);
|
||||
}
|
||||
|
||||
for (ScheduledFuture<?> future : _futures) {
|
||||
future.cancel(false);
|
||||
}
|
||||
@Override
|
||||
public void disconnect(Status state) {
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Processing disconnect " + _id);
|
||||
}
|
||||
|
||||
synchronized(this) {
|
||||
if( _resource != null ) {
|
||||
_resource.disconnected();
|
||||
_resource = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
for (ScheduledFuture<?> future : _futures) {
|
||||
future.cancel(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (!(obj instanceof DirectAgentAttache)) {
|
||||
return false;
|
||||
}
|
||||
return super.equals(obj);
|
||||
}
|
||||
synchronized(this) {
|
||||
if( _resource != null ) {
|
||||
_resource.disconnected();
|
||||
_resource = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized boolean isClosed() {
|
||||
return _resource == null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(Request req) throws AgentUnavailableException {
|
||||
req.log(_id, "Executing: ");
|
||||
// if (s_logger.isDebugEnabled()) {
|
||||
// s_logger.debug(log(req.getSequence(), "Executing " + req.toString()));
|
||||
// }
|
||||
if (req instanceof Response) {
|
||||
Response resp = (Response)req;
|
||||
Answer[] answers = resp.getAnswers();
|
||||
if (answers != null && answers[0] instanceof StartupAnswer) {
|
||||
StartupAnswer startup = (StartupAnswer)answers[0];
|
||||
int interval = startup.getPingInterval();
|
||||
_futures.add(s_executor.scheduleAtFixedRate(new PingTask(), interval, interval, TimeUnit.SECONDS));
|
||||
}
|
||||
} else {
|
||||
Command[] cmds = req.getCommands();
|
||||
if (cmds.length > 0 && !(cmds[0] instanceof CronCommand)) {
|
||||
s_executor.execute(new Task(req));
|
||||
} else {
|
||||
CronCommand cmd = (CronCommand)cmds[0];
|
||||
_futures.add(s_executor.scheduleAtFixedRate(new Task(req), cmd.getInterval(), cmd.getInterval(), TimeUnit.SECONDS));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (!(obj instanceof DirectAgentAttache)) {
|
||||
return false;
|
||||
}
|
||||
return super.equals(obj);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized boolean isClosed() {
|
||||
return _resource == null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(Request req) throws AgentUnavailableException {
|
||||
req.log(_id, "Executing: ");
|
||||
// if (s_logger.isDebugEnabled()) {
|
||||
// s_logger.debug(log(req.getSequence(), "Executing " + req.toString()));
|
||||
// }
|
||||
if (req instanceof Response) {
|
||||
Response resp = (Response)req;
|
||||
Answer[] answers = resp.getAnswers();
|
||||
if (answers != null && answers[0] instanceof StartupAnswer) {
|
||||
StartupAnswer startup = (StartupAnswer)answers[0];
|
||||
int interval = startup.getPingInterval();
|
||||
_futures.add(s_executor.scheduleAtFixedRate(new PingTask(), interval, interval, TimeUnit.SECONDS));
|
||||
}
|
||||
} else {
|
||||
Command[] cmds = req.getCommands();
|
||||
if (cmds.length > 0 && !(cmds[0] instanceof CronCommand)) {
|
||||
s_executor.execute(new Task(req));
|
||||
} else {
|
||||
CronCommand cmd = (CronCommand)cmds[0];
|
||||
_futures.add(s_executor.scheduleAtFixedRate(new Task(req), cmd.getInterval(), cmd.getInterval(), TimeUnit.SECONDS));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(Answer[] answers) {
|
||||
if (answers != null && answers[0] instanceof StartupAnswer) {
|
||||
StartupAnswer startup = (StartupAnswer)answers[0];
|
||||
|
|
@ -119,82 +119,87 @@ public class DirectAgentAttache extends AgentAttache {
|
|||
s_logger.info("StartupAnswer received " + startup.getHostId() + " Interval = " + interval );
|
||||
_futures.add(s_executor.scheduleAtFixedRate(new PingTask(), interval, interval, TimeUnit.SECONDS));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void finalize() {
|
||||
assert _resource == null : "Come on now....If you're going to dabble in agent code, you better know how to close out our resources. Ever considered why there's a method called disconnect()?";
|
||||
synchronized(this) {
|
||||
if (_resource != null) {
|
||||
disconnect(Status.Alert);
|
||||
protected void finalize() throws Throwable {
|
||||
try {
|
||||
assert _resource == null : "Come on now....If you're going to dabble in agent code, you better know how to close out our resources. Ever considered why there's a method called disconnect()?";
|
||||
synchronized (this) {
|
||||
if (_resource != null) {
|
||||
s_logger.warn("Lost attache for " + _id);
|
||||
disconnect(Status.Alert);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
super.finalize();
|
||||
}
|
||||
}
|
||||
|
||||
protected class PingTask implements Runnable {
|
||||
@Override
|
||||
public synchronized void run() {
|
||||
try {
|
||||
ServerResource resource = _resource;
|
||||
|
||||
if (resource != null) {
|
||||
PingCommand cmd = resource.getCurrentStatus(_id);
|
||||
if (cmd == null) {
|
||||
s_logger.warn("Unable to get current status on " + _id);
|
||||
_mgr.disconnect(DirectAgentAttache.this, Event.AgentDisconnected, true);
|
||||
return;
|
||||
}
|
||||
|
||||
protected class PingTask implements Runnable {
|
||||
@Override
|
||||
public synchronized void run() {
|
||||
try {
|
||||
ServerResource resource = _resource;
|
||||
|
||||
if (resource != null) {
|
||||
PingCommand cmd = resource.getCurrentStatus(_id);
|
||||
if (cmd == null) {
|
||||
s_logger.warn("Unable to get current status on " + _id);
|
||||
_mgr.disconnect(DirectAgentAttache.this, Event.AgentDisconnected, true);
|
||||
return;
|
||||
}
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Ping from " + _id);
|
||||
}
|
||||
long seq = _seq++;
|
||||
|
||||
if (s_logger.isTraceEnabled()) {
|
||||
s_logger.trace("SeqA " + _id + "-" + seq + ": " + new Request(seq, _id, -1, cmd, false).toString());
|
||||
}
|
||||
long seq = _seq++;
|
||||
|
||||
_mgr.handleCommands(DirectAgentAttache.this, seq, new Command[]{cmd});
|
||||
} else {
|
||||
s_logger.debug("Unable to send ping because agent is disconnected " + _id);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
s_logger.warn("Unable to complete the ping task", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (s_logger.isTraceEnabled()) {
|
||||
s_logger.trace("SeqA " + _id + "-" + seq + ": " + new Request(seq, _id, -1, cmd, false).toString());
|
||||
}
|
||||
|
||||
protected class Task implements Runnable {
|
||||
Request _req;
|
||||
|
||||
public Task(Request req) {
|
||||
_req = req;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
_mgr.handleCommands(DirectAgentAttache.this, seq, new Command[]{cmd});
|
||||
} else {
|
||||
s_logger.debug("Unable to send ping because agent is disconnected " + _id);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
s_logger.warn("Unable to complete the ping task", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
protected class Task implements Runnable {
|
||||
Request _req;
|
||||
|
||||
public Task(Request req) {
|
||||
_req = req;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
long seq = _req.getSequence();
|
||||
try {
|
||||
ServerResource resource = _resource;
|
||||
Command[] cmds = _req.getCommands();
|
||||
boolean stopOnError = _req.stopOnError();
|
||||
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug(log(seq, "Executing request"));
|
||||
}
|
||||
ArrayList<Answer> answers = new ArrayList<Answer>(cmds.length);
|
||||
for (int i = 0; i < cmds.length; i++) {
|
||||
Answer answer = null;
|
||||
try {
|
||||
if (resource != null) {
|
||||
answer = resource.executeRequest(cmds[i]);
|
||||
} else {
|
||||
answer = new Answer(cmds[i], false, "Agent is disconnected");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
s_logger.warn(log(seq, "Exception Caught while executing command"), e);
|
||||
answer = new Answer(cmds[i], false, e.toString());
|
||||
}
|
||||
Command[] cmds = _req.getCommands();
|
||||
boolean stopOnError = _req.stopOnError();
|
||||
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug(log(seq, "Executing request"));
|
||||
}
|
||||
ArrayList<Answer> answers = new ArrayList<Answer>(cmds.length);
|
||||
for (int i = 0; i < cmds.length; i++) {
|
||||
Answer answer = null;
|
||||
try {
|
||||
if (resource != null) {
|
||||
answer = resource.executeRequest(cmds[i]);
|
||||
} else {
|
||||
answer = new Answer(cmds[i], false, "Agent is disconnected");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
s_logger.warn(log(seq, "Exception Caught while executing command"), e);
|
||||
answer = new Answer(cmds[i], false, e.toString());
|
||||
}
|
||||
answers.add(answer);
|
||||
if (!answer.getResult() && stopOnError) {
|
||||
if (i < cmds.length - 1 && s_logger.isDebugEnabled()) {
|
||||
|
|
@ -202,17 +207,17 @@ public class DirectAgentAttache extends AgentAttache {
|
|||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Response resp = new Response(_req, answers.toArray(new Answer[answers.size()]));
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug(log(seq, "Response Received: "));
|
||||
}
|
||||
|
||||
processAnswers(seq, resp);
|
||||
} catch (Exception e) {
|
||||
s_logger.warn(log(seq, "Exception caught "), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Response resp = new Response(_req, answers.toArray(new Answer[answers.size()]));
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug(log(seq, "Response Received: "));
|
||||
}
|
||||
|
||||
processAnswers(seq, resp);
|
||||
} catch (Exception e) {
|
||||
s_logger.warn(log(seq, "Exception caught "), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue