From 0008dfc3a9f47ccd9fe40c2ac0c43ec38761f073 Mon Sep 17 00:00:00 2001 From: Alex Huang Date: Wed, 4 May 2011 15:19:43 -0700 Subject: [PATCH] Added code to prevent two management servers --- .../com/cloud/agent/manager/AgentAttache.java | 4 ++-- .../cloud/agent/manager/AgentManagerImpl.java | 16 ++++++------- .../manager/ClusteredAgentManagerImpl.java | 2 +- .../agent/manager/ConnectedAgentAttache.java | 14 +++++++++-- .../agent/manager/DirectAgentAttache.java | 23 ++++++++++++++----- utils/src/com/cloud/utils/StringUtils.java | 2 +- 6 files changed, 41 insertions(+), 20 deletions(-) diff --git a/server/src/com/cloud/agent/manager/AgentAttache.java b/server/src/com/cloud/agent/manager/AgentAttache.java index 929b846dfae..4014e322ada 100644 --- a/server/src/com/cloud/agent/manager/AgentAttache.java +++ b/server/src/com/cloud/agent/manager/AgentAttache.java @@ -57,7 +57,7 @@ import com.cloud.utils.concurrency.NamedThreadFactory; public abstract class AgentAttache { private static final Logger s_logger = Logger.getLogger(AgentAttache.class); - private static final ScheduledExecutorService s_executor = Executors.newScheduledThreadPool(10, new NamedThreadFactory("ListenerTimer")); + private static final ScheduledExecutorService s_listenerExecutor = Executors.newScheduledThreadPool(10, new NamedThreadFactory("ListenerTimer")); protected static final Comparator s_reqComparator = new Comparator() { @@ -202,7 +202,7 @@ public abstract class AgentAttache { s_logger.trace(log(seq, "Registering listener")); } if (listener.getTimeout() != -1) { - s_executor.schedule(new Alarm(seq), listener.getTimeout(), TimeUnit.SECONDS); + s_listenerExecutor.schedule(new Alarm(seq), listener.getTimeout(), TimeUnit.SECONDS); } _waitForList.put(seq, listener); } diff --git a/server/src/com/cloud/agent/manager/AgentManagerImpl.java b/server/src/com/cloud/agent/manager/AgentManagerImpl.java index eb4f8152029..df8a538b1df 100755 --- a/server/src/com/cloud/agent/manager/AgentManagerImpl.java +++ b/server/src/com/cloud/agent/manager/AgentManagerImpl.java @@ -538,6 +538,14 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, ResourceS long id = server.getId(); AgentAttache attache = createAttache(id, server, resource); + if (attache.isReady()) { + StartupAnswer[] answers = new StartupAnswer[startup.length]; + for (int i = 0; i < answers.length; i++) { + answers[i] = new StartupAnswer(startup[i], attache.getId(), _pingInterval); + } + + attache.process(answers); + } attache = notifyMonitorsOfConnection(attache, startup); @@ -1713,14 +1721,6 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, ResourceS resource.disconnected(); return null; } - if (attache.isReady()) { - StartupAnswer[] answers = new StartupAnswer[cmds.length]; - for (int i = 0; i < answers.length; i++) { - answers[i] = new StartupAnswer(cmds[i], attache.getId(), _pingInterval); - } - - attache.process(answers); - } return attache; } diff --git a/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java b/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java index d70bda6f865..3d264ab13b9 100644 --- a/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java +++ b/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java @@ -124,7 +124,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } } } - } + } if(hosts != null && hosts.size() > 0) { for(HostVO host: hosts) { AgentAttache agentattache = findAttache(host.getId()); diff --git a/server/src/com/cloud/agent/manager/ConnectedAgentAttache.java b/server/src/com/cloud/agent/manager/ConnectedAgentAttache.java index 2c73cbc08e3..9712fb31885 100644 --- a/server/src/com/cloud/agent/manager/ConnectedAgentAttache.java +++ b/server/src/com/cloud/agent/manager/ConnectedAgentAttache.java @@ -74,8 +74,18 @@ public class ConnectedAgentAttache extends AgentAttache { ConnectedAgentAttache that = (ConnectedAgentAttache) obj; return super.equals(obj) && this._link == that._link && this._link != null; } catch (ClassCastException e) { - assert false : "Who's sending an " + obj.getClass().getSimpleName() + " to AgentAttache.equals()? "; + assert false : "Who's sending an " + obj.getClass().getSimpleName() + " to " + this.getClass().getSimpleName() + ".equals()? "; return false; } - } + } + + @Override + public void finalize() { + assert _link == null : "Duh...Says you....Forgot to call disconnect()!"; + synchronized(this) { + if (_link != null) { + disconnect(Status.Alert); + } + } + } } diff --git a/server/src/com/cloud/agent/manager/DirectAgentAttache.java b/server/src/com/cloud/agent/manager/DirectAgentAttache.java index c3c88caa998..1c8ffd458a9 100644 --- a/server/src/com/cloud/agent/manager/DirectAgentAttache.java +++ b/server/src/com/cloud/agent/manager/DirectAgentAttache.java @@ -42,8 +42,9 @@ import com.cloud.utils.concurrency.NamedThreadFactory; public class DirectAgentAttache extends AgentAttache { private final static Logger s_logger = Logger.getLogger(DirectAgentAttache.class); + static ScheduledExecutorService s_executor = Executors.newScheduledThreadPool(100, new NamedThreadFactory("DirectAgent")); + ServerResource _resource; - static ScheduledExecutorService _executor = Executors.newScheduledThreadPool(100, new NamedThreadFactory("DirectAgent")); List> _futures = new ArrayList>(); AgentManagerImpl _mgr; long _seq = 0; @@ -77,7 +78,7 @@ public class DirectAgentAttache extends AgentAttache { if (!(obj instanceof DirectAgentAttache)) { return false; } - return super.equals(obj) && _executor == ((DirectAgentAttache)obj)._executor; + return super.equals(obj); } @Override @@ -97,15 +98,15 @@ public class DirectAgentAttache extends AgentAttache { if (answers != null && answers[0] instanceof StartupAnswer) { StartupAnswer startup = (StartupAnswer)answers[0]; int interval = startup.getPingInterval(); - _futures.add(_executor.scheduleAtFixedRate(new PingTask(), interval, interval, TimeUnit.SECONDS)); + _futures.add(s_executor.scheduleAtFixedRate(new PingTask(), interval, interval, TimeUnit.SECONDS)); } } else { Command[] cmds = req.getCommands(); if (cmds.length > 0 && !(cmds[0] instanceof CronCommand)) { - _executor.execute(new Task(req)); + s_executor.execute(new Task(req)); } else { CronCommand cmd = (CronCommand)cmds[0]; - _futures.add(_executor.scheduleAtFixedRate(new Task(req), cmd.getInterval(), cmd.getInterval(), TimeUnit.SECONDS)); + _futures.add(s_executor.scheduleAtFixedRate(new Task(req), cmd.getInterval(), cmd.getInterval(), TimeUnit.SECONDS)); } } } @@ -116,10 +117,20 @@ public class DirectAgentAttache extends AgentAttache { StartupAnswer startup = (StartupAnswer)answers[0]; int interval = startup.getPingInterval(); s_logger.info("StartupAnswer received " + startup.getHostId() + " Interval = " + interval ); - _futures.add(_executor.scheduleAtFixedRate(new PingTask(), interval, interval, TimeUnit.SECONDS)); + _futures.add(s_executor.scheduleAtFixedRate(new PingTask(), interval, interval, TimeUnit.SECONDS)); } } + @Override + protected void finalize() { + assert _resource == null : "Come on now....If you're going to dabble in agent code, you better know how to close out our resources. Ever considered why there's a method called disconnect()?"; + synchronized(this) { + if (_resource != null) { + disconnect(Status.Alert); + } + } + } + protected class PingTask implements Runnable { @Override public synchronized void run() { diff --git a/utils/src/com/cloud/utils/StringUtils.java b/utils/src/com/cloud/utils/StringUtils.java index 8a91608b1dd..ab50829de64 100644 --- a/utils/src/com/cloud/utils/StringUtils.java +++ b/utils/src/com/cloud/utils/StringUtils.java @@ -28,7 +28,7 @@ public class StringUtils { public static String join(Iterable iterable, String delim) { StringBuilder sb = new StringBuilder(); if (iterable != null) { - Iterator iter = iterable.iterator(); + Iterator iter = iterable.iterator(); if (iter.hasNext()) { Object next = iter.next(); sb.append(next.toString());