diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java index 41f14190f36..167d9f511ed 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java @@ -147,7 +147,7 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase SearchBuilder sbItem = createSearchBuilder(); sbItem.and("lastProcessMsid", sbItem.entity().getLastProcessMsid(), SearchCriteria.Op.NNULL); sbItem.and("lastProcessNumber", sbItem.entity().getLastProcessNumber(), SearchCriteria.Op.NNULL); - sbItem.and("lastProcessNumber", sbItem.entity().getLastProcessTime(), SearchCriteria.Op.NNULL); + sbItem.and("lastProcessTime", sbItem.entity().getLastProcessTime(), SearchCriteria.Op.NNULL); sbItem.and("lastProcessTime2", sbItem.entity().getLastProcessTime(), SearchCriteria.Op.LT); sbItem.done(); diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java index 7e65ede3e53..04fab24d7ae 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java @@ -769,46 +769,57 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, public void reallyRun() { try { - s_logger.trace("Begin cleanup expired async-jobs"); - - Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - JobExpireMinutes.value() * 60000); - - // limit to 100 jobs per turn, this gives cleanup throughput as 600 jobs per minute - // hopefully this will be fast enough to balance potential growth of job table - // 1) Expire unfinished jobs that weren't processed yet - List unfinishedJobs = _jobDao.getExpiredUnfinishedJobs(cutTime, 100); - for (AsyncJobVO job : unfinishedJobs) { - s_logger.info("Expunging unfinished job " + job); - - _jobMonitor.unregisterByJobId(job.getId()); - expungeAsyncJob(job); - } - - // 2) Expunge finished jobs - List completedJobs = _jobDao.getExpiredCompletedJobs(cutTime, 100); - for (AsyncJobVO job : completedJobs) { - s_logger.trace("Expunging completed job " + job); - - expungeAsyncJob(job); - } + s_logger.info("Begin cleanup expired async-jobs"); // forcefully cancel blocking queue items if they've been staying there for too long List blockItems = _queueMgr.getBlockedQueueItems(JobCancelThresholdMinutes.value() * 60000, false); if (blockItems != null && blockItems.size() > 0) { for (SyncQueueItemVO item : blockItems) { - if (item.getContentType().equalsIgnoreCase(SyncQueueItem.AsyncJobContentType)) { - s_logger.info("Remove Job-" + item.getContentId() + " from Queue-" + item.getId() + " since it has been blocked for too long"); - completeAsyncJob(item.getContentId(), JobInfo.Status.FAILED, 0, "Job is cancelled as it has been blocking others for too long"); + try { + if (item.getContentType().equalsIgnoreCase(SyncQueueItem.AsyncJobContentType)) { + s_logger.info("Remove Job-" + item.getContentId() + " from Queue-" + item.getId() + " since it has been blocked for too long"); + completeAsyncJob(item.getContentId(), JobInfo.Status.FAILED, 0, "Job is cancelled as it has been blocking others for too long"); - _jobMonitor.unregisterByJobId(item.getContentId()); + _jobMonitor.unregisterByJobId(item.getContentId()); + } + + // purge the item and resume queue processing + _queueMgr.purgeItem(item.getId()); + } catch (Throwable e) { + s_logger.error("Unexpected exception when trying to remove job from sync queue, ", e); } - - // purge the item and resume queue processing - _queueMgr.purgeItem(item.getId()); } } - s_logger.trace("End cleanup expired async-jobs"); + Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - JobExpireMinutes.value() * 60000); + // limit to 100 jobs per turn, this gives cleanup throughput as 600 jobs per minute + // hopefully this will be fast enough to balance potential growth of job table + // 1) Expire unfinished jobs that weren't processed yet + List unfinishedJobs = _jobDao.getExpiredUnfinishedJobs(cutTime, 100); + for (AsyncJobVO job : unfinishedJobs) { + try { + s_logger.info("Expunging unfinished job-" + job.getId()); + + _jobMonitor.unregisterByJobId(job.getId()); + expungeAsyncJob(job); + } catch (Throwable e) { + s_logger.error("Unexpected exception when trying to expunge job-" + job.getId(), e); + } + } + + // 2) Expunge finished jobs + List completedJobs = _jobDao.getExpiredCompletedJobs(cutTime, 100); + for (AsyncJobVO job : completedJobs) { + try { + s_logger.info("Expunging completed job-" + job.getId()); + + expungeAsyncJob(job); + } catch (Throwable e) { + s_logger.error("Unexpected exception when trying to expunge job-" + job.getId(), e); + } + } + + s_logger.info("End cleanup expired async-jobs"); } catch (Throwable e) { s_logger.error("Unexpected exception when trying to execute queue item, ", e); }