diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java index ee10813dc82..3d075d3b524 100644 --- a/server/src/com/cloud/async/AsyncJobManagerImpl.java +++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java @@ -46,6 +46,7 @@ import com.cloud.async.dao.AsyncJobDao; import com.cloud.cluster.ClusterManager; import com.cloud.cluster.ClusterManagerListener; import com.cloud.cluster.ManagementServerHostVO; +import com.cloud.configuration.Config; import com.cloud.configuration.dao.ConfigurationDao; import com.cloud.exception.InvalidParameterValueException; import com.cloud.exception.PermissionDeniedException; @@ -82,6 +83,7 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe private AccountDao _accountDao; private AsyncJobDao _jobDao; private long _jobExpireSeconds = 86400; // 1 day + private long _jobCancelThresholdSeconds = 3600; // 1 hour private ApiDispatcher _dispatcher; private final ScheduledExecutorService _heartbeatScheduler = @@ -549,6 +551,18 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe } } + // forcely cancel blocking queue items if they've been staying there for too long + List blockItems = _queueMgr.getBlockedQueueItems(_jobCancelThresholdSeconds*1000, false); + if(blockItems != null && blockItems.size() > 0) { + for(SyncQueueItemVO item : blockItems) { + if(item.getContentType().equalsIgnoreCase("AsyncJob")) + completeAsyncJob(item.getContentId(), 2, 0, "Job is cancelled as it has been blocking others for too long"); + + // purge the item and resume queue processing + _queueMgr.purgeItem(item.getId()); + } + } + s_logger.trace("End cleanup expired async-jobs"); } catch(Throwable e) { s_logger.error("Unexpected exception when trying to execute queue item, ", e); @@ -596,10 +610,13 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe throw new ConfigurationException("Unable to get the configuration dao."); } - Map configs = configDao.getConfiguration("management-server", params); - - int expireMinutes = NumbersUtil.parseInt(configs.get("job.expire.minutes"), 24*60); + int expireMinutes = NumbersUtil.parseInt( + configDao.getValue(Config.JobExpireMinutes.key()), 24*60); _jobExpireSeconds = (long)expireMinutes*60; + + _jobCancelThresholdSeconds = NumbersUtil.parseInt( + configDao.getValue(Config.JobCancelThresholdMinutes.key()), 60); + _jobCancelThresholdSeconds *= 60; _accountDao = locator.getDao(AccountDao.class); if (_accountDao == null) { diff --git a/server/src/com/cloud/async/SyncQueueManager.java b/server/src/com/cloud/async/SyncQueueManager.java index 50181fde951..7811c64515b 100644 --- a/server/src/com/cloud/async/SyncQueueManager.java +++ b/server/src/com/cloud/async/SyncQueueManager.java @@ -29,5 +29,6 @@ public interface SyncQueueManager extends Manager { public void purgeItem(long queueItemId); public List getActiveQueueItems(Long msid, boolean exclusive); + public List getBlockedQueueItems(long thresholdMs, boolean exclusive); public void resetQueueProcess(long msid); } diff --git a/server/src/com/cloud/async/SyncQueueManagerImpl.java b/server/src/com/cloud/async/SyncQueueManagerImpl.java index 7142357db25..d0bedcb6ed5 100644 --- a/server/src/com/cloud/async/SyncQueueManagerImpl.java +++ b/server/src/com/cloud/async/SyncQueueManagerImpl.java @@ -33,8 +33,6 @@ import com.cloud.async.dao.SyncQueueItemDao; import com.cloud.utils.DateUtil; import com.cloud.utils.component.ComponentLocator; import com.cloud.utils.db.DB; -import com.cloud.utils.db.SearchBuilder; -import com.cloud.utils.db.SearchCriteria; import com.cloud.utils.db.Transaction; import com.cloud.utils.exception.CloudRuntimeException; @@ -204,6 +202,11 @@ public class SyncQueueManagerImpl implements SyncQueueManager { return _syncQueueItemDao.getActiveQueueItems(msid, exclusive); } + @Override + public List getBlockedQueueItems(long thresholdMs, boolean exclusive) { + return _syncQueueItemDao.getBlockedQueueItems(thresholdMs, exclusive); + } + @Override public void resetQueueProcess(long msid) { _syncQueueDao.resetQueueProcessing(msid); diff --git a/server/src/com/cloud/async/dao/SyncQueueItemDao.java b/server/src/com/cloud/async/dao/SyncQueueItemDao.java index e06764a0754..d05549bed44 100644 --- a/server/src/com/cloud/async/dao/SyncQueueItemDao.java +++ b/server/src/com/cloud/async/dao/SyncQueueItemDao.java @@ -18,13 +18,14 @@ package com.cloud.async.dao; -import java.util.List; - +import java.util.List; + import com.cloud.async.SyncQueueItemVO; import com.cloud.utils.db.GenericDao; public interface SyncQueueItemDao extends GenericDao { public SyncQueueItemVO getNextQueueItem(long queueId); public List getNextQueueItems(int maxItems); - public List getActiveQueueItems(Long msid, boolean exclusive); + public List getActiveQueueItems(Long msid, boolean exclusive); + public List getBlockedQueueItems(long thresholdMs, boolean exclusive); } diff --git a/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java b/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java index 5299d9c4311..b3f90ee9966 100644 --- a/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java +++ b/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java @@ -22,6 +22,7 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Date; import java.util.List; import java.util.TimeZone; @@ -30,16 +31,20 @@ import javax.ejb.Local; import org.apache.log4j.Logger; import com.cloud.async.SyncQueueItemVO; +import com.cloud.async.SyncQueueVO; import com.cloud.utils.DateUtil; import com.cloud.utils.db.Filter; import com.cloud.utils.db.GenericDaoBase; +import com.cloud.utils.db.JoinBuilder; import com.cloud.utils.db.SearchBuilder; import com.cloud.utils.db.SearchCriteria; import com.cloud.utils.db.Transaction; @Local(value = { SyncQueueItemDao.class }) public class SyncQueueItemDaoImpl extends GenericDaoBase implements SyncQueueItemDao { - private static final Logger s_logger = Logger.getLogger(SyncQueueItemDaoImpl.class); + private static final Logger s_logger = Logger.getLogger(SyncQueueItemDaoImpl.class); + + private final SyncQueueDao _syncQueueDao = new SyncQueueDaoImpl(); @Override public SyncQueueItemVO getNextQueueItem(long queueId) { @@ -109,5 +114,30 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase if(exclusive) return lockRows(sc, filter, true); return listBy(sc, filter); - } + } + + @Override + public List getBlockedQueueItems(long thresholdMs, boolean exclusive) { + Date cutTime = DateUtil.currentGMTTime(); + cutTime = new Date(cutTime.getTime() - thresholdMs); + + SearchBuilder sbQueue = _syncQueueDao.createSearchBuilder(); + sbQueue.and("lastProcessTime", sbQueue.entity().getLastProcessTime(), SearchCriteria.Op.NNULL); + sbQueue.and("lastProcessTime2", sbQueue.entity().getLastProcessTime(), SearchCriteria.Op.LT); + + SearchBuilder sbItem = createSearchBuilder(); + sbItem.join("queueItemJoinQueue", sbQueue, sbQueue.entity().getId(), sbItem.entity().getQueueId(), JoinBuilder.JoinType.INNER); + sbItem.and("lastProcessMsid", sbItem.entity().getLastProcessMsid(), SearchCriteria.Op.NNULL); + sbItem.and("lastProcessNumber", sbItem.entity().getLastProcessNumber(), SearchCriteria.Op.NNULL); + + sbQueue.done(); + sbItem.done(); + + SearchCriteria sc = sbItem.create(); + sc.setJoinParameters("queueItemJoinQueue", "lastProcessTime2", cutTime); + + if(exclusive) + return lockRows(sc, null, true); + return listBy(sc, null); + } } diff --git a/server/src/com/cloud/configuration/Config.java b/server/src/com/cloud/configuration/Config.java index 09115970603..57cfce0d2a8 100644 --- a/server/src/com/cloud/configuration/Config.java +++ b/server/src/com/cloud/configuration/Config.java @@ -112,6 +112,7 @@ public enum Config { // Advanced JobExpireMinutes("Advanced", ManagementServer.class, String.class, "job.expire.minutes", "1440", "Time (in minutes) for async-jobs to be kept in system", null), + JobCancelThresholdMinutes("Advanced", ManagementServer.class, String.class, "job.cancel.threshold.minutes", "60", "Time (in minutes) for async-jobs to be forcely cancelled if it has been in process for long", null), AccountCleanupInterval("Advanced", ManagementServer.class, Integer.class, "account.cleanup.interval", "86400", "The interval in seconds between cleanup for removed accounts", null), AllowPublicUserTemplates("Advanced", ManagementServer.class, Integer.class, "allow.public.user.templates", "true", "If false, users will not be able to create public templates.", null), diff --git a/server/test/async-job-component.xml b/server/test/async-job-component.xml index 38602dc3e92..8aab9d14276 100644 --- a/server/test/async-job-component.xml +++ b/server/test/async-job-component.xml @@ -24,160 +24,160 @@ --> - - - - - + + + + + 50 -1 - - - + + + 50 -1 - + 50 -1 - + - - - + + + 50 -1 routing - + 5000 300 - - + + 50 -1 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - - - + + + - - - + + + - - + + 2 - + 2 - - + + - - + + - - + + - - - + + + - - + + - - + + - - - - + + + + - + - + - + true - + - + - + - - + + - + - + - + - + - + - + - + - + - - - + + + diff --git a/server/test/com/cloud/async/TestSyncQueueManager.java b/server/test/com/cloud/async/TestSyncQueueManager.java index 39a39994149..74e06bbe0c7 100644 --- a/server/test/com/cloud/async/TestSyncQueueManager.java +++ b/server/test/com/cloud/async/TestSyncQueueManager.java @@ -20,8 +20,6 @@ package com.cloud.async; import java.util.List; -import junit.framework.Assert; - import org.apache.log4j.Logger; import com.cloud.utils.component.ComponentLocator; @@ -34,7 +32,7 @@ public class TestSyncQueueManager extends ComponentTestCase { private volatile int count = 0; private volatile long expectingCurrent = 1; - + public void leftOverItems() { SyncQueueManager mgr = ComponentLocator.getCurrentLocator().getManager( SyncQueueManager.class); @@ -198,5 +196,21 @@ public class TestSyncQueueManager extends ComponentTestCase { for(int q = 1; q <= queues; q++) for(int i = 0; i < totalRuns; i++) mgr.queue("vm_instance", q, "Async-job", i+1); - } + } + + public void testSyncQueue() { + final SyncQueueManager mgr = ComponentLocator.getCurrentLocator().getManager( + SyncQueueManager.class); + + mgr.queue("vm_instance", 1, "Async-job", 1); + mgr.queue("vm_instance", 1, "Async-job", 2); + mgr.queue("vm_instance", 1, "Async-job", 3); + mgr.dequeueFromAny(100L, 1); + + List l = mgr.getBlockedQueueItems(100000, false); + for(SyncQueueItemVO item : l) { + System.out.println("Blocked item. " + item.getContentType() + "-" + item.getContentId()); + mgr.purgeItem(item.getId()); + } + } } diff --git a/server/test/sync-queue-component.xml b/server/test/sync-queue-component.xml index c1e85b1ef2c..2c30ef8d2d5 100644 --- a/server/test/sync-queue-component.xml +++ b/server/test/sync-queue-component.xml @@ -1,8 +1,8 @@ - - - - + + + +