mirror of https://github.com/apache/cloudstack.git
Agent disconnected due to so many Fencecommand send to agent, cause agent can't get pingaswer from mgt server, then agent thinks it's ping timeout, reconnect to mgt server.
The fix is add a separate thread pool for all the reqests from mgt server(which may be time-consuming), so the pingaswer(which is response) is processed in another different thread pool. Make sure the pinganswer is get processed immeidately. Reviewed-by: Alex
This commit is contained in:
parent
d54175234a
commit
4eab8c4dbd
|
|
@ -29,6 +29,8 @@ import java.util.Map;
|
|||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
|
@ -124,6 +126,7 @@ public class Agent implements HandlerFactory, IAgentControl {
|
|||
boolean _reconnectAllowed = true;
|
||||
//For time sentitive task, e.g. PingTask
|
||||
private ThreadPoolExecutor _ugentTaskPool;
|
||||
ExecutorService _executor;
|
||||
|
||||
// for simulator use only
|
||||
public Agent(IAgentShell shell) {
|
||||
|
|
@ -137,6 +140,8 @@ public class Agent implements HandlerFactory, IAgentControl {
|
|||
_ugentTaskPool = new ThreadPoolExecutor(shell.getPingRetries(), 2 * shell.getPingRetries(), 10, TimeUnit.MINUTES,
|
||||
new SynchronousQueue<Runnable>(), new NamedThreadFactory("UgentTask")
|
||||
);
|
||||
|
||||
_executor = new ThreadPoolExecutor(_shell.getWorkers(), 5 * _shell.getWorkers(), 1, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("agentRequest-Handler"));
|
||||
}
|
||||
|
||||
public Agent(IAgentShell shell, int localAgentId, ServerResource resource) throws ConfigurationException {
|
||||
|
|
@ -172,6 +177,8 @@ public class Agent implements HandlerFactory, IAgentControl {
|
|||
new SynchronousQueue<Runnable>(), new NamedThreadFactory("UgentTask")
|
||||
);
|
||||
|
||||
_executor = new ThreadPoolExecutor(_shell.getWorkers(), 5 * _shell.getWorkers(), 1, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("agentRequest-Handler"));
|
||||
|
||||
s_logger.info("Agent [id = " + (_id != null ? _id : "new") + " : type = " + getResourceName() + " : zone = " + _shell.getZone() + " : pod = " + _shell.getPod() + " : workers = "
|
||||
+ _shell.getWorkers() + " : host = " + _shell.getHost() + " : port = " + _shell.getPort());
|
||||
}
|
||||
|
|
@ -829,6 +836,20 @@ public class Agent implements HandlerFactory, IAgentControl {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
public class AgentRequestHandler extends Task {
|
||||
public AgentRequestHandler(Task.Type type, Link link, Request req) {
|
||||
super(type, link, req);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doTask(Task task) throws Exception {
|
||||
Request req = (Request)this.get();
|
||||
if (!(req instanceof Response)) {
|
||||
processRequest(req, task.getLink());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public class ServerHandler extends Task {
|
||||
public ServerHandler(Task.Type type, Link link, byte[] data) {
|
||||
|
|
@ -850,9 +871,12 @@ public class Agent implements HandlerFactory, IAgentControl {
|
|||
try {
|
||||
request = Request.parse(task.getData());
|
||||
if (request instanceof Response) {
|
||||
//It's for pinganswer etc, should be processed immediately.
|
||||
processResponse((Response) request, task.getLink());
|
||||
} else {
|
||||
processRequest(request, task.getLink());
|
||||
//put the requests from mgt server into another thread pool, as the request may take a longer time to finish. Don't block the NIO main thread pool
|
||||
//processRequest(request, task.getLink());
|
||||
_executor.execute(new AgentRequestHandler(this.getType(), this.getLink(), request));
|
||||
}
|
||||
} catch (final ClassNotFoundException e) {
|
||||
s_logger.error("Unable to find this request ");
|
||||
|
|
|
|||
|
|
@ -84,7 +84,6 @@ public class KVMFencer implements FenceBuilder {
|
|||
|
||||
List<HostVO> hosts = _hostDao.listByCluster(host.getClusterId());
|
||||
FenceCommand fence = new FenceCommand(vm, host);
|
||||
fence.setSeq(true);
|
||||
|
||||
for (HostVO h : hosts) {
|
||||
if (h.getHypervisorType() == HypervisorType.KVM) {
|
||||
|
|
|
|||
Loading…
Reference in New Issue