diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java index 47d793fd0b0..0101a8a0abf 100644 --- a/server/src/com/cloud/async/AsyncJobManagerImpl.java +++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java @@ -621,11 +621,18 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, // limit to 100 jobs per turn, this gives cleanup throughput as 600 jobs per minute // hopefully this will be fast enough to balance potential growth of job table - List l = _jobDao.getExpiredJobs(cutTime, 100); - if(l != null && l.size() > 0) { - for(AsyncJobVO job : l) { - expungeAsyncJob(job); - } + //1) Expire unfinished jobs that weren't processed yet + List l = _jobDao.getExpiredUnfinishedJobs(cutTime, 100); + for(AsyncJobVO job : l) { + s_logger.trace("Expunging unfinished job " + job); + expungeAsyncJob(job); + } + + //2) Expunge finished jobs + List completedJobs = _jobDao.getExpiredCompletedJobs(cutTime, 100); + for(AsyncJobVO job : completedJobs) { + s_logger.trace("Expunging completed job " + job); + expungeAsyncJob(job); } // forcefully cancel blocking queue items if they've been staying there for too long diff --git a/server/src/com/cloud/async/dao/AsyncJobDao.java b/server/src/com/cloud/async/dao/AsyncJobDao.java index 9d207593574..9ab9b224c10 100644 --- a/server/src/com/cloud/async/dao/AsyncJobDao.java +++ b/server/src/com/cloud/async/dao/AsyncJobDao.java @@ -26,6 +26,7 @@ import com.cloud.utils.db.GenericDao; public interface AsyncJobDao extends GenericDao { AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId); List findInstancePendingAsyncJobs(AsyncJob.Type instanceType, Long accountId); - List getExpiredJobs(Date cutTime, int limit); + List getExpiredUnfinishedJobs(Date cutTime, int limit); void resetJobProcess(long msid, int jobResultCode, String jobResultMessage); -} + List getExpiredCompletedJobs(Date cutTime, int limit); +} \ No newline at end of file diff --git a/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java b/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java index 4793a6edc12..b2c0d9cc4e0 100644 --- a/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java +++ b/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java @@ -42,17 +42,19 @@ public class AsyncJobDaoImpl extends GenericDaoBase implements private static final Logger s_logger = Logger.getLogger(AsyncJobDaoImpl.class.getName()); private final SearchBuilder pendingAsyncJobSearch; - private final SearchBuilder pendingAsyncJobsSearch; - private final SearchBuilder expiringAsyncJobSearch; - - public AsyncJobDaoImpl() { - pendingAsyncJobSearch = createSearchBuilder(); - pendingAsyncJobSearch.and("instanceType", pendingAsyncJobSearch.entity().getInstanceType(), - SearchCriteria.Op.EQ); - pendingAsyncJobSearch.and("instanceId", pendingAsyncJobSearch.entity().getInstanceId(), - SearchCriteria.Op.EQ); - pendingAsyncJobSearch.and("status", pendingAsyncJobSearch.entity().getStatus(), - SearchCriteria.Op.EQ); + private final SearchBuilder pendingAsyncJobsSearch; + private final SearchBuilder expiringUnfinishedAsyncJobSearch; + private final SearchBuilder expiringCompletedAsyncJobSearch; + + + public AsyncJobDaoImpl() { + pendingAsyncJobSearch = createSearchBuilder(); + pendingAsyncJobSearch.and("instanceType", pendingAsyncJobSearch.entity().getInstanceType(), + SearchCriteria.Op.EQ); + pendingAsyncJobSearch.and("instanceId", pendingAsyncJobSearch.entity().getInstanceId(), + SearchCriteria.Op.EQ); + pendingAsyncJobSearch.and("status", pendingAsyncJobSearch.entity().getStatus(), + SearchCriteria.Op.EQ); pendingAsyncJobSearch.done(); pendingAsyncJobsSearch = createSearchBuilder(); @@ -64,27 +66,36 @@ public class AsyncJobDaoImpl extends GenericDaoBase implements SearchCriteria.Op.EQ); pendingAsyncJobsSearch.done(); - expiringAsyncJobSearch = createSearchBuilder(); - expiringAsyncJobSearch.and("created", expiringAsyncJobSearch.entity().getCreated(), + expiringUnfinishedAsyncJobSearch = createSearchBuilder(); + expiringUnfinishedAsyncJobSearch.and("created", expiringUnfinishedAsyncJobSearch.entity().getCreated(), SearchCriteria.Op.LTEQ); - expiringAsyncJobSearch.done(); - } - - public AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId) { - SearchCriteria sc = pendingAsyncJobSearch.create(); - sc.setParameters("instanceType", instanceType); - sc.setParameters("instanceId", instanceId); - sc.setParameters("status", AsyncJobResult.STATUS_IN_PROGRESS); - - List l = listIncludingRemovedBy(sc); - if(l != null && l.size() > 0) { - if(l.size() > 1) { - s_logger.warn("Instance " + instanceType + "-" + instanceId + " has multiple pending async-job"); - } - - return l.get(0); - } - return null; + expiringUnfinishedAsyncJobSearch.and("completeMsId", expiringUnfinishedAsyncJobSearch.entity().getCompleteMsid(), SearchCriteria.Op.NULL); + expiringUnfinishedAsyncJobSearch.and("jobStatus", expiringUnfinishedAsyncJobSearch.entity().getStatus(), SearchCriteria.Op.EQ); + expiringUnfinishedAsyncJobSearch.done(); + + expiringCompletedAsyncJobSearch = createSearchBuilder(); + expiringCompletedAsyncJobSearch.and("created", expiringCompletedAsyncJobSearch.entity().getCreated(), + SearchCriteria.Op.LTEQ); + expiringCompletedAsyncJobSearch.and("completeMsId", expiringCompletedAsyncJobSearch.entity().getCompleteMsid(), SearchCriteria.Op.NNULL); + expiringCompletedAsyncJobSearch.and("jobStatus", expiringCompletedAsyncJobSearch.entity().getStatus(), SearchCriteria.Op.NEQ); + expiringCompletedAsyncJobSearch.done(); + } + + public AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId) { + SearchCriteria sc = pendingAsyncJobSearch.create(); + sc.setParameters("instanceType", instanceType); + sc.setParameters("instanceId", instanceId); + sc.setParameters("status", AsyncJobResult.STATUS_IN_PROGRESS); + + List l = listIncludingRemovedBy(sc); + if(l != null && l.size() > 0) { + if(l.size() > 1) { + s_logger.warn("Instance " + instanceType + "-" + instanceId + " has multiple pending async-job"); + } + + return l.get(0); + } + return null; } public List findInstancePendingAsyncJobs(AsyncJob.Type instanceType, Long accountId) { @@ -99,9 +110,20 @@ public class AsyncJobDaoImpl extends GenericDaoBase implements return listBy(sc); } - public List getExpiredJobs(Date cutTime, int limit) { - SearchCriteria sc = expiringAsyncJobSearch.create(); + @Override + public List getExpiredUnfinishedJobs(Date cutTime, int limit) { + SearchCriteria sc = expiringUnfinishedAsyncJobSearch.create(); sc.setParameters("created", cutTime); + sc.setParameters("jobStatus", 0); + Filter filter = new Filter(AsyncJobVO.class, "created", true, 0L, (long)limit); + return listIncludingRemovedBy(sc, filter); + } + + @Override + public List getExpiredCompletedJobs(Date cutTime, int limit) { + SearchCriteria sc = expiringCompletedAsyncJobSearch.create(); + sc.setParameters("created", cutTime); + sc.setParameters("jobStatus", 0); Filter filter = new Filter(AsyncJobVO.class, "created", true, 0L, (long)limit); return listIncludingRemovedBy(sc, filter); }