changes for timertask to runnable for agent self tasks

Signed-off-by: Abhishek Kumar <abhishek.mrt22@gmail.com>
This commit is contained in:
Abhishek Kumar 2024-10-21 15:01:23 +05:30
parent 62ce43c2d5
commit 16a541cd71
1 changed files with 20 additions and 23 deletions

View File

@ -34,6 +34,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -90,7 +91,6 @@ import com.cloud.utils.nio.Link;
import com.cloud.utils.nio.NioClient;
import com.cloud.utils.nio.NioConnection;
import com.cloud.utils.nio.Task;
import com.cloud.utils.script.OutputInterpreter;
import com.cloud.utils.script.Script;
/**
@ -115,7 +115,7 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater
Configuration(66), // Exiting due to configuration problems.
Error(67); // Exiting because of error.
int value;
final int value;
ExitStatus(final int value) {
this.value = value;
@ -138,7 +138,7 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater
ScheduledExecutorService certExecutor;
ScheduledExecutorService hostLbCheckExecutor;
CopyOnWriteArrayList<WatchTask> watchList = new CopyOnWriteArrayList<>();
CopyOnWriteArrayList<ScheduledFuture<?>> watchList = new CopyOnWriteArrayList<>();
AtomicLong sequence = new AtomicLong(0);
AtomicLong lastPingResponseTime = new AtomicLong(0L);
long pingInterval = 0;
@ -433,8 +433,8 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater
s_logger.debug("Adding a watch list");
}
final WatchTask task = new WatchTask(link, request, this);
selfTaskExecutor.scheduleAtFixedRate(task, delay, period, TimeUnit.MILLISECONDS);
watchList.add(task);
final ScheduledFuture<?> future = selfTaskExecutor.scheduleAtFixedRate(task, delay, period, TimeUnit.MILLISECONDS);
watchList.add(future);
}
public void triggerUpdate() {
@ -451,8 +451,8 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater
}
protected void cancelTasks() {
for (final WatchTask task : watchList) {
task.cancel();
for (final ScheduledFuture<?> task : watchList) {
task.cancel(true);
}
if (s_logger.isDebugEnabled()) {
s_logger.debug("Clearing watch list: " + watchList.size());
@ -475,11 +475,12 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater
public void lockStartupTask(final Link link) {
if (s_logger.isDebugEnabled()) {
s_logger.debug(String.format("Creating startup task - %s", getLinkLog(link)));
s_logger.debug(String.format("Creating startup task for link: %s", getLinkLog(link)));
}
StartupTask currentTask = startupTask.get();
if (currentTask != null) {
s_logger.warn("A Startup task is already locked or in progress.");
s_logger.warn(String.format("A Startup task is already locked or in progress, cannot create for link %s",
getLinkLog(link)));
return;
}
currentTask = new StartupTask(link);
@ -487,7 +488,7 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater
selfTaskExecutor.schedule(currentTask, startupWait, TimeUnit.SECONDS);
return;
}
s_logger.warn("Failed to lock a StartupTask");
s_logger.warn(String.format("Failed to lock a StartupTask for link: %s", getLinkLog(link)));
}
protected boolean cancelStartupTask() {
@ -533,15 +534,13 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater
if (s_logger.isTraceEnabled()) {
s_logger.trace(" Retrieving hostname " + serverResource.getClass().getSimpleName());
}
final Script command = new Script("hostname", 500, s_logger);
final OutputInterpreter.OneLineParser parser = new OutputInterpreter.OneLineParser();
final String result = command.execute(parser);
if (result != null) {
return parser.getLine();
final String result = Script.runSimpleBashScript(Script.getExecutableAbsolutePath("hostname"), 500);
if (StringUtils.isNotBlank(result)) {
return result;
}
try {
InetAddress addr = InetAddress.getLocalHost();
return addr.toString();
InetAddress address = InetAddress.getLocalHost();
return address.toString();
} catch (final UnknownHostException e) {
s_logger.warn("unknown host? ", e);
throw new CloudRuntimeException("Cannot get local IP address");
@ -1079,7 +1078,7 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater
}
}
public class WatchTask extends ManagedContextTimerTask {
public class WatchTask implements Runnable {
protected Request _request;
protected Agent _agent;
protected Link _link;
@ -1092,7 +1091,7 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater
}
@Override
protected void runInContext() {
public void run() {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Scheduling " + (_request instanceof Response ? "Ping" : "Watch Task"));
}
@ -1108,7 +1107,7 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater
}
}
public class StartupTask extends ManagedContextTimerTask {
public class StartupTask implements Runnable {
protected Link _link;
private final AtomicBoolean cancelled = new AtomicBoolean(false);
@ -1117,19 +1116,17 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater
_link = link;
}
@Override
public boolean cancel() {
// TimerTask.cancel may fail depends on the calling context
if (cancelled.compareAndSet(false, true)) {
startupWait = DEFAULT_STARTUP_WAIT;
s_logger.debug("Startup task cancelled");
return super.cancel();
}
return true;
}
@Override
protected void runInContext() {
public void run() {
if (cancelled.compareAndSet(false, true)) {
s_logger.info("The running startup command is now invalid. Attempting reconnect");
startupTask.set(null);