wip changes for agent reconnection

Signed-off-by: Abhishek Kumar <abhishek.mrt22@gmail.com>
This commit is contained in:
Abhishek Kumar 2024-09-25 18:33:55 +05:30
parent 0151f125aa
commit 1652086ff9
7 changed files with 107 additions and 64 deletions

View File

@ -154,6 +154,14 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater
private String _keystoreSetupPath;
private String _keystoreCertImportPath;
protected String getConnectionName() {
return "Agent-" + _resource.getName();
}
protected String getConnectionNameLog() {
return String.format("%s:: ", getConnectionName());
}
// for simulator use only
public Agent(final IAgentShell shell) {
_shell = shell;
@ -195,7 +203,7 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater
}
final String host = _shell.getNextHost();
_connection = new NioClient("Agent", host, _shell.getPort(), _shell.getWorkers(), this);
_connection = new NioClient(getConnectionName(), host, _shell.getPort(), _shell.getWorkers(), this);
// ((NioClient)_connection).setBindAddress(_shell.getPrivateIp());
@ -536,7 +544,7 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater
_resource.disconnected();
s_logger.info("Lost connection to host: " + _shell.getConnectedHost() + ". Attempting reconnection while we still have " + _inProgress.get() + " commands in progress.");
s_logger.info(getConnectionNameLog() + "Lost connection to host: " + _shell.getConnectedHost() + ". Attempting reconnection while we still have " + _inProgress.get() + " commands in progress.");
_connection.stop();
@ -553,7 +561,7 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater
do {
final String host = _shell.getNextHost();
_connection = new NioClient("Agent", host, _shell.getPort(), _shell.getWorkers(), this);
s_logger.info("Reconnecting to host:" + host);
s_logger.info(getConnectionNameLog() + "Reconnecting to host:" + host);
try {
_connection.start();
} catch (final NioConnectionException e) {

View File

@ -38,9 +38,6 @@ import java.util.concurrent.locks.ReentrantLock;
import javax.inject.Inject;
import javax.naming.ConfigurationException;
import com.cloud.configuration.Config;
import com.cloud.utils.NumbersUtil;
import com.cloud.utils.db.GlobalLock;
import org.apache.cloudstack.agent.lb.IndirectAgentLB;
import org.apache.cloudstack.ca.CAManager;
import org.apache.cloudstack.engine.orchestration.service.NetworkOrchestrationService;
@ -53,6 +50,7 @@ import org.apache.cloudstack.managed.context.ManagedContextRunnable;
import org.apache.cloudstack.outofbandmanagement.dao.OutOfBandManagementDao;
import org.apache.cloudstack.utils.identity.ManagementServerNode;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import org.apache.log4j.MDC;
@ -81,6 +79,7 @@ import com.cloud.agent.api.UnsupportedAnswer;
import com.cloud.agent.transport.Request;
import com.cloud.agent.transport.Response;
import com.cloud.alert.AlertManager;
import com.cloud.configuration.Config;
import com.cloud.configuration.ManagementServiceConfiguration;
import com.cloud.dc.ClusterVO;
import com.cloud.dc.DataCenterVO;
@ -104,11 +103,13 @@ import com.cloud.resource.Discoverer;
import com.cloud.resource.ResourceManager;
import com.cloud.resource.ResourceState;
import com.cloud.resource.ServerResource;
import com.cloud.utils.NumbersUtil;
import com.cloud.utils.Pair;
import com.cloud.utils.component.ManagerBase;
import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.db.DB;
import com.cloud.utils.db.EntityManager;
import com.cloud.utils.db.GlobalLock;
import com.cloud.utils.db.QueryBuilder;
import com.cloud.utils.db.SearchCriteria.Op;
import com.cloud.utils.db.TransactionLegacy;
@ -123,7 +124,6 @@ import com.cloud.utils.nio.Link;
import com.cloud.utils.nio.NioServer;
import com.cloud.utils.nio.Task;
import com.cloud.utils.time.InaccurateClock;
import org.apache.commons.lang3.StringUtils;
/**
* Implementation of the Agent Manager. This class controls the connection to the agents.
@ -197,6 +197,8 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
protected final ConfigKey<Integer> Workers = new ConfigKey<Integer>("Advanced", Integer.class, "workers", "5",
"Number of worker threads handling remote agent connections.", false);
protected final ConfigKey<Integer> Port = new ConfigKey<Integer>("Advanced", Integer.class, "port", "8250", "Port to listen on for remote agent connections.", false);
protected final ConfigKey<Integer> RemoteAgentSslHandshakeTimeout = new ConfigKey<>("Advanced", Integer.class, "agent.ssl.handshake.timeout", "30",
"Seconds after which SSL handshake times out during remote agent connections.", false);
protected final ConfigKey<Integer> AlertWait = new ConfigKey<Integer>("Advanced", Integer.class, "alert.wait", "1800",
"Seconds to wait before alerting on a disconnected agent", true);
protected final ConfigKey<Integer> DirectAgentLoadSize = new ConfigKey<Integer>("Advanced", Integer.class, "direct.agent.load.size", "16",
@ -231,7 +233,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
// allow core threads to time out even when there are no items in the queue
_connectExecutor.allowCoreThreadTimeOut(true);
_connection = new NioServer("AgentManager", Port.value(), Workers.value() + 10, this, caService);
_connection = new NioServer("AgentManager", Port.value(), Workers.value() + 10, this, caService, RemoteAgentSslHandshakeTimeout.value());
s_logger.info("Listening on " + Port.value() + " with " + Workers.value() + " workers");
// executes all agent commands other than cron and ping
@ -1838,7 +1840,9 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
@Override
public ConfigKey<?>[] getConfigKeys() {
return new ConfigKey<?>[] { CheckTxnBeforeSending, Workers, Port, Wait, AlertWait, DirectAgentLoadSize,
DirectAgentPoolSize, DirectAgentThreadCap, EnableKVMAutoEnableDisable, ReadyCommandWait };
DirectAgentPoolSize, DirectAgentThreadCap, EnableKVMAutoEnableDisable, ReadyCommandWait,
RemoteAgentSslHandshakeTimeout
};
}
protected class SetHostParamsListener implements Listener {

View File

@ -48,6 +48,7 @@ import javax.net.ssl.TrustManagerFactory;
import org.apache.cloudstack.framework.ca.CAService;
import org.apache.cloudstack.utils.security.KeyStoreUtils;
import org.apache.cloudstack.utils.security.SSLUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.log4j.Logger;
import com.cloud.utils.PropertiesUtil;
@ -591,6 +592,10 @@ public class Link {
}
public static boolean doHandshake(final SocketChannel socketChannel, final SSLEngine sslEngine) throws IOException {
return doHandshake(socketChannel, sslEngine, null);
}
public static boolean doHandshake(final SocketChannel socketChannel, final SSLEngine sslEngine, Integer timeout) throws IOException {
if (socketChannel == null || sslEngine == null) {
return false;
}
@ -605,12 +610,13 @@ public class Link {
final long startTimeMills = System.currentTimeMillis();
HandshakeStatus handshakeStatus = sslEngine.getHandshakeStatus();
long timeoutMillis = ObjectUtils.defaultIfNull(timeout, 30) * 1000L;
while (handshakeStatus != SSLEngineResult.HandshakeStatus.FINISHED
&& handshakeStatus != SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING) {
final long timeTaken = System.currentTimeMillis() - startTimeMills;
if (timeTaken > 30000L) {
s_logger.warn("SSL Handshake has taken more than 30s to connect to: " + socketChannel.getRemoteAddress() +
". Please investigate this connection.");
if (timeTaken > timeoutMillis) {
s_logger.warn(String.format("SSL Handshake has taken more than 30s to connect to: %s" +
" while status: %s. Please investigate this connection.", socketChannel.getRemoteAddress(), handshakeStatus));
return false;
}
switch (handshakeStatus) {

View File

@ -51,7 +51,7 @@ public class NioClient extends NioConnection {
try {
_clientConnection = SocketChannel.open();
s_logger.info("Connecting to " + _host + ":" + _port);
s_logger.info(getConnectionName() + "Connecting to " + _host + ":" + _port);
final InetSocketAddress peerAddr = new InetSocketAddress(_host, _port);
_clientConnection.connect(peerAddr);
_clientConnection.configureBlocking(false);
@ -61,13 +61,13 @@ public class NioClient extends NioConnection {
sslEngine.setUseClientMode(true);
sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols()));
sslEngine.beginHandshake();
if (!Link.doHandshake(_clientConnection, sslEngine)) {
s_logger.error("SSL Handshake failed while connecting to host: " + _host + " port: " + _port);
if (!Link.doHandshake(_clientConnection, sslEngine, getSslHandshakeTimeout())) {
s_logger.error(getConnectionName() + "SSL Handshake failed while connecting to host: " + _host + " port: " + _port);
_selector.close();
throw new IOException("SSL Handshake failed while connecting to host: " + _host + " port: " + _port);
}
s_logger.info("SSL: Handshake done");
s_logger.info("Connected to " + _host + ":" + _port);
s_logger.info(getConnectionName() + "SSL: Handshake done");
s_logger.info(getConnectionName() + "Connected to " + _host + ":" + _port);
final Link link = new Link(peerAddr, this);
link.setSSLEngine(sslEngine);
@ -81,6 +81,7 @@ public class NioClient extends NioConnection {
_selector.close();
throw new IOException("Failed to initialise security", e);
} catch (final IOException e) {
s_logger.error(getConnectionName() + "IOException", e);
_selector.close();
throw e;
}

View File

@ -32,17 +32,21 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLEngine;
@ -73,6 +77,13 @@ public abstract class NioConnection implements Callable<Boolean> {
protected ExecutorService _executor;
protected ExecutorService _sslHandshakeExecutor;
protected CAService caService;
protected Integer sslHandshakeTimeout = null;
private final AtomicInteger activeAcceptConnections = new AtomicInteger(0);
private BlockingQueue<Runnable> sslHandshakeQueue;
public String getConnectionName() {
return String.format("%s:: ", _name);
}
public NioConnection(final String name, final int port, final int workers, final HandlerFactory factory) {
_name = name;
@ -80,8 +91,12 @@ public abstract class NioConnection implements Callable<Boolean> {
_selector = null;
_port = port;
_factory = factory;
_executor = new ThreadPoolExecutor(workers, 5 * workers, 1, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(name + "-Handler"));
_sslHandshakeExecutor = Executors.newCachedThreadPool(new NamedThreadFactory(name + "-SSLHandshakeHandler"));
_executor = new ThreadPoolExecutor(workers, 5 * workers, 1, TimeUnit.DAYS,
new LinkedBlockingQueue<>(), new NamedThreadFactory(name + "-Handler"));
sslHandshakeQueue = new LinkedBlockingQueue<>(5 * workers);
_sslHandshakeExecutor = new ThreadPoolExecutor(workers, 5 * workers, 90,
TimeUnit.SECONDS, sslHandshakeQueue,
new NamedThreadFactory(name + "-Handler"), new ThreadPoolExecutor.AbortPolicy());
}
public void setCAService(final CAService caService) {
@ -94,13 +109,13 @@ public abstract class NioConnection implements Callable<Boolean> {
try {
init();
} catch (final ConnectException e) {
s_logger.warn("Unable to connect to remote: is there a server running on port " + _port);
s_logger.warn(getConnectionName() + "Unable to connect to remote: is there a server running on port" + _port);
return;
} catch (final IOException e) {
s_logger.error("Unable to initialize the threads.", e);
s_logger.error(getConnectionName() + "Unable to initialize the threads.", e);
throw new NioConnectionException(e.getMessage(), e);
} catch (final Exception e) {
s_logger.error("Unable to initialize the threads due to unknown exception.", e);
s_logger.error(getConnectionName() + "Unable to initialize the threads due to unknown exception.", e);
throw new NioConnectionException(e.getMessage(), e);
}
_isStartup = true;
@ -190,6 +205,8 @@ public abstract class NioConnection implements Callable<Boolean> {
abstract void unregisterLink(InetSocketAddress saddr);
protected void accept(final SelectionKey key) throws IOException {
s_logger.info(String.format("%s------------------active accept connections: %d, queue: %d",
getConnectionName(), activeAcceptConnections.get(), sslHandshakeQueue.size()));
final ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
final SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
@ -198,51 +215,49 @@ public abstract class NioConnection implements Callable<Boolean> {
socket.setKeepAlive(true);
if (s_logger.isTraceEnabled()) {
s_logger.trace("Connection accepted for " + socket);
s_logger.trace(getConnectionName() + "Connection accepted for " + socket);
}
final SSLEngine sslEngine;
try {
sslEngine = Link.initServerSSLEngine(caService, socketChannel.getRemoteAddress().toString());
sslEngine.setUseClientMode(false);
sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols()));
final NioConnection nioConnection = this;
_sslHandshakeExecutor.submit(new Runnable() {
@Override
public void run() {
_selector.wakeup();
try {
sslEngine.beginHandshake();
if (!Link.doHandshake(socketChannel, sslEngine)) {
throw new IOException("SSL handshake timed out with " + socketChannel.getRemoteAddress());
}
if (s_logger.isTraceEnabled()) {
s_logger.trace("SSL: Handshake done");
}
final InetSocketAddress saddr = (InetSocketAddress)socket.getRemoteSocketAddress();
final Link link = new Link(saddr, nioConnection);
link.setSSLEngine(sslEngine);
link.setKey(socketChannel.register(key.selector(), SelectionKey.OP_READ, link));
final Task task = _factory.create(Task.Type.CONNECT, link, null);
registerLink(saddr, link);
_executor.submit(task);
} catch (IOException e) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Connection closed due to failure: " + e.getMessage());
}
closeAutoCloseable(socket, "accepting socket");
closeAutoCloseable(socketChannel, "accepting socketChannel");
} finally {
_selector.wakeup();
_sslHandshakeExecutor.submit(() -> {
activeAcceptConnections.incrementAndGet();
_selector.wakeup();
try {
final SSLEngine sslEngine = Link.initServerSSLEngine(caService, socketChannel.getRemoteAddress().toString());
sslEngine.setUseClientMode(false);
sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols()));
sslEngine.beginHandshake();
if (!Link.doHandshake(socketChannel, sslEngine)) {
throw new IOException("SSL handshake timed out with " + socketChannel.getRemoteAddress());
}
if (s_logger.isTraceEnabled()) {
s_logger.trace("SSL: Handshake done");
}
final InetSocketAddress saddr = (InetSocketAddress)socket.getRemoteSocketAddress();
final Link link = new Link(saddr, nioConnection);
link.setSSLEngine(sslEngine);
link.setKey(socketChannel.register(key.selector(), SelectionKey.OP_READ, link));
final Task task = _factory.create(Task.Type.CONNECT, link, null);
registerLink(saddr, link);
_executor.submit(task);
} catch (final GeneralSecurityException | IOException e) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Connection closed due to failure: " + e.getMessage());
}
closeAutoCloseable(socket, "accepting socket");
closeAutoCloseable(socketChannel, "accepting socketChannel");
} finally {
activeAcceptConnections.decrementAndGet();
_selector.wakeup();
}
});
} catch (final Exception e) {
} catch (final RejectedExecutionException e) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Connection closed due to failure: " + e.getMessage());
s_logger.trace("Task rejected: " + e.getMessage());
}
closeAutoCloseable(socket, "accepting socket");
closeAutoCloseable(socketChannel, "accepting socketChannel");
closeAutoCloseable(socket, "Rejecting connection - accepting socket");
closeAutoCloseable(socketChannel, "Rejecting connection - accepting socketChannel");
} finally {
_selector.wakeup();
}
@ -259,7 +274,7 @@ public abstract class NioConnection implements Callable<Boolean> {
try {
_executor.submit(task);
} catch (final Exception e) {
s_logger.warn("Exception occurred when submitting the task", e);
s_logger.warn(getConnectionName() + "Exception occurred when submitting the task", e);
}
}
}
@ -283,7 +298,7 @@ public abstract class NioConnection implements Callable<Boolean> {
try {
_executor.submit(task);
} catch (final Exception e) {
s_logger.warn("Exception occurred when submitting the task", e);
s_logger.warn(getConnectionName() + "Exception occurred when submitting the task", e);
}
} catch (final Exception e) {
logDebug(e, key, 1);
@ -315,7 +330,7 @@ public abstract class NioConnection implements Callable<Boolean> {
}
}
s_logger.debug("Location " + loc + ": Socket " + socket + " closed on read. Probably -1 returned: " + e.getMessage());
s_logger.debug(getConnectionName() + "Location " + loc + ": Socket " + socket + " closed on read. Probably -1 returned: " + e.getMessage());
}
}
@ -412,7 +427,7 @@ public abstract class NioConnection implements Callable<Boolean> {
try {
_executor.submit(task);
} catch (final Exception e) {
s_logger.warn("Exception occurred when submitting the task", e);
s_logger.warn(String.format("Exception occurred when submitting the task for connect: %s", socket), e);
}
} catch (final IOException e) {
logTrace(e, key, 2);
@ -512,4 +527,12 @@ public abstract class NioConnection implements Callable<Boolean> {
this.att = att;
}
}
public Integer getSslHandshakeTimeout() {
return sslHandshakeTimeout;
}
public void setSslHandshakeTimeout(Integer sslHandshakeTimeout) {
this.sslHandshakeTimeout = sslHandshakeTimeout;
}
}

View File

@ -38,9 +38,10 @@ public class NioServer extends NioConnection {
protected WeakHashMap<InetSocketAddress, Link> _links;
public NioServer(final String name, final int port, final int workers, final HandlerFactory factory, final CAService caService) {
public NioServer(final String name, final int port, final int workers, final HandlerFactory factory, final CAService caService, final Integer sslHandShakeTimeout) {
super(name, port, workers, factory);
setCAService(caService);
setSslHandshakeTimeout(sslHandShakeTimeout);
_localAddr = null;
_links = new WeakHashMap<InetSocketAddress, Link>(1024);
}

View File

@ -98,7 +98,7 @@ public class NioTest {
testBytes = new byte[1000000];
randomGenerator.nextBytes(testBytes);
server = new NioServer("NioTestServer", 0, 1, new NioTestServer(), null);
server = new NioServer("NioTestServer", 0, 1, new NioTestServer(), null, null);
try {
server.start();
} catch (final NioConnectionException e) {