Bug 7598: add management server restart and peer-cleanup within cluster handling to async job processing

This commit is contained in:
Kelven Yang 2011-01-03 16:46:12 -08:00
parent c12526d4e0
commit 7c5895d7d5
7 changed files with 58 additions and 2 deletions

View File

@ -449,6 +449,9 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
job.setFromPreviousSession(fromPreviousSession);
job.setSyncSource(item);
job.setCompleteMsid(getMsid());
_jobDao.update(job.getId(), job);
scheduleExecution(job);
} else {
if(s_logger.isDebugEnabled())
@ -573,6 +576,7 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
if(contentType != null && contentType.equals("AsyncJob")) {
Long jobId = item.getContentId();
if(jobId != null) {
s_logger.warn("Mark job as failed as its correspoding queue-item has been discarded. job id: " + jobId);
completeAsyncJob(jobId, AsyncJobResult.STATUS_FAILED, 0, "Execution was cancelled because of server shutdown");
}
}
@ -638,6 +642,8 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
txn.start();
List<SyncQueueItemVO> items = _queueMgr.getActiveQueueItems(msHost.getId(), true);
cleanupPendingJobs(items);
_queueMgr.resetQueueProcess(msHost.getId());
_jobDao.resetJobProcess(msHost.getId());
txn.commit();
} catch(Throwable e) {
s_logger.warn("Unexpected exception ", e);
@ -653,6 +659,8 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
try {
List<SyncQueueItemVO> l = _queueMgr.getActiveQueueItems(getMsid(), false);
cleanupPendingJobs(l);
_queueMgr.resetQueueProcess(getMsid());
_jobDao.resetJobProcess(getMsid());
} catch(Throwable e) {
s_logger.error("Unexpected exception " + e.getMessage(), e);
}

View File

@ -29,4 +29,5 @@ public interface SyncQueueManager extends Manager {
public void purgeItem(long queueItemId);
public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive);
public void resetQueueProcess(long msid);
}

View File

@ -204,6 +204,11 @@ public class SyncQueueManagerImpl implements SyncQueueManager {
return _syncQueueItemDao.getActiveQueueItems(msid, exclusive);
}
@Override
public void resetQueueProcess(long msid) {
_syncQueueDao.resetQueueProcessing(msid);
}
@Override
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
_name = name;

View File

@ -28,5 +28,6 @@ import com.cloud.utils.db.GenericDao;
public interface AsyncJobDao extends GenericDao<AsyncJobVO, Long> {
AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId);
List<AsyncJobVO> findInstancePendingAsyncJobs(AsyncJob.Type instanceType, long accountId);
List<AsyncJobVO> getExpiredJobs(Date cutTime, int limit);
List<AsyncJobVO> getExpiredJobs(Date cutTime, int limit);
void resetJobProcess(long msid);
}

View File

@ -18,6 +18,8 @@
package com.cloud.async.dao;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Date;
import java.util.List;
@ -28,10 +30,12 @@ import org.apache.log4j.Logger;
import com.cloud.async.AsyncJob;
import com.cloud.async.AsyncJobResult;
import com.cloud.async.AsyncJobVO;
import com.cloud.utils.db.DB;
import com.cloud.utils.db.Filter;
import com.cloud.utils.db.GenericDaoBase;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.Transaction;
@Local(value = { AsyncJobDao.class })
public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements AsyncJobDao {
@ -98,4 +102,22 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements
Filter filter = new Filter(AsyncJobVO.class, "created", true, 0L, (long)limit);
return listIncludingRemovedBy(sc, filter);
}
@DB
public void resetJobProcess(long msid) {
String sql = "UPDATE async_job SET job_status=2, job_result='job cancelled because of management server restart' where job_complete_msid=? OR (job_complete_msid IS NULL AND job_init_msid=?)";
Transaction txn = Transaction.currentTxn();
PreparedStatement pstmt = null;
try {
pstmt = txn.prepareAutoCloseStatement(sql);
pstmt.setLong(1, msid);
pstmt.setLong(2, msid);
pstmt.execute();
} catch (SQLException e) {
s_logger.warn("Unable to reset job status for management server " + msid, e);
} catch (Throwable e) {
s_logger.warn("Unable to reset job status for management server " + msid, e);
}
}
}

View File

@ -23,5 +23,6 @@ import com.cloud.utils.db.GenericDao;
public interface SyncQueueDao extends GenericDao<SyncQueueVO, Long>{
public void ensureQueue(String syncObjType, long syncObjId);
public SyncQueueVO find(String syncObjType, long syncObjId);
public SyncQueueVO find(String syncObjType, long syncObjId);
public void resetQueueProcessing(long msid);
}

View File

@ -29,6 +29,7 @@ import org.apache.log4j.Logger;
import com.cloud.async.SyncQueueVO;
import com.cloud.utils.DateUtil;
import com.cloud.utils.db.DB;
import com.cloud.utils.db.GenericDaoBase;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
@ -68,6 +69,23 @@ public class SyncQueueDaoImpl extends GenericDaoBase<SyncQueueVO, Long> implemen
sc.setParameters("syncObjId", syncObjId);
return findOneBy(sc);
}
@Override @DB
public void resetQueueProcessing(long msid) {
String sql = "UPDATE sync_queue set queue_proc_msid=NULL, queue_proc_time=NULL where queue_proc_msid=?";
Transaction txn = Transaction.currentTxn();
PreparedStatement pstmt = null;
try {
pstmt = txn.prepareAutoCloseStatement(sql);
pstmt.setLong(1, msid);
pstmt.execute();
} catch (SQLException e) {
s_logger.warn("Unable to reset sync queue for management server " + msid, e);
} catch (Throwable e) {
s_logger.warn("Unable to reset sync queue for management server " + msid, e);
}
}
protected SyncQueueDaoImpl() {
super();