diff --git a/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java b/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java index ebf9366eb2c..f11f69f3c53 100755 --- a/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java +++ b/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java @@ -30,7 +30,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.cloudstack.managed.context.ManagedContextRunnable; import org.apache.log4j.Logger; @@ -106,7 +105,6 @@ public abstract class AgentAttache { protected Status _status = Status.Connecting; protected boolean _maintenance; protected long _nextSequence; - protected AtomicInteger _outstandingTaskCount; protected AgentManagerImpl _agentMgr; @@ -129,7 +127,6 @@ public abstract class AgentAttache { _requests = new LinkedList(); _agentMgr = agentMgr; _nextSequence = new Long(s_rand.nextInt(Short.MAX_VALUE)) << 48; - _outstandingTaskCount = new AtomicInteger(0); } public synchronized long getNextSequence() { diff --git a/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java b/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java index c9a35dcced8..c0a87adcf25 100755 --- a/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java +++ b/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java @@ -162,6 +162,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl protected ExecutorService _executor; protected ThreadPoolExecutor _connectExecutor; protected ScheduledExecutorService _directAgentExecutor; + protected ScheduledExecutorService _cronJobExecutor; protected ScheduledExecutorService _monitorExecutor; private int _directAgentThreadCap; @@ -219,7 +220,10 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl _connection = new NioServer("AgentManager", Port.value(), Workers.value() + 10, this); s_logger.info("Listening on " + Port.value() + " with " + Workers.value() + " workers"); + // executes all agent commands other than cron and ping _directAgentExecutor = new ScheduledThreadPoolExecutor(DirectAgentPoolSize.value(), new NamedThreadFactory("DirectAgent")); + // executes cron and ping agent commands + _cronJobExecutor = new ScheduledThreadPoolExecutor(DirectAgentPoolSize.value(), new NamedThreadFactory("DirectAgentCronJob")); s_logger.debug("Created DirectAgentAttache pool with size: " + DirectAgentPoolSize.value()); _directAgentThreadCap = Math.round(DirectAgentPoolSize.value() * DirectAgentThreadCap.value()) + 1; // add 1 to always make the value > 0 @@ -1452,6 +1456,10 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl return _directAgentExecutor; } + public ScheduledExecutorService getCronJobPool() { + return _cronJobExecutor; + } + public int getDirectAgentThreadCap() { return _directAgentThreadCap; } diff --git a/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java b/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java index 9874ee41932..7ca6929686a 100755 --- a/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java +++ b/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java @@ -17,12 +17,13 @@ package com.cloud.agent.manager; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.log4j.Logger; - import org.apache.cloudstack.managed.context.ManagedContextRunnable; import com.cloud.agent.api.Answer; @@ -43,11 +44,16 @@ public class DirectAgentAttache extends AgentAttache { List> _futures = new ArrayList>(); AgentManagerImpl _mgr; long _seq = 0; + LinkedList tasks = new LinkedList(); + AtomicInteger _outstandingTaskCount; + AtomicInteger _outstandingCronTaskCount; public DirectAgentAttache(AgentManagerImpl agentMgr, long id, String name, ServerResource resource, boolean maintenance, AgentManagerImpl mgr) { super(agentMgr, id, name, maintenance); _resource = resource; _mgr = mgr; + _outstandingTaskCount = new AtomicInteger(0); + _outstandingCronTaskCount = new AtomicInteger(0); } @Override @@ -90,15 +96,16 @@ public class DirectAgentAttache extends AgentAttache { if (answers != null && answers[0] instanceof StartupAnswer) { StartupAnswer startup = (StartupAnswer)answers[0]; int interval = startup.getPingInterval(); - _futures.add(_agentMgr.getDirectAgentPool().scheduleAtFixedRate(new PingTask(), interval, interval, TimeUnit.SECONDS)); + _futures.add(_agentMgr.getCronJobPool().scheduleAtFixedRate(new PingTask(), interval, interval, TimeUnit.SECONDS)); } } else { Command[] cmds = req.getCommands(); if (cmds.length > 0 && !(cmds[0] instanceof CronCommand)) { - _agentMgr.getDirectAgentPool().execute(new Task(req)); + queueTask(new Task(req)); + scheduleFromQueue(); } else { CronCommand cmd = (CronCommand)cmds[0]; - _futures.add(_agentMgr.getDirectAgentPool().scheduleAtFixedRate(new Task(req), cmd.getInterval(), cmd.getInterval(), TimeUnit.SECONDS)); + _futures.add(_agentMgr.getCronJobPool().scheduleAtFixedRate(new CronTask(req), cmd.getInterval(), cmd.getInterval(), TimeUnit.SECONDS)); } } } @@ -109,7 +116,7 @@ public class DirectAgentAttache extends AgentAttache { StartupAnswer startup = (StartupAnswer)answers[0]; int interval = startup.getPingInterval(); s_logger.info("StartupAnswer received " + startup.getHostId() + " Interval = " + interval); - _futures.add(_agentMgr.getDirectAgentPool().scheduleAtFixedRate(new PingTask(), interval, interval, TimeUnit.SECONDS)); + _futures.add(_agentMgr.getCronJobPool().scheduleAtFixedRate(new PingTask(), interval, interval, TimeUnit.SECONDS)); } } @@ -128,13 +135,26 @@ public class DirectAgentAttache extends AgentAttache { } } + private synchronized void queueTask(Task task) { + tasks.add(task); + } + + private synchronized void scheduleFromQueue() { + if (s_logger.isTraceEnabled()) { + s_logger.trace("Agent attache=" + _id + ", task queue size=" + tasks.size() + ", outstanding tasks=" + _outstandingTaskCount.get()); + } + while (!tasks.isEmpty() && _outstandingTaskCount.get() < _agentMgr.getDirectAgentThreadCap()) { + _outstandingTaskCount.incrementAndGet(); + _agentMgr.getDirectAgentPool().execute(tasks.remove()); + } + } + protected class PingTask extends ManagedContextRunnable { @Override protected synchronized void runInContext() { try { - if (_outstandingTaskCount.incrementAndGet() > _agentMgr.getDirectAgentThreadCap()) { - s_logger.warn("Task execution for direct attache(" + _id + ") has reached maximum outstanding limit(" + _agentMgr.getDirectAgentThreadCap() + - "), bailing out"); + if (_outstandingCronTaskCount.incrementAndGet() >= _agentMgr.getDirectAgentThreadCap()) { + s_logger.warn("PingTask execution for direct attache(" + _id + ") has reached maximum outstanding limit(" + _agentMgr.getDirectAgentThreadCap() + "), bailing out"); return; } @@ -162,15 +182,15 @@ public class DirectAgentAttache extends AgentAttache { } catch (Exception e) { s_logger.warn("Unable to complete the ping task", e); } finally { - _outstandingTaskCount.decrementAndGet(); + _outstandingCronTaskCount.decrementAndGet(); } } } - protected class Task extends ManagedContextRunnable { + protected class CronTask extends ManagedContextRunnable { Request _req; - public Task(Request req) { + public CronTask(Request req) { _req = req; } @@ -194,9 +214,8 @@ public class DirectAgentAttache extends AgentAttache { protected void runInContext() { long seq = _req.getSequence(); try { - if (_outstandingTaskCount.incrementAndGet() > _agentMgr.getDirectAgentThreadCap()) { - s_logger.warn("Task execution for direct attache(" + _id + ") has reached maximum outstanding limit(" + _agentMgr.getDirectAgentThreadCap() + - "), bailing out"); + if (_outstandingCronTaskCount.incrementAndGet() >= _agentMgr.getDirectAgentThreadCap()) { + s_logger.warn("CronTask execution for direct attache(" + _id + ") has reached maximum outstanding limit(" + _agentMgr.getDirectAgentThreadCap() + "), bailing out"); bailout(); return; } @@ -243,9 +262,67 @@ public class DirectAgentAttache extends AgentAttache { } catch (Exception e) { s_logger.warn(log(seq, "Exception caught "), e); } finally { - _outstandingTaskCount.decrementAndGet(); + _outstandingCronTaskCount.decrementAndGet(); } } } + protected class Task extends ManagedContextRunnable { + Request _req; + + public Task(Request req) { + _req = req; + } + + @Override + protected void runInContext() { + long seq = _req.getSequence(); + try { + ServerResource resource = _resource; + Command[] cmds = _req.getCommands(); + boolean stopOnError = _req.stopOnError(); + + if (s_logger.isDebugEnabled()) { + s_logger.debug(log(seq, "Executing request")); + } + ArrayList answers = new ArrayList(cmds.length); + for (int i = 0; i < cmds.length; i++) { + Answer answer = null; + try { + if (resource != null) { + answer = resource.executeRequest(cmds[i]); + if (answer == null) { + s_logger.warn("Resource returned null answer!"); + answer = new Answer(cmds[i], false, "Resource returned null answer"); + } + } else { + answer = new Answer(cmds[i], false, "Agent is disconnected"); + } + } catch (Exception e) { + s_logger.warn(log(seq, "Exception Caught while executing command"), e); + answer = new Answer(cmds[i], false, e.toString()); + } + answers.add(answer); + if (!answer.getResult() && stopOnError) { + if (i < cmds.length - 1 && s_logger.isDebugEnabled()) { + s_logger.debug(log(seq, "Cancelling because one of the answers is false and it is stop on error.")); + } + break; + } + } + + Response resp = new Response(_req, answers.toArray(new Answer[answers.size()])); + if (s_logger.isDebugEnabled()) { + s_logger.debug(log(seq, "Response Received: ")); + } + + processAnswers(seq, resp); + } catch (Exception e) { + s_logger.warn(log(seq, "Exception caught "), e); + } finally { + _outstandingTaskCount.decrementAndGet(); + scheduleFromQueue(); + } + } + } }