Signed-off-by: Abhishek Kumar <abhishek.mrt22@gmail.com>
This commit is contained in:
Abhishek Kumar 2024-10-15 18:53:46 +05:30
parent 79c257c454
commit 053a19f551
5 changed files with 101 additions and 70 deletions

View File

@ -40,10 +40,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.naming.ConfigurationException;
import com.cloud.resource.AgentStatusUpdater;
import com.cloud.resource.ResourceStatusUpdater;
import com.cloud.agent.api.PingAnswer;
import com.cloud.utils.NumbersUtil;
import org.apache.cloudstack.agent.lb.SetupMSListAnswer;
import org.apache.cloudstack.agent.lb.SetupMSListCommand;
import org.apache.cloudstack.ca.PostCertificateRenewalCommand;
@ -66,6 +62,7 @@ import com.cloud.agent.api.Command;
import com.cloud.agent.api.CronCommand;
import com.cloud.agent.api.MaintainAnswer;
import com.cloud.agent.api.MaintainCommand;
import com.cloud.agent.api.PingAnswer;
import com.cloud.agent.api.PingCommand;
import com.cloud.agent.api.ReadyCommand;
import com.cloud.agent.api.ShutdownCommand;
@ -75,7 +72,10 @@ import com.cloud.agent.transport.Request;
import com.cloud.agent.transport.Response;
import com.cloud.exception.AgentControlChannelException;
import com.cloud.host.Host;
import com.cloud.resource.AgentStatusUpdater;
import com.cloud.resource.ResourceStatusUpdater;
import com.cloud.resource.ServerResource;
import com.cloud.utils.NumbersUtil;
import com.cloud.utils.PropertiesUtil;
import com.cloud.utils.backoff.BackoffAlgorithm;
import com.cloud.utils.concurrency.NamedThreadFactory;
@ -553,14 +553,14 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater
s_logger.warn("Fail to clean up old connection. " + e);
}
while (_connection.isStartup()) {
do {
_shell.getBackoffAlgorithm().waitBeforeRetry();
}
} while (_connection.isStartup());
do {
final String host = _shell.getNextHost();
_connection = new NioClient("Agent", host, _shell.getPort(), _shell.getWorkers(), _shell.getSslHandshakeTimeout(), this);
s_logger.info("Reconnecting to host:" + host);
s_logger.info(String.format("Reconnecting to host: %s", host));
try {
_connection.start();
} catch (final NioConnectionException e) {

View File

@ -35,74 +35,94 @@ import org.apache.log4j.Logger;
public class NioClient extends NioConnection {
private static final Logger s_logger = Logger.getLogger(NioClient.class);
protected String _host;
protected SocketChannel _clientConnection;
protected String host;
protected SocketChannel clientConnection;
public NioClient(final String name, final String host, final int port, final int workers, final Integer sslHandshakeTimeout, final HandlerFactory factory) {
super(name, port, workers, 1, 2, factory);
_host = host;
setSslHandshakeTimeout(sslHandshakeTimeout);
this.host = host;
}
protected void closeChannelAndSelector() {
try {
if (clientConnection != null && clientConnection.isOpen()) {
clientConnection.close();
clientConnection = null;
}
} catch (IOException e) {
s_logger.error("Failed to close SocketChannel", e);
}
try {
if (_selector != null && _selector.isOpen()) {
_selector.close();
_selector = null;
}
} catch (IOException e) {
s_logger.error("Failed to close Selector", e);
}
}
@Override
protected void init() throws IOException {
_selector = Selector.open();
Task task = null;
Task task;
String hostLog = host + ":" + _port;
try {
_clientConnection = SocketChannel.open();
s_logger.info("Connecting to " + _host + ":" + _port);
final InetSocketAddress peerAddr = new InetSocketAddress(_host, _port);
_clientConnection.connect(peerAddr);
_clientConnection.configureBlocking(false);
s_logger.info(String.format("Connecting to %s", hostLog));
_selector = Selector.open();
clientConnection = SocketChannel.open();
final InetSocketAddress serverAddress = new InetSocketAddress(host, _port);
clientConnection.connect(serverAddress);
s_logger.info(String.format("Connected to %s", hostLog));
clientConnection.configureBlocking(false);
final SSLContext sslContext = Link.initClientSSLContext();
SSLEngine sslEngine = sslContext.createSSLEngine(_host, _port);
SSLEngine sslEngine = sslContext.createSSLEngine(host, _port);
sslEngine.setUseClientMode(true);
sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols()));
sslEngine.beginHandshake();
if (!Link.doHandshake(_clientConnection, sslEngine, getSslHandshakeTimeout())) {
s_logger.error("SSL Handshake failed while connecting to host: " + _host + " port: " + _port);
_selector.close();
throw new IOException("SSL Handshake failed while connecting to host: " + _host + " port: " + _port);
if (!Link.doHandshake(clientConnection, sslEngine, getSslHandshakeTimeout())) {
throw new IOException(String.format("SSL Handshake failed while connecting to host: %s", hostLog));
}
s_logger.info("SSL: Handshake done");
s_logger.info("Connected to " + _host + ":" + _port);
final Link link = new Link(peerAddr, this);
final Link link = new Link(serverAddress, this);
link.setSSLEngine(sslEngine);
final SelectionKey key = _clientConnection.register(_selector, SelectionKey.OP_READ);
final SelectionKey key = clientConnection.register(_selector, SelectionKey.OP_READ);
link.setKey(key);
key.attach(link);
// Notice we've already connected due to the handshake, so let's get the
// remaining task done
task = _factory.create(Task.Type.CONNECT, link, null);
} catch (final GeneralSecurityException e) {
_selector.close();
closeChannelAndSelector();
throw new IOException("Failed to initialise security", e);
} catch (final IOException e) {
s_logger.error("IOException", e);
_selector.close();
closeChannelAndSelector();
s_logger.error(String.format("IOException while connecting to %s", hostLog), e);
throw e;
}
_executor.submit(task);
if (task != null) {
_executor.submit(task);
}
}
@Override
protected void registerLink(final InetSocketAddress saddr, final Link link) {
protected void registerLink(final InetSocketAddress address, final Link link) {
// don't do anything.
}
@Override
protected void unregisterLink(final InetSocketAddress saddr) {
protected void unregisterLink(final InetSocketAddress address) {
// don't do anything.
}
@Override
public void cleanUp() throws IOException {
super.cleanUp();
if (_clientConnection != null) {
_clientConnection.close();
if (clientConnection != null && clientConnection.isOpen()) {
clientConnection.close();
clientConnection = null;
}
s_logger.info("NioClient connection closed");
}

View File

@ -63,7 +63,8 @@ import com.cloud.utils.exception.NioConnectionException;
* provides that.
*/
public abstract class NioConnection implements Callable<Boolean> {
private static final Logger s_logger = Logger.getLogger(NioConnection.class);;
private static final Logger s_logger = Logger.getLogger(NioConnection.class);
public static final String SERVER_BUSY_MESSAGE = "Server is busy.";
protected Selector _selector;
protected ExecutorService _threadExecutor;
@ -91,12 +92,13 @@ public abstract class NioConnection implements Callable<Boolean> {
_selector = null;
_port = port;
_factory = factory;
this.sslHandshakeMaxWorkers = sslHandshakeMaxWorkers;
int sslMinWorkers = Math.max(sslHandshakeMinWorkers, 1);
this.sslHandshakeMaxWorkers = Math.max(sslHandshakeMaxWorkers, sslMinWorkers);
workerQueue = new LinkedBlockingQueue<>(5 * workers);
_executor = new ThreadPoolExecutor(workers, 5 * workers, 1, TimeUnit.DAYS,
workerQueue, new NamedThreadFactory(name + "-Handler"), new ThreadPoolExecutor.AbortPolicy());
sslHandshakeQueue = new SynchronousQueue<>();
_sslHandshakeExecutor = new ThreadPoolExecutor(sslHandshakeMinWorkers, sslHandshakeMaxWorkers, 30,
_sslHandshakeExecutor = new ThreadPoolExecutor(sslMinWorkers, this.sslHandshakeMaxWorkers, 30,
TimeUnit.MINUTES, sslHandshakeQueue, new NamedThreadFactory(name + "-SSLHandshakeHandler"),
new ThreadPoolExecutor.AbortPolicy());
}
@ -111,8 +113,8 @@ public abstract class NioConnection implements Callable<Boolean> {
try {
init();
} catch (final ConnectException e) {
s_logger.warn("Unable to connect to remote: is there a server running on port" + _port);
return;
s_logger.warn("Unable to connect to remote: is there a server running on port" + _port, e);
throw new NioConnectionException(e.getMessage(), e);
} catch (final IOException e) {
s_logger.error("Unable to initialize the threads.", e);
throw new NioConnectionException(e.getMessage(), e);
@ -129,6 +131,7 @@ public abstract class NioConnection implements Callable<Boolean> {
public void stop() {
_executor.shutdown();
_sslHandshakeExecutor.shutdown();
_isRunning = false;
if (_threadExecutor != null) {
_futureTask.cancel(false);
@ -206,16 +209,23 @@ public abstract class NioConnection implements Callable<Boolean> {
abstract void unregisterLink(InetSocketAddress saddr);
protected boolean rejectConnectionIfBusy(final SocketChannel socketChannel) throws IOException {
if (activeAcceptConnections.get() < sslHandshakeMaxWorkers) {
return false;
}
// Reject new connection if the server is busy
s_logger.warn(String.format("%s Rejecting new connection. %d active connections currently",
SERVER_BUSY_MESSAGE, sslHandshakeMaxWorkers));
socketChannel.close();
_selector.wakeup();
return true;
}
protected void accept(final SelectionKey key) throws IOException {
final ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
final SocketChannel socketChannel = serverSocketChannel.accept();
if (activeAcceptConnections.get() >= sslHandshakeMaxWorkers) {
// Reject new connection if the server is busy
s_logger.warn("Server is busy. Rejecting new connection.");
if (socketChannel != null) {
socketChannel.close();
}
_selector.wakeup();
if (rejectConnectionIfBusy(socketChannel)) {
return;
}
socketChannel.configureBlocking(false);
@ -520,7 +530,8 @@ public abstract class NioConnection implements Callable<Boolean> {
/* Release the resource used by the instance */
public void cleanUp() throws IOException {
if (_selector != null) {
if (_selector != null && _selector.isOpen()) {
_selector.wakeup();
_selector.close();
}
}

View File

@ -33,10 +33,10 @@ import org.apache.log4j.Logger;
public class NioServer extends NioConnection {
private final static Logger s_logger = Logger.getLogger(NioServer.class);
protected InetSocketAddress _localAddr;
private ServerSocketChannel _serverSocket;
protected InetSocketAddress localAddress;
private ServerSocketChannel serverSocket;
protected WeakHashMap<InetSocketAddress, Link> _links;
protected WeakHashMap<InetSocketAddress, Link> links;
public NioServer(final String name, final int port, final int workers, final int sslHandshakeMinWorkers,
final int sslHandshakeMaxWorkers, final HandlerFactory factory, final CAService caService,
@ -44,46 +44,46 @@ public class NioServer extends NioConnection {
super(name, port, workers, sslHandshakeMinWorkers, sslHandshakeMaxWorkers, factory);
setCAService(caService);
setSslHandshakeTimeout(sslHandShakeTimeout);
_localAddr = null;
_links = new WeakHashMap<InetSocketAddress, Link>(1024);
localAddress = null;
links = new WeakHashMap<>(1024);
}
public int getPort() {
return _serverSocket.socket().getLocalPort();
return serverSocket.socket().getLocalPort();
}
@Override
protected void init() throws IOException {
_selector = SelectorProvider.provider().openSelector();
_serverSocket = ServerSocketChannel.open();
_serverSocket.configureBlocking(false);
serverSocket = ServerSocketChannel.open();
serverSocket.configureBlocking(false);
_localAddr = new InetSocketAddress(_port);
_serverSocket.socket().bind(_localAddr);
localAddress = new InetSocketAddress(_port);
serverSocket.socket().bind(localAddress);
_serverSocket.register(_selector, SelectionKey.OP_ACCEPT, null);
serverSocket.register(_selector, SelectionKey.OP_ACCEPT, null);
s_logger.info("NioServer started and listening on " + _serverSocket.socket().getLocalSocketAddress());
s_logger.info("NioServer started and listening on " + serverSocket.socket().getLocalSocketAddress());
}
@Override
public void cleanUp() throws IOException {
super.cleanUp();
if (_serverSocket != null) {
_serverSocket.close();
if (serverSocket != null && serverSocket.isOpen()) {
serverSocket.close();
}
s_logger.info("NioConnection stopped on " + _localAddr.toString());
s_logger.info("NioConnection stopped on " + localAddress.toString());
}
@Override
protected void registerLink(final InetSocketAddress addr, final Link link) {
_links.put(addr, link);
links.put(addr, link);
}
@Override
protected void unregisterLink(final InetSocketAddress saddr) {
_links.remove(saddr);
links.remove(saddr);
}
/**
@ -96,7 +96,7 @@ public class NioServer extends NioConnection {
* @return null if not sent. attach object in link if sent.
*/
public Object send(final InetSocketAddress saddr, final byte[] data) throws ClosedChannelException {
final Link link = _links.get(saddr);
final Link link = links.get(saddr);
if (link == null) {
return null;
}

View File

@ -186,10 +186,10 @@ public class NioTest {
protected void init() throws IOException {
_selector = Selector.open();
try {
_clientConnection = SocketChannel.open();
LOGGER.info("Connecting to " + _host + ":" + _port);
final InetSocketAddress peerAddr = new InetSocketAddress(_host, _port);
_clientConnection.connect(peerAddr);
clientConnection = SocketChannel.open();
LOGGER.info("Connecting to " + host + ":" + _port);
final InetSocketAddress peerAddr = new InetSocketAddress(host, _port);
clientConnection.connect(peerAddr);
// This is done on purpose, the malicious client would connect
// to the server and then do nothing, hence using a large sleep value
Thread.sleep(Long.MAX_VALUE);