diff --git a/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java b/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java index ff35255c7db..9c87812dcc9 100755 --- a/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java +++ b/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java @@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.cloudstack.managed.context.ManagedContextRunnable; import org.apache.log4j.Logger; @@ -107,7 +108,8 @@ public abstract class AgentAttache { protected Long _currentSequence; protected Status _status = Status.Connecting; protected boolean _maintenance; - protected long _nextSequence; + protected long _nextSequence; + protected AtomicInteger _outstandingTaskCount; protected AgentManagerImpl _agentMgr; @@ -131,6 +133,7 @@ public abstract class AgentAttache { _requests = new LinkedList(); _agentMgr = agentMgr; _nextSequence = s_rand.nextInt(Short.MAX_VALUE) << 48; + _outstandingTaskCount = new AtomicInteger(0); } public synchronized long getNextSequence() { diff --git a/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java b/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java index 3e684cc9fd4..39d470213a9 100755 --- a/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java +++ b/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java @@ -159,24 +159,28 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl protected ScheduledExecutorService _directAgentExecutor; protected ScheduledExecutorService _monitorExecutor; + private int _directAgentThreadCap; + protected StateMachine2 _statusStateMachine = Status.getStateMachine(); private final Map _pingMap = new ConcurrentHashMap(10007); @Inject ResourceManager _resourceMgr; - protected final ConfigKey Workers = new ConfigKey(Integer.class, "workers", "Advance", "5", + protected final ConfigKey Workers = new ConfigKey(Integer.class, "workers", "Advanced", "5", "Number of worker threads handling remote agent connections.", false); - protected final ConfigKey Port = new ConfigKey(Integer.class, "port", "Advance", "8250", "Port to listen on for remote agent connections.", false); - protected final ConfigKey PingInterval = new ConfigKey(Integer.class, "ping.interval", "Advance", "60", + protected final ConfigKey Port = new ConfigKey(Integer.class, "port", "Advanced", "8250", "Port to listen on for remote agent connections.", false); + protected final ConfigKey PingInterval = new ConfigKey(Integer.class, "ping.interval", "Advanced", "60", "Interval to send application level pings to make sure the connection is still working", false); - protected final ConfigKey PingTimeout = new ConfigKey(Float.class, "ping.timeout", "Advance", "2.5", + protected final ConfigKey PingTimeout = new ConfigKey(Float.class, "ping.timeout", "Advanced", "2.5", "Multiplier to ping.interval before announcing an agent has timed out", true); - protected final ConfigKey AlertWait = new ConfigKey(Integer.class, "alert.wait", "Advance", "1800", + protected final ConfigKey AlertWait = new ConfigKey(Integer.class, "alert.wait", "Advanced", "1800", "Seconds to wait before alerting on a disconnected agent", true); - protected final ConfigKey DirectAgentLoadSize = new ConfigKey(Integer.class, "direct.agent.load.size", "Advance", "16", + protected final ConfigKey DirectAgentLoadSize = new ConfigKey(Integer.class, "direct.agent.load.size", "Advanced", "16", "The number of direct agents to load each time", false); - protected final ConfigKey DirectAgentPoolSize = new ConfigKey(Integer.class, "direct.agent.pool.size", "Advance", "500", + protected final ConfigKey DirectAgentPoolSize = new ConfigKey(Integer.class, "direct.agent.pool.size", "Advanced", "500", "Default size for DirectAgentPool", false); + protected final ConfigKey DirectAgentThreadCap = new ConfigKey(Float.class, "direct.agent.thread.cap", "Advanced", "0.1", + "Percentage (as a value between 0 and 1) of direct.agent.pool.size to be used as upper thread cap for a single direct agent to process requests", false); @Override public boolean configure(final String name, final Map params) throws ConfigurationException { @@ -202,10 +206,10 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl _connection = new NioServer("AgentManager", Port.value(), Workers.value() + 10, this); s_logger.info("Listening on " + Port.value() + " with " + Workers.value() + " workers"); - _directAgentExecutor = new ScheduledThreadPoolExecutor(DirectAgentPoolSize.value(), new NamedThreadFactory("DirectAgent")); s_logger.debug("Created DirectAgentAttache pool with size: " + DirectAgentPoolSize.value()); - + _directAgentThreadCap = Math.round(DirectAgentPoolSize.value() * DirectAgentThreadCap.value()) + 1; // add 1 to always make the value > 0 + _monitorExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AgentMonitor")); return true; @@ -1422,6 +1426,10 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl return _directAgentExecutor; } + public int getDirectAgentThreadCap() { + return _directAgentThreadCap; + } + public Long getAgentPingTime(long agentId) { return _pingMap.get(agentId); } @@ -1568,7 +1576,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl @Override public ConfigKey[] getConfigKeys() { - return new ConfigKey[] {Workers, Port, PingInterval, PingTimeout, Wait, AlertWait, DirectAgentLoadSize, DirectAgentPoolSize}; + return new ConfigKey[] {Workers, Port, PingInterval, PingTimeout, Wait, AlertWait, DirectAgentLoadSize, DirectAgentPoolSize, DirectAgentThreadCap}; } } diff --git a/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java b/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java index 7d3f7659639..0b6a0116214 100755 --- a/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java +++ b/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java @@ -132,6 +132,11 @@ public class DirectAgentAttache extends AgentAttache { @Override protected synchronized void runInContext() { try { + if (_outstandingTaskCount.incrementAndGet() > _agentMgr.getDirectAgentThreadCap()) { + s_logger.warn("Task execution for direct attache(" + _id + ") has reached maximum outstanding limit(" + _agentMgr.getDirectAgentThreadCap() + "), bailing out"); + return; + } + ServerResource resource = _resource; if (resource != null) { @@ -156,6 +161,8 @@ public class DirectAgentAttache extends AgentAttache { } } catch (Exception e) { s_logger.warn("Unable to complete the ping task", e); + } finally { + _outstandingTaskCount.decrementAndGet(); } } } @@ -168,10 +175,32 @@ public class DirectAgentAttache extends AgentAttache { _req = req; } + private void bailout() { + long seq = _req.getSequence(); + try { + Command[] cmds = _req.getCommands(); + ArrayList answers = new ArrayList(cmds.length); + for (Command cmd : cmds) { + Answer answer = new Answer(cmd, false, "Bailed out as maximum oustanding task limit reached"); + answers.add(answer); + } + Response resp = new Response(_req, answers.toArray(new Answer[answers.size()])); + processAnswers(seq, resp); + } catch (Exception e) { + s_logger.warn(log(seq, "Exception caught in bailout "), e); + } + } + @Override protected void runInContext() { long seq = _req.getSequence(); try { + if (_outstandingTaskCount.incrementAndGet() > _agentMgr.getDirectAgentThreadCap()) { + s_logger.warn("Task execution for direct attache(" + _id + ") has reached maximum outstanding limit(" + _agentMgr.getDirectAgentThreadCap() + "), bailing out"); + bailout(); + return; + } + ServerResource resource = _resource; Command[] cmds = _req.getCommands(); boolean stopOnError = _req.stopOnError(); @@ -186,7 +215,7 @@ public class DirectAgentAttache extends AgentAttache { if (resource != null) { answer = resource.executeRequest(cmds[i]); if(answer == null) { - s_logger.warn("Resource returned null answer!"); + s_logger.warn("Resource returned null answer!"); answer = new Answer(cmds[i], false, "Resource returned null answer"); } } else { @@ -213,6 +242,8 @@ public class DirectAgentAttache extends AgentAttache { processAnswers(seq, resp); } catch (Exception e) { s_logger.warn(log(seq, "Exception caught "), e); + } finally { + _outstandingTaskCount.decrementAndGet(); } } }