From 0afec010b8a8b455c7a3b5093adbd866a359e319 Mon Sep 17 00:00:00 2001 From: Rohit Yadav Date: Thu, 5 Feb 2015 16:20:08 +0530 Subject: [PATCH] jobs: fix corner cases, add NPE checks Signed-off-by: Rohit Yadav --- .../apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java | 2 ++ .../cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java | 2 +- .../cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java | 3 ++- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java index a050407cd97..6889cd22dc9 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java @@ -64,6 +64,7 @@ public class AsyncJobDaoImpl extends GenericDaoBase implements pendingAsyncJobsSearch.done(); expiringUnfinishedAsyncJobSearch = createSearchBuilder(); + expiringUnfinishedAsyncJobSearch.and("jobDispatcher", expiringUnfinishedAsyncJobSearch.entity().getDispatcher(), SearchCriteria.Op.NEQ); expiringUnfinishedAsyncJobSearch.and("created", expiringUnfinishedAsyncJobSearch.entity().getCreated(), SearchCriteria.Op.LTEQ); expiringUnfinishedAsyncJobSearch.and("completeMsId", expiringUnfinishedAsyncJobSearch.entity().getCompleteMsid(), SearchCriteria.Op.NULL); expiringUnfinishedAsyncJobSearch.and("jobStatus", expiringUnfinishedAsyncJobSearch.entity().getStatus(), SearchCriteria.Op.EQ); @@ -159,6 +160,7 @@ public class AsyncJobDaoImpl extends GenericDaoBase implements @Override public List getExpiredUnfinishedJobs(Date cutTime, int limit) { SearchCriteria sc = expiringUnfinishedAsyncJobSearch.create(); + sc.setParameters("jobDispatcher", AsyncJobVO.JOB_DISPATCHER_PSEUDO); sc.setParameters("created", cutTime); sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS); Filter filter = new Filter(AsyncJobVO.class, "created", true, 0L, (long)limit); diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java index bdc7cb0bfc4..89e08004ae8 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java @@ -673,7 +673,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, while (timeoutInMiliseconds < 0 || System.currentTimeMillis() - startTick < timeoutInMiliseconds) { msgDetector.waitAny(checkIntervalInMilliSeconds); job = _jobDao.findById(job.getId()); - if (job.getStatus().done()) { + if (job != null && job.getStatus().done()) { return true; } diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java index 1cfec4dba41..65351634bcb 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java @@ -142,7 +142,7 @@ public class SyncQueueManagerImpl extends ManagerBase implements SyncQueueManage for(SyncQueueItemVO item : l) { SyncQueueVO queueVO = _syncQueueDao.findById(item.getQueueId()); SyncQueueItemVO itemVO = _syncQueueItemDao.findById(item.getId()); - if(queueReadyToProcess(queueVO) && itemVO.getLastProcessNumber() == null) { + if(queueReadyToProcess(queueVO) && itemVO != null && itemVO.getLastProcessNumber() == null) { Long processNumber = queueVO.getLastProcessNumber(); if (processNumber == null) processNumber = new Long(1); @@ -220,6 +220,7 @@ public class SyncQueueManagerImpl extends ManagerBase implements SyncQueueManage itemVO.setLastProcessTime(null); _syncQueueItemDao.update(queueItemId, itemVO); + queueVO.setQueueSize(queueVO.getQueueSize() - 1); queueVO.setLastUpdated(DateUtil.currentGMTTime()); _syncQueueDao.update(queueVO.getId(), queueVO); }