diff --git a/agent/src/com/cloud/agent/Agent.java b/agent/src/com/cloud/agent/Agent.java index 734639476e9..a3a0c368f2b 100755 --- a/agent/src/com/cloud/agent/Agent.java +++ b/agent/src/com/cloud/agent/Agent.java @@ -29,6 +29,8 @@ import java.util.Map; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -124,6 +126,7 @@ public class Agent implements HandlerFactory, IAgentControl { boolean _reconnectAllowed = true; //For time sentitive task, e.g. PingTask private ThreadPoolExecutor _ugentTaskPool; + ExecutorService _executor; // for simulator use only public Agent(IAgentShell shell) { @@ -137,6 +140,8 @@ public class Agent implements HandlerFactory, IAgentControl { _ugentTaskPool = new ThreadPoolExecutor(shell.getPingRetries(), 2 * shell.getPingRetries(), 10, TimeUnit.MINUTES, new SynchronousQueue(), new NamedThreadFactory("UgentTask") ); + + _executor = new ThreadPoolExecutor(_shell.getWorkers(), 5 * _shell.getWorkers(), 1, TimeUnit.DAYS, new LinkedBlockingQueue(), new NamedThreadFactory("agentRequest-Handler")); } public Agent(IAgentShell shell, int localAgentId, ServerResource resource) throws ConfigurationException { @@ -172,6 +177,8 @@ public class Agent implements HandlerFactory, IAgentControl { new SynchronousQueue(), new NamedThreadFactory("UgentTask") ); + _executor = new ThreadPoolExecutor(_shell.getWorkers(), 5 * _shell.getWorkers(), 1, TimeUnit.DAYS, new LinkedBlockingQueue(), new NamedThreadFactory("agentRequest-Handler")); + s_logger.info("Agent [id = " + (_id != null ? _id : "new") + " : type = " + getResourceName() + " : zone = " + _shell.getZone() + " : pod = " + _shell.getPod() + " : workers = " + _shell.getWorkers() + " : host = " + _shell.getHost() + " : port = " + _shell.getPort()); } @@ -829,6 +836,20 @@ public class Agent implements HandlerFactory, IAgentControl { } } } + + public class AgentRequestHandler extends Task { + public AgentRequestHandler(Task.Type type, Link link, Request req) { + super(type, link, req); + } + + @Override + protected void doTask(Task task) throws Exception { + Request req = (Request)this.get(); + if (!(req instanceof Response)) { + processRequest(req, task.getLink()); + } + } + } public class ServerHandler extends Task { public ServerHandler(Task.Type type, Link link, byte[] data) { @@ -850,9 +871,12 @@ public class Agent implements HandlerFactory, IAgentControl { try { request = Request.parse(task.getData()); if (request instanceof Response) { + //It's for pinganswer etc, should be processed immediately. processResponse((Response) request, task.getLink()); } else { - processRequest(request, task.getLink()); + //put the requests from mgt server into another thread pool, as the request may take a longer time to finish. Don't block the NIO main thread pool + //processRequest(request, task.getLink()); + _executor.execute(new AgentRequestHandler(this.getType(), this.getLink(), request)); } } catch (final ClassNotFoundException e) { s_logger.error("Unable to find this request "); diff --git a/server/src/com/cloud/ha/KVMFencer.java b/server/src/com/cloud/ha/KVMFencer.java index be2dabc6a72..25f9967bbde 100644 --- a/server/src/com/cloud/ha/KVMFencer.java +++ b/server/src/com/cloud/ha/KVMFencer.java @@ -84,7 +84,6 @@ public class KVMFencer implements FenceBuilder { List hosts = _hostDao.listByCluster(host.getClusterId()); FenceCommand fence = new FenceCommand(vm, host); - fence.setSeq(true); for (HostVO h : hosts) { if (h.getHypervisorType() == HypervisorType.KVM) {