Merge pull request #805 from ekholabs/improvement/callable_CLOUDSTACK-8822

CLOUDSTACK-8822 - Replacing Runnable by CallableThat's the first part of the refactor, which will touch the ManagedContextRunnable and all its subclasses. However, in order to reduce impact, the first part comprises the Agent and Nio related classes (NioConnection, NioServer and NioClient).

* All the sub-classes were also updated according to the changes in the super-classes
* Improved exception handling
* There were also code formatting changes

Changes were structural and the NioTest covered them without need to modify the unit test.

This PR is quite extensive. Please, wait for the Test Report in order to proceed with further review.

ping @remibergsma @miguelaferreira @bhaisaab @karuturi @wido @DaanHoogland @borisroman @K0zka

Cheers,
Wilder

* pr/805:
  CLOUDSTACK-8822 - Replacing Runnable by Callable in the Taks and NioConnection classes

Signed-off-by: wilderrodrigues <wrodrigues@schubergphilis.com>
This commit is contained in:
wilderrodrigues 2015-09-11 14:52:35 +02:00
commit 68bf049106
11 changed files with 587 additions and 433 deletions

View File

@ -35,9 +35,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.naming.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.cloudstack.managed.context.ManagedContextTimerTask;
import org.apache.log4j.Logger;
import com.cloud.agent.api.AgentControlAnswer;
import com.cloud.agent.api.AgentControlCommand;
@ -59,6 +58,8 @@ import com.cloud.utils.PropertiesUtil;
import com.cloud.utils.backoff.BackoffAlgorithm;
import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.utils.exception.NioConnectionException;
import com.cloud.utils.exception.TaskExecutionException;
import com.cloud.utils.nio.HandlerFactory;
import com.cloud.utils.nio.Link;
import com.cloud.utils.nio.NioClient;
@ -121,11 +122,11 @@ public class Agent implements HandlerFactory, IAgentControl {
long _startupWait = _startupWaitDefault;
boolean _reconnectAllowed = true;
//For time sentitive task, e.g. PingTask
private ThreadPoolExecutor _ugentTaskPool;
private final ThreadPoolExecutor _ugentTaskPool;
ExecutorService _executor;
// for simulator use only
public Agent(IAgentShell shell) {
public Agent(final IAgentShell shell) {
_shell = shell;
_link = null;
@ -134,29 +135,29 @@ public class Agent implements HandlerFactory, IAgentControl {
Runtime.getRuntime().addShutdownHook(new ShutdownThread(this));
_ugentTaskPool =
new ThreadPoolExecutor(shell.getPingRetries(), 2 * shell.getPingRetries(), 10, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(), new NamedThreadFactory(
"UgentTask"));
new ThreadPoolExecutor(shell.getPingRetries(), 2 * shell.getPingRetries(), 10, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(), new NamedThreadFactory(
"UgentTask"));
_executor =
new ThreadPoolExecutor(_shell.getWorkers(), 5 * _shell.getWorkers(), 1, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(
"agentRequest-Handler"));
new ThreadPoolExecutor(_shell.getWorkers(), 5 * _shell.getWorkers(), 1, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(
"agentRequest-Handler"));
}
public Agent(IAgentShell shell, int localAgentId, ServerResource resource) throws ConfigurationException {
public Agent(final IAgentShell shell, final int localAgentId, final ServerResource resource) throws ConfigurationException {
_shell = shell;
_resource = resource;
_link = null;
resource.setAgentControl(this);
String value = _shell.getPersistentProperty(getResourceName(), "id");
final String value = _shell.getPersistentProperty(getResourceName(), "id");
_id = value != null ? Long.parseLong(value) : null;
s_logger.info("id is " + ((_id != null) ? _id : ""));
s_logger.info("id is " + (_id != null ? _id : ""));
final Map<String, Object> params = PropertiesUtil.toMap(_shell.getProperties());
// merge with properties from command line to let resource access command line parameters
for (Map.Entry<String, Object> cmdLineProp : _shell.getCmdLineProperties().entrySet()) {
for (final Map.Entry<String, Object> cmdLineProp : _shell.getCmdLineProperties().entrySet()) {
params.put(cmdLineProp.getKey(), cmdLineProp.getValue());
}
@ -172,15 +173,15 @@ public class Agent implements HandlerFactory, IAgentControl {
Runtime.getRuntime().addShutdownHook(new ShutdownThread(this));
_ugentTaskPool =
new ThreadPoolExecutor(shell.getPingRetries(), 2 * shell.getPingRetries(), 10, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(), new NamedThreadFactory(
"UgentTask"));
new ThreadPoolExecutor(shell.getPingRetries(), 2 * shell.getPingRetries(), 10, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(), new NamedThreadFactory(
"UgentTask"));
_executor =
new ThreadPoolExecutor(_shell.getWorkers(), 5 * _shell.getWorkers(), 1, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(
"agentRequest-Handler"));
new ThreadPoolExecutor(_shell.getWorkers(), 5 * _shell.getWorkers(), 1, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(
"agentRequest-Handler"));
s_logger.info("Agent [id = " + (_id != null ? _id : "new") + " : type = " + getResourceName() + " : zone = " + _shell.getZone() + " : pod = " + _shell.getPod() +
" : workers = " + _shell.getWorkers() + " : host = " + _shell.getHost() + " : port = " + _shell.getPort());
" : workers = " + _shell.getWorkers() + " : host = " + _shell.getHost() + " : port = " + _shell.getPort());
}
public String getVersion() {
@ -188,7 +189,7 @@ public class Agent implements HandlerFactory, IAgentControl {
}
public String getResourceGuid() {
String guid = _shell.getGuid();
final String guid = _shell.getGuid();
return guid + "-" + getResourceName();
}
@ -222,11 +223,19 @@ public class Agent implements HandlerFactory, IAgentControl {
throw new CloudRuntimeException("Unable to start the resource: " + _resource.getName());
}
_connection.start();
try {
_connection.start();
} catch (final NioConnectionException e) {
throw new CloudRuntimeException("Unable to start the connection!", e);
}
while (!_connection.isStartup()) {
_shell.getBackoffAlgorithm().waitBeforeRetry();
_connection = new NioClient("Agent", _shell.getHost(), _shell.getPort(), _shell.getWorkers(), this);
_connection.start();
try {
_connection.start();
} catch (final NioConnectionException e) {
throw new CloudRuntimeException("Unable to start the connection!", e);
}
}
}
@ -236,12 +245,12 @@ public class Agent implements HandlerFactory, IAgentControl {
final ShutdownCommand cmd = new ShutdownCommand(reason, detail);
try {
if (_link != null) {
Request req = new Request((_id != null ? _id : -1), -1, cmd, false);
final Request req = new Request(_id != null ? _id : -1, -1, cmd, false);
_link.send(req.toBytes());
}
} catch (final ClosedChannelException e) {
s_logger.warn("Unable to send: " + cmd.toString());
} catch (Exception e) {
} catch (final Exception e) {
s_logger.warn("Unable to send: " + cmd.toString() + " due to exception: ", e);
}
s_logger.debug("Sending shutdown to management server");
@ -294,13 +303,13 @@ public class Agent implements HandlerFactory, IAgentControl {
_watchList.clear();
}
}
public synchronized void lockStartupTask(Link link)
public synchronized void lockStartupTask(final Link link)
{
_startup = new StartupTask(link);
_timer.schedule(_startup, _startupWait);
}
public void sendStartup(Link link) {
public void sendStartup(final Link link) {
final StartupCommand[] startup = _resource.initialize();
if (startup != null) {
final Command[] commands = new Command[startup.length];
@ -323,7 +332,7 @@ public class Agent implements HandlerFactory, IAgentControl {
}
}
protected void setupStartupCommand(StartupCommand startup) {
protected void setupStartupCommand(final StartupCommand startup) {
InetAddress addr;
try {
addr = InetAddress.getLocalHost();
@ -349,7 +358,7 @@ public class Agent implements HandlerFactory, IAgentControl {
}
@Override
public Task create(Task.Type type, Link link, byte[] data) {
public Task create(final Task.Type type, final Link link, final byte[] data) {
return new ServerHandler(type, link, data);
}
@ -391,19 +400,23 @@ public class Agent implements HandlerFactory, IAgentControl {
try {
_connection.cleanUp();
} catch (IOException e) {
} catch (final IOException e) {
s_logger.warn("Fail to clean up old connection. " + e);
}
_connection = new NioClient("Agent", _shell.getHost(), _shell.getPort(), _shell.getWorkers(), this);
do {
s_logger.info("Reconnecting...");
_connection.start();
try {
_connection.start();
} catch (final NioConnectionException e) {
throw new CloudRuntimeException("Unable to start the connection!", e);
}
_shell.getBackoffAlgorithm().waitBeforeRetry();
} while (!_connection.isStartup());
s_logger.info("Connected to the server");
}
public void processStartupAnswer(Answer answer, Response response, Link link) {
public void processStartupAnswer(final Answer answer, final Response response, final Link link) {
boolean cancelled = false;
synchronized (this) {
if (_startup != null) {
@ -450,7 +463,7 @@ public class Agent implements HandlerFactory, IAgentControl {
if (s_logger.isDebugEnabled()) {
if (!requestLogged) // ensures request is logged only once per method call
{
String requestMsg = request.toString();
final String requestMsg = request.toString();
if (requestMsg != null) {
s_logger.debug("Request:" + requestMsg);
}
@ -464,7 +477,7 @@ public class Agent implements HandlerFactory, IAgentControl {
scheduleWatch(link, request, (long)watch.getInterval() * 1000, watch.getInterval() * 1000);
answer = new Answer(cmd, true, null);
} else if (cmd instanceof ShutdownCommand) {
ShutdownCommand shutdown = (ShutdownCommand)cmd;
final ShutdownCommand shutdown = (ShutdownCommand)cmd;
s_logger.debug("Received shutdownCommand, due to: " + shutdown.getReason());
cancelTasks();
_reconnectAllowed = false;
@ -481,7 +494,7 @@ public class Agent implements HandlerFactory, IAgentControl {
} else if (cmd instanceof AgentControlCommand) {
answer = null;
synchronized (_controlListeners) {
for (IAgentControlListener listener : _controlListeners) {
for (final IAgentControlListener listener : _controlListeners) {
answer = listener.processControlRequest(request, (AgentControlCommand)cmd);
if (answer != null) {
break;
@ -527,7 +540,7 @@ public class Agent implements HandlerFactory, IAgentControl {
response = new Response(request, answers);
} finally {
if (s_logger.isDebugEnabled()) {
String responseMsg = response.toString();
final String responseMsg = response.toString();
if (responseMsg != null) {
s_logger.debug(response.toString());
}
@ -553,7 +566,7 @@ public class Agent implements HandlerFactory, IAgentControl {
} else if (answer instanceof AgentControlAnswer) {
// Notice, we are doing callback while holding a lock!
synchronized (_controlListeners) {
for (IAgentControlListener listener : _controlListeners) {
for (final IAgentControlListener listener : _controlListeners) {
listener.processControlResponse(response, (AgentControlAnswer)answer);
}
}
@ -562,7 +575,7 @@ public class Agent implements HandlerFactory, IAgentControl {
}
}
public void processReadyCommand(Command cmd) {
public void processReadyCommand(final Command cmd) {
final ReadyCommand ready = (ReadyCommand)cmd;
@ -574,10 +587,10 @@ public class Agent implements HandlerFactory, IAgentControl {
}
public void processOtherTask(Task task) {
public void processOtherTask(final Task task) {
final Object obj = task.get();
if (obj instanceof Response) {
if ((System.currentTimeMillis() - _lastPingResponseTime) > _pingInterval * _shell.getPingRetries()) {
if (System.currentTimeMillis() - _lastPingResponseTime > _pingInterval * _shell.getPingRetries()) {
s_logger.error("Ping Interval has gone past " + _pingInterval * _shell.getPingRetries() + ". Won't reconnect to mgt server, as connection is still alive");
return;
}
@ -633,25 +646,25 @@ public class Agent implements HandlerFactory, IAgentControl {
}
@Override
public void registerControlListener(IAgentControlListener listener) {
public void registerControlListener(final IAgentControlListener listener) {
synchronized (_controlListeners) {
_controlListeners.add(listener);
}
}
@Override
public void unregisterControlListener(IAgentControlListener listener) {
public void unregisterControlListener(final IAgentControlListener listener) {
synchronized (_controlListeners) {
_controlListeners.remove(listener);
}
}
@Override
public AgentControlAnswer sendRequest(AgentControlCommand cmd, int timeoutInMilliseconds) throws AgentControlChannelException {
Request request = new Request(this.getId(), -1, new Command[] {cmd}, true, false);
public AgentControlAnswer sendRequest(final AgentControlCommand cmd, final int timeoutInMilliseconds) throws AgentControlChannelException {
final Request request = new Request(getId(), -1, new Command[] {cmd}, true, false);
request.setSequence(getNextSequence());
AgentControlListener listener = new AgentControlListener(request);
final AgentControlListener listener = new AgentControlListener(request);
registerControlListener(listener);
try {
@ -659,7 +672,7 @@ public class Agent implements HandlerFactory, IAgentControl {
synchronized (listener) {
try {
listener.wait(timeoutInMilliseconds);
} catch (InterruptedException e) {
} catch (final InterruptedException e) {
s_logger.warn("sendRequest is interrupted, exit waiting");
}
}
@ -671,13 +684,13 @@ public class Agent implements HandlerFactory, IAgentControl {
}
@Override
public void postRequest(AgentControlCommand cmd) throws AgentControlChannelException {
Request request = new Request(this.getId(), -1, new Command[] {cmd}, true, false);
public void postRequest(final AgentControlCommand cmd) throws AgentControlChannelException {
final Request request = new Request(getId(), -1, new Command[] {cmd}, true, false);
request.setSequence(getNextSequence());
postRequest(request);
}
private void postRequest(Request request) throws AgentControlChannelException {
private void postRequest(final Request request) throws AgentControlChannelException {
if (_link != null) {
try {
_link.send(request.toBytes());
@ -694,7 +707,7 @@ public class Agent implements HandlerFactory, IAgentControl {
private AgentControlAnswer _answer;
private final Request _request;
public AgentControlListener(Request request) {
public AgentControlListener(final Request request) {
_request = request;
}
@ -703,12 +716,12 @@ public class Agent implements HandlerFactory, IAgentControl {
}
@Override
public Answer processControlRequest(Request request, AgentControlCommand cmd) {
public Answer processControlRequest(final Request request, final AgentControlCommand cmd) {
return null;
}
@Override
public void processControlResponse(Response response, AgentControlAnswer answer) {
public void processControlResponse(final Response response, final AgentControlAnswer answer) {
if (_request.getSequence() == response.getSequence()) {
_answer = answer;
synchronized (this) {
@ -797,13 +810,13 @@ public class Agent implements HandlerFactory, IAgentControl {
}
public class AgentRequestHandler extends Task {
public AgentRequestHandler(Task.Type type, Link link, Request req) {
public AgentRequestHandler(final Task.Type type, final Link link, final Request req) {
super(type, link, req);
}
@Override
protected void doTask(Task task) throws Exception {
Request req = (Request)this.get();
protected void doTask(final Task task) throws TaskExecutionException {
final Request req = (Request)get();
if (!(req instanceof Response)) {
processRequest(req, task.getLink());
}
@ -811,16 +824,16 @@ public class Agent implements HandlerFactory, IAgentControl {
}
public class ServerHandler extends Task {
public ServerHandler(Task.Type type, Link link, byte[] data) {
public ServerHandler(final Task.Type type, final Link link, final byte[] data) {
super(type, link, data);
}
public ServerHandler(Task.Type type, Link link, Request req) {
public ServerHandler(final Task.Type type, final Link link, final Request req) {
super(type, link, req);
}
@Override
public void doTask(final Task task) {
public void doTask(final Task task) throws TaskExecutionException {
if (task.getType() == Task.Type.CONNECT) {
_shell.getBackoffAlgorithm().reset();
setLink(task.getLink());
@ -835,7 +848,7 @@ public class Agent implements HandlerFactory, IAgentControl {
} else {
//put the requests from mgt server into another thread pool, as the request may take a longer time to finish. Don't block the NIO main thread pool
//processRequest(request, task.getLink());
_executor.execute(new AgentRequestHandler(this.getType(), this.getLink(), request));
_executor.submit(new AgentRequestHandler(getType(), getLink(), request));
}
} catch (final ClassNotFoundException e) {
s_logger.error("Unable to find this request ");

View File

@ -103,6 +103,8 @@ import com.cloud.utils.db.SearchCriteria.Op;
import com.cloud.utils.db.TransactionLegacy;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.utils.exception.HypervisorVersionChangedException;
import com.cloud.utils.exception.NioConnectionException;
import com.cloud.utils.exception.TaskExecutionException;
import com.cloud.utils.fsm.NoTransitionException;
import com.cloud.utils.fsm.StateMachine2;
import com.cloud.utils.nio.HandlerFactory;
@ -593,7 +595,11 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
startDirectlyConnectedHosts();
if (_connection != null) {
_connection.start();
try {
_connection.start();
} catch (final NioConnectionException e) {
s_logger.error("Error when connecting to the NioServer!", e);
}
}
_monitorExecutor.scheduleWithFixedDelay(new MonitorTask(), PingInterval.value(), PingInterval.value(), TimeUnit.SECONDS);
@ -827,7 +833,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
Status determinedState = investigate(attache);
// if state cannot be determined do nothing and bail out
if (determinedState == null) {
if (((System.currentTimeMillis() >> 10) - host.getLastPinged()) > AlertWait.value()) {
if ((System.currentTimeMillis() >> 10) - host.getLastPinged() > AlertWait.value()) {
s_logger.warn("Agent " + hostId + " state cannot be determined for more than " + AlertWait + "(" + AlertWait.value() + ") seconds, will go to Alert state");
determinedState = Status.Alert;
} else {
@ -840,7 +846,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
s_logger.info("The agent " + hostId + " state determined is " + determinedState);
if (determinedState == Status.Down) {
String message = "Host is down: " + host.getId() + "-" + host.getName() + ". Starting HA on the VMs";
final String message = "Host is down: " + host.getId() + "-" + host.getName() + ". Starting HA on the VMs";
s_logger.error(message);
if (host.getType() != Host.Type.SecondaryStorage && host.getType() != Host.Type.ConsoleProxy) {
_alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_HOST, host.getDataCenterId(), host.getPodId(), "Host down, " + host.getId(), message);
@ -1299,7 +1305,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
}
@Override
protected void doTask(final Task task) throws Exception {
protected void doTask(final Task task) throws TaskExecutionException {
final TransactionLegacy txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB);
try {
final Type type = task.getType();
@ -1315,6 +1321,10 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
} catch (final UnsupportedVersionException e) {
s_logger.warn(e.getMessage());
// upgradeAgent(task.getLink(), data, e.getReason());
} catch (final ClassNotFoundException e) {
final String message = String.format("Exception occured when executing taks! Error '%s'", e.getMessage());
s_logger.error(message);
throw new TaskExecutionException(message, e);
}
} else if (type == Task.Type.CONNECT) {
} else if (type == Task.Type.DISCONNECT) {

View File

@ -66,4 +66,6 @@ public interface SerialVersionUID {
public static final long UnableDeleteHostException = Base | 0x29;
public static final long AffinityConflictException = Base | 0x2a;
public static final long JobCancellationException = Base | 0x2b;
public static final long NioConnectionException = Base | 0x2c;
public static final long TaskExecutionException = Base | 0x2d;
}

View File

@ -0,0 +1,48 @@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
package com.cloud.utils.exception;
import com.cloud.utils.SerialVersionUID;
/**
* Used by the NioConnection class to wrap-up its exceptions.
*/
public class NioConnectionException extends Exception {
private static final long serialVersionUID = SerialVersionUID.NioConnectionException;
protected int csErrorCode;
public NioConnectionException(final String msg, final Throwable cause) {
super(msg, cause);
setCSErrorCode(CSExceptionErrorCode.getCSErrCode(this.getClass().getName()));
}
public NioConnectionException(final String msg) {
super(msg);
}
public void setCSErrorCode(final int cserrcode) {
csErrorCode = cserrcode;
}
public int getCSErrorCode() {
return csErrorCode;
}
}

View File

@ -0,0 +1,48 @@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
package com.cloud.utils.exception;
import com.cloud.utils.SerialVersionUID;
/**
* Used by the Task class to wrap-up its exceptions.
*/
public class TaskExecutionException extends Exception {
private static final long serialVersionUID = SerialVersionUID.NioConnectionException;
protected int csErrorCode;
public TaskExecutionException(final String msg, final Throwable cause) {
super(msg, cause);
setCSErrorCode(CSExceptionErrorCode.getCSErrCode(this.getClass().getName()));
}
public TaskExecutionException(final String msg) {
super(msg);
}
public void setCSErrorCode(final int cserrcode) {
csErrorCode = cserrcode;
}
public int getCSErrorCode() {
return csErrorCode;
}
}

View File

@ -29,9 +29,8 @@ import java.security.GeneralSecurityException;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.apache.log4j.Logger;
import org.apache.cloudstack.utils.security.SSLUtils;
import org.apache.log4j.Logger;
public class NioClient extends NioConnection {
private static final Logger s_logger = Logger.getLogger(NioClient.class);
@ -40,12 +39,12 @@ public class NioClient extends NioConnection {
protected String _bindAddress;
protected SocketChannel _clientConnection;
public NioClient(String name, String host, int port, int workers, HandlerFactory factory) {
public NioClient(final String name, final String host, final int port, final int workers, final HandlerFactory factory) {
super(name, port, workers, factory);
_host = host;
}
public void setBindAddress(String ipAddress) {
public void setBindAddress(final String ipAddress) {
_bindAddress = ipAddress;
}
@ -62,18 +61,18 @@ public class NioClient extends NioConnection {
if (_bindAddress != null) {
s_logger.info("Binding outbound interface at " + _bindAddress);
InetSocketAddress bindAddr = new InetSocketAddress(_bindAddress, 0);
final InetSocketAddress bindAddr = new InetSocketAddress(_bindAddress, 0);
_clientConnection.socket().bind(bindAddr);
}
InetSocketAddress peerAddr = new InetSocketAddress(_host, _port);
final InetSocketAddress peerAddr = new InetSocketAddress(_host, _port);
_clientConnection.connect(peerAddr);
SSLEngine sslEngine = null;
// Begin SSL handshake in BLOCKING mode
_clientConnection.configureBlocking(true);
SSLContext sslContext = Link.initSSLContext(true);
final SSLContext sslContext = Link.initSSLContext(true);
sslEngine = sslContext.createSSLEngine(_host, _port);
sslEngine.setUseClientMode(true);
sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols()));
@ -83,32 +82,31 @@ public class NioClient extends NioConnection {
s_logger.info("Connected to " + _host + ":" + _port);
_clientConnection.configureBlocking(false);
Link link = new Link(peerAddr, this);
final Link link = new Link(peerAddr, this);
link.setSSLEngine(sslEngine);
SelectionKey key = _clientConnection.register(_selector, SelectionKey.OP_READ);
final SelectionKey key = _clientConnection.register(_selector, SelectionKey.OP_READ);
link.setKey(key);
key.attach(link);
// Notice we've already connected due to the handshake, so let's get the
// remaining task done
task = _factory.create(Task.Type.CONNECT, link, null);
} catch (GeneralSecurityException e) {
} catch (final GeneralSecurityException e) {
_selector.close();
throw new IOException("Failed to initialise security", e);
} catch (IOException e) {
} catch (final IOException e) {
_selector.close();
throw e;
}
_executor.execute(task);
_executor.submit(task);
}
@Override
protected void registerLink(InetSocketAddress saddr, Link link) {
protected void registerLink(final InetSocketAddress saddr, final Link link) {
// don't do anything.
}
@Override
protected void unregisterLink(InetSocketAddress saddr) {
protected void unregisterLink(final InetSocketAddress saddr) {
// don't do anything.
}
@ -119,7 +117,5 @@ public class NioClient extends NioConnection {
_clientConnection.close();
}
s_logger.info("NioClient connection closed");
}
}
}

View File

@ -27,6 +27,7 @@ import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
@ -35,7 +36,10 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -43,21 +47,23 @@ import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.apache.cloudstack.utils.security.SSLUtils;
import org.apache.log4j.Logger;
import org.apache.cloudstack.utils.security.SSLUtils;
import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.exception.NioConnectionException;
/**
* NioConnection abstracts the NIO socket operations. The Java implementation
* provides that.
*/
public abstract class NioConnection implements Runnable {
public abstract class NioConnection implements Callable<Boolean> {
private static final Logger s_logger = Logger.getLogger(NioConnection.class);;
protected Selector _selector;
protected Thread _thread;
protected ExecutorService _threadExecutor;
protected Future<Boolean> _futureTask;
protected boolean _isRunning;
protected boolean _isStartup;
protected int _port;
@ -66,42 +72,48 @@ public abstract class NioConnection implements Runnable {
protected String _name;
protected ExecutorService _executor;
public NioConnection(String name, int port, int workers, HandlerFactory factory) {
public NioConnection(final String name, final int port, final int workers, final HandlerFactory factory) {
_name = name;
_isRunning = false;
_thread = null;
_selector = null;
_port = port;
_factory = factory;
_executor = new ThreadPoolExecutor(workers, 5 * workers, 1, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(name + "-Handler"));
}
public void start() {
public void start() throws NioConnectionException {
_todos = new ArrayList<ChangeRequest>();
_thread = new Thread(this, _name + "-Selector");
_isRunning = true;
_thread.start();
// Wait until we got init() done
synchronized (_thread) {
try {
_thread.wait();
} catch (InterruptedException e) {
s_logger.warn("Interrupted start thread ", e);
}
try {
init();
} catch (final ConnectException e) {
s_logger.warn("Unable to connect to remote: is there a server running on port " + _port);
} catch (final IOException e) {
s_logger.error("Unable to initialize the threads.", e);
throw new NioConnectionException(e.getMessage(), e);
} catch (final Exception e) {
s_logger.error("Unable to initialize the threads due to unknown exception.", e);
throw new NioConnectionException(e.getMessage(), e);
}
_isStartup = true;
_threadExecutor = Executors.newSingleThreadExecutor();
_futureTask = _threadExecutor.submit(this);
_isRunning = true;
}
public void stop() {
_executor.shutdown();
_isRunning = false;
if (_thread != null) {
_thread.interrupt();
if (_threadExecutor != null) {
_futureTask.cancel(false);
_threadExecutor.shutdown();
}
}
public boolean isRunning() {
return _thread.isAlive();
return !_futureTask.isDone();
}
public boolean isStartup() {
@ -109,45 +121,28 @@ public abstract class NioConnection implements Runnable {
}
@Override
public void run() {
synchronized (_thread) {
try {
init();
} catch (ConnectException e) {
s_logger.warn("Unable to connect to remote: is there a server running on port " + _port);
return;
} catch (IOException e) {
s_logger.error("Unable to initialize the threads.", e);
return;
} catch (Exception e) {
s_logger.error("Unable to initialize the threads due to unknown exception.", e);
return;
}
_isStartup = true;
_thread.notifyAll();
}
public Boolean call() throws NioConnectionException {
while (_isRunning) {
try {
_selector.select();
// Someone is ready for I/O, get the ready keys
Set<SelectionKey> readyKeys = _selector.selectedKeys();
Iterator<SelectionKey> i = readyKeys.iterator();
final Set<SelectionKey> readyKeys = _selector.selectedKeys();
final Iterator<SelectionKey> i = readyKeys.iterator();
if (s_logger.isTraceEnabled()) {
s_logger.trace("Keys Processing: " + readyKeys.size());
}
// Walk through the ready keys collection.
while (i.hasNext()) {
SelectionKey sk = i.next();
final SelectionKey sk = i.next();
i.remove();
if (!sk.isValid()) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Selection Key is invalid: " + sk.toString());
}
Link link = (Link)sk.attachment();
final Link link = (Link)sk.attachment();
if (link != null) {
link.terminated();
} else {
@ -167,13 +162,18 @@ public abstract class NioConnection implements Runnable {
s_logger.trace("Keys Done Processing.");
processTodos();
} catch (Throwable e) {
s_logger.warn("Caught an exception but continuing on.", e);
} catch (final ClosedSelectorException e) {
/*
* Exception occurred when calling java.nio.channels.Selector.selectedKeys() method. It means the connection has not yet been established. Let's continue trying
* We do not log it here otherwise we will fill the disk with messages.
*/
} catch (final IOException e) {
s_logger.error("Agent will die due to this IOException!", e);
throw new NioConnectionException(e.getMessage(), e);
}
}
synchronized (_thread) {
_isStartup = false;
}
_isStartup = false;
return true;
}
abstract void init() throws IOException;
@ -182,11 +182,11 @@ public abstract class NioConnection implements Runnable {
abstract void unregisterLink(InetSocketAddress saddr);
protected void accept(SelectionKey key) throws IOException {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
protected void accept(final SelectionKey key) throws IOException {
final ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
Socket socket = socketChannel.socket();
final SocketChannel socketChannel = serverSocketChannel.accept();
final Socket socket = socketChannel.socket();
socket.setKeepAlive(true);
if (s_logger.isTraceEnabled()) {
@ -198,7 +198,7 @@ public abstract class NioConnection implements Runnable {
SSLEngine sslEngine = null;
try {
SSLContext sslContext = Link.initSSLContext(false);
final SSLContext sslContext = Link.initSSLContext(false);
sslEngine = sslContext.createSSLEngine();
sslEngine.setUseClientMode(false);
sslEngine.setNeedClientAuth(false);
@ -206,7 +206,7 @@ public abstract class NioConnection implements Runnable {
Link.doHandshake(socketChannel, sslEngine, false);
} catch (Exception e) {
} catch (final Exception e) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Socket " + socket + " closed on read. Probably -1 returned: " + e.getMessage());
}
@ -219,53 +219,68 @@ public abstract class NioConnection implements Runnable {
s_logger.trace("SSL: Handshake done");
}
socketChannel.configureBlocking(false);
InetSocketAddress saddr = (InetSocketAddress)socket.getRemoteSocketAddress();
Link link = new Link(saddr, this);
final InetSocketAddress saddr = (InetSocketAddress)socket.getRemoteSocketAddress();
final Link link = new Link(saddr, this);
link.setSSLEngine(sslEngine);
link.setKey(socketChannel.register(key.selector(), SelectionKey.OP_READ, link));
Task task = _factory.create(Task.Type.CONNECT, link, null);
final Task task = _factory.create(Task.Type.CONNECT, link, null);
registerLink(saddr, link);
_executor.execute(task);
}
protected void terminate(SelectionKey key) {
Link link = (Link)key.attachment();
closeConnection(key);
if (link != null) {
link.terminated();
Task task = _factory.create(Task.Type.DISCONNECT, link, null);
unregisterLink(link.getSocketAddress());
_executor.execute(task);
try {
_executor.submit(task);
} catch (final Exception e) {
s_logger.warn("Exception occurred when submitting the task", e);
}
}
protected void read(SelectionKey key) throws IOException {
Link link = (Link)key.attachment();
protected void terminate(final SelectionKey key) {
final Link link = (Link)key.attachment();
closeConnection(key);
if (link != null) {
link.terminated();
final Task task = _factory.create(Task.Type.DISCONNECT, link, null);
unregisterLink(link.getSocketAddress());
try {
_executor.submit(task);
} catch (final Exception e) {
s_logger.warn("Exception occurred when submitting the task", e);
}
}
}
protected void read(final SelectionKey key) throws IOException {
final Link link = (Link)key.attachment();
try {
SocketChannel socketChannel = (SocketChannel)key.channel();
final SocketChannel socketChannel = (SocketChannel)key.channel();
if (s_logger.isTraceEnabled()) {
s_logger.trace("Reading from: " + socketChannel.socket().toString());
}
byte[] data = link.read(socketChannel);
final byte[] data = link.read(socketChannel);
if (data == null) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Packet is incomplete. Waiting for more.");
}
return;
}
Task task = _factory.create(Task.Type.DATA, link, data);
_executor.execute(task);
} catch (Exception e) {
final Task task = _factory.create(Task.Type.DATA, link, data);
try {
_executor.submit(task);
} catch (final Exception e) {
s_logger.warn("Exception occurred when submitting the task", e);
}
} catch (final Exception e) {
logDebug(e, key, 1);
terminate(key);
}
}
protected void logTrace(Exception e, SelectionKey key, int loc) {
protected void logTrace(final Exception e, final SelectionKey key, final int loc) {
if (s_logger.isTraceEnabled()) {
Socket socket = null;
if (key != null) {
SocketChannel ch = (SocketChannel)key.channel();
final SocketChannel ch = (SocketChannel)key.channel();
if (ch != null) {
socket = ch.socket();
}
@ -275,11 +290,11 @@ public abstract class NioConnection implements Runnable {
}
}
protected void logDebug(Exception e, SelectionKey key, int loc) {
protected void logDebug(final Exception e, final SelectionKey key, final int loc) {
if (s_logger.isDebugEnabled()) {
Socket socket = null;
if (key != null) {
SocketChannel ch = (SocketChannel)key.channel();
final SocketChannel ch = (SocketChannel)key.channel();
if (ch != null) {
socket = ch.socket();
}
@ -304,113 +319,122 @@ public abstract class NioConnection implements Runnable {
s_logger.trace("Todos Processing: " + todos.size());
}
SelectionKey key;
for (ChangeRequest todo : todos) {
for (final ChangeRequest todo : todos) {
switch (todo.type) {
case ChangeRequest.CHANGEOPS:
try {
key = (SelectionKey)todo.key;
if (key != null && key.isValid()) {
if (todo.att != null) {
key.attach(todo.att);
Link link = (Link)todo.att;
link.setKey(key);
}
key.interestOps(todo.ops);
}
} catch (CancelledKeyException e) {
s_logger.debug("key has been cancelled");
}
break;
case ChangeRequest.REGISTER:
try {
key = ((SocketChannel)(todo.key)).register(_selector, todo.ops, todo.att);
case ChangeRequest.CHANGEOPS:
try {
key = (SelectionKey)todo.key;
if (key != null && key.isValid()) {
if (todo.att != null) {
Link link = (Link)todo.att;
key.attach(todo.att);
final Link link = (Link)todo.att;
link.setKey(key);
}
} catch (ClosedChannelException e) {
s_logger.warn("Couldn't register socket: " + todo.key);
try {
((SocketChannel)(todo.key)).close();
} catch (IOException ignore) {
s_logger.info("[ignored] socket channel");
} finally {
Link link = (Link)todo.att;
link.terminated();
}
key.interestOps(todo.ops);
}
break;
case ChangeRequest.CLOSE:
if (s_logger.isTraceEnabled()) {
s_logger.trace("Trying to close " + todo.key);
} catch (final CancelledKeyException e) {
s_logger.debug("key has been cancelled");
}
break;
case ChangeRequest.REGISTER:
try {
key = ((SocketChannel)todo.key).register(_selector, todo.ops, todo.att);
if (todo.att != null) {
final Link link = (Link)todo.att;
link.setKey(key);
}
key = (SelectionKey)todo.key;
closeConnection(key);
if (key != null) {
Link link = (Link)key.attachment();
if (link != null) {
link.terminated();
}
} catch (final ClosedChannelException e) {
s_logger.warn("Couldn't register socket: " + todo.key);
try {
((SocketChannel)todo.key).close();
} catch (final IOException ignore) {
s_logger.info("[ignored] socket channel");
} finally {
final Link link = (Link)todo.att;
link.terminated();
}
break;
default:
s_logger.warn("Shouldn't be here");
throw new RuntimeException("Shouldn't be here");
}
break;
case ChangeRequest.CLOSE:
if (s_logger.isTraceEnabled()) {
s_logger.trace("Trying to close " + todo.key);
}
key = (SelectionKey)todo.key;
closeConnection(key);
if (key != null) {
final Link link = (Link)key.attachment();
if (link != null) {
link.terminated();
}
}
break;
default:
s_logger.warn("Shouldn't be here");
throw new RuntimeException("Shouldn't be here");
}
}
s_logger.trace("Todos Done processing");
}
protected void connect(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel)key.channel();
protected void connect(final SelectionKey key) throws IOException {
final SocketChannel socketChannel = (SocketChannel)key.channel();
try {
socketChannel.finishConnect();
key.interestOps(SelectionKey.OP_READ);
Socket socket = socketChannel.socket();
final Socket socket = socketChannel.socket();
if (!socket.getKeepAlive()) {
socket.setKeepAlive(true);
}
if (s_logger.isDebugEnabled()) {
s_logger.debug("Connected to " + socket);
}
Link link = new Link((InetSocketAddress)socket.getRemoteSocketAddress(), this);
final Link link = new Link((InetSocketAddress)socket.getRemoteSocketAddress(), this);
link.setKey(key);
key.attach(link);
Task task = _factory.create(Task.Type.CONNECT, link, null);
_executor.execute(task);
} catch (IOException e) {
final Task task = _factory.create(Task.Type.CONNECT, link, null);
try {
_executor.submit(task);
} catch (final Exception e) {
s_logger.warn("Exception occurred when submitting the task", e);
}
} catch (final IOException e) {
logTrace(e, key, 2);
terminate(key);
}
}
protected void scheduleTask(Task task) {
_executor.execute(task);
protected void scheduleTask(final Task task) {
try {
_executor.submit(task);
} catch (final Exception e) {
s_logger.warn("Exception occurred when submitting the task", e);
}
}
protected void write(SelectionKey key) throws IOException {
Link link = (Link)key.attachment();
protected void write(final SelectionKey key) throws IOException {
final Link link = (Link)key.attachment();
try {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Writing to " + link.getSocketAddress().toString());
}
boolean close = link.write((SocketChannel)key.channel());
final boolean close = link.write((SocketChannel)key.channel());
if (close) {
closeConnection(key);
link.terminated();
} else {
key.interestOps(SelectionKey.OP_READ);
}
} catch (Exception e) {
} catch (final Exception e) {
logDebug(e, key, 3);
terminate(key);
}
}
protected void closeConnection(SelectionKey key) {
protected void closeConnection(final SelectionKey key) {
if (key != null) {
SocketChannel channel = (SocketChannel)key.channel();
final SocketChannel channel = (SocketChannel)key.channel();
key.cancel();
try {
if (channel != null) {
@ -419,30 +443,30 @@ public abstract class NioConnection implements Runnable {
}
channel.close();
}
} catch (IOException ignore) {
} catch (final IOException ignore) {
s_logger.info("[ignored] channel");
}
}
}
public void register(int ops, SocketChannel key, Object att) {
ChangeRequest todo = new ChangeRequest(key, ChangeRequest.REGISTER, ops, att);
public void register(final int ops, final SocketChannel key, final Object att) {
final ChangeRequest todo = new ChangeRequest(key, ChangeRequest.REGISTER, ops, att);
synchronized (this) {
_todos.add(todo);
}
_selector.wakeup();
}
public void change(int ops, SelectionKey key, Object att) {
ChangeRequest todo = new ChangeRequest(key, ChangeRequest.CHANGEOPS, ops, att);
public void change(final int ops, final SelectionKey key, final Object att) {
final ChangeRequest todo = new ChangeRequest(key, ChangeRequest.CHANGEOPS, ops, att);
synchronized (this) {
_todos.add(todo);
}
_selector.wakeup();
}
public void close(SelectionKey key) {
ChangeRequest todo = new ChangeRequest(key, ChangeRequest.CLOSE, 0, null);
public void close(final SelectionKey key) {
final ChangeRequest todo = new ChangeRequest(key, ChangeRequest.CLOSE, 0, null);
synchronized (this) {
_todos.add(todo);
}
@ -466,7 +490,7 @@ public abstract class NioConnection implements Runnable {
public int ops;
public Object att;
public ChangeRequest(Object key, int type, int ops, Object att) {
public ChangeRequest(final Object key, final int type, final int ops, final Object att) {
this.key = key;
this.type = type;
this.ops = ops;

View File

@ -37,7 +37,7 @@ public class NioServer extends NioConnection {
protected WeakHashMap<InetSocketAddress, Link> _links;
public NioServer(String name, int port, int workers, HandlerFactory factory) {
public NioServer(final String name, final int port, final int workers, final HandlerFactory factory) {
super(name, port, workers, factory);
_localAddr = null;
_links = new WeakHashMap<InetSocketAddress, Link>(1024);
@ -68,12 +68,12 @@ public class NioServer extends NioConnection {
}
@Override
protected void registerLink(InetSocketAddress addr, Link link) {
protected void registerLink(final InetSocketAddress addr, final Link link) {
_links.put(addr, link);
}
@Override
protected void unregisterLink(InetSocketAddress saddr) {
protected void unregisterLink(final InetSocketAddress saddr) {
_links.remove(saddr);
}
@ -86,8 +86,8 @@ public class NioServer extends NioConnection {
* @param data
* @return null if not sent. attach object in link if sent.
*/
public Object send(InetSocketAddress saddr, byte[] data) throws ClosedChannelException {
Link link = _links.get(saddr);
public Object send(final InetSocketAddress saddr, final byte[] data) throws ClosedChannelException {
final Link link = _links.get(saddr);
if (link == null) {
return null;
}

View File

@ -19,14 +19,14 @@
package com.cloud.utils.nio;
import org.apache.log4j.Logger;
import java.util.concurrent.Callable;
import com.cloud.utils.exception.TaskExecutionException;
/**
* Task represents one todo item for the AgentManager or the AgentManager
*
*/
public abstract class Task implements Runnable {
private static final Logger s_logger = Logger.getLogger(Task.class);
public abstract class Task implements Callable<Boolean> {
public enum Type {
CONNECT, // Process a new connection.
@ -40,13 +40,13 @@ public abstract class Task implements Runnable {
Type _type;
Link _link;
public Task(Type type, Link link, byte[] data) {
public Task(final Type type, final Link link, final byte[] data) {
_data = data;
_type = type;
_link = link;
}
public Task(Type type, Link link, Object data) {
public Task(final Type type, final Link link, final Object data) {
_data = data;
_type = type;
_link = link;
@ -76,14 +76,11 @@ public abstract class Task implements Runnable {
return _type.toString();
}
abstract protected void doTask(Task task) throws Exception;
abstract protected void doTask(Task task) throws TaskExecutionException;
@Override
public final void run() {
try {
doTask(this);
} catch (Throwable e) {
s_logger.warn("Caught the following exception but pushing on", e);
}
public Boolean call() throws TaskExecutionException {
doTask(this);
return true;
}
}
}

View File

@ -27,6 +27,7 @@ import junit.framework.TestCase;
import org.apache.log4j.Logger;
import org.junit.Assert;
import com.cloud.utils.exception.NioConnectionException;
import com.cloud.utils.nio.HandlerFactory;
import com.cloud.utils.nio.Link;
import com.cloud.utils.nio.NioClient;
@ -56,7 +57,7 @@ public class NioTest extends TestCase {
private boolean isTestsDone() {
boolean result;
synchronized (this) {
result = (_testCount == _completedCount);
result = _testCount == _completedCount;
}
return result;
}
@ -81,16 +82,24 @@ public class NioTest extends TestCase {
_completedCount = 0;
_server = new NioServer("NioTestServer", 7777, 5, new NioTestServer());
_server.start();
try {
_server.start();
} catch (final NioConnectionException e) {
fail(e.getMessage());
}
_client = new NioClient("NioTestServer", "127.0.0.1", 7777, 5, new NioTestClient());
_client.start();
try {
_client.start();
} catch (final NioConnectionException e) {
fail(e.getMessage());
}
while (_clientLink == null) {
try {
s_logger.debug("Link is not up! Waiting ...");
Thread.sleep(1000);
} catch (InterruptedException e) {
} catch (final InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
@ -101,9 +110,9 @@ public class NioTest extends TestCase {
public void tearDown() {
while (!isTestsDone()) {
try {
s_logger.debug(this._completedCount + "/" + this._testCount + " tests done. Waiting for completion");
s_logger.debug(_completedCount + "/" + _testCount + " tests done. Waiting for completion");
Thread.sleep(1000);
} catch (InterruptedException e) {
} catch (final InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
@ -122,7 +131,7 @@ public class NioTest extends TestCase {
s_logger.info("Server stopped.");
}
protected void setClientLink(Link link) {
protected void setClientLink(final Link link) {
_clientLink = link;
}
@ -140,13 +149,13 @@ public class NioTest extends TestCase {
getOneMoreTest();
_clientLink.send(_testBytes);
s_logger.info("Client: Data sent");
} catch (ClosedChannelException e) {
} catch (final ClosedChannelException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
protected void doServerProcess(byte[] data) {
protected void doServerProcess(final byte[] data) {
oneMoreTestDone();
Assert.assertArrayEquals(_testBytes, data);
s_logger.info("Verify done.");
@ -155,13 +164,13 @@ public class NioTest extends TestCase {
public class NioTestClient implements HandlerFactory {
@Override
public Task create(Type type, Link link, byte[] data) {
public Task create(final Type type, final Link link, final byte[] data) {
return new NioTestClientHandler(type, link, data);
}
public class NioTestClientHandler extends Task {
public NioTestClientHandler(Type type, Link link, byte[] data) {
public NioTestClientHandler(final Type type, final Link link, final byte[] data) {
super(type, link, data);
}
@ -186,13 +195,13 @@ public class NioTest extends TestCase {
public class NioTestServer implements HandlerFactory {
@Override
public Task create(Type type, Link link, byte[] data) {
public Task create(final Type type, final Link link, final byte[] data) {
return new NioTestServerHandler(type, link, data);
}
public class NioTestServerHandler extends Task {
public NioTestServerHandler(Type type, Link link, byte[] data) {
public NioTestServerHandler(final Type type, final Link link, final byte[] data) {
super(type, link, data);
}