diff --git a/agent/src/main/java/com/cloud/agent/Agent.java b/agent/src/main/java/com/cloud/agent/Agent.java index f7268ee7bee..c0920860bf1 100644 --- a/agent/src/main/java/com/cloud/agent/Agent.java +++ b/agent/src/main/java/com/cloud/agent/Agent.java @@ -40,10 +40,6 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.naming.ConfigurationException; -import com.cloud.resource.AgentStatusUpdater; -import com.cloud.resource.ResourceStatusUpdater; -import com.cloud.agent.api.PingAnswer; -import com.cloud.utils.NumbersUtil; import org.apache.cloudstack.agent.lb.SetupMSListAnswer; import org.apache.cloudstack.agent.lb.SetupMSListCommand; import org.apache.cloudstack.ca.PostCertificateRenewalCommand; @@ -66,6 +62,7 @@ import com.cloud.agent.api.Command; import com.cloud.agent.api.CronCommand; import com.cloud.agent.api.MaintainAnswer; import com.cloud.agent.api.MaintainCommand; +import com.cloud.agent.api.PingAnswer; import com.cloud.agent.api.PingCommand; import com.cloud.agent.api.ReadyCommand; import com.cloud.agent.api.ShutdownCommand; @@ -75,7 +72,10 @@ import com.cloud.agent.transport.Request; import com.cloud.agent.transport.Response; import com.cloud.exception.AgentControlChannelException; import com.cloud.host.Host; +import com.cloud.resource.AgentStatusUpdater; +import com.cloud.resource.ResourceStatusUpdater; import com.cloud.resource.ServerResource; +import com.cloud.utils.NumbersUtil; import com.cloud.utils.PropertiesUtil; import com.cloud.utils.backoff.BackoffAlgorithm; import com.cloud.utils.concurrency.NamedThreadFactory; @@ -553,14 +553,14 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater s_logger.warn("Fail to clean up old connection. " + e); } - while (_connection.isStartup()) { + do { _shell.getBackoffAlgorithm().waitBeforeRetry(); - } + } while (_connection.isStartup()); do { final String host = _shell.getNextHost(); _connection = new NioClient("Agent", host, _shell.getPort(), _shell.getWorkers(), _shell.getSslHandshakeTimeout(), this); - s_logger.info("Reconnecting to host:" + host); + s_logger.info(String.format("Reconnecting to host: %s", host)); try { _connection.start(); } catch (final NioConnectionException e) { diff --git a/utils/src/main/java/com/cloud/utils/nio/NioClient.java b/utils/src/main/java/com/cloud/utils/nio/NioClient.java index e5ce6836830..44ad4780ec3 100644 --- a/utils/src/main/java/com/cloud/utils/nio/NioClient.java +++ b/utils/src/main/java/com/cloud/utils/nio/NioClient.java @@ -35,74 +35,94 @@ import org.apache.log4j.Logger; public class NioClient extends NioConnection { private static final Logger s_logger = Logger.getLogger(NioClient.class); - protected String _host; - protected SocketChannel _clientConnection; + protected String host; + protected SocketChannel clientConnection; public NioClient(final String name, final String host, final int port, final int workers, final Integer sslHandshakeTimeout, final HandlerFactory factory) { super(name, port, workers, 1, 2, factory); - _host = host; + setSslHandshakeTimeout(sslHandshakeTimeout); + this.host = host; + } + + protected void closeChannelAndSelector() { + try { + if (clientConnection != null && clientConnection.isOpen()) { + clientConnection.close(); + clientConnection = null; + } + } catch (IOException e) { + s_logger.error("Failed to close SocketChannel", e); + } + try { + if (_selector != null && _selector.isOpen()) { + _selector.close(); + _selector = null; + } + } catch (IOException e) { + s_logger.error("Failed to close Selector", e); + } } @Override protected void init() throws IOException { - _selector = Selector.open(); - Task task = null; - + Task task; + String hostLog = host + ":" + _port; try { - _clientConnection = SocketChannel.open(); - - s_logger.info("Connecting to " + _host + ":" + _port); - final InetSocketAddress peerAddr = new InetSocketAddress(_host, _port); - _clientConnection.connect(peerAddr); - _clientConnection.configureBlocking(false); + s_logger.info(String.format("Connecting to %s", hostLog)); + _selector = Selector.open(); + clientConnection = SocketChannel.open(); + final InetSocketAddress serverAddress = new InetSocketAddress(host, _port); + clientConnection.connect(serverAddress); + s_logger.info(String.format("Connected to %s", hostLog)); + clientConnection.configureBlocking(false); final SSLContext sslContext = Link.initClientSSLContext(); - SSLEngine sslEngine = sslContext.createSSLEngine(_host, _port); + SSLEngine sslEngine = sslContext.createSSLEngine(host, _port); sslEngine.setUseClientMode(true); sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols())); sslEngine.beginHandshake(); - if (!Link.doHandshake(_clientConnection, sslEngine, getSslHandshakeTimeout())) { - s_logger.error("SSL Handshake failed while connecting to host: " + _host + " port: " + _port); - _selector.close(); - throw new IOException("SSL Handshake failed while connecting to host: " + _host + " port: " + _port); + if (!Link.doHandshake(clientConnection, sslEngine, getSslHandshakeTimeout())) { + throw new IOException(String.format("SSL Handshake failed while connecting to host: %s", hostLog)); } s_logger.info("SSL: Handshake done"); - s_logger.info("Connected to " + _host + ":" + _port); - final Link link = new Link(peerAddr, this); + final Link link = new Link(serverAddress, this); link.setSSLEngine(sslEngine); - final SelectionKey key = _clientConnection.register(_selector, SelectionKey.OP_READ); + final SelectionKey key = clientConnection.register(_selector, SelectionKey.OP_READ); link.setKey(key); key.attach(link); // Notice we've already connected due to the handshake, so let's get the // remaining task done task = _factory.create(Task.Type.CONNECT, link, null); } catch (final GeneralSecurityException e) { - _selector.close(); + closeChannelAndSelector(); throw new IOException("Failed to initialise security", e); } catch (final IOException e) { - s_logger.error("IOException", e); - _selector.close(); + closeChannelAndSelector(); + s_logger.error(String.format("IOException while connecting to %s", hostLog), e); throw e; } - _executor.submit(task); + if (task != null) { + _executor.submit(task); + } } @Override - protected void registerLink(final InetSocketAddress saddr, final Link link) { + protected void registerLink(final InetSocketAddress address, final Link link) { // don't do anything. } @Override - protected void unregisterLink(final InetSocketAddress saddr) { + protected void unregisterLink(final InetSocketAddress address) { // don't do anything. } @Override public void cleanUp() throws IOException { super.cleanUp(); - if (_clientConnection != null) { - _clientConnection.close(); + if (clientConnection != null && clientConnection.isOpen()) { + clientConnection.close(); + clientConnection = null; } s_logger.info("NioClient connection closed"); } diff --git a/utils/src/main/java/com/cloud/utils/nio/NioConnection.java b/utils/src/main/java/com/cloud/utils/nio/NioConnection.java index 0ff01631a67..66b42b8310a 100644 --- a/utils/src/main/java/com/cloud/utils/nio/NioConnection.java +++ b/utils/src/main/java/com/cloud/utils/nio/NioConnection.java @@ -63,7 +63,8 @@ import com.cloud.utils.exception.NioConnectionException; * provides that. */ public abstract class NioConnection implements Callable { - private static final Logger s_logger = Logger.getLogger(NioConnection.class);; + private static final Logger s_logger = Logger.getLogger(NioConnection.class); + public static final String SERVER_BUSY_MESSAGE = "Server is busy."; protected Selector _selector; protected ExecutorService _threadExecutor; @@ -91,12 +92,13 @@ public abstract class NioConnection implements Callable { _selector = null; _port = port; _factory = factory; - this.sslHandshakeMaxWorkers = sslHandshakeMaxWorkers; + int sslMinWorkers = Math.max(sslHandshakeMinWorkers, 1); + this.sslHandshakeMaxWorkers = Math.max(sslHandshakeMaxWorkers, sslMinWorkers); workerQueue = new LinkedBlockingQueue<>(5 * workers); _executor = new ThreadPoolExecutor(workers, 5 * workers, 1, TimeUnit.DAYS, workerQueue, new NamedThreadFactory(name + "-Handler"), new ThreadPoolExecutor.AbortPolicy()); sslHandshakeQueue = new SynchronousQueue<>(); - _sslHandshakeExecutor = new ThreadPoolExecutor(sslHandshakeMinWorkers, sslHandshakeMaxWorkers, 30, + _sslHandshakeExecutor = new ThreadPoolExecutor(sslMinWorkers, this.sslHandshakeMaxWorkers, 30, TimeUnit.MINUTES, sslHandshakeQueue, new NamedThreadFactory(name + "-SSLHandshakeHandler"), new ThreadPoolExecutor.AbortPolicy()); } @@ -111,8 +113,8 @@ public abstract class NioConnection implements Callable { try { init(); } catch (final ConnectException e) { - s_logger.warn("Unable to connect to remote: is there a server running on port" + _port); - return; + s_logger.warn("Unable to connect to remote: is there a server running on port" + _port, e); + throw new NioConnectionException(e.getMessage(), e); } catch (final IOException e) { s_logger.error("Unable to initialize the threads.", e); throw new NioConnectionException(e.getMessage(), e); @@ -129,6 +131,7 @@ public abstract class NioConnection implements Callable { public void stop() { _executor.shutdown(); + _sslHandshakeExecutor.shutdown(); _isRunning = false; if (_threadExecutor != null) { _futureTask.cancel(false); @@ -206,16 +209,23 @@ public abstract class NioConnection implements Callable { abstract void unregisterLink(InetSocketAddress saddr); + protected boolean rejectConnectionIfBusy(final SocketChannel socketChannel) throws IOException { + if (activeAcceptConnections.get() < sslHandshakeMaxWorkers) { + return false; + } + // Reject new connection if the server is busy + s_logger.warn(String.format("%s Rejecting new connection. %d active connections currently", + SERVER_BUSY_MESSAGE, sslHandshakeMaxWorkers)); + socketChannel.close(); + _selector.wakeup(); + return true; + } + + protected void accept(final SelectionKey key) throws IOException { final ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel(); final SocketChannel socketChannel = serverSocketChannel.accept(); - if (activeAcceptConnections.get() >= sslHandshakeMaxWorkers) { - // Reject new connection if the server is busy - s_logger.warn("Server is busy. Rejecting new connection."); - if (socketChannel != null) { - socketChannel.close(); - } - _selector.wakeup(); + if (rejectConnectionIfBusy(socketChannel)) { return; } socketChannel.configureBlocking(false); @@ -520,7 +530,8 @@ public abstract class NioConnection implements Callable { /* Release the resource used by the instance */ public void cleanUp() throws IOException { - if (_selector != null) { + if (_selector != null && _selector.isOpen()) { + _selector.wakeup(); _selector.close(); } } diff --git a/utils/src/main/java/com/cloud/utils/nio/NioServer.java b/utils/src/main/java/com/cloud/utils/nio/NioServer.java index 59f90a4a8ef..29f6aedecfc 100644 --- a/utils/src/main/java/com/cloud/utils/nio/NioServer.java +++ b/utils/src/main/java/com/cloud/utils/nio/NioServer.java @@ -33,10 +33,10 @@ import org.apache.log4j.Logger; public class NioServer extends NioConnection { private final static Logger s_logger = Logger.getLogger(NioServer.class); - protected InetSocketAddress _localAddr; - private ServerSocketChannel _serverSocket; + protected InetSocketAddress localAddress; + private ServerSocketChannel serverSocket; - protected WeakHashMap _links; + protected WeakHashMap links; public NioServer(final String name, final int port, final int workers, final int sslHandshakeMinWorkers, final int sslHandshakeMaxWorkers, final HandlerFactory factory, final CAService caService, @@ -44,46 +44,46 @@ public class NioServer extends NioConnection { super(name, port, workers, sslHandshakeMinWorkers, sslHandshakeMaxWorkers, factory); setCAService(caService); setSslHandshakeTimeout(sslHandShakeTimeout); - _localAddr = null; - _links = new WeakHashMap(1024); + localAddress = null; + links = new WeakHashMap<>(1024); } public int getPort() { - return _serverSocket.socket().getLocalPort(); + return serverSocket.socket().getLocalPort(); } @Override protected void init() throws IOException { _selector = SelectorProvider.provider().openSelector(); - _serverSocket = ServerSocketChannel.open(); - _serverSocket.configureBlocking(false); + serverSocket = ServerSocketChannel.open(); + serverSocket.configureBlocking(false); - _localAddr = new InetSocketAddress(_port); - _serverSocket.socket().bind(_localAddr); + localAddress = new InetSocketAddress(_port); + serverSocket.socket().bind(localAddress); - _serverSocket.register(_selector, SelectionKey.OP_ACCEPT, null); + serverSocket.register(_selector, SelectionKey.OP_ACCEPT, null); - s_logger.info("NioServer started and listening on " + _serverSocket.socket().getLocalSocketAddress()); + s_logger.info("NioServer started and listening on " + serverSocket.socket().getLocalSocketAddress()); } @Override public void cleanUp() throws IOException { super.cleanUp(); - if (_serverSocket != null) { - _serverSocket.close(); + if (serverSocket != null && serverSocket.isOpen()) { + serverSocket.close(); } - s_logger.info("NioConnection stopped on " + _localAddr.toString()); + s_logger.info("NioConnection stopped on " + localAddress.toString()); } @Override protected void registerLink(final InetSocketAddress addr, final Link link) { - _links.put(addr, link); + links.put(addr, link); } @Override protected void unregisterLink(final InetSocketAddress saddr) { - _links.remove(saddr); + links.remove(saddr); } /** @@ -96,7 +96,7 @@ public class NioServer extends NioConnection { * @return null if not sent. attach object in link if sent. */ public Object send(final InetSocketAddress saddr, final byte[] data) throws ClosedChannelException { - final Link link = _links.get(saddr); + final Link link = links.get(saddr); if (link == null) { return null; } diff --git a/utils/src/test/java/com/cloud/utils/testcase/NioTest.java b/utils/src/test/java/com/cloud/utils/testcase/NioTest.java index 446158b2018..fbdfdc0a833 100644 --- a/utils/src/test/java/com/cloud/utils/testcase/NioTest.java +++ b/utils/src/test/java/com/cloud/utils/testcase/NioTest.java @@ -186,10 +186,10 @@ public class NioTest { protected void init() throws IOException { _selector = Selector.open(); try { - _clientConnection = SocketChannel.open(); - LOGGER.info("Connecting to " + _host + ":" + _port); - final InetSocketAddress peerAddr = new InetSocketAddress(_host, _port); - _clientConnection.connect(peerAddr); + clientConnection = SocketChannel.open(); + LOGGER.info("Connecting to " + host + ":" + _port); + final InetSocketAddress peerAddr = new InetSocketAddress(host, _port); + clientConnection.connect(peerAddr); // This is done on purpose, the malicious client would connect // to the server and then do nothing, hence using a large sleep value Thread.sleep(Long.MAX_VALUE);