Added code to prevent two management servers

This commit is contained in:
Alex Huang 2011-05-04 15:19:43 -07:00
parent 4128c82018
commit 0008dfc3a9
6 changed files with 41 additions and 20 deletions

View File

@ -57,7 +57,7 @@ import com.cloud.utils.concurrency.NamedThreadFactory;
public abstract class AgentAttache {
private static final Logger s_logger = Logger.getLogger(AgentAttache.class);
private static final ScheduledExecutorService s_executor = Executors.newScheduledThreadPool(10, new NamedThreadFactory("ListenerTimer"));
private static final ScheduledExecutorService s_listenerExecutor = Executors.newScheduledThreadPool(10, new NamedThreadFactory("ListenerTimer"));
protected static final Comparator<Request> s_reqComparator =
new Comparator<Request>() {
@ -202,7 +202,7 @@ public abstract class AgentAttache {
s_logger.trace(log(seq, "Registering listener"));
}
if (listener.getTimeout() != -1) {
s_executor.schedule(new Alarm(seq), listener.getTimeout(), TimeUnit.SECONDS);
s_listenerExecutor.schedule(new Alarm(seq), listener.getTimeout(), TimeUnit.SECONDS);
}
_waitForList.put(seq, listener);
}

View File

@ -538,6 +538,14 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, ResourceS
long id = server.getId();
AgentAttache attache = createAttache(id, server, resource);
if (attache.isReady()) {
StartupAnswer[] answers = new StartupAnswer[startup.length];
for (int i = 0; i < answers.length; i++) {
answers[i] = new StartupAnswer(startup[i], attache.getId(), _pingInterval);
}
attache.process(answers);
}
attache = notifyMonitorsOfConnection(attache, startup);
@ -1713,14 +1721,6 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, ResourceS
resource.disconnected();
return null;
}
if (attache.isReady()) {
StartupAnswer[] answers = new StartupAnswer[cmds.length];
for (int i = 0; i < answers.length; i++) {
answers[i] = new StartupAnswer(cmds[i], attache.getId(), _pingInterval);
}
attache.process(answers);
}
return attache;
}

View File

@ -124,7 +124,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
}
}
}
}
if(hosts != null && hosts.size() > 0) {
for(HostVO host: hosts) {
AgentAttache agentattache = findAttache(host.getId());

View File

@ -74,8 +74,18 @@ public class ConnectedAgentAttache extends AgentAttache {
ConnectedAgentAttache that = (ConnectedAgentAttache) obj;
return super.equals(obj) && this._link == that._link && this._link != null;
} catch (ClassCastException e) {
assert false : "Who's sending an " + obj.getClass().getSimpleName() + " to AgentAttache.equals()? ";
assert false : "Who's sending an " + obj.getClass().getSimpleName() + " to " + this.getClass().getSimpleName() + ".equals()? ";
return false;
}
}
}
@Override
public void finalize() {
assert _link == null : "Duh...Says you....Forgot to call disconnect()!";
synchronized(this) {
if (_link != null) {
disconnect(Status.Alert);
}
}
}
}

View File

@ -42,8 +42,9 @@ import com.cloud.utils.concurrency.NamedThreadFactory;
public class DirectAgentAttache extends AgentAttache {
private final static Logger s_logger = Logger.getLogger(DirectAgentAttache.class);
static ScheduledExecutorService s_executor = Executors.newScheduledThreadPool(100, new NamedThreadFactory("DirectAgent"));
ServerResource _resource;
static ScheduledExecutorService _executor = Executors.newScheduledThreadPool(100, new NamedThreadFactory("DirectAgent"));
List<ScheduledFuture<?>> _futures = new ArrayList<ScheduledFuture<?>>();
AgentManagerImpl _mgr;
long _seq = 0;
@ -77,7 +78,7 @@ public class DirectAgentAttache extends AgentAttache {
if (!(obj instanceof DirectAgentAttache)) {
return false;
}
return super.equals(obj) && _executor == ((DirectAgentAttache)obj)._executor;
return super.equals(obj);
}
@Override
@ -97,15 +98,15 @@ public class DirectAgentAttache extends AgentAttache {
if (answers != null && answers[0] instanceof StartupAnswer) {
StartupAnswer startup = (StartupAnswer)answers[0];
int interval = startup.getPingInterval();
_futures.add(_executor.scheduleAtFixedRate(new PingTask(), interval, interval, TimeUnit.SECONDS));
_futures.add(s_executor.scheduleAtFixedRate(new PingTask(), interval, interval, TimeUnit.SECONDS));
}
} else {
Command[] cmds = req.getCommands();
if (cmds.length > 0 && !(cmds[0] instanceof CronCommand)) {
_executor.execute(new Task(req));
s_executor.execute(new Task(req));
} else {
CronCommand cmd = (CronCommand)cmds[0];
_futures.add(_executor.scheduleAtFixedRate(new Task(req), cmd.getInterval(), cmd.getInterval(), TimeUnit.SECONDS));
_futures.add(s_executor.scheduleAtFixedRate(new Task(req), cmd.getInterval(), cmd.getInterval(), TimeUnit.SECONDS));
}
}
}
@ -116,10 +117,20 @@ public class DirectAgentAttache extends AgentAttache {
StartupAnswer startup = (StartupAnswer)answers[0];
int interval = startup.getPingInterval();
s_logger.info("StartupAnswer received " + startup.getHostId() + " Interval = " + interval );
_futures.add(_executor.scheduleAtFixedRate(new PingTask(), interval, interval, TimeUnit.SECONDS));
_futures.add(s_executor.scheduleAtFixedRate(new PingTask(), interval, interval, TimeUnit.SECONDS));
}
}
@Override
protected void finalize() {
assert _resource == null : "Come on now....If you're going to dabble in agent code, you better know how to close out our resources. Ever considered why there's a method called disconnect()?";
synchronized(this) {
if (_resource != null) {
disconnect(Status.Alert);
}
}
}
protected class PingTask implements Runnable {
@Override
public synchronized void run() {

View File

@ -28,7 +28,7 @@ public class StringUtils {
public static String join(Iterable<? extends Object> iterable, String delim) {
StringBuilder sb = new StringBuilder();
if (iterable != null) {
Iterator iter = iterable.iterator();
Iterator<? extends Object> iter = iterable.iterator();
if (iter.hasNext()) {
Object next = iter.next();
sb.append(next.toString());