diff --git a/server/src/com/cloud/agent/manager/AgentManagerImpl.java b/server/src/com/cloud/agent/manager/AgentManagerImpl.java index 892e4055cfe..d7edd45fcd1 100755 --- a/server/src/com/cloud/agent/manager/AgentManagerImpl.java +++ b/server/src/com/cloud/agent/manager/AgentManagerImpl.java @@ -219,6 +219,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { protected AgentMonitor _monitor = null; protected ExecutorService _executor; + protected ThreadPoolExecutor _connectExecutor; protected StateMachine2 _statusStateMachine = Status.getStateMachine(); @@ -274,7 +275,12 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { registerForHostEvents(_monitor, true, true, false); _executor = new ThreadPoolExecutor(threads, threads, 60l, TimeUnit.SECONDS, new LinkedBlockingQueue(), new NamedThreadFactory("AgentTaskPool")); - + + _connectExecutor = new ThreadPoolExecutor(100, 500, 60l, TimeUnit.SECONDS, + new LinkedBlockingQueue(), new NamedThreadFactory("AgentConnectTaskPool")); + //allow core threads to time out even when there are no items in the queue + _connectExecutor.allowCoreThreadTimeOut(true); + _connection = new NioServer("AgentManager", _port, workers + 10, this); s_logger.info("Listening on " + _port + " with " + workers + " workers"); @@ -828,6 +834,8 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { } } } + + _connectExecutor.shutdownNow(); return true; } @@ -1206,6 +1214,35 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { } } } + + protected class HandleAgentConnectTask implements Runnable { + Link _link; + Command[] _cmds; + Request _request; + + HandleAgentConnectTask(Link link, final Command[] cmds, final Request request) { + _link = link; + _cmds = cmds; + _request = request; + } + + @Override + public void run() { + _request.logD("Processing the first command "); + StartupCommand[] startups = new StartupCommand[_cmds.length]; + for (int i = 0; i < _cmds.length; i++) { + startups[i] = (StartupCommand) _cmds[i]; + } + AgentAttache attache = handleConnectedAgent(_link, startups, _request); + if (attache == null) { + s_logger.warn("Unable to create attache for agent: " + _request); + } + } + } + + protected void connectAgent(Link link, final Command[] cmds, final Request request) { + _connectExecutor.execute(new HandleAgentConnectTask(link, cmds, request)); + } public class AgentHandler extends Task { public AgentHandler(Task.Type type, Link link, byte[] data) { @@ -1218,21 +1255,13 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { Command cmd = cmds[0]; boolean logD = true; - Response response = null; if (attache == null) { - request.logD("Processing the first command "); if (!(cmd instanceof StartupCommand)) { s_logger.warn("Throwing away a request because it came through as the first command on a connect: " + request); - return; - } - - StartupCommand[] startups = new StartupCommand[cmds.length]; - for (int i = 0; i < cmds.length; i++) { - startups[i] = (StartupCommand) cmds[i]; - } - attache = handleConnectedAgent(link, startups, request); - if (attache == null) { - s_logger.warn("Unable to create attache for agent: " + request); + } else { + //submit the task for execution + request.logD("Scheduling the first command "); + connectAgent(link, cmds, request); } return; } @@ -1331,7 +1360,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { answers[i] = answer; } - response = new Response(request, answers, _nodeId, attache.getId()); + Response response = new Response(request, answers, _nodeId, attache.getId()); if (s_logger.isDebugEnabled()) { if (logD) { s_logger.debug("SeqA " + attache.getId() + "-" + response.getSequence() + ": Sending " + response);