diff --git a/api/src/com/cloud/async/SyncQueueItem.java b/api/src/com/cloud/async/SyncQueueItem.java index f299481eb11..9f9c379a742 100644 --- a/api/src/com/cloud/async/SyncQueueItem.java +++ b/api/src/com/cloud/async/SyncQueueItem.java @@ -16,7 +16,9 @@ // under the License. package com.cloud.async; + public interface SyncQueueItem { + public final String AsyncJobContentType = "AsyncJob"; String getContentType(); diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java index 6cf95feb977..5fb5105857c 100644 --- a/server/src/com/cloud/async/AsyncJobManagerImpl.java +++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java @@ -270,7 +270,7 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe Random random = new Random(); for(int i = 0; i < 5; i++) { - queue = _queueMgr.queue(syncObjType, syncObjId, "AsyncJob", job.getId(), queueSizeLimit); + queue = _queueMgr.queue(syncObjType, syncObjId, SyncQueueItem.AsyncJobContentType, job.getId(), queueSizeLimit); if(queue != null) { break; } @@ -598,60 +598,73 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe return new Runnable() { @Override public void run() { - GlobalLock scanLock = GlobalLock.getInternLock("AsyncJobManagerGC"); - try { - if(scanLock.lock(ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION)) { - try { - reallyRun(); - } finally { - scanLock.unlock(); - } - } - } finally { - scanLock.releaseRef(); - } - } - - private void reallyRun() { - try { - s_logger.trace("Begin cleanup expired async-jobs"); - - Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - _jobExpireSeconds*1000); - - // limit to 100 jobs per turn, this gives cleanup throughput as 600 jobs per minute - // hopefully this will be fast enough to balance potential growth of job table - List l = _jobDao.getExpiredJobs(cutTime, 100); - if(l != null && l.size() > 0) { - for(AsyncJobVO job : l) { - _jobDao.expunge(job.getId()); - } - } - - // forcely cancel blocking queue items if they've been staying there for too long - List blockItems = _queueMgr.getBlockedQueueItems(_jobCancelThresholdSeconds*1000, false); - if(blockItems != null && blockItems.size() > 0) { - for(SyncQueueItemVO item : blockItems) { - if(item.getContentType().equalsIgnoreCase("AsyncJob")) { - completeAsyncJob(item.getContentId(), AsyncJobResult.STATUS_FAILED, 0, getResetResultResponse("Job is cancelled as it has been blocking others for too long")); + GlobalLock scanLock = GlobalLock.getInternLock("AsyncJobManagerGC"); + try { + if(scanLock.lock(ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION)) { + try { + reallyRun(); + } finally { + scanLock.unlock(); + } + } + } finally { + scanLock.releaseRef(); + } + } + + public void reallyRun() { + try { + s_logger.trace("Begin cleanup expired async-jobs"); + + Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - _jobExpireSeconds*1000); + + // limit to 100 jobs per turn, this gives cleanup throughput as 600 jobs per minute + // hopefully this will be fast enough to balance potential growth of job table + List l = _jobDao.getExpiredJobs(cutTime, 100); + if(l != null && l.size() > 0) { + for(AsyncJobVO job : l) { + expungeAsyncJob(job); + } + } + + // forcefully cancel blocking queue items if they've been staying there for too long + List blockItems = _queueMgr.getBlockedQueueItems(_jobCancelThresholdSeconds*1000, false); + if(blockItems != null && blockItems.size() > 0) { + for(SyncQueueItemVO item : blockItems) { + if(item.getContentType().equalsIgnoreCase(SyncQueueItem.AsyncJobContentType)) { + completeAsyncJob(item.getContentId(), AsyncJobResult.STATUS_FAILED, 0, + getResetResultResponse("Job is cancelled as it has been blocking others for too long")); } - - // purge the item and resume queue processing - _queueMgr.purgeItem(item.getId()); - } - } - - s_logger.trace("End cleanup expired async-jobs"); - } catch(Throwable e) { - s_logger.error("Unexpected exception when trying to execute queue item, ", e); - } finally { - StackMaid.current().exitCleanup(); - } - } - }; - } - - private long getMsid() { - if(_clusterMgr != null) { + + // purge the item and resume queue processing + _queueMgr.purgeItem(item.getId()); + } + } + + s_logger.trace("End cleanup expired async-jobs"); + } catch(Throwable e) { + s_logger.error("Unexpected exception when trying to execute queue item, ", e); + } finally { + StackMaid.current().exitCleanup(); + } + } + + + }; + } + + @DB + protected void expungeAsyncJob(AsyncJobVO job) { + Transaction txn = Transaction.currentTxn(); + txn.start(); + _jobDao.expunge(job.getId()); + //purge corresponding sync queue item + _queueMgr.purgeAsyncJobQueueItemId(job.getId()); + txn.commit(); + } + + private long getMsid() { + if(_clusterMgr != null) { return _clusterMgr.getManagementNodeId(); } @@ -666,7 +679,7 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe } String contentType = item.getContentType(); - if(contentType != null && contentType.equals("AsyncJob")) { + if(contentType != null && contentType.equalsIgnoreCase(SyncQueueItem.AsyncJobContentType)) { Long jobId = item.getContentId(); if(jobId != null) { s_logger.warn("Mark job as failed as its correspoding queue-item has been discarded. job id: " + jobId); diff --git a/server/src/com/cloud/async/SyncQueueManager.java b/server/src/com/cloud/async/SyncQueueManager.java index b605f1b8670..a7032daaa47 100644 --- a/server/src/com/cloud/async/SyncQueueManager.java +++ b/server/src/com/cloud/async/SyncQueueManager.java @@ -30,4 +30,6 @@ public interface SyncQueueManager extends Manager { public List getActiveQueueItems(Long msid, boolean exclusive); public List getBlockedQueueItems(long thresholdMs, boolean exclusive); + + void purgeAsyncJobQueueItemId(long asyncJobId); } diff --git a/server/src/com/cloud/async/SyncQueueManagerImpl.java b/server/src/com/cloud/async/SyncQueueManagerImpl.java index c3f49557b00..4d1506523f6 100644 --- a/server/src/com/cloud/async/SyncQueueManagerImpl.java +++ b/server/src/com/cloud/async/SyncQueueManagerImpl.java @@ -185,13 +185,16 @@ public class SyncQueueManagerImpl implements SyncQueueManager { if(itemVO != null) { SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true); - _syncQueueItemDao.expunge(itemVO.getId()); - - queueVO.setLastUpdated(DateUtil.currentGMTTime()); - //decrement the count - assert (queueVO.getQueueSize() > 0) : "Count reduce happens when it's already <= 0!"; - queueVO.setQueueSize(queueVO.getQueueSize() - 1); - _syncQueueDao.update(queueVO.getId(), queueVO); + _syncQueueItemDao.expunge(itemVO.getId()); + + //if item is active, reset queue information + if (itemVO.getLastProcessMsid() != null) { + queueVO.setLastUpdated(DateUtil.currentGMTTime()); + //decrement the count + assert (queueVO.getQueueSize() > 0) : "Count reduce happens when it's already <= 0!"; + queueVO.setQueueSize(queueVO.getQueueSize() - 1); + _syncQueueDao.update(queueVO.getId(), queueVO); + } } txt.commit(); } catch(Exception e) { @@ -273,5 +276,13 @@ public class SyncQueueManagerImpl implements SyncQueueManager { private boolean queueReadyToProcess(SyncQueueVO queueVO) { return queueVO.getQueueSize() < queueVO.getQueueSizeLimit(); + } + + @Override + public void purgeAsyncJobQueueItemId(long asyncJobId) { + Long itemId = _syncQueueItemDao.getQueueItemIdByContentIdAndType(asyncJobId, SyncQueueItem.AsyncJobContentType); + if (itemId != null) { + purgeItem(itemId); + } } -} \ No newline at end of file +} diff --git a/server/src/com/cloud/async/dao/SyncQueueItemDao.java b/server/src/com/cloud/async/dao/SyncQueueItemDao.java index cd9df2f4682..6b9da8b63ea 100644 --- a/server/src/com/cloud/async/dao/SyncQueueItemDao.java +++ b/server/src/com/cloud/async/dao/SyncQueueItemDao.java @@ -26,4 +26,5 @@ public interface SyncQueueItemDao extends GenericDao { public List getNextQueueItems(int maxItems); public List getActiveQueueItems(Long msid, boolean exclusive); public List getBlockedQueueItems(long thresholdMs, boolean exclusive); -} + public Long getQueueItemIdByContentIdAndType(long contentId, String contentType); +} diff --git a/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java b/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java index ce212981d50..5e757563bff 100644 --- a/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java +++ b/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java @@ -33,13 +33,25 @@ import com.cloud.async.SyncQueueItemVO; import com.cloud.utils.DateUtil; import com.cloud.utils.db.Filter; import com.cloud.utils.db.GenericDaoBase; +import com.cloud.utils.db.GenericSearchBuilder; import com.cloud.utils.db.SearchBuilder; import com.cloud.utils.db.SearchCriteria; +import com.cloud.utils.db.SearchCriteria.Op; import com.cloud.utils.db.Transaction; @Local(value = { SyncQueueItemDao.class }) public class SyncQueueItemDaoImpl extends GenericDaoBase implements SyncQueueItemDao { private static final Logger s_logger = Logger.getLogger(SyncQueueItemDaoImpl.class); + final GenericSearchBuilder queueIdSearch; + + protected SyncQueueItemDaoImpl() { + super(); + queueIdSearch = createSearchBuilder(Long.class); + queueIdSearch.and("contentId", queueIdSearch.entity().getContentId(), Op.EQ); + queueIdSearch.and("contentType", queueIdSearch.entity().getContentType(), Op.EQ); + queueIdSearch.selectField(queueIdSearch.entity().getId()); + queueIdSearch.done(); + } @Override @@ -132,4 +144,15 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase return lockRows(sc, null, true); return listBy(sc, null); } + + + @Override + public Long getQueueItemIdByContentIdAndType(long contentId, String contentType) { + SearchCriteria sc = queueIdSearch.create(); + sc.setParameters("contentId", contentId); + sc.setParameters("contentType", contentType); + List id = customSearch(sc, null); + + return id.size() == 0 ? null : id.get(0); + } }