Signed-off-by: Abhishek Kumar <abhishek.mrt22@gmail.com>
This commit is contained in:
Abhishek Kumar 2024-09-26 12:10:38 +05:30
parent 1652086ff9
commit fa50740514
8 changed files with 124 additions and 27 deletions

View File

@ -403,9 +403,12 @@ public class AgentShell implements IAgentShell, Daemon {
_properties.put(cmdLineProp.getKey(), cmdLineProp.getValue());
}
s_logger.info("Defaulting to the constant time backoff algorithm");
s_logger.info("Defaulting to the range time backoff algorithm");
_backoff = new ConstantTimeBackoff();
_backoff.configure("ConstantTimeBackoff", new HashMap<String, Object>());
Map<String, Object> map = new HashMap<>();
map.put("minSeconds", _properties.getProperty("backoff.min.seconds"));
map.put("maxSeconds", _properties.getProperty("backoff.max.seconds"));
_backoff.configure("RangeTimeBackoff", map);
}
private void launchAgent() throws ConfigurationException {

View File

@ -197,8 +197,15 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
protected final ConfigKey<Integer> Workers = new ConfigKey<Integer>("Advanced", Integer.class, "workers", "5",
"Number of worker threads handling remote agent connections.", false);
protected final ConfigKey<Integer> Port = new ConfigKey<Integer>("Advanced", Integer.class, "port", "8250", "Port to listen on for remote agent connections.", false);
protected final ConfigKey<Integer> RemoteAgentSslHandshakeTimeout = new ConfigKey<>("Advanced", Integer.class, "agent.ssl.handshake.timeout", "30",
protected final ConfigKey<Integer> RemoteAgentSslHandshakeTimeout = new ConfigKey<>("Advanced",
Integer.class, "agent.ssl.handshake.timeout", "30",
"Seconds after which SSL handshake times out during remote agent connections.", false);
protected final ConfigKey<Integer> RemoteAgentSslHandshakeMinWorkers = new ConfigKey<>("Advanced",
Integer.class, "agent.ssl.handshake.min.workers", "5",
"Number of minimum worker threads handling SSL handshake with remote agents.", false);
protected final ConfigKey<Integer> RemoteAgentSslHandshakeMaxWorkers = new ConfigKey<>("Advanced",
Integer.class, "agent.ssl.handshake.min.workers", "100",
"Number of maximum worker threads handling SSL handshake with remote agents.", false);
protected final ConfigKey<Integer> AlertWait = new ConfigKey<Integer>("Advanced", Integer.class, "alert.wait", "1800",
"Seconds to wait before alerting on a disconnected agent", true);
protected final ConfigKey<Integer> DirectAgentLoadSize = new ConfigKey<Integer>("Advanced", Integer.class, "direct.agent.load.size", "16",
@ -233,7 +240,9 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
// allow core threads to time out even when there are no items in the queue
_connectExecutor.allowCoreThreadTimeOut(true);
_connection = new NioServer("AgentManager", Port.value(), Workers.value() + 10, this, caService, RemoteAgentSslHandshakeTimeout.value());
_connection = new NioServer("AgentManager", Port.value(), Workers.value() + 10,
RemoteAgentSslHandshakeMinWorkers.value(), RemoteAgentSslHandshakeMaxWorkers.value(), this,
caService, RemoteAgentSslHandshakeTimeout.value());
s_logger.info("Listening on " + Port.value() + " with " + Workers.value() + " workers");
// executes all agent commands other than cron and ping

View File

@ -0,0 +1,72 @@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
package com.cloud.utils.backoff.impl;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import com.cloud.utils.NumbersUtil;
import com.cloud.utils.backoff.BackoffAlgorithm;
import com.cloud.utils.component.AdapterBase;
import org.apache.log4j.Logger;
/**
* An implementation of BackoffAlgorithm that waits for some random seconds
* within a given range.
* After the time the client can try to perform the operation again.
*
**/
public class RangeTimeBackoff extends AdapterBase implements BackoffAlgorithm {
private static final int DEFAULT_MIN_TIME = 5;
private int minTime = DEFAULT_MIN_TIME;
private int maxTime = 3 * DEFAULT_MIN_TIME;
private final Map<String, Thread> asleep = new ConcurrentHashMap<>();
private static final Logger LOG = Logger.getLogger(RangeTimeBackoff.class.getName());
@Override
public void waitBeforeRetry() {
Thread current = Thread.currentThread();
try {
asleep.put(current.getName(), current);
long time = ThreadLocalRandom.current().nextInt(minTime, maxTime) * 1000L;
Thread.sleep(time);
} catch (InterruptedException e) {
// JMX or other threads may interrupt this thread, but let's log it
// anyway, no exception to log as this is not an error
LOG.info("Thread " + current.getName() + " interrupted while waiting for retry");
} finally {
asleep.remove(current.getName());
}
return;
}
@Override
public void reset() {
}
@Override
public boolean configure(String name, Map<String, Object> params) {
minTime = NumbersUtil.parseInt((String)params.get("minSeconds"), DEFAULT_MIN_TIME);
maxTime = NumbersUtil.parseInt((String)params.get("maxSeconds"), minTime * 3);
return true;
}
}

View File

@ -39,7 +39,7 @@ public class NioClient extends NioConnection {
protected SocketChannel _clientConnection;
public NioClient(final String name, final String host, final int port, final int workers, final HandlerFactory factory) {
super(name, port, workers, factory);
super(name, port, workers, 1, 2, factory);
_host = host;
}

View File

@ -79,24 +79,27 @@ public abstract class NioConnection implements Callable<Boolean> {
protected CAService caService;
protected Integer sslHandshakeTimeout = null;
private final AtomicInteger activeAcceptConnections = new AtomicInteger(0);
private BlockingQueue<Runnable> sslHandshakeQueue;
private final BlockingQueue<Runnable> workerQueue;
private final BlockingQueue<Runnable> sslHandshakeQueue;
public String getConnectionName() {
return String.format("%s:: ", _name);
}
public NioConnection(final String name, final int port, final int workers, final HandlerFactory factory) {
public NioConnection(final String name, final int port, final int workers, final int sslHandshakeMinWorkers,
final int sslHandshakeMaxWorkers, final HandlerFactory factory) {
_name = name;
_isRunning = false;
_selector = null;
_port = port;
_factory = factory;
workerQueue = new LinkedBlockingQueue<>(5 * workers);
_executor = new ThreadPoolExecutor(workers, 5 * workers, 1, TimeUnit.DAYS,
new LinkedBlockingQueue<>(), new NamedThreadFactory(name + "-Handler"));
sslHandshakeQueue = new LinkedBlockingQueue<>(5 * workers);
_sslHandshakeExecutor = new ThreadPoolExecutor(workers, 5 * workers, 90,
TimeUnit.SECONDS, sslHandshakeQueue,
new NamedThreadFactory(name + "-Handler"), new ThreadPoolExecutor.AbortPolicy());
workerQueue, new NamedThreadFactory(name + "-Handler"), new ThreadPoolExecutor.AbortPolicy());
sslHandshakeQueue = new LinkedBlockingQueue<>(sslHandshakeMaxWorkers);
_sslHandshakeExecutor = new ThreadPoolExecutor(sslHandshakeMinWorkers, sslHandshakeMaxWorkers, 30,
TimeUnit.MINUTES, sslHandshakeQueue, new NamedThreadFactory(name + "-Handler"),
new ThreadPoolExecutor.AbortPolicy());
}
public void setCAService(final CAService caService) {
@ -205,8 +208,8 @@ public abstract class NioConnection implements Callable<Boolean> {
abstract void unregisterLink(InetSocketAddress saddr);
protected void accept(final SelectionKey key) throws IOException {
s_logger.info(String.format("%s------------------active accept connections: %d, queue: %d",
getConnectionName(), activeAcceptConnections.get(), sslHandshakeQueue.size()));
s_logger.info(String.format("%s------------------active accept c0nnections: %d, queue: %d, worker-queue: %d",
getConnectionName(), activeAcceptConnections.get(), sslHandshakeQueue.size(), workerQueue.size()));
final ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
final SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
@ -221,20 +224,24 @@ public abstract class NioConnection implements Callable<Boolean> {
try {
final NioConnection nioConnection = this;
_sslHandshakeExecutor.submit(() -> {
activeAcceptConnections.incrementAndGet();
final InetSocketAddress saddr = (InetSocketAddress)socket.getRemoteSocketAddress();
int connections = activeAcceptConnections.incrementAndGet();
Long startTime = System.currentTimeMillis();
s_logger.info(String.format("%s-start-----------------active accept c0nnections: %d, " +
"accept-queue: %d, worker-queue: %d",
saddr, connections, sslHandshakeQueue.size(), workerQueue.size()));
_selector.wakeup();
try {
final SSLEngine sslEngine = Link.initServerSSLEngine(caService, socketChannel.getRemoteAddress().toString());
sslEngine.setUseClientMode(false);
sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols()));
sslEngine.beginHandshake();
if (!Link.doHandshake(socketChannel, sslEngine)) {
throw new IOException("SSL handshake timed out with " + socketChannel.getRemoteAddress());
if (!Link.doHandshake(socketChannel, sslEngine, getSslHandshakeTimeout())) {
throw new IOException("SSL handshake timed out with " + saddr);
}
if (s_logger.isTraceEnabled()) {
s_logger.trace("SSL: Handshake done");
}
final InetSocketAddress saddr = (InetSocketAddress)socket.getRemoteSocketAddress();
final Link link = new Link(saddr, nioConnection);
link.setSSLEngine(sslEngine);
link.setKey(socketChannel.register(key.selector(), SelectionKey.OP_READ, link));
@ -243,18 +250,22 @@ public abstract class NioConnection implements Callable<Boolean> {
_executor.submit(task);
} catch (final GeneralSecurityException | IOException e) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Connection closed due to failure: " + e.getMessage());
s_logger.trace(socket.getRemoteSocketAddress()+ "Connection closed due to failure: " + e.getMessage());
}
closeAutoCloseable(socket, "accepting socket");
closeAutoCloseable(socketChannel, "accepting socketChannel");
} finally {
activeAcceptConnections.decrementAndGet();
connections = activeAcceptConnections.decrementAndGet();
s_logger.info(String.format("%s-end----active accept c0nnections: %d, accept-queue: %d," +
"worker-queue: %d, time taken: %d",
saddr, connections, sslHandshakeQueue.size(), workerQueue.size(),
(System.currentTimeMillis() - startTime)));
_selector.wakeup();
}
});
} catch (final RejectedExecutionException e) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Task rejected: " + e.getMessage());
if (s_logger.isInfoEnabled()) {
s_logger.trace(socket.getRemoteSocketAddress()+ " Accept Task rejected: " + e.getMessage());
}
closeAutoCloseable(socket, "Rejecting connection - accepting socket");
closeAutoCloseable(socketChannel, "Rejecting connection - accepting socketChannel");

View File

@ -38,8 +38,10 @@ public class NioServer extends NioConnection {
protected WeakHashMap<InetSocketAddress, Link> _links;
public NioServer(final String name, final int port, final int workers, final HandlerFactory factory, final CAService caService, final Integer sslHandShakeTimeout) {
super(name, port, workers, factory);
public NioServer(final String name, final int port, final int workers, final int sslHandshakeMinWorkers,
final int sslHandshakeMaxWorkers, final HandlerFactory factory, final CAService caService,
final Integer sslHandShakeTimeout) {
super(name, port, workers, sslHandshakeMinWorkers, sslHandshakeMaxWorkers, factory);
setCAService(caService);
setSslHandshakeTimeout(sslHandShakeTimeout);
_localAddr = null;

View File

@ -46,7 +46,7 @@ public class SSLUtils {
* It returns recommended protocols that are considered secure.
*/
public static String[] getRecommendedProtocols() {
return new String[] { "TLSv1", "TLSv1.1", "TLSv1.2" };
return new String[] { "TLSv1", "TLSv1.1", "TLSv1.2", "TLSv1.3" };
}
/**
@ -66,7 +66,7 @@ public class SSLUtils {
}
public static SSLContext getSSLContext() throws NoSuchAlgorithmException {
return SSLContext.getInstance("TLSv1.2");
return SSLContext.getInstance("TLSv1.3");
}
public static SSLContext getSSLContext(String provider) throws NoSuchAlgorithmException, NoSuchProviderException {

View File

@ -98,7 +98,7 @@ public class NioTest {
testBytes = new byte[1000000];
randomGenerator.nextBytes(testBytes);
server = new NioServer("NioTestServer", 0, 1, new NioTestServer(), null, null);
server = new NioServer("NioTestServer", 0, 1, 1, 1, new NioTestServer(), null, null);
try {
server.start();
} catch (final NioConnectionException e) {