From 5dd4fb22eff5ec6df3c79bab0ebb99226c954e9b Mon Sep 17 00:00:00 2001 From: Kelven Yang Date: Mon, 3 Mar 2014 16:09:40 -0800 Subject: [PATCH] Remove cancelled jobs from job monitoring, correct mis-calculated time-unit in job cancellation. --- .../jobs/impl/AsyncJobManagerImpl.java | 15 ++++++---- .../framework/jobs/impl/AsyncJobMonitor.java | 30 +++++++++++++++---- 2 files changed, 35 insertions(+), 10 deletions(-) diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java index 2be27860f07..b9246aa395d 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java @@ -85,12 +85,11 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, private static final ConfigKey JobExpireMinutes = new ConfigKey(Long.class, "job.expire.minutes", "Advanced", "1440", "Time (in minutes) for async-jobs to be kept in system", true, ConfigKey.Scope.Global, 60l); private static final ConfigKey JobCancelThresholdMinutes = new ConfigKey(Long.class, "job.cancel.threshold.minutes", "Advanced", "60", - "Time (in minutes) for async-jobs to be forcely cancelled if it has been in process for long", true, ConfigKey.Scope.Global, 60l); + "Time (in minutes) for async-jobs to be forcely cancelled if it has been in process for long", true, ConfigKey.Scope.Global, 240l); private static final Logger s_logger = Logger.getLogger(AsyncJobManagerImpl.class); private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3; // 3 seconds - private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_SYNC = 60; // 60 seconds private static final int MAX_ONETIME_SCHEDULE_SIZE = 50; private static final int HEARTBEAT_INTERVAL = 2000; @@ -706,14 +705,16 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, try { s_logger.trace("Begin cleanup expired async-jobs"); - Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - JobExpireMinutes.value() * 1000); + Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - JobExpireMinutes.value() * 60000); // limit to 100 jobs per turn, this gives cleanup throughput as 600 jobs per minute // hopefully this will be fast enough to balance potential growth of job table //1) Expire unfinished jobs that weren't processed yet List l = _jobDao.getExpiredUnfinishedJobs(cutTime, 100); for (AsyncJobVO job : l) { - s_logger.trace("Expunging unfinished job " + job); + s_logger.info("Expunging unfinished job " + job); + + _jobMonitor.unregisterByJobId(job.getId()); expungeAsyncJob(job); } @@ -721,15 +722,19 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, List completedJobs = _jobDao.getExpiredCompletedJobs(cutTime, 100); for (AsyncJobVO job : completedJobs) { s_logger.trace("Expunging completed job " + job); + expungeAsyncJob(job); } // forcefully cancel blocking queue items if they've been staying there for too long - List blockItems = _queueMgr.getBlockedQueueItems(JobCancelThresholdMinutes.value() * 1000, false); + List blockItems = _queueMgr.getBlockedQueueItems(JobCancelThresholdMinutes.value() * 60000, false); if (blockItems != null && blockItems.size() > 0) { for (SyncQueueItemVO item : blockItems) { if (item.getContentType().equalsIgnoreCase(SyncQueueItem.AsyncJobContentType)) { + s_logger.info("Remove Job-" + item.getContentId() + " from Queue-" + item.getId() + " since it has been blocked for too long"); completeAsyncJob(item.getContentId(), JobInfo.Status.FAILED, 0, "Job is cancelled as it has been blocking others for too long"); + + _jobMonitor.unregisterByJobId(item.getContentId()); } // purge the item and resume queue processing diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java index 671818130f9..0b6f7a582b0 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java @@ -17,6 +17,7 @@ package org.apache.cloudstack.framework.jobs.impl; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.Timer; import java.util.concurrent.atomic.AtomicInteger; @@ -38,8 +39,7 @@ import com.cloud.utils.component.ManagerBase; public class AsyncJobMonitor extends ManagerBase { public static final Logger s_logger = Logger.getLogger(AsyncJobMonitor.class); - @Inject - private MessageBus _messageBus; + @Inject private MessageBus _messageBus; private final Map _activeTasks = new HashMap(); private final Timer _timer = new Timer(); @@ -86,15 +86,16 @@ public class AsyncJobMonitor extends ManagerBase { synchronized (this) { for (Map.Entry entry : _activeTasks.entrySet()) { if (entry.getValue().millisSinceLastJobHeartbeat() > _inactivityWarningThresholdMs) { - s_logger.warn("Task (job-" + entry.getValue().getJobId() + ") has been pending for " + entry.getValue().millisSinceLastJobHeartbeat() / 1000 + - " seconds"); + s_logger.warn("Task (job-" + entry.getValue().getJobId() + ") has been pending for " + + entry.getValue().millisSinceLastJobHeartbeat() / 1000 + " seconds"); } } } } @Override - public boolean configure(String name, Map params) throws ConfigurationException { + public boolean configure(String name, Map params) + throws ConfigurationException { _messageBus.subscribe(AsyncJob.Topics.JOB_HEARTBEAT, MessageDispatcher.getDispatcher(this)); _timer.scheduleAtFixedRate(new ManagedContextTimerTask() { @@ -141,6 +142,25 @@ public class AsyncJobMonitor extends ManagerBase { } } + public void unregisterByJobId(long jobId) { + synchronized (this) { + Iterator> it = _activeTasks.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + if (entry.getValue().getJobId() == jobId) { + s_logger.info("Remove Job-" + entry.getValue().getJobId() + " from job monitoring due to job cancelling"); + + if (entry.getValue().isPoolThread()) + _activePoolThreads.decrementAndGet(); + else + _activeInplaceThreads.decrementAndGet(); + + it.remove(); + } + } + } + } + public int getActivePoolThreads() { return _activePoolThreads.get(); }