mirror of https://github.com/apache/cloudstack.git
jobs: fix corner cases, add NPE checks
Signed-off-by: Rohit Yadav <rohit.yadav@shapeblue.com>
This commit is contained in:
parent
d5538fbe3b
commit
0afec010b8
|
|
@ -64,6 +64,7 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements
|
|||
pendingAsyncJobsSearch.done();
|
||||
|
||||
expiringUnfinishedAsyncJobSearch = createSearchBuilder();
|
||||
expiringUnfinishedAsyncJobSearch.and("jobDispatcher", expiringUnfinishedAsyncJobSearch.entity().getDispatcher(), SearchCriteria.Op.NEQ);
|
||||
expiringUnfinishedAsyncJobSearch.and("created", expiringUnfinishedAsyncJobSearch.entity().getCreated(), SearchCriteria.Op.LTEQ);
|
||||
expiringUnfinishedAsyncJobSearch.and("completeMsId", expiringUnfinishedAsyncJobSearch.entity().getCompleteMsid(), SearchCriteria.Op.NULL);
|
||||
expiringUnfinishedAsyncJobSearch.and("jobStatus", expiringUnfinishedAsyncJobSearch.entity().getStatus(), SearchCriteria.Op.EQ);
|
||||
|
|
@ -159,6 +160,7 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements
|
|||
@Override
|
||||
public List<AsyncJobVO> getExpiredUnfinishedJobs(Date cutTime, int limit) {
|
||||
SearchCriteria<AsyncJobVO> sc = expiringUnfinishedAsyncJobSearch.create();
|
||||
sc.setParameters("jobDispatcher", AsyncJobVO.JOB_DISPATCHER_PSEUDO);
|
||||
sc.setParameters("created", cutTime);
|
||||
sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS);
|
||||
Filter filter = new Filter(AsyncJobVO.class, "created", true, 0L, (long)limit);
|
||||
|
|
|
|||
|
|
@ -673,7 +673,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
|
|||
while (timeoutInMiliseconds < 0 || System.currentTimeMillis() - startTick < timeoutInMiliseconds) {
|
||||
msgDetector.waitAny(checkIntervalInMilliSeconds);
|
||||
job = _jobDao.findById(job.getId());
|
||||
if (job.getStatus().done()) {
|
||||
if (job != null && job.getStatus().done()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -142,7 +142,7 @@ public class SyncQueueManagerImpl extends ManagerBase implements SyncQueueManage
|
|||
for(SyncQueueItemVO item : l) {
|
||||
SyncQueueVO queueVO = _syncQueueDao.findById(item.getQueueId());
|
||||
SyncQueueItemVO itemVO = _syncQueueItemDao.findById(item.getId());
|
||||
if(queueReadyToProcess(queueVO) && itemVO.getLastProcessNumber() == null) {
|
||||
if(queueReadyToProcess(queueVO) && itemVO != null && itemVO.getLastProcessNumber() == null) {
|
||||
Long processNumber = queueVO.getLastProcessNumber();
|
||||
if (processNumber == null)
|
||||
processNumber = new Long(1);
|
||||
|
|
@ -220,6 +220,7 @@ public class SyncQueueManagerImpl extends ManagerBase implements SyncQueueManage
|
|||
itemVO.setLastProcessTime(null);
|
||||
_syncQueueItemDao.update(queueItemId, itemVO);
|
||||
|
||||
queueVO.setQueueSize(queueVO.getQueueSize() - 1);
|
||||
queueVO.setLastUpdated(DateUtil.currentGMTTime());
|
||||
_syncQueueDao.update(queueVO.getId(), queueVO);
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue