From c33aa96025847ee896846a245b4ae7b50733ebb8 Mon Sep 17 00:00:00 2001 From: Abhishek Kumar Date: Thu, 17 Oct 2024 19:30:26 +0530 Subject: [PATCH] refactor, improve startuptask Signed-off-by: Abhishek Kumar --- .../src/main/java/com/cloud/agent/Agent.java | 133 ++++++++++++------ 1 file changed, 87 insertions(+), 46 deletions(-) diff --git a/agent/src/main/java/com/cloud/agent/Agent.java b/agent/src/main/java/com/cloud/agent/Agent.java index 709559d9d23..8434fcbcb47 100644 --- a/agent/src/main/java/com/cloud/agent/Agent.java +++ b/agent/src/main/java/com/cloud/agent/Agent.java @@ -37,6 +37,7 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import javax.naming.ConfigurationException; @@ -51,6 +52,7 @@ import org.apache.cloudstack.managed.context.ManagedContextTimerTask; import org.apache.cloudstack.utils.security.KeyStoreUtils; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Logger; import org.apache.log4j.MDC; @@ -141,7 +143,7 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater long _pingInterval = 0; AtomicInteger _inProgress = new AtomicInteger(); - StartupTask _startup = null; + private final AtomicReference startupTask = new AtomicReference<>(); long _startupWaitDefault = 180000; long _startupWait = _startupWaitDefault; boolean _reconnectAllowed = true; @@ -155,6 +157,18 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater private String _keystoreCertImportPath; private String hostname; + private static String getLinkLog(final Link link) { + if (link == null) { + return ""; + } + StringBuilder str = new StringBuilder(); + if (s_logger.isTraceEnabled()) { + str.append(System.identityHashCode(link)).append("-"); + } + str.append(link.getSocketAddress()); + return str.toString(); + } + // for simulator use only public Agent(final IAgentShell shell) { _shell = shell; @@ -342,8 +356,8 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater _resource = null; } - if (_startup != null) { - _startup = null; + if (startupTask.get() != null) { + startupTask.set(null); } if (_ugentTaskPool != null) { @@ -452,9 +466,21 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater _shell.setPersistentProperty(null, "pod", ""); } - public synchronized void lockStartupTask(final Link link) { - _startup = new StartupTask(link); - _timer.schedule(_startup, _startupWait); + public void lockStartupTask(final Link link) { + if (s_logger.isTraceEnabled()) { + s_logger.info(String.format("Creating startup task - %s", getLinkLog(link))); + } + StartupTask currentTask = startupTask.get(); + if (currentTask != null) { + s_logger.warn("A Startup task is already locked or in progress."); + return; + } + currentTask = new StartupTask(link, this); + if (startupTask.compareAndSet(null, currentTask)) { + _timer.schedule(currentTask, _startupWait); + return; + } + s_logger.warn("Failed to lock a StartupTask!"); } public void sendStartup(final Link link) { @@ -477,6 +503,9 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater try { link.send(request.toBytes()); } catch (final ClosedChannelException e) { + if (s_logger.isTraceEnabled()) { + s_logger.trace(String.format("Unable to send request to %s", getLinkLog(link))); + } s_logger.warn("Unable to send request: " + request.toString()); } @@ -522,15 +551,23 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater return new ServerHandler(type, link, data); } + protected void reconnectIfNeeded(final Link link) { + if (ObjectUtils.allNotNull(link, _link) && _link != link) { + s_logger.debug(String.format("Ignoring reconnect as agent link %s is different from given link %s", + getLinkLog(link), getLinkLog(_link))); + return; + } + reconnect(link); + } + protected void reconnect(final Link link) { if (!_reconnectAllowed) { return; } - synchronized (this) { - if (_startup != null) { - _startup.cancel(); - _startup = null; - } + + StartupTask task = startupTask.getAndSet(null); + if (task != null) { + task.cancel(); } if (link != null) { @@ -580,13 +617,11 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater public void processStartupAnswer(final Answer answer, final Response response, final Link link) { boolean cancelled = false; - synchronized (this) { - if (_startup != null) { - _startup.cancel(); - _startup = null; - } else { - cancelled = true; - } + StartupTask task = startupTask.getAndSet(null); + if (task != null) { + task.cancel(); + } else { + cancelled = true; } final StartupAnswer startup = (StartupAnswer)answer; if (!startup.getResult()) { @@ -596,7 +631,7 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater s_logger.trace(String.format("%s does not allow exit on failure, reconnecting", _resource.getClass().getSimpleName())); } - reconnect(link); + reconnectIfNeeded(link); return; } System.exit(1); @@ -665,7 +700,7 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater s_logger.trace(String.format("%s does not allow exit on failure, reconnecting", _resource.getClass().getSimpleName())); } - reconnect(link); + reconnectIfNeeded(link); return; } System.exit(1); @@ -1084,11 +1119,13 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater public class StartupTask extends ManagedContextTimerTask { protected Link _link; + protected Agent agent; protected volatile boolean cancelled = false; - public StartupTask(final Link link) { + public StartupTask(final Link link, final Agent agent) { s_logger.debug("Startup task created"); _link = link; + this.agent = agent; } @Override @@ -1104,15 +1141,18 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater } @Override - protected synchronized void runInContext() { + protected void runInContext() { if (!cancelled) { if (s_logger.isInfoEnabled()) { - s_logger.info("The startup command is now cancelled"); + s_logger.info("The startup command is now cancelled. Attempting reconnect"); } cancelled = true; - _startup = null; + startupTask.set(null); _startupWait = _startupWaitDefault * 2; - reconnect(_link); + if (s_logger.isTraceEnabled()) { + s_logger.info(String.format("Executing reconnect from task - %s", getLinkLog(_link))); + } + reconnectIfNeeded(_link); } } } @@ -1164,8 +1204,10 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater s_logger.error("Error parsing task", e); } } else if (task.getType() == Task.Type.DISCONNECT) { - reconnect(task.getLink()); - return; + if (s_logger.isDebugEnabled()) { + s_logger.info(String.format("Executing disconnect task - %s", getLinkLog(task.getLink()))); + } + reconnectIfNeeded(task.getLink()); } else if (task.getType() == Task.Type.OTHER) { processOtherTask(task); } @@ -1237,32 +1279,31 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater } final String preferredHost = msList[0]; final String connectedHost = _shell.getConnectedHost(); - if (s_logger.isTraceEnabled()) { - s_logger.trace("Running preferred host checker task, connected host=" + connectedHost + ", preferred host=" + preferredHost); + if (s_logger.isDebugEnabled()) { + s_logger.debug("Running preferred host checker task, connected host=" + connectedHost + ", preferred host=" + preferredHost); } - if (preferredHost != null && !preferredHost.equals(connectedHost) && _link != null) { - boolean isHostUp = true; - try (final Socket socket = new Socket()) { - socket.connect(new InetSocketAddress(preferredHost, _shell.getPort()), 5000); - } catch (final IOException e) { - isHostUp = false; - if (s_logger.isTraceEnabled()) { - s_logger.trace("Host: " + preferredHost + " is not reachable"); - } + if (preferredHost == null || preferredHost.equals(connectedHost) || _link == null) { + return; + } + boolean isHostUp = true; + try (final Socket socket = new Socket()) { + socket.connect(new InetSocketAddress(preferredHost, _shell.getPort()), 5000); + } catch (final IOException e) { + isHostUp = false; + if (s_logger.isDebugEnabled()) { + s_logger.debug("Host: " + preferredHost + " is not reachable"); } - if (isHostUp && _link != null && _inProgress.get() == 0) { - if (s_logger.isDebugEnabled()) { - s_logger.debug("Preferred host " + preferredHost + " is found to be reachable, trying to reconnect"); - } - _shell.resetHostCounter(); - reconnect(_link); + } + if (isHostUp && _link != null && _inProgress.get() == 0) { + if (s_logger.isDebugEnabled()) { + s_logger.debug("Preferred host " + preferredHost + " is found to be reachable, trying to reconnect"); } + _shell.resetHostCounter(); + reconnect(_link); } } catch (Throwable t) { s_logger.error("Error caught while attempting to connect to preferred host", t); } } - } - }