jobs: fix corner cases, add NPE checks

Signed-off-by: Rohit Yadav <rohit.yadav@shapeblue.com>
This commit is contained in:
Rohit Yadav 2015-02-05 16:20:08 +05:30
parent 744c20bd7c
commit 46cd98b163
3 changed files with 5 additions and 2 deletions

View File

@ -64,6 +64,7 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements
pendingAsyncJobsSearch.done(); pendingAsyncJobsSearch.done();
expiringUnfinishedAsyncJobSearch = createSearchBuilder(); expiringUnfinishedAsyncJobSearch = createSearchBuilder();
expiringUnfinishedAsyncJobSearch.and("jobDispatcher", expiringUnfinishedAsyncJobSearch.entity().getDispatcher(), SearchCriteria.Op.NEQ);
expiringUnfinishedAsyncJobSearch.and("created", expiringUnfinishedAsyncJobSearch.entity().getCreated(), SearchCriteria.Op.LTEQ); expiringUnfinishedAsyncJobSearch.and("created", expiringUnfinishedAsyncJobSearch.entity().getCreated(), SearchCriteria.Op.LTEQ);
expiringUnfinishedAsyncJobSearch.and("completeMsId", expiringUnfinishedAsyncJobSearch.entity().getCompleteMsid(), SearchCriteria.Op.NULL); expiringUnfinishedAsyncJobSearch.and("completeMsId", expiringUnfinishedAsyncJobSearch.entity().getCompleteMsid(), SearchCriteria.Op.NULL);
expiringUnfinishedAsyncJobSearch.and("jobStatus", expiringUnfinishedAsyncJobSearch.entity().getStatus(), SearchCriteria.Op.EQ); expiringUnfinishedAsyncJobSearch.and("jobStatus", expiringUnfinishedAsyncJobSearch.entity().getStatus(), SearchCriteria.Op.EQ);
@ -159,6 +160,7 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements
@Override @Override
public List<AsyncJobVO> getExpiredUnfinishedJobs(Date cutTime, int limit) { public List<AsyncJobVO> getExpiredUnfinishedJobs(Date cutTime, int limit) {
SearchCriteria<AsyncJobVO> sc = expiringUnfinishedAsyncJobSearch.create(); SearchCriteria<AsyncJobVO> sc = expiringUnfinishedAsyncJobSearch.create();
sc.setParameters("jobDispatcher", AsyncJobVO.JOB_DISPATCHER_PSEUDO);
sc.setParameters("created", cutTime); sc.setParameters("created", cutTime);
sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS); sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS);
Filter filter = new Filter(AsyncJobVO.class, "created", true, 0L, (long)limit); Filter filter = new Filter(AsyncJobVO.class, "created", true, 0L, (long)limit);

View File

@ -673,7 +673,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
while (timeoutInMiliseconds < 0 || System.currentTimeMillis() - startTick < timeoutInMiliseconds) { while (timeoutInMiliseconds < 0 || System.currentTimeMillis() - startTick < timeoutInMiliseconds) {
msgDetector.waitAny(checkIntervalInMilliSeconds); msgDetector.waitAny(checkIntervalInMilliSeconds);
job = _jobDao.findById(job.getId()); job = _jobDao.findById(job.getId());
if (job.getStatus().done()) { if (job != null && job.getStatus().done()) {
return true; return true;
} }

View File

@ -142,7 +142,7 @@ public class SyncQueueManagerImpl extends ManagerBase implements SyncQueueManage
for(SyncQueueItemVO item : l) { for(SyncQueueItemVO item : l) {
SyncQueueVO queueVO = _syncQueueDao.findById(item.getQueueId()); SyncQueueVO queueVO = _syncQueueDao.findById(item.getQueueId());
SyncQueueItemVO itemVO = _syncQueueItemDao.findById(item.getId()); SyncQueueItemVO itemVO = _syncQueueItemDao.findById(item.getId());
if(queueReadyToProcess(queueVO) && itemVO.getLastProcessNumber() == null) { if(queueReadyToProcess(queueVO) && itemVO != null && itemVO.getLastProcessNumber() == null) {
Long processNumber = queueVO.getLastProcessNumber(); Long processNumber = queueVO.getLastProcessNumber();
if (processNumber == null) if (processNumber == null)
processNumber = new Long(1); processNumber = new Long(1);
@ -220,6 +220,7 @@ public class SyncQueueManagerImpl extends ManagerBase implements SyncQueueManage
itemVO.setLastProcessTime(null); itemVO.setLastProcessTime(null);
_syncQueueItemDao.update(queueItemId, itemVO); _syncQueueItemDao.update(queueItemId, itemVO);
queueVO.setQueueSize(queueVO.getQueueSize() - 1);
queueVO.setLastUpdated(DateUtil.currentGMTTime()); queueVO.setLastUpdated(DateUtil.currentGMTTime());
_syncQueueDao.update(queueVO.getId(), queueVO); _syncQueueDao.update(queueVO.getId(), queueVO);
} }