diff --git a/agent/src/main/java/com/cloud/agent/AgentShell.java b/agent/src/main/java/com/cloud/agent/AgentShell.java index ef042496a37..49f8aecbcf5 100644 --- a/agent/src/main/java/com/cloud/agent/AgentShell.java +++ b/agent/src/main/java/com/cloud/agent/AgentShell.java @@ -403,9 +403,12 @@ public class AgentShell implements IAgentShell, Daemon { _properties.put(cmdLineProp.getKey(), cmdLineProp.getValue()); } - s_logger.info("Defaulting to the constant time backoff algorithm"); + s_logger.info("Defaulting to the range time backoff algorithm"); _backoff = new ConstantTimeBackoff(); - _backoff.configure("ConstantTimeBackoff", new HashMap()); + Map map = new HashMap<>(); + map.put("minSeconds", _properties.getProperty("backoff.min.seconds")); + map.put("maxSeconds", _properties.getProperty("backoff.max.seconds")); + _backoff.configure("RangeTimeBackoff", map); } private void launchAgent() throws ConfigurationException { diff --git a/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java b/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java index 8d0e0b09780..57e6143ddb8 100644 --- a/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java +++ b/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java @@ -197,8 +197,15 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl protected final ConfigKey Workers = new ConfigKey("Advanced", Integer.class, "workers", "5", "Number of worker threads handling remote agent connections.", false); protected final ConfigKey Port = new ConfigKey("Advanced", Integer.class, "port", "8250", "Port to listen on for remote agent connections.", false); - protected final ConfigKey RemoteAgentSslHandshakeTimeout = new ConfigKey<>("Advanced", Integer.class, "agent.ssl.handshake.timeout", "30", + protected final ConfigKey RemoteAgentSslHandshakeTimeout = new ConfigKey<>("Advanced", + Integer.class, "agent.ssl.handshake.timeout", "30", "Seconds after which SSL handshake times out during remote agent connections.", false); + protected final ConfigKey RemoteAgentSslHandshakeMinWorkers = new ConfigKey<>("Advanced", + Integer.class, "agent.ssl.handshake.min.workers", "5", + "Number of minimum worker threads handling SSL handshake with remote agents.", false); + protected final ConfigKey RemoteAgentSslHandshakeMaxWorkers = new ConfigKey<>("Advanced", + Integer.class, "agent.ssl.handshake.min.workers", "100", + "Number of maximum worker threads handling SSL handshake with remote agents.", false); protected final ConfigKey AlertWait = new ConfigKey("Advanced", Integer.class, "alert.wait", "1800", "Seconds to wait before alerting on a disconnected agent", true); protected final ConfigKey DirectAgentLoadSize = new ConfigKey("Advanced", Integer.class, "direct.agent.load.size", "16", @@ -233,7 +240,9 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl // allow core threads to time out even when there are no items in the queue _connectExecutor.allowCoreThreadTimeOut(true); - _connection = new NioServer("AgentManager", Port.value(), Workers.value() + 10, this, caService, RemoteAgentSslHandshakeTimeout.value()); + _connection = new NioServer("AgentManager", Port.value(), Workers.value() + 10, + RemoteAgentSslHandshakeMinWorkers.value(), RemoteAgentSslHandshakeMaxWorkers.value(), this, + caService, RemoteAgentSslHandshakeTimeout.value()); s_logger.info("Listening on " + Port.value() + " with " + Workers.value() + " workers"); // executes all agent commands other than cron and ping diff --git a/utils/src/main/java/com/cloud/utils/backoff/impl/RangeTimeBackoff.java b/utils/src/main/java/com/cloud/utils/backoff/impl/RangeTimeBackoff.java new file mode 100644 index 00000000000..9b513285309 --- /dev/null +++ b/utils/src/main/java/com/cloud/utils/backoff/impl/RangeTimeBackoff.java @@ -0,0 +1,72 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +package com.cloud.utils.backoff.impl; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; + +import com.cloud.utils.NumbersUtil; +import com.cloud.utils.backoff.BackoffAlgorithm; +import com.cloud.utils.component.AdapterBase; +import org.apache.log4j.Logger; + +/** + * An implementation of BackoffAlgorithm that waits for some random seconds + * within a given range. + * After the time the client can try to perform the operation again. + * + **/ +public class RangeTimeBackoff extends AdapterBase implements BackoffAlgorithm { + private static final int DEFAULT_MIN_TIME = 5; + private int minTime = DEFAULT_MIN_TIME; + private int maxTime = 3 * DEFAULT_MIN_TIME; + private final Map asleep = new ConcurrentHashMap<>(); + private static final Logger LOG = Logger.getLogger(RangeTimeBackoff.class.getName()); + + @Override + public void waitBeforeRetry() { + Thread current = Thread.currentThread(); + try { + asleep.put(current.getName(), current); + long time = ThreadLocalRandom.current().nextInt(minTime, maxTime) * 1000L; + Thread.sleep(time); + } catch (InterruptedException e) { + // JMX or other threads may interrupt this thread, but let's log it + // anyway, no exception to log as this is not an error + LOG.info("Thread " + current.getName() + " interrupted while waiting for retry"); + } finally { + asleep.remove(current.getName()); + } + return; + } + + @Override + public void reset() { + } + + @Override + public boolean configure(String name, Map params) { + minTime = NumbersUtil.parseInt((String)params.get("minSeconds"), DEFAULT_MIN_TIME); + maxTime = NumbersUtil.parseInt((String)params.get("maxSeconds"), minTime * 3); + return true; + } +} diff --git a/utils/src/main/java/com/cloud/utils/nio/NioClient.java b/utils/src/main/java/com/cloud/utils/nio/NioClient.java index 603a6cb03e3..51448c88875 100644 --- a/utils/src/main/java/com/cloud/utils/nio/NioClient.java +++ b/utils/src/main/java/com/cloud/utils/nio/NioClient.java @@ -39,7 +39,7 @@ public class NioClient extends NioConnection { protected SocketChannel _clientConnection; public NioClient(final String name, final String host, final int port, final int workers, final HandlerFactory factory) { - super(name, port, workers, factory); + super(name, port, workers, 1, 2, factory); _host = host; } diff --git a/utils/src/main/java/com/cloud/utils/nio/NioConnection.java b/utils/src/main/java/com/cloud/utils/nio/NioConnection.java index 5e17fe05bf0..ce2346d2a34 100644 --- a/utils/src/main/java/com/cloud/utils/nio/NioConnection.java +++ b/utils/src/main/java/com/cloud/utils/nio/NioConnection.java @@ -79,24 +79,27 @@ public abstract class NioConnection implements Callable { protected CAService caService; protected Integer sslHandshakeTimeout = null; private final AtomicInteger activeAcceptConnections = new AtomicInteger(0); - private BlockingQueue sslHandshakeQueue; + private final BlockingQueue workerQueue; + private final BlockingQueue sslHandshakeQueue; public String getConnectionName() { return String.format("%s:: ", _name); } - public NioConnection(final String name, final int port, final int workers, final HandlerFactory factory) { + public NioConnection(final String name, final int port, final int workers, final int sslHandshakeMinWorkers, + final int sslHandshakeMaxWorkers, final HandlerFactory factory) { _name = name; _isRunning = false; _selector = null; _port = port; _factory = factory; + workerQueue = new LinkedBlockingQueue<>(5 * workers); _executor = new ThreadPoolExecutor(workers, 5 * workers, 1, TimeUnit.DAYS, - new LinkedBlockingQueue<>(), new NamedThreadFactory(name + "-Handler")); - sslHandshakeQueue = new LinkedBlockingQueue<>(5 * workers); - _sslHandshakeExecutor = new ThreadPoolExecutor(workers, 5 * workers, 90, - TimeUnit.SECONDS, sslHandshakeQueue, - new NamedThreadFactory(name + "-Handler"), new ThreadPoolExecutor.AbortPolicy()); + workerQueue, new NamedThreadFactory(name + "-Handler"), new ThreadPoolExecutor.AbortPolicy()); + sslHandshakeQueue = new LinkedBlockingQueue<>(sslHandshakeMaxWorkers); + _sslHandshakeExecutor = new ThreadPoolExecutor(sslHandshakeMinWorkers, sslHandshakeMaxWorkers, 30, + TimeUnit.MINUTES, sslHandshakeQueue, new NamedThreadFactory(name + "-Handler"), + new ThreadPoolExecutor.AbortPolicy()); } public void setCAService(final CAService caService) { @@ -205,8 +208,8 @@ public abstract class NioConnection implements Callable { abstract void unregisterLink(InetSocketAddress saddr); protected void accept(final SelectionKey key) throws IOException { - s_logger.info(String.format("%s------------------active accept connections: %d, queue: %d", - getConnectionName(), activeAcceptConnections.get(), sslHandshakeQueue.size())); + s_logger.info(String.format("%s------------------active accept c0nnections: %d, queue: %d, worker-queue: %d", + getConnectionName(), activeAcceptConnections.get(), sslHandshakeQueue.size(), workerQueue.size())); final ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel(); final SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); @@ -221,20 +224,24 @@ public abstract class NioConnection implements Callable { try { final NioConnection nioConnection = this; _sslHandshakeExecutor.submit(() -> { - activeAcceptConnections.incrementAndGet(); + final InetSocketAddress saddr = (InetSocketAddress)socket.getRemoteSocketAddress(); + int connections = activeAcceptConnections.incrementAndGet(); + Long startTime = System.currentTimeMillis(); + s_logger.info(String.format("%s-start-----------------active accept c0nnections: %d, " + + "accept-queue: %d, worker-queue: %d", + saddr, connections, sslHandshakeQueue.size(), workerQueue.size())); _selector.wakeup(); try { final SSLEngine sslEngine = Link.initServerSSLEngine(caService, socketChannel.getRemoteAddress().toString()); sslEngine.setUseClientMode(false); sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols())); sslEngine.beginHandshake(); - if (!Link.doHandshake(socketChannel, sslEngine)) { - throw new IOException("SSL handshake timed out with " + socketChannel.getRemoteAddress()); + if (!Link.doHandshake(socketChannel, sslEngine, getSslHandshakeTimeout())) { + throw new IOException("SSL handshake timed out with " + saddr); } if (s_logger.isTraceEnabled()) { s_logger.trace("SSL: Handshake done"); } - final InetSocketAddress saddr = (InetSocketAddress)socket.getRemoteSocketAddress(); final Link link = new Link(saddr, nioConnection); link.setSSLEngine(sslEngine); link.setKey(socketChannel.register(key.selector(), SelectionKey.OP_READ, link)); @@ -243,18 +250,22 @@ public abstract class NioConnection implements Callable { _executor.submit(task); } catch (final GeneralSecurityException | IOException e) { if (s_logger.isTraceEnabled()) { - s_logger.trace("Connection closed due to failure: " + e.getMessage()); + s_logger.trace(socket.getRemoteSocketAddress()+ "Connection closed due to failure: " + e.getMessage()); } closeAutoCloseable(socket, "accepting socket"); closeAutoCloseable(socketChannel, "accepting socketChannel"); } finally { - activeAcceptConnections.decrementAndGet(); + connections = activeAcceptConnections.decrementAndGet(); + s_logger.info(String.format("%s-end----active accept c0nnections: %d, accept-queue: %d," + + "worker-queue: %d, time taken: %d", + saddr, connections, sslHandshakeQueue.size(), workerQueue.size(), + (System.currentTimeMillis() - startTime))); _selector.wakeup(); } }); } catch (final RejectedExecutionException e) { - if (s_logger.isTraceEnabled()) { - s_logger.trace("Task rejected: " + e.getMessage()); + if (s_logger.isInfoEnabled()) { + s_logger.trace(socket.getRemoteSocketAddress()+ " Accept Task rejected: " + e.getMessage()); } closeAutoCloseable(socket, "Rejecting connection - accepting socket"); closeAutoCloseable(socketChannel, "Rejecting connection - accepting socketChannel"); diff --git a/utils/src/main/java/com/cloud/utils/nio/NioServer.java b/utils/src/main/java/com/cloud/utils/nio/NioServer.java index 3fc3ec156bd..59f90a4a8ef 100644 --- a/utils/src/main/java/com/cloud/utils/nio/NioServer.java +++ b/utils/src/main/java/com/cloud/utils/nio/NioServer.java @@ -38,8 +38,10 @@ public class NioServer extends NioConnection { protected WeakHashMap _links; - public NioServer(final String name, final int port, final int workers, final HandlerFactory factory, final CAService caService, final Integer sslHandShakeTimeout) { - super(name, port, workers, factory); + public NioServer(final String name, final int port, final int workers, final int sslHandshakeMinWorkers, + final int sslHandshakeMaxWorkers, final HandlerFactory factory, final CAService caService, + final Integer sslHandShakeTimeout) { + super(name, port, workers, sslHandshakeMinWorkers, sslHandshakeMaxWorkers, factory); setCAService(caService); setSslHandshakeTimeout(sslHandShakeTimeout); _localAddr = null; diff --git a/utils/src/main/java/org/apache/cloudstack/utils/security/SSLUtils.java b/utils/src/main/java/org/apache/cloudstack/utils/security/SSLUtils.java index 8016f5a1916..36df09453bb 100644 --- a/utils/src/main/java/org/apache/cloudstack/utils/security/SSLUtils.java +++ b/utils/src/main/java/org/apache/cloudstack/utils/security/SSLUtils.java @@ -46,7 +46,7 @@ public class SSLUtils { * It returns recommended protocols that are considered secure. */ public static String[] getRecommendedProtocols() { - return new String[] { "TLSv1", "TLSv1.1", "TLSv1.2" }; + return new String[] { "TLSv1", "TLSv1.1", "TLSv1.2", "TLSv1.3" }; } /** @@ -66,7 +66,7 @@ public class SSLUtils { } public static SSLContext getSSLContext() throws NoSuchAlgorithmException { - return SSLContext.getInstance("TLSv1.2"); + return SSLContext.getInstance("TLSv1.3"); } public static SSLContext getSSLContext(String provider) throws NoSuchAlgorithmException, NoSuchProviderException { diff --git a/utils/src/test/java/com/cloud/utils/testcase/NioTest.java b/utils/src/test/java/com/cloud/utils/testcase/NioTest.java index 692278d20ce..bc5b3aed227 100644 --- a/utils/src/test/java/com/cloud/utils/testcase/NioTest.java +++ b/utils/src/test/java/com/cloud/utils/testcase/NioTest.java @@ -98,7 +98,7 @@ public class NioTest { testBytes = new byte[1000000]; randomGenerator.nextBytes(testBytes); - server = new NioServer("NioTestServer", 0, 1, new NioTestServer(), null, null); + server = new NioServer("NioTestServer", 0, 1, 1, 1, new NioTestServer(), null, null); try { server.start(); } catch (final NioConnectionException e) {