mirror of https://github.com/apache/cloudstack.git
879 lines
33 KiB
Java
879 lines
33 KiB
Java
// Licensed to the Apache Software Foundation (ASF) under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing,
|
|
// software distributed under the License is distributed on an
|
|
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
// KIND, either express or implied. See the License for the
|
|
// specific language governing permissions and limitations
|
|
// under the License.
|
|
package com.cloud.agent;
|
|
|
|
import java.io.IOException;
|
|
import java.io.PrintWriter;
|
|
import java.io.StringWriter;
|
|
import java.net.InetAddress;
|
|
import java.net.UnknownHostException;
|
|
import java.nio.channels.ClosedChannelException;
|
|
import java.util.ArrayList;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Timer;
|
|
import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
import java.util.concurrent.SynchronousQueue;
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
import javax.naming.ConfigurationException;
|
|
|
|
import org.apache.cloudstack.managed.context.ManagedContextTimerTask;
|
|
import org.apache.log4j.Logger;
|
|
import org.slf4j.MDC;
|
|
|
|
import com.cloud.agent.api.AgentControlAnswer;
|
|
import com.cloud.agent.api.AgentControlCommand;
|
|
import com.cloud.agent.api.Answer;
|
|
import com.cloud.agent.api.Command;
|
|
import com.cloud.agent.api.CronCommand;
|
|
import com.cloud.agent.api.MaintainAnswer;
|
|
import com.cloud.agent.api.MaintainCommand;
|
|
import com.cloud.agent.api.PingCommand;
|
|
import com.cloud.agent.api.ReadyCommand;
|
|
import com.cloud.agent.api.ShutdownCommand;
|
|
import com.cloud.agent.api.StartupAnswer;
|
|
import com.cloud.agent.api.StartupCommand;
|
|
import com.cloud.agent.transport.Request;
|
|
import com.cloud.agent.transport.Response;
|
|
import com.cloud.exception.AgentControlChannelException;
|
|
import com.cloud.resource.ServerResource;
|
|
import com.cloud.utils.PropertiesUtil;
|
|
import com.cloud.utils.backoff.BackoffAlgorithm;
|
|
import com.cloud.utils.concurrency.NamedThreadFactory;
|
|
import com.cloud.utils.exception.CloudRuntimeException;
|
|
import com.cloud.utils.exception.NioConnectionException;
|
|
import com.cloud.utils.exception.TaskExecutionException;
|
|
import com.cloud.utils.nio.HandlerFactory;
|
|
import com.cloud.utils.nio.Link;
|
|
import com.cloud.utils.nio.NioClient;
|
|
import com.cloud.utils.nio.NioConnection;
|
|
import com.cloud.utils.nio.Task;
|
|
import com.cloud.utils.script.OutputInterpreter;
|
|
import com.cloud.utils.script.Script;
|
|
|
|
/**
|
|
* @config
|
|
* {@table
|
|
* || Param Name | Description | Values | Default ||
|
|
* || type | Type of server | Storage / Computing / Routing | No Default ||
|
|
* || workers | # of workers to process the requests | int | 1 ||
|
|
* || host | host to connect to | ip address | localhost ||
|
|
* || port | port to connect to | port number | 8250 ||
|
|
* || instance | Used to allow multiple agents running on the same host | String | none || * }
|
|
*
|
|
* For more configuration options, see the individual types.
|
|
*
|
|
**/
|
|
public class Agent implements HandlerFactory, IAgentControl {
|
|
private static final Logger s_logger = Logger.getLogger(Agent.class.getName());
|
|
|
|
public enum ExitStatus {
|
|
Normal(0), // Normal status = 0.
|
|
Upgrade(65), // Exiting for upgrade.
|
|
Configuration(66), // Exiting due to configuration problems.
|
|
Error(67); // Exiting because of error.
|
|
|
|
int value;
|
|
|
|
ExitStatus(final int value) {
|
|
this.value = value;
|
|
}
|
|
|
|
public int value() {
|
|
return value;
|
|
}
|
|
}
|
|
|
|
List<IAgentControlListener> _controlListeners = new ArrayList<IAgentControlListener>();
|
|
|
|
IAgentShell _shell;
|
|
NioConnection _connection;
|
|
ServerResource _resource;
|
|
Link _link;
|
|
Long _id;
|
|
|
|
Timer _timer = new Timer("Agent Timer");
|
|
|
|
List<WatchTask> _watchList = new ArrayList<WatchTask>();
|
|
long _sequence = 0;
|
|
long _lastPingResponseTime = 0;
|
|
long _pingInterval = 0;
|
|
AtomicInteger _inProgress = new AtomicInteger();
|
|
|
|
StartupTask _startup = null;
|
|
long _startupWaitDefault = 180000;
|
|
long _startupWait = _startupWaitDefault;
|
|
boolean _reconnectAllowed = true;
|
|
//For time sentitive task, e.g. PingTask
|
|
private final ThreadPoolExecutor _ugentTaskPool;
|
|
ExecutorService _executor;
|
|
|
|
// for simulator use only
|
|
public Agent(final IAgentShell shell) {
|
|
_shell = shell;
|
|
_link = null;
|
|
|
|
_connection = new NioClient("Agent", _shell.getHost(), _shell.getPort(), _shell.getWorkers(), this);
|
|
|
|
Runtime.getRuntime().addShutdownHook(new ShutdownThread(this));
|
|
|
|
_ugentTaskPool =
|
|
new ThreadPoolExecutor(shell.getPingRetries(), 2 * shell.getPingRetries(), 10, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(), new NamedThreadFactory(
|
|
"UgentTask"));
|
|
|
|
_executor =
|
|
new ThreadPoolExecutor(_shell.getWorkers(), 5 * _shell.getWorkers(), 1, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(
|
|
"agentRequest-Handler"));
|
|
}
|
|
|
|
public Agent(final IAgentShell shell, final int localAgentId, final ServerResource resource) throws ConfigurationException {
|
|
_shell = shell;
|
|
_resource = resource;
|
|
_link = null;
|
|
|
|
resource.setAgentControl(this);
|
|
|
|
final String value = _shell.getPersistentProperty(getResourceName(), "id");
|
|
_id = value != null ? Long.parseLong(value) : null;
|
|
s_logger.info("id is " + (_id != null ? _id : ""));
|
|
|
|
final Map<String, Object> params = PropertiesUtil.toMap(_shell.getProperties());
|
|
|
|
// merge with properties from command line to let resource access command line parameters
|
|
for (final Map.Entry<String, Object> cmdLineProp : _shell.getCmdLineProperties().entrySet()) {
|
|
params.put(cmdLineProp.getKey(), cmdLineProp.getValue());
|
|
}
|
|
|
|
if (!_resource.configure(getResourceName(), params)) {
|
|
throw new ConfigurationException("Unable to configure " + _resource.getName());
|
|
}
|
|
|
|
_connection = new NioClient("Agent", _shell.getHost(), _shell.getPort(), _shell.getWorkers(), this);
|
|
|
|
// ((NioClient)_connection).setBindAddress(_shell.getPrivateIp());
|
|
|
|
s_logger.debug("Adding shutdown hook");
|
|
Runtime.getRuntime().addShutdownHook(new ShutdownThread(this));
|
|
|
|
_ugentTaskPool =
|
|
new ThreadPoolExecutor(shell.getPingRetries(), 2 * shell.getPingRetries(), 10, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(), new NamedThreadFactory(
|
|
"UgentTask"));
|
|
|
|
_executor =
|
|
new ThreadPoolExecutor(_shell.getWorkers(), 5 * _shell.getWorkers(), 1, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(
|
|
"agentRequest-Handler"));
|
|
|
|
s_logger.info("Agent [id = " + (_id != null ? _id : "new") + " : type = " + getResourceName() + " : zone = " + _shell.getZone() + " : pod = " + _shell.getPod() +
|
|
" : workers = " + _shell.getWorkers() + " : host = " + _shell.getHost() + " : port = " + _shell.getPort());
|
|
}
|
|
|
|
public String getVersion() {
|
|
return _shell.getVersion();
|
|
}
|
|
|
|
public String getResourceGuid() {
|
|
final String guid = _shell.getGuid();
|
|
return guid + "-" + getResourceName();
|
|
}
|
|
|
|
public String getZone() {
|
|
return _shell.getZone();
|
|
}
|
|
|
|
public String getPod() {
|
|
return _shell.getPod();
|
|
}
|
|
|
|
protected void setLink(final Link link) {
|
|
_link = link;
|
|
}
|
|
|
|
public ServerResource getResource() {
|
|
return _resource;
|
|
}
|
|
|
|
public BackoffAlgorithm getBackoffAlgorithm() {
|
|
return _shell.getBackoffAlgorithm();
|
|
}
|
|
|
|
public String getResourceName() {
|
|
return _resource.getClass().getSimpleName();
|
|
}
|
|
|
|
public void start() {
|
|
if (!_resource.start()) {
|
|
s_logger.error("Unable to start the resource: " + _resource.getName());
|
|
throw new CloudRuntimeException("Unable to start the resource: " + _resource.getName());
|
|
}
|
|
|
|
try {
|
|
_connection.start();
|
|
} catch (final NioConnectionException e) {
|
|
s_logger.warn("NIO Connection Exception " + e);
|
|
s_logger.info("Attempted to connect to the server, but received an unexpected exception, trying again...");
|
|
}
|
|
while (!_connection.isStartup()) {
|
|
_shell.getBackoffAlgorithm().waitBeforeRetry();
|
|
_connection = new NioClient("Agent", _shell.getHost(), _shell.getPort(), _shell.getWorkers(), this);
|
|
try {
|
|
_connection.start();
|
|
} catch (final NioConnectionException e) {
|
|
s_logger.warn("NIO Connection Exception " + e);
|
|
s_logger.info("Attempted to connect to the server, but received an unexpected exception, trying again...");
|
|
}
|
|
}
|
|
}
|
|
|
|
public void stop(final String reason, final String detail) {
|
|
s_logger.info("Stopping the agent: Reason = " + reason + (detail != null ? ": Detail = " + detail : ""));
|
|
if (_connection != null) {
|
|
final ShutdownCommand cmd = new ShutdownCommand(reason, detail);
|
|
try {
|
|
if (_link != null) {
|
|
final Request req = new Request(_id != null ? _id : -1, -1, cmd, false);
|
|
_link.send(req.toBytes());
|
|
}
|
|
} catch (final ClosedChannelException e) {
|
|
s_logger.warn("Unable to send: " + cmd.toString());
|
|
} catch (final Exception e) {
|
|
s_logger.warn("Unable to send: " + cmd.toString() + " due to exception: ", e);
|
|
}
|
|
s_logger.debug("Sending shutdown to management server");
|
|
try {
|
|
Thread.sleep(1000);
|
|
} catch (final InterruptedException e) {
|
|
s_logger.debug("Who the heck interrupted me here?");
|
|
}
|
|
_connection.stop();
|
|
_connection = null;
|
|
}
|
|
|
|
if (_resource != null) {
|
|
_resource.stop();
|
|
_resource = null;
|
|
}
|
|
|
|
_ugentTaskPool.shutdownNow();
|
|
}
|
|
|
|
public Long getId() {
|
|
return _id;
|
|
}
|
|
|
|
public void setId(final Long id) {
|
|
s_logger.info("Set agent id " + id);
|
|
_id = id;
|
|
_shell.setPersistentProperty(getResourceName(), "id", Long.toString(id));
|
|
}
|
|
|
|
public void scheduleWatch(final Link link, final Request request, final long delay, final long period) {
|
|
synchronized (_watchList) {
|
|
if (s_logger.isDebugEnabled()) {
|
|
s_logger.debug("Adding a watch list");
|
|
}
|
|
final WatchTask task = new WatchTask(link, request, this);
|
|
_timer.schedule(task, 0, period);
|
|
_watchList.add(task);
|
|
}
|
|
}
|
|
|
|
protected void cancelTasks() {
|
|
synchronized (_watchList) {
|
|
for (final WatchTask task : _watchList) {
|
|
task.cancel();
|
|
}
|
|
if (s_logger.isDebugEnabled()) {
|
|
s_logger.debug("Clearing watch list: " + _watchList.size());
|
|
}
|
|
_watchList.clear();
|
|
}
|
|
}
|
|
public synchronized void lockStartupTask(final Link link)
|
|
{
|
|
_startup = new StartupTask(link);
|
|
_timer.schedule(_startup, _startupWait);
|
|
}
|
|
|
|
public void sendStartup(final Link link) {
|
|
final StartupCommand[] startup = _resource.initialize();
|
|
if (startup != null) {
|
|
final Command[] commands = new Command[startup.length];
|
|
for (int i = 0; i < startup.length; i++) {
|
|
setupStartupCommand(startup[i]);
|
|
commands[i] = startup[i];
|
|
}
|
|
final Request request = new Request(_id != null ? _id : -1, -1, commands, false, false);
|
|
request.setSequence(getNextSequence());
|
|
|
|
if (s_logger.isDebugEnabled()) {
|
|
s_logger.debug("Sending Startup: " + request.toString());
|
|
}
|
|
lockStartupTask(link);
|
|
try {
|
|
link.send(request.toBytes());
|
|
} catch (final ClosedChannelException e) {
|
|
s_logger.warn("Unable to send reques: " + request.toString());
|
|
}
|
|
}
|
|
}
|
|
|
|
protected void setupStartupCommand(final StartupCommand startup) {
|
|
InetAddress addr;
|
|
try {
|
|
addr = InetAddress.getLocalHost();
|
|
} catch (final UnknownHostException e) {
|
|
s_logger.warn("unknow host? ", e);
|
|
throw new CloudRuntimeException("Cannot get local IP address");
|
|
}
|
|
|
|
final Script command = new Script("hostname", 500, s_logger);
|
|
final OutputInterpreter.OneLineParser parser = new OutputInterpreter.OneLineParser();
|
|
final String result = command.execute(parser);
|
|
final String hostname = result == null ? parser.getLine() : addr.toString();
|
|
|
|
startup.setId(getId());
|
|
if (startup.getName() == null) {
|
|
startup.setName(hostname);
|
|
}
|
|
startup.setDataCenter(getZone());
|
|
startup.setPod(getPod());
|
|
startup.setGuid(getResourceGuid());
|
|
startup.setResourceName(getResourceName());
|
|
startup.setVersion(getVersion());
|
|
}
|
|
|
|
@Override
|
|
public Task create(final Task.Type type, final Link link, final byte[] data) {
|
|
return new ServerHandler(type, link, data);
|
|
}
|
|
|
|
protected void reconnect(final Link link) {
|
|
if (!_reconnectAllowed) {
|
|
return;
|
|
}
|
|
synchronized (this) {
|
|
if (_startup != null) {
|
|
_startup.cancel();
|
|
_startup = null;
|
|
}
|
|
}
|
|
|
|
link.close();
|
|
link.terminated();
|
|
|
|
setLink(null);
|
|
cancelTasks();
|
|
|
|
_resource.disconnected();
|
|
|
|
int inProgress = 0;
|
|
do {
|
|
_shell.getBackoffAlgorithm().waitBeforeRetry();
|
|
|
|
s_logger.info("Lost connection to the server. Dealing with the remaining commands...");
|
|
|
|
inProgress = _inProgress.get();
|
|
if (inProgress > 0) {
|
|
s_logger.info("Cannot connect because we still have " + inProgress + " commands in progress.");
|
|
}
|
|
} while (inProgress > 0);
|
|
|
|
_connection.stop();
|
|
|
|
try {
|
|
_connection.cleanUp();
|
|
} catch (final IOException e) {
|
|
s_logger.warn("Fail to clean up old connection. " + e);
|
|
}
|
|
|
|
while (_connection.isStartup()) {
|
|
_shell.getBackoffAlgorithm().waitBeforeRetry();
|
|
}
|
|
|
|
_connection = new NioClient("Agent", _shell.getHost(), _shell.getPort(), _shell.getWorkers(), this);
|
|
do {
|
|
s_logger.info("Reconnecting...");
|
|
try {
|
|
_connection.start();
|
|
} catch (final NioConnectionException e) {
|
|
s_logger.warn("NIO Connection Exception " + e);
|
|
s_logger.info("Attempted to connect to the server, but received an unexpected exception, trying again...");
|
|
}
|
|
_shell.getBackoffAlgorithm().waitBeforeRetry();
|
|
} while (!_connection.isStartup());
|
|
s_logger.info("Connected to the server");
|
|
}
|
|
|
|
public void processStartupAnswer(final Answer answer, final Response response, final Link link) {
|
|
boolean cancelled = false;
|
|
synchronized (this) {
|
|
if (_startup != null) {
|
|
_startup.cancel();
|
|
_startup = null;
|
|
} else {
|
|
cancelled = true;
|
|
}
|
|
}
|
|
final StartupAnswer startup = (StartupAnswer)answer;
|
|
if (!startup.getResult()) {
|
|
s_logger.error("Not allowed to connect to the server: " + answer.getDetails());
|
|
System.exit(1);
|
|
}
|
|
if (cancelled) {
|
|
s_logger.warn("Threw away a startup answer because we're reconnecting.");
|
|
return;
|
|
}
|
|
|
|
s_logger.info("Proccess agent startup answer, agent id = " + startup.getHostId());
|
|
|
|
setId(startup.getHostId());
|
|
_pingInterval = (long)startup.getPingInterval() * 1000; // change to ms.
|
|
|
|
setLastPingResponseTime();
|
|
scheduleWatch(link, response, _pingInterval, _pingInterval);
|
|
|
|
_ugentTaskPool.setKeepAliveTime(2 * _pingInterval, TimeUnit.MILLISECONDS);
|
|
|
|
s_logger.info("Startup Response Received: agent id = " + getId());
|
|
}
|
|
|
|
protected void processRequest(final Request request, final Link link) {
|
|
boolean requestLogged = false;
|
|
Response response = null;
|
|
try {
|
|
final Command[] cmds = request.getCommands();
|
|
final Answer[] answers = new Answer[cmds.length];
|
|
|
|
for (int i = 0; i < cmds.length; i++) {
|
|
final Command cmd = cmds[i];
|
|
Answer answer;
|
|
try {
|
|
if (cmd.getContextParam("logid") != null) {
|
|
MDC.put("logcontextid", cmd.getContextParam("logid"));
|
|
}
|
|
if (s_logger.isDebugEnabled()) {
|
|
if (!requestLogged) // ensures request is logged only once per method call
|
|
{
|
|
final String requestMsg = request.toString();
|
|
if (requestMsg != null) {
|
|
s_logger.debug("Request:" + requestMsg);
|
|
}
|
|
requestLogged = true;
|
|
}
|
|
s_logger.debug("Processing command: " + cmd.toString());
|
|
}
|
|
|
|
if (cmd instanceof CronCommand) {
|
|
final CronCommand watch = (CronCommand)cmd;
|
|
scheduleWatch(link, request, (long)watch.getInterval() * 1000, watch.getInterval() * 1000);
|
|
answer = new Answer(cmd, true, null);
|
|
} else if (cmd instanceof ShutdownCommand) {
|
|
final ShutdownCommand shutdown = (ShutdownCommand)cmd;
|
|
s_logger.debug("Received shutdownCommand, due to: " + shutdown.getReason());
|
|
cancelTasks();
|
|
_reconnectAllowed = false;
|
|
answer = new Answer(cmd, true, null);
|
|
} else if (cmd instanceof ReadyCommand && ((ReadyCommand)cmd).getDetails() != null) {
|
|
s_logger.debug("Not ready to connect to mgt server: " + ((ReadyCommand)cmd).getDetails());
|
|
System.exit(1);
|
|
return;
|
|
} else if (cmd instanceof MaintainCommand) {
|
|
s_logger.debug("Received maintainCommand");
|
|
cancelTasks();
|
|
_reconnectAllowed = false;
|
|
answer = new MaintainAnswer((MaintainCommand)cmd);
|
|
} else if (cmd instanceof AgentControlCommand) {
|
|
answer = null;
|
|
synchronized (_controlListeners) {
|
|
for (final IAgentControlListener listener : _controlListeners) {
|
|
answer = listener.processControlRequest(request, (AgentControlCommand)cmd);
|
|
if (answer != null) {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (answer == null) {
|
|
s_logger.warn("No handler found to process cmd: " + cmd.toString());
|
|
answer = new AgentControlAnswer(cmd);
|
|
}
|
|
|
|
} else {
|
|
if (cmd instanceof ReadyCommand) {
|
|
processReadyCommand(cmd);
|
|
}
|
|
_inProgress.incrementAndGet();
|
|
try {
|
|
answer = _resource.executeRequest(cmd);
|
|
} finally {
|
|
_inProgress.decrementAndGet();
|
|
}
|
|
if (answer == null) {
|
|
s_logger.debug("Response: unsupported command" + cmd.toString());
|
|
answer = Answer.createUnsupportedCommandAnswer(cmd);
|
|
}
|
|
}
|
|
} catch (final Throwable th) {
|
|
s_logger.warn("Caught: ", th);
|
|
final StringWriter writer = new StringWriter();
|
|
th.printStackTrace(new PrintWriter(writer));
|
|
answer = new Answer(cmd, false, writer.toString());
|
|
}
|
|
|
|
answers[i] = answer;
|
|
if (!answer.getResult() && request.stopOnError()) {
|
|
for (i++; i < cmds.length; i++) {
|
|
answers[i] = new Answer(cmds[i], false, "Stopped by previous failure");
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
response = new Response(request, answers);
|
|
} finally {
|
|
if (s_logger.isDebugEnabled()) {
|
|
final String responseMsg = response.toString();
|
|
if (responseMsg != null) {
|
|
s_logger.debug(response.toString());
|
|
}
|
|
}
|
|
|
|
if (response != null) {
|
|
try {
|
|
link.send(response.toBytes());
|
|
} catch (final ClosedChannelException e) {
|
|
s_logger.warn("Unable to send response: " + response.toString());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
public void processResponse(final Response response, final Link link) {
|
|
final Answer answer = response.getAnswer();
|
|
if (s_logger.isDebugEnabled()) {
|
|
s_logger.debug("Received response: " + response.toString());
|
|
}
|
|
if (answer instanceof StartupAnswer) {
|
|
processStartupAnswer(answer, response, link);
|
|
} else if (answer instanceof AgentControlAnswer) {
|
|
// Notice, we are doing callback while holding a lock!
|
|
synchronized (_controlListeners) {
|
|
for (final IAgentControlListener listener : _controlListeners) {
|
|
listener.processControlResponse(response, (AgentControlAnswer)answer);
|
|
}
|
|
}
|
|
} else {
|
|
setLastPingResponseTime();
|
|
}
|
|
}
|
|
|
|
public void processReadyCommand(final Command cmd) {
|
|
|
|
final ReadyCommand ready = (ReadyCommand)cmd;
|
|
|
|
s_logger.info("Proccess agent ready command, agent id = " + ready.getHostId());
|
|
if (ready.getHostId() != null) {
|
|
setId(ready.getHostId());
|
|
}
|
|
s_logger.info("Ready command is processed: agent id = " + getId());
|
|
|
|
}
|
|
|
|
public void processOtherTask(final Task task) {
|
|
final Object obj = task.get();
|
|
if (obj instanceof Response) {
|
|
if (System.currentTimeMillis() - _lastPingResponseTime > _pingInterval * _shell.getPingRetries()) {
|
|
s_logger.error("Ping Interval has gone past " + _pingInterval * _shell.getPingRetries() + ". Won't reconnect to mgt server, as connection is still alive");
|
|
return;
|
|
}
|
|
|
|
final PingCommand ping = _resource.getCurrentStatus(getId());
|
|
final Request request = new Request(_id, -1, ping, false);
|
|
request.setSequence(getNextSequence());
|
|
if (s_logger.isDebugEnabled()) {
|
|
s_logger.debug("Sending ping: " + request.toString());
|
|
}
|
|
|
|
try {
|
|
task.getLink().send(request.toBytes());
|
|
//if i can send pingcommand out, means the link is ok
|
|
setLastPingResponseTime();
|
|
} catch (final ClosedChannelException e) {
|
|
s_logger.warn("Unable to send request: " + request.toString());
|
|
}
|
|
|
|
} else if (obj instanceof Request) {
|
|
final Request req = (Request)obj;
|
|
final Command command = req.getCommand();
|
|
if (command.getContextParam("logid") != null) {
|
|
MDC.put("logcontextid", command.getContextParam("logid"));
|
|
}
|
|
Answer answer = null;
|
|
_inProgress.incrementAndGet();
|
|
try {
|
|
answer = _resource.executeRequest(command);
|
|
} finally {
|
|
_inProgress.decrementAndGet();
|
|
}
|
|
if (answer != null) {
|
|
final Response response = new Response(req, answer);
|
|
|
|
if (s_logger.isDebugEnabled()) {
|
|
s_logger.debug("Watch Sent: " + response.toString());
|
|
}
|
|
try {
|
|
task.getLink().send(response.toBytes());
|
|
} catch (final ClosedChannelException e) {
|
|
s_logger.warn("Unable to send response: " + response.toString());
|
|
}
|
|
}
|
|
} else {
|
|
s_logger.warn("Ignoring an unknown task");
|
|
}
|
|
}
|
|
|
|
public synchronized void setLastPingResponseTime() {
|
|
_lastPingResponseTime = System.currentTimeMillis();
|
|
}
|
|
|
|
protected synchronized long getNextSequence() {
|
|
return _sequence++;
|
|
}
|
|
|
|
@Override
|
|
public void registerControlListener(final IAgentControlListener listener) {
|
|
synchronized (_controlListeners) {
|
|
_controlListeners.add(listener);
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public void unregisterControlListener(final IAgentControlListener listener) {
|
|
synchronized (_controlListeners) {
|
|
_controlListeners.remove(listener);
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public AgentControlAnswer sendRequest(final AgentControlCommand cmd, final int timeoutInMilliseconds) throws AgentControlChannelException {
|
|
final Request request = new Request(getId(), -1, new Command[] {cmd}, true, false);
|
|
request.setSequence(getNextSequence());
|
|
|
|
final AgentControlListener listener = new AgentControlListener(request);
|
|
|
|
registerControlListener(listener);
|
|
try {
|
|
postRequest(request);
|
|
synchronized (listener) {
|
|
try {
|
|
listener.wait(timeoutInMilliseconds);
|
|
} catch (final InterruptedException e) {
|
|
s_logger.warn("sendRequest is interrupted, exit waiting");
|
|
}
|
|
}
|
|
|
|
return listener.getAnswer();
|
|
} finally {
|
|
unregisterControlListener(listener);
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public void postRequest(final AgentControlCommand cmd) throws AgentControlChannelException {
|
|
final Request request = new Request(getId(), -1, new Command[] {cmd}, true, false);
|
|
request.setSequence(getNextSequence());
|
|
postRequest(request);
|
|
}
|
|
|
|
private void postRequest(final Request request) throws AgentControlChannelException {
|
|
if (_link != null) {
|
|
try {
|
|
_link.send(request.toBytes());
|
|
} catch (final ClosedChannelException e) {
|
|
s_logger.warn("Unable to post agent control reques: " + request.toString());
|
|
throw new AgentControlChannelException("Unable to post agent control request due to " + e.getMessage());
|
|
}
|
|
} else {
|
|
throw new AgentControlChannelException("Unable to post agent control request as link is not available");
|
|
}
|
|
}
|
|
|
|
public class AgentControlListener implements IAgentControlListener {
|
|
private AgentControlAnswer _answer;
|
|
private final Request _request;
|
|
|
|
public AgentControlListener(final Request request) {
|
|
_request = request;
|
|
}
|
|
|
|
public AgentControlAnswer getAnswer() {
|
|
return _answer;
|
|
}
|
|
|
|
@Override
|
|
public Answer processControlRequest(final Request request, final AgentControlCommand cmd) {
|
|
return null;
|
|
}
|
|
|
|
@Override
|
|
public void processControlResponse(final Response response, final AgentControlAnswer answer) {
|
|
if (_request.getSequence() == response.getSequence()) {
|
|
_answer = answer;
|
|
synchronized (this) {
|
|
notifyAll();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
protected class ShutdownThread extends Thread {
|
|
Agent _agent;
|
|
|
|
public ShutdownThread(final Agent agent) {
|
|
super("AgentShutdownThread");
|
|
_agent = agent;
|
|
}
|
|
|
|
@Override
|
|
public void run() {
|
|
_agent.stop(ShutdownCommand.Requested, null);
|
|
}
|
|
}
|
|
|
|
public class WatchTask extends ManagedContextTimerTask {
|
|
protected Request _request;
|
|
protected Agent _agent;
|
|
protected Link _link;
|
|
|
|
public WatchTask(final Link link, final Request request, final Agent agent) {
|
|
super();
|
|
_request = request;
|
|
_link = link;
|
|
_agent = agent;
|
|
}
|
|
|
|
@Override
|
|
protected void runInContext() {
|
|
if (s_logger.isTraceEnabled()) {
|
|
s_logger.trace("Scheduling " + (_request instanceof Response ? "Ping" : "Watch Task"));
|
|
}
|
|
try {
|
|
if (_request instanceof Response) {
|
|
_ugentTaskPool.submit(new ServerHandler(Task.Type.OTHER, _link, _request));
|
|
} else {
|
|
_link.schedule(new ServerHandler(Task.Type.OTHER, _link, _request));
|
|
}
|
|
} catch (final ClosedChannelException e) {
|
|
s_logger.warn("Unable to schedule task because channel is closed");
|
|
}
|
|
}
|
|
}
|
|
|
|
public class StartupTask extends ManagedContextTimerTask {
|
|
protected Link _link;
|
|
protected volatile boolean cancelled = false;
|
|
|
|
public StartupTask(final Link link) {
|
|
s_logger.debug("Startup task created");
|
|
_link = link;
|
|
}
|
|
|
|
@Override
|
|
public synchronized boolean cancel() {
|
|
// TimerTask.cancel may fail depends on the calling context
|
|
if (!cancelled) {
|
|
cancelled = true;
|
|
_startupWait = _startupWaitDefault;
|
|
s_logger.debug("Startup task cancelled");
|
|
return super.cancel();
|
|
}
|
|
return true;
|
|
}
|
|
|
|
@Override
|
|
protected synchronized void runInContext() {
|
|
if (!cancelled) {
|
|
if (s_logger.isInfoEnabled()) {
|
|
s_logger.info("The startup command is now cancelled");
|
|
}
|
|
cancelled = true;
|
|
_startup = null;
|
|
_startupWait = _startupWaitDefault * 2;
|
|
reconnect(_link);
|
|
}
|
|
}
|
|
}
|
|
|
|
public class AgentRequestHandler extends Task {
|
|
public AgentRequestHandler(final Task.Type type, final Link link, final Request req) {
|
|
super(type, link, req);
|
|
}
|
|
|
|
@Override
|
|
protected void doTask(final Task task) throws TaskExecutionException {
|
|
final Request req = (Request)get();
|
|
if (!(req instanceof Response)) {
|
|
processRequest(req, task.getLink());
|
|
}
|
|
}
|
|
}
|
|
|
|
public class ServerHandler extends Task {
|
|
public ServerHandler(final Task.Type type, final Link link, final byte[] data) {
|
|
super(type, link, data);
|
|
}
|
|
|
|
public ServerHandler(final Task.Type type, final Link link, final Request req) {
|
|
super(type, link, req);
|
|
}
|
|
|
|
@Override
|
|
public void doTask(final Task task) throws TaskExecutionException {
|
|
if (task.getType() == Task.Type.CONNECT) {
|
|
_shell.getBackoffAlgorithm().reset();
|
|
setLink(task.getLink());
|
|
sendStartup(task.getLink());
|
|
} else if (task.getType() == Task.Type.DATA) {
|
|
Request request;
|
|
try {
|
|
request = Request.parse(task.getData());
|
|
if (request instanceof Response) {
|
|
//It's for pinganswer etc, should be processed immediately.
|
|
processResponse((Response)request, task.getLink());
|
|
} else {
|
|
//put the requests from mgt server into another thread pool, as the request may take a longer time to finish. Don't block the NIO main thread pool
|
|
//processRequest(request, task.getLink());
|
|
_executor.submit(new AgentRequestHandler(getType(), getLink(), request));
|
|
}
|
|
} catch (final ClassNotFoundException e) {
|
|
s_logger.error("Unable to find this request ");
|
|
} catch (final Exception e) {
|
|
s_logger.error("Error parsing task", e);
|
|
}
|
|
} else if (task.getType() == Task.Type.DISCONNECT) {
|
|
reconnect(task.getLink());
|
|
return;
|
|
} else if (task.getType() == Task.Type.OTHER) {
|
|
processOtherTask(task);
|
|
}
|
|
}
|
|
}
|
|
}
|