Remove cancelled jobs from job monitoring, correct mis-calculated time-unit in job cancellation.

This commit is contained in:
Kelven Yang 2014-03-03 16:09:40 -08:00
parent bbf5a912c6
commit 5dd4fb22ef
2 changed files with 35 additions and 10 deletions

View File

@ -85,12 +85,11 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
private static final ConfigKey<Long> JobExpireMinutes = new ConfigKey<Long>(Long.class, "job.expire.minutes", "Advanced", "1440",
"Time (in minutes) for async-jobs to be kept in system", true, ConfigKey.Scope.Global, 60l);
private static final ConfigKey<Long> JobCancelThresholdMinutes = new ConfigKey<Long>(Long.class, "job.cancel.threshold.minutes", "Advanced", "60",
"Time (in minutes) for async-jobs to be forcely cancelled if it has been in process for long", true, ConfigKey.Scope.Global, 60l);
"Time (in minutes) for async-jobs to be forcely cancelled if it has been in process for long", true, ConfigKey.Scope.Global, 240l);
private static final Logger s_logger = Logger.getLogger(AsyncJobManagerImpl.class);
private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3; // 3 seconds
private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_SYNC = 60; // 60 seconds
private static final int MAX_ONETIME_SCHEDULE_SIZE = 50;
private static final int HEARTBEAT_INTERVAL = 2000;
@ -706,14 +705,16 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
try {
s_logger.trace("Begin cleanup expired async-jobs");
Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - JobExpireMinutes.value() * 1000);
Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - JobExpireMinutes.value() * 60000);
// limit to 100 jobs per turn, this gives cleanup throughput as 600 jobs per minute
// hopefully this will be fast enough to balance potential growth of job table
//1) Expire unfinished jobs that weren't processed yet
List<AsyncJobVO> l = _jobDao.getExpiredUnfinishedJobs(cutTime, 100);
for (AsyncJobVO job : l) {
s_logger.trace("Expunging unfinished job " + job);
s_logger.info("Expunging unfinished job " + job);
_jobMonitor.unregisterByJobId(job.getId());
expungeAsyncJob(job);
}
@ -721,15 +722,19 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
List<AsyncJobVO> completedJobs = _jobDao.getExpiredCompletedJobs(cutTime, 100);
for (AsyncJobVO job : completedJobs) {
s_logger.trace("Expunging completed job " + job);
expungeAsyncJob(job);
}
// forcefully cancel blocking queue items if they've been staying there for too long
List<SyncQueueItemVO> blockItems = _queueMgr.getBlockedQueueItems(JobCancelThresholdMinutes.value() * 1000, false);
List<SyncQueueItemVO> blockItems = _queueMgr.getBlockedQueueItems(JobCancelThresholdMinutes.value() * 60000, false);
if (blockItems != null && blockItems.size() > 0) {
for (SyncQueueItemVO item : blockItems) {
if (item.getContentType().equalsIgnoreCase(SyncQueueItem.AsyncJobContentType)) {
s_logger.info("Remove Job-" + item.getContentId() + " from Queue-" + item.getId() + " since it has been blocked for too long");
completeAsyncJob(item.getContentId(), JobInfo.Status.FAILED, 0, "Job is cancelled as it has been blocking others for too long");
_jobMonitor.unregisterByJobId(item.getContentId());
}
// purge the item and resume queue processing

View File

@ -17,6 +17,7 @@
package org.apache.cloudstack.framework.jobs.impl;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Timer;
import java.util.concurrent.atomic.AtomicInteger;
@ -38,8 +39,7 @@ import com.cloud.utils.component.ManagerBase;
public class AsyncJobMonitor extends ManagerBase {
public static final Logger s_logger = Logger.getLogger(AsyncJobMonitor.class);
@Inject
private MessageBus _messageBus;
@Inject private MessageBus _messageBus;
private final Map<Long, ActiveTaskRecord> _activeTasks = new HashMap<Long, ActiveTaskRecord>();
private final Timer _timer = new Timer();
@ -86,15 +86,16 @@ public class AsyncJobMonitor extends ManagerBase {
synchronized (this) {
for (Map.Entry<Long, ActiveTaskRecord> entry : _activeTasks.entrySet()) {
if (entry.getValue().millisSinceLastJobHeartbeat() > _inactivityWarningThresholdMs) {
s_logger.warn("Task (job-" + entry.getValue().getJobId() + ") has been pending for " + entry.getValue().millisSinceLastJobHeartbeat() / 1000 +
" seconds");
s_logger.warn("Task (job-" + entry.getValue().getJobId() + ") has been pending for "
+ entry.getValue().millisSinceLastJobHeartbeat() / 1000 + " seconds");
}
}
}
}
@Override
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
public boolean configure(String name, Map<String, Object> params)
throws ConfigurationException {
_messageBus.subscribe(AsyncJob.Topics.JOB_HEARTBEAT, MessageDispatcher.getDispatcher(this));
_timer.scheduleAtFixedRate(new ManagedContextTimerTask() {
@ -141,6 +142,25 @@ public class AsyncJobMonitor extends ManagerBase {
}
}
public void unregisterByJobId(long jobId) {
synchronized (this) {
Iterator<Map.Entry<Long, ActiveTaskRecord>> it = _activeTasks.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Long, ActiveTaskRecord> entry = it.next();
if (entry.getValue().getJobId() == jobId) {
s_logger.info("Remove Job-" + entry.getValue().getJobId() + " from job monitoring due to job cancelling");
if (entry.getValue().isPoolThread())
_activePoolThreads.decrementAndGet();
else
_activeInplaceThreads.decrementAndGet();
it.remove();
}
}
}
}
public int getActivePoolThreads() {
return _activePoolThreads.get();
}