From 269a4ef11ee151fa408a7dd1f2e69cd1f7f05191 Mon Sep 17 00:00:00 2001 From: Koushik Das Date: Wed, 30 Oct 2013 15:32:01 +0530 Subject: [PATCH] CLOUDSTACK-4855: Throttle based on the # of outstanding requests to the directly managed HV host (direct agents) Cloudstack sends requests to directly managed HV hosts (direct agents) using the direct agent thread pool. The size of the pool is determined by global config direct.agent.pool.size defaulted to 500. Currently there is no restriction on the number of threads a direct agent can use from this shared thread pool to send requests to the host. This is fine as long as the host is responding to requests in a reasonable amount of time. But if there is a considerable delay in getting response, the thread remain blocked for that much time. As more commands are send to the slow host threads keep getting blocked. This can eventually lead to a situation where requests to healthy hosts cannot be processed as there are not enough free threads. The problem being addressed here is to localize the impact of few bad hosts, so that entire management server is not affected. One such way is to throttle based on the # of outstanding requests on per host basis. The outstanding requests to a host will be a % of direct agent pool size. This is configurable based on direct.agent.thread.cap. The default value is 0.1 or 10%, a value of 1 would mean the old behavior where there is no upper cap. This will ensure that the impacted host will be bound by a upper cap on the number of threads it can use to process requests and not the entire pool. --- .../com/cloud/agent/manager/AgentAttache.java | 5 ++- .../cloud/agent/manager/AgentManagerImpl.java | 28 ++++++++++------ .../agent/manager/DirectAgentAttache.java | 33 ++++++++++++++++++- 3 files changed, 54 insertions(+), 12 deletions(-) diff --git a/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java b/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java index ff35255c7db..9c87812dcc9 100755 --- a/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java +++ b/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java @@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.cloudstack.managed.context.ManagedContextRunnable; import org.apache.log4j.Logger; @@ -107,7 +108,8 @@ public abstract class AgentAttache { protected Long _currentSequence; protected Status _status = Status.Connecting; protected boolean _maintenance; - protected long _nextSequence; + protected long _nextSequence; + protected AtomicInteger _outstandingTaskCount; protected AgentManagerImpl _agentMgr; @@ -131,6 +133,7 @@ public abstract class AgentAttache { _requests = new LinkedList(); _agentMgr = agentMgr; _nextSequence = s_rand.nextInt(Short.MAX_VALUE) << 48; + _outstandingTaskCount = new AtomicInteger(0); } public synchronized long getNextSequence() { diff --git a/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java b/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java index 3e684cc9fd4..39d470213a9 100755 --- a/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java +++ b/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java @@ -159,24 +159,28 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl protected ScheduledExecutorService _directAgentExecutor; protected ScheduledExecutorService _monitorExecutor; + private int _directAgentThreadCap; + protected StateMachine2 _statusStateMachine = Status.getStateMachine(); private final Map _pingMap = new ConcurrentHashMap(10007); @Inject ResourceManager _resourceMgr; - protected final ConfigKey Workers = new ConfigKey(Integer.class, "workers", "Advance", "5", + protected final ConfigKey Workers = new ConfigKey(Integer.class, "workers", "Advanced", "5", "Number of worker threads handling remote agent connections.", false); - protected final ConfigKey Port = new ConfigKey(Integer.class, "port", "Advance", "8250", "Port to listen on for remote agent connections.", false); - protected final ConfigKey PingInterval = new ConfigKey(Integer.class, "ping.interval", "Advance", "60", + protected final ConfigKey Port = new ConfigKey(Integer.class, "port", "Advanced", "8250", "Port to listen on for remote agent connections.", false); + protected final ConfigKey PingInterval = new ConfigKey(Integer.class, "ping.interval", "Advanced", "60", "Interval to send application level pings to make sure the connection is still working", false); - protected final ConfigKey PingTimeout = new ConfigKey(Float.class, "ping.timeout", "Advance", "2.5", + protected final ConfigKey PingTimeout = new ConfigKey(Float.class, "ping.timeout", "Advanced", "2.5", "Multiplier to ping.interval before announcing an agent has timed out", true); - protected final ConfigKey AlertWait = new ConfigKey(Integer.class, "alert.wait", "Advance", "1800", + protected final ConfigKey AlertWait = new ConfigKey(Integer.class, "alert.wait", "Advanced", "1800", "Seconds to wait before alerting on a disconnected agent", true); - protected final ConfigKey DirectAgentLoadSize = new ConfigKey(Integer.class, "direct.agent.load.size", "Advance", "16", + protected final ConfigKey DirectAgentLoadSize = new ConfigKey(Integer.class, "direct.agent.load.size", "Advanced", "16", "The number of direct agents to load each time", false); - protected final ConfigKey DirectAgentPoolSize = new ConfigKey(Integer.class, "direct.agent.pool.size", "Advance", "500", + protected final ConfigKey DirectAgentPoolSize = new ConfigKey(Integer.class, "direct.agent.pool.size", "Advanced", "500", "Default size for DirectAgentPool", false); + protected final ConfigKey DirectAgentThreadCap = new ConfigKey(Float.class, "direct.agent.thread.cap", "Advanced", "0.1", + "Percentage (as a value between 0 and 1) of direct.agent.pool.size to be used as upper thread cap for a single direct agent to process requests", false); @Override public boolean configure(final String name, final Map params) throws ConfigurationException { @@ -202,10 +206,10 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl _connection = new NioServer("AgentManager", Port.value(), Workers.value() + 10, this); s_logger.info("Listening on " + Port.value() + " with " + Workers.value() + " workers"); - _directAgentExecutor = new ScheduledThreadPoolExecutor(DirectAgentPoolSize.value(), new NamedThreadFactory("DirectAgent")); s_logger.debug("Created DirectAgentAttache pool with size: " + DirectAgentPoolSize.value()); - + _directAgentThreadCap = Math.round(DirectAgentPoolSize.value() * DirectAgentThreadCap.value()) + 1; // add 1 to always make the value > 0 + _monitorExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AgentMonitor")); return true; @@ -1422,6 +1426,10 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl return _directAgentExecutor; } + public int getDirectAgentThreadCap() { + return _directAgentThreadCap; + } + public Long getAgentPingTime(long agentId) { return _pingMap.get(agentId); } @@ -1568,7 +1576,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl @Override public ConfigKey[] getConfigKeys() { - return new ConfigKey[] {Workers, Port, PingInterval, PingTimeout, Wait, AlertWait, DirectAgentLoadSize, DirectAgentPoolSize}; + return new ConfigKey[] {Workers, Port, PingInterval, PingTimeout, Wait, AlertWait, DirectAgentLoadSize, DirectAgentPoolSize, DirectAgentThreadCap}; } } diff --git a/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java b/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java index 7d3f7659639..0b6a0116214 100755 --- a/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java +++ b/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java @@ -132,6 +132,11 @@ public class DirectAgentAttache extends AgentAttache { @Override protected synchronized void runInContext() { try { + if (_outstandingTaskCount.incrementAndGet() > _agentMgr.getDirectAgentThreadCap()) { + s_logger.warn("Task execution for direct attache(" + _id + ") has reached maximum outstanding limit(" + _agentMgr.getDirectAgentThreadCap() + "), bailing out"); + return; + } + ServerResource resource = _resource; if (resource != null) { @@ -156,6 +161,8 @@ public class DirectAgentAttache extends AgentAttache { } } catch (Exception e) { s_logger.warn("Unable to complete the ping task", e); + } finally { + _outstandingTaskCount.decrementAndGet(); } } } @@ -168,10 +175,32 @@ public class DirectAgentAttache extends AgentAttache { _req = req; } + private void bailout() { + long seq = _req.getSequence(); + try { + Command[] cmds = _req.getCommands(); + ArrayList answers = new ArrayList(cmds.length); + for (Command cmd : cmds) { + Answer answer = new Answer(cmd, false, "Bailed out as maximum oustanding task limit reached"); + answers.add(answer); + } + Response resp = new Response(_req, answers.toArray(new Answer[answers.size()])); + processAnswers(seq, resp); + } catch (Exception e) { + s_logger.warn(log(seq, "Exception caught in bailout "), e); + } + } + @Override protected void runInContext() { long seq = _req.getSequence(); try { + if (_outstandingTaskCount.incrementAndGet() > _agentMgr.getDirectAgentThreadCap()) { + s_logger.warn("Task execution for direct attache(" + _id + ") has reached maximum outstanding limit(" + _agentMgr.getDirectAgentThreadCap() + "), bailing out"); + bailout(); + return; + } + ServerResource resource = _resource; Command[] cmds = _req.getCommands(); boolean stopOnError = _req.stopOnError(); @@ -186,7 +215,7 @@ public class DirectAgentAttache extends AgentAttache { if (resource != null) { answer = resource.executeRequest(cmds[i]); if(answer == null) { - s_logger.warn("Resource returned null answer!"); + s_logger.warn("Resource returned null answer!"); answer = new Answer(cmds[i], false, "Resource returned null answer"); } } else { @@ -213,6 +242,8 @@ public class DirectAgentAttache extends AgentAttache { processAnswers(seq, resp); } catch (Exception e) { s_logger.warn(log(seq, "Exception caught "), e); + } finally { + _outstandingTaskCount.decrementAndGet(); } } }