CS-16592: process handleConnectedAgent in a separate thread pool

This commit is contained in:
Alena Prokharchyk 2012-10-24 09:54:38 -07:00
parent 3948d7d7c5
commit a5077968db
1 changed files with 43 additions and 14 deletions

View File

@ -219,6 +219,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager {
protected AgentMonitor _monitor = null;
protected ExecutorService _executor;
protected ThreadPoolExecutor _connectExecutor;
protected StateMachine2<Status, Status.Event, Host> _statusStateMachine = Status.getStateMachine();
@ -274,7 +275,12 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager {
registerForHostEvents(_monitor, true, true, false);
_executor = new ThreadPoolExecutor(threads, threads, 60l, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("AgentTaskPool"));
_connectExecutor = new ThreadPoolExecutor(100, 500, 60l, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("AgentConnectTaskPool"));
//allow core threads to time out even when there are no items in the queue
_connectExecutor.allowCoreThreadTimeOut(true);
_connection = new NioServer("AgentManager", _port, workers + 10, this);
s_logger.info("Listening on " + _port + " with " + workers + " workers");
@ -828,6 +834,8 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager {
}
}
}
_connectExecutor.shutdownNow();
return true;
}
@ -1206,6 +1214,35 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager {
}
}
}
protected class HandleAgentConnectTask implements Runnable {
Link _link;
Command[] _cmds;
Request _request;
HandleAgentConnectTask(Link link, final Command[] cmds, final Request request) {
_link = link;
_cmds = cmds;
_request = request;
}
@Override
public void run() {
_request.logD("Processing the first command ");
StartupCommand[] startups = new StartupCommand[_cmds.length];
for (int i = 0; i < _cmds.length; i++) {
startups[i] = (StartupCommand) _cmds[i];
}
AgentAttache attache = handleConnectedAgent(_link, startups, _request);
if (attache == null) {
s_logger.warn("Unable to create attache for agent: " + _request);
}
}
}
protected void connectAgent(Link link, final Command[] cmds, final Request request) {
_connectExecutor.execute(new HandleAgentConnectTask(link, cmds, request));
}
public class AgentHandler extends Task {
public AgentHandler(Task.Type type, Link link, byte[] data) {
@ -1218,21 +1255,13 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager {
Command cmd = cmds[0];
boolean logD = true;
Response response = null;
if (attache == null) {
request.logD("Processing the first command ");
if (!(cmd instanceof StartupCommand)) {
s_logger.warn("Throwing away a request because it came through as the first command on a connect: " + request);
return;
}
StartupCommand[] startups = new StartupCommand[cmds.length];
for (int i = 0; i < cmds.length; i++) {
startups[i] = (StartupCommand) cmds[i];
}
attache = handleConnectedAgent(link, startups, request);
if (attache == null) {
s_logger.warn("Unable to create attache for agent: " + request);
} else {
//submit the task for execution
request.logD("Scheduling the first command ");
connectAgent(link, cmds, request);
}
return;
}
@ -1331,7 +1360,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager {
answers[i] = answer;
}
response = new Response(request, answers, _nodeId, attache.getId());
Response response = new Response(request, answers, _nodeId, attache.getId());
if (s_logger.isDebugEnabled()) {
if (logD) {
s_logger.debug("SeqA " + attache.getId() + "-" + response.getSequence() + ": Sending " + response);