From 1652086ff9d777f04fa09658e33e5545e69367b1 Mon Sep 17 00:00:00 2001 From: Abhishek Kumar Date: Wed, 25 Sep 2024 18:33:55 +0530 Subject: [PATCH] wip changes for agent reconnection Signed-off-by: Abhishek Kumar --- .../src/main/java/com/cloud/agent/Agent.java | 14 ++- .../cloud/agent/manager/AgentManagerImpl.java | 16 ++- .../main/java/com/cloud/utils/nio/Link.java | 12 +- .../java/com/cloud/utils/nio/NioClient.java | 11 +- .../com/cloud/utils/nio/NioConnection.java | 113 +++++++++++------- .../java/com/cloud/utils/nio/NioServer.java | 3 +- .../com/cloud/utils/testcase/NioTest.java | 2 +- 7 files changed, 107 insertions(+), 64 deletions(-) diff --git a/agent/src/main/java/com/cloud/agent/Agent.java b/agent/src/main/java/com/cloud/agent/Agent.java index 9e0ee746c03..eb584aaf2d9 100644 --- a/agent/src/main/java/com/cloud/agent/Agent.java +++ b/agent/src/main/java/com/cloud/agent/Agent.java @@ -154,6 +154,14 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater private String _keystoreSetupPath; private String _keystoreCertImportPath; + protected String getConnectionName() { + return "Agent-" + _resource.getName(); + } + + protected String getConnectionNameLog() { + return String.format("%s:: ", getConnectionName()); + } + // for simulator use only public Agent(final IAgentShell shell) { _shell = shell; @@ -195,7 +203,7 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater } final String host = _shell.getNextHost(); - _connection = new NioClient("Agent", host, _shell.getPort(), _shell.getWorkers(), this); + _connection = new NioClient(getConnectionName(), host, _shell.getPort(), _shell.getWorkers(), this); // ((NioClient)_connection).setBindAddress(_shell.getPrivateIp()); @@ -536,7 +544,7 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater _resource.disconnected(); - s_logger.info("Lost connection to host: " + _shell.getConnectedHost() + ". Attempting reconnection while we still have " + _inProgress.get() + " commands in progress."); + s_logger.info(getConnectionNameLog() + "Lost connection to host: " + _shell.getConnectedHost() + ". Attempting reconnection while we still have " + _inProgress.get() + " commands in progress."); _connection.stop(); @@ -553,7 +561,7 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater do { final String host = _shell.getNextHost(); _connection = new NioClient("Agent", host, _shell.getPort(), _shell.getWorkers(), this); - s_logger.info("Reconnecting to host:" + host); + s_logger.info(getConnectionNameLog() + "Reconnecting to host:" + host); try { _connection.start(); } catch (final NioConnectionException e) { diff --git a/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java b/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java index 6f9db09693c..8d0e0b09780 100644 --- a/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java +++ b/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java @@ -38,9 +38,6 @@ import java.util.concurrent.locks.ReentrantLock; import javax.inject.Inject; import javax.naming.ConfigurationException; -import com.cloud.configuration.Config; -import com.cloud.utils.NumbersUtil; -import com.cloud.utils.db.GlobalLock; import org.apache.cloudstack.agent.lb.IndirectAgentLB; import org.apache.cloudstack.ca.CAManager; import org.apache.cloudstack.engine.orchestration.service.NetworkOrchestrationService; @@ -53,6 +50,7 @@ import org.apache.cloudstack.managed.context.ManagedContextRunnable; import org.apache.cloudstack.outofbandmanagement.dao.OutOfBandManagementDao; import org.apache.cloudstack.utils.identity.ManagementServerNode; import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Logger; import org.apache.log4j.MDC; @@ -81,6 +79,7 @@ import com.cloud.agent.api.UnsupportedAnswer; import com.cloud.agent.transport.Request; import com.cloud.agent.transport.Response; import com.cloud.alert.AlertManager; +import com.cloud.configuration.Config; import com.cloud.configuration.ManagementServiceConfiguration; import com.cloud.dc.ClusterVO; import com.cloud.dc.DataCenterVO; @@ -104,11 +103,13 @@ import com.cloud.resource.Discoverer; import com.cloud.resource.ResourceManager; import com.cloud.resource.ResourceState; import com.cloud.resource.ServerResource; +import com.cloud.utils.NumbersUtil; import com.cloud.utils.Pair; import com.cloud.utils.component.ManagerBase; import com.cloud.utils.concurrency.NamedThreadFactory; import com.cloud.utils.db.DB; import com.cloud.utils.db.EntityManager; +import com.cloud.utils.db.GlobalLock; import com.cloud.utils.db.QueryBuilder; import com.cloud.utils.db.SearchCriteria.Op; import com.cloud.utils.db.TransactionLegacy; @@ -123,7 +124,6 @@ import com.cloud.utils.nio.Link; import com.cloud.utils.nio.NioServer; import com.cloud.utils.nio.Task; import com.cloud.utils.time.InaccurateClock; -import org.apache.commons.lang3.StringUtils; /** * Implementation of the Agent Manager. This class controls the connection to the agents. @@ -197,6 +197,8 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl protected final ConfigKey Workers = new ConfigKey("Advanced", Integer.class, "workers", "5", "Number of worker threads handling remote agent connections.", false); protected final ConfigKey Port = new ConfigKey("Advanced", Integer.class, "port", "8250", "Port to listen on for remote agent connections.", false); + protected final ConfigKey RemoteAgentSslHandshakeTimeout = new ConfigKey<>("Advanced", Integer.class, "agent.ssl.handshake.timeout", "30", + "Seconds after which SSL handshake times out during remote agent connections.", false); protected final ConfigKey AlertWait = new ConfigKey("Advanced", Integer.class, "alert.wait", "1800", "Seconds to wait before alerting on a disconnected agent", true); protected final ConfigKey DirectAgentLoadSize = new ConfigKey("Advanced", Integer.class, "direct.agent.load.size", "16", @@ -231,7 +233,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl // allow core threads to time out even when there are no items in the queue _connectExecutor.allowCoreThreadTimeOut(true); - _connection = new NioServer("AgentManager", Port.value(), Workers.value() + 10, this, caService); + _connection = new NioServer("AgentManager", Port.value(), Workers.value() + 10, this, caService, RemoteAgentSslHandshakeTimeout.value()); s_logger.info("Listening on " + Port.value() + " with " + Workers.value() + " workers"); // executes all agent commands other than cron and ping @@ -1838,7 +1840,9 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl @Override public ConfigKey[] getConfigKeys() { return new ConfigKey[] { CheckTxnBeforeSending, Workers, Port, Wait, AlertWait, DirectAgentLoadSize, - DirectAgentPoolSize, DirectAgentThreadCap, EnableKVMAutoEnableDisable, ReadyCommandWait }; + DirectAgentPoolSize, DirectAgentThreadCap, EnableKVMAutoEnableDisable, ReadyCommandWait, + RemoteAgentSslHandshakeTimeout + }; } protected class SetHostParamsListener implements Listener { diff --git a/utils/src/main/java/com/cloud/utils/nio/Link.java b/utils/src/main/java/com/cloud/utils/nio/Link.java index 5040c8306c0..832c77d2cc7 100644 --- a/utils/src/main/java/com/cloud/utils/nio/Link.java +++ b/utils/src/main/java/com/cloud/utils/nio/Link.java @@ -48,6 +48,7 @@ import javax.net.ssl.TrustManagerFactory; import org.apache.cloudstack.framework.ca.CAService; import org.apache.cloudstack.utils.security.KeyStoreUtils; import org.apache.cloudstack.utils.security.SSLUtils; +import org.apache.commons.lang3.ObjectUtils; import org.apache.log4j.Logger; import com.cloud.utils.PropertiesUtil; @@ -591,6 +592,10 @@ public class Link { } public static boolean doHandshake(final SocketChannel socketChannel, final SSLEngine sslEngine) throws IOException { + return doHandshake(socketChannel, sslEngine, null); + } + + public static boolean doHandshake(final SocketChannel socketChannel, final SSLEngine sslEngine, Integer timeout) throws IOException { if (socketChannel == null || sslEngine == null) { return false; } @@ -605,12 +610,13 @@ public class Link { final long startTimeMills = System.currentTimeMillis(); HandshakeStatus handshakeStatus = sslEngine.getHandshakeStatus(); + long timeoutMillis = ObjectUtils.defaultIfNull(timeout, 30) * 1000L; while (handshakeStatus != SSLEngineResult.HandshakeStatus.FINISHED && handshakeStatus != SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING) { final long timeTaken = System.currentTimeMillis() - startTimeMills; - if (timeTaken > 30000L) { - s_logger.warn("SSL Handshake has taken more than 30s to connect to: " + socketChannel.getRemoteAddress() + - ". Please investigate this connection."); + if (timeTaken > timeoutMillis) { + s_logger.warn(String.format("SSL Handshake has taken more than 30s to connect to: %s" + + " while status: %s. Please investigate this connection.", socketChannel.getRemoteAddress(), handshakeStatus)); return false; } switch (handshakeStatus) { diff --git a/utils/src/main/java/com/cloud/utils/nio/NioClient.java b/utils/src/main/java/com/cloud/utils/nio/NioClient.java index 0eb58a59f62..603a6cb03e3 100644 --- a/utils/src/main/java/com/cloud/utils/nio/NioClient.java +++ b/utils/src/main/java/com/cloud/utils/nio/NioClient.java @@ -51,7 +51,7 @@ public class NioClient extends NioConnection { try { _clientConnection = SocketChannel.open(); - s_logger.info("Connecting to " + _host + ":" + _port); + s_logger.info(getConnectionName() + "Connecting to " + _host + ":" + _port); final InetSocketAddress peerAddr = new InetSocketAddress(_host, _port); _clientConnection.connect(peerAddr); _clientConnection.configureBlocking(false); @@ -61,13 +61,13 @@ public class NioClient extends NioConnection { sslEngine.setUseClientMode(true); sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols())); sslEngine.beginHandshake(); - if (!Link.doHandshake(_clientConnection, sslEngine)) { - s_logger.error("SSL Handshake failed while connecting to host: " + _host + " port: " + _port); + if (!Link.doHandshake(_clientConnection, sslEngine, getSslHandshakeTimeout())) { + s_logger.error(getConnectionName() + "SSL Handshake failed while connecting to host: " + _host + " port: " + _port); _selector.close(); throw new IOException("SSL Handshake failed while connecting to host: " + _host + " port: " + _port); } - s_logger.info("SSL: Handshake done"); - s_logger.info("Connected to " + _host + ":" + _port); + s_logger.info(getConnectionName() + "SSL: Handshake done"); + s_logger.info(getConnectionName() + "Connected to " + _host + ":" + _port); final Link link = new Link(peerAddr, this); link.setSSLEngine(sslEngine); @@ -81,6 +81,7 @@ public class NioClient extends NioConnection { _selector.close(); throw new IOException("Failed to initialise security", e); } catch (final IOException e) { + s_logger.error(getConnectionName() + "IOException", e); _selector.close(); throw e; } diff --git a/utils/src/main/java/com/cloud/utils/nio/NioConnection.java b/utils/src/main/java/com/cloud/utils/nio/NioConnection.java index 9a5bf7e4153..5e17fe05bf0 100644 --- a/utils/src/main/java/com/cloud/utils/nio/NioConnection.java +++ b/utils/src/main/java/com/cloud/utils/nio/NioConnection.java @@ -32,17 +32,21 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; +import java.security.GeneralSecurityException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import javax.net.ssl.SSLEngine; @@ -73,6 +77,13 @@ public abstract class NioConnection implements Callable { protected ExecutorService _executor; protected ExecutorService _sslHandshakeExecutor; protected CAService caService; + protected Integer sslHandshakeTimeout = null; + private final AtomicInteger activeAcceptConnections = new AtomicInteger(0); + private BlockingQueue sslHandshakeQueue; + + public String getConnectionName() { + return String.format("%s:: ", _name); + } public NioConnection(final String name, final int port, final int workers, final HandlerFactory factory) { _name = name; @@ -80,8 +91,12 @@ public abstract class NioConnection implements Callable { _selector = null; _port = port; _factory = factory; - _executor = new ThreadPoolExecutor(workers, 5 * workers, 1, TimeUnit.DAYS, new LinkedBlockingQueue(), new NamedThreadFactory(name + "-Handler")); - _sslHandshakeExecutor = Executors.newCachedThreadPool(new NamedThreadFactory(name + "-SSLHandshakeHandler")); + _executor = new ThreadPoolExecutor(workers, 5 * workers, 1, TimeUnit.DAYS, + new LinkedBlockingQueue<>(), new NamedThreadFactory(name + "-Handler")); + sslHandshakeQueue = new LinkedBlockingQueue<>(5 * workers); + _sslHandshakeExecutor = new ThreadPoolExecutor(workers, 5 * workers, 90, + TimeUnit.SECONDS, sslHandshakeQueue, + new NamedThreadFactory(name + "-Handler"), new ThreadPoolExecutor.AbortPolicy()); } public void setCAService(final CAService caService) { @@ -94,13 +109,13 @@ public abstract class NioConnection implements Callable { try { init(); } catch (final ConnectException e) { - s_logger.warn("Unable to connect to remote: is there a server running on port " + _port); + s_logger.warn(getConnectionName() + "Unable to connect to remote: is there a server running on port" + _port); return; } catch (final IOException e) { - s_logger.error("Unable to initialize the threads.", e); + s_logger.error(getConnectionName() + "Unable to initialize the threads.", e); throw new NioConnectionException(e.getMessage(), e); } catch (final Exception e) { - s_logger.error("Unable to initialize the threads due to unknown exception.", e); + s_logger.error(getConnectionName() + "Unable to initialize the threads due to unknown exception.", e); throw new NioConnectionException(e.getMessage(), e); } _isStartup = true; @@ -190,6 +205,8 @@ public abstract class NioConnection implements Callable { abstract void unregisterLink(InetSocketAddress saddr); protected void accept(final SelectionKey key) throws IOException { + s_logger.info(String.format("%s------------------active accept connections: %d, queue: %d", + getConnectionName(), activeAcceptConnections.get(), sslHandshakeQueue.size())); final ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel(); final SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); @@ -198,51 +215,49 @@ public abstract class NioConnection implements Callable { socket.setKeepAlive(true); if (s_logger.isTraceEnabled()) { - s_logger.trace("Connection accepted for " + socket); + s_logger.trace(getConnectionName() + "Connection accepted for " + socket); } - final SSLEngine sslEngine; try { - sslEngine = Link.initServerSSLEngine(caService, socketChannel.getRemoteAddress().toString()); - sslEngine.setUseClientMode(false); - sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols())); final NioConnection nioConnection = this; - _sslHandshakeExecutor.submit(new Runnable() { - @Override - public void run() { - _selector.wakeup(); - try { - sslEngine.beginHandshake(); - if (!Link.doHandshake(socketChannel, sslEngine)) { - throw new IOException("SSL handshake timed out with " + socketChannel.getRemoteAddress()); - } - if (s_logger.isTraceEnabled()) { - s_logger.trace("SSL: Handshake done"); - } - final InetSocketAddress saddr = (InetSocketAddress)socket.getRemoteSocketAddress(); - final Link link = new Link(saddr, nioConnection); - link.setSSLEngine(sslEngine); - link.setKey(socketChannel.register(key.selector(), SelectionKey.OP_READ, link)); - final Task task = _factory.create(Task.Type.CONNECT, link, null); - registerLink(saddr, link); - _executor.submit(task); - } catch (IOException e) { - if (s_logger.isTraceEnabled()) { - s_logger.trace("Connection closed due to failure: " + e.getMessage()); - } - closeAutoCloseable(socket, "accepting socket"); - closeAutoCloseable(socketChannel, "accepting socketChannel"); - } finally { - _selector.wakeup(); + _sslHandshakeExecutor.submit(() -> { + activeAcceptConnections.incrementAndGet(); + _selector.wakeup(); + try { + final SSLEngine sslEngine = Link.initServerSSLEngine(caService, socketChannel.getRemoteAddress().toString()); + sslEngine.setUseClientMode(false); + sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols())); + sslEngine.beginHandshake(); + if (!Link.doHandshake(socketChannel, sslEngine)) { + throw new IOException("SSL handshake timed out with " + socketChannel.getRemoteAddress()); } + if (s_logger.isTraceEnabled()) { + s_logger.trace("SSL: Handshake done"); + } + final InetSocketAddress saddr = (InetSocketAddress)socket.getRemoteSocketAddress(); + final Link link = new Link(saddr, nioConnection); + link.setSSLEngine(sslEngine); + link.setKey(socketChannel.register(key.selector(), SelectionKey.OP_READ, link)); + final Task task = _factory.create(Task.Type.CONNECT, link, null); + registerLink(saddr, link); + _executor.submit(task); + } catch (final GeneralSecurityException | IOException e) { + if (s_logger.isTraceEnabled()) { + s_logger.trace("Connection closed due to failure: " + e.getMessage()); + } + closeAutoCloseable(socket, "accepting socket"); + closeAutoCloseable(socketChannel, "accepting socketChannel"); + } finally { + activeAcceptConnections.decrementAndGet(); + _selector.wakeup(); } }); - } catch (final Exception e) { + } catch (final RejectedExecutionException e) { if (s_logger.isTraceEnabled()) { - s_logger.trace("Connection closed due to failure: " + e.getMessage()); + s_logger.trace("Task rejected: " + e.getMessage()); } - closeAutoCloseable(socket, "accepting socket"); - closeAutoCloseable(socketChannel, "accepting socketChannel"); + closeAutoCloseable(socket, "Rejecting connection - accepting socket"); + closeAutoCloseable(socketChannel, "Rejecting connection - accepting socketChannel"); } finally { _selector.wakeup(); } @@ -259,7 +274,7 @@ public abstract class NioConnection implements Callable { try { _executor.submit(task); } catch (final Exception e) { - s_logger.warn("Exception occurred when submitting the task", e); + s_logger.warn(getConnectionName() + "Exception occurred when submitting the task", e); } } } @@ -283,7 +298,7 @@ public abstract class NioConnection implements Callable { try { _executor.submit(task); } catch (final Exception e) { - s_logger.warn("Exception occurred when submitting the task", e); + s_logger.warn(getConnectionName() + "Exception occurred when submitting the task", e); } } catch (final Exception e) { logDebug(e, key, 1); @@ -315,7 +330,7 @@ public abstract class NioConnection implements Callable { } } - s_logger.debug("Location " + loc + ": Socket " + socket + " closed on read. Probably -1 returned: " + e.getMessage()); + s_logger.debug(getConnectionName() + "Location " + loc + ": Socket " + socket + " closed on read. Probably -1 returned: " + e.getMessage()); } } @@ -412,7 +427,7 @@ public abstract class NioConnection implements Callable { try { _executor.submit(task); } catch (final Exception e) { - s_logger.warn("Exception occurred when submitting the task", e); + s_logger.warn(String.format("Exception occurred when submitting the task for connect: %s", socket), e); } } catch (final IOException e) { logTrace(e, key, 2); @@ -512,4 +527,12 @@ public abstract class NioConnection implements Callable { this.att = att; } } + + public Integer getSslHandshakeTimeout() { + return sslHandshakeTimeout; + } + + public void setSslHandshakeTimeout(Integer sslHandshakeTimeout) { + this.sslHandshakeTimeout = sslHandshakeTimeout; + } } diff --git a/utils/src/main/java/com/cloud/utils/nio/NioServer.java b/utils/src/main/java/com/cloud/utils/nio/NioServer.java index 0f83eda57b5..3fc3ec156bd 100644 --- a/utils/src/main/java/com/cloud/utils/nio/NioServer.java +++ b/utils/src/main/java/com/cloud/utils/nio/NioServer.java @@ -38,9 +38,10 @@ public class NioServer extends NioConnection { protected WeakHashMap _links; - public NioServer(final String name, final int port, final int workers, final HandlerFactory factory, final CAService caService) { + public NioServer(final String name, final int port, final int workers, final HandlerFactory factory, final CAService caService, final Integer sslHandShakeTimeout) { super(name, port, workers, factory); setCAService(caService); + setSslHandshakeTimeout(sslHandShakeTimeout); _localAddr = null; _links = new WeakHashMap(1024); } diff --git a/utils/src/test/java/com/cloud/utils/testcase/NioTest.java b/utils/src/test/java/com/cloud/utils/testcase/NioTest.java index 0a9deea1a9d..692278d20ce 100644 --- a/utils/src/test/java/com/cloud/utils/testcase/NioTest.java +++ b/utils/src/test/java/com/cloud/utils/testcase/NioTest.java @@ -98,7 +98,7 @@ public class NioTest { testBytes = new byte[1000000]; randomGenerator.nextBytes(testBytes); - server = new NioServer("NioTestServer", 0, 1, new NioTestServer(), null); + server = new NioServer("NioTestServer", 0, 1, new NioTestServer(), null, null); try { server.start(); } catch (final NioConnectionException e) {