diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java index 0bf870937c7..ee10813dc82 100644 --- a/server/src/com/cloud/async/AsyncJobManagerImpl.java +++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java @@ -449,6 +449,9 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe job.setFromPreviousSession(fromPreviousSession); job.setSyncSource(item); + + job.setCompleteMsid(getMsid()); + _jobDao.update(job.getId(), job); scheduleExecution(job); } else { if(s_logger.isDebugEnabled()) @@ -573,6 +576,7 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe if(contentType != null && contentType.equals("AsyncJob")) { Long jobId = item.getContentId(); if(jobId != null) { + s_logger.warn("Mark job as failed as its correspoding queue-item has been discarded. job id: " + jobId); completeAsyncJob(jobId, AsyncJobResult.STATUS_FAILED, 0, "Execution was cancelled because of server shutdown"); } } @@ -638,6 +642,8 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe txn.start(); List items = _queueMgr.getActiveQueueItems(msHost.getId(), true); cleanupPendingJobs(items); + _queueMgr.resetQueueProcess(msHost.getId()); + _jobDao.resetJobProcess(msHost.getId()); txn.commit(); } catch(Throwable e) { s_logger.warn("Unexpected exception ", e); @@ -653,6 +659,8 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe try { List l = _queueMgr.getActiveQueueItems(getMsid(), false); cleanupPendingJobs(l); + _queueMgr.resetQueueProcess(getMsid()); + _jobDao.resetJobProcess(getMsid()); } catch(Throwable e) { s_logger.error("Unexpected exception " + e.getMessage(), e); } diff --git a/server/src/com/cloud/async/SyncQueueManager.java b/server/src/com/cloud/async/SyncQueueManager.java index 9ad1e50c9b9..50181fde951 100644 --- a/server/src/com/cloud/async/SyncQueueManager.java +++ b/server/src/com/cloud/async/SyncQueueManager.java @@ -29,4 +29,5 @@ public interface SyncQueueManager extends Manager { public void purgeItem(long queueItemId); public List getActiveQueueItems(Long msid, boolean exclusive); + public void resetQueueProcess(long msid); } diff --git a/server/src/com/cloud/async/SyncQueueManagerImpl.java b/server/src/com/cloud/async/SyncQueueManagerImpl.java index 64205481c94..7142357db25 100644 --- a/server/src/com/cloud/async/SyncQueueManagerImpl.java +++ b/server/src/com/cloud/async/SyncQueueManagerImpl.java @@ -204,6 +204,11 @@ public class SyncQueueManagerImpl implements SyncQueueManager { return _syncQueueItemDao.getActiveQueueItems(msid, exclusive); } + @Override + public void resetQueueProcess(long msid) { + _syncQueueDao.resetQueueProcessing(msid); + } + @Override public boolean configure(String name, Map params) throws ConfigurationException { _name = name; diff --git a/server/src/com/cloud/async/dao/AsyncJobDao.java b/server/src/com/cloud/async/dao/AsyncJobDao.java index b3e27bc1fc1..69b171812f9 100644 --- a/server/src/com/cloud/async/dao/AsyncJobDao.java +++ b/server/src/com/cloud/async/dao/AsyncJobDao.java @@ -28,5 +28,6 @@ import com.cloud.utils.db.GenericDao; public interface AsyncJobDao extends GenericDao { AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId); List findInstancePendingAsyncJobs(AsyncJob.Type instanceType, long accountId); - List getExpiredJobs(Date cutTime, int limit); + List getExpiredJobs(Date cutTime, int limit); + void resetJobProcess(long msid); } diff --git a/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java b/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java index 54b2b4732e5..efa366ff06f 100644 --- a/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java +++ b/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java @@ -18,6 +18,8 @@ package com.cloud.async.dao; +import java.sql.PreparedStatement; +import java.sql.SQLException; import java.util.Date; import java.util.List; @@ -28,10 +30,12 @@ import org.apache.log4j.Logger; import com.cloud.async.AsyncJob; import com.cloud.async.AsyncJobResult; import com.cloud.async.AsyncJobVO; +import com.cloud.utils.db.DB; import com.cloud.utils.db.Filter; import com.cloud.utils.db.GenericDaoBase; import com.cloud.utils.db.SearchBuilder; import com.cloud.utils.db.SearchCriteria; +import com.cloud.utils.db.Transaction; @Local(value = { AsyncJobDao.class }) public class AsyncJobDaoImpl extends GenericDaoBase implements AsyncJobDao { @@ -98,4 +102,22 @@ public class AsyncJobDaoImpl extends GenericDaoBase implements Filter filter = new Filter(AsyncJobVO.class, "created", true, 0L, (long)limit); return listIncludingRemovedBy(sc, filter); } + + @DB + public void resetJobProcess(long msid) { + String sql = "UPDATE async_job SET job_status=2, job_result='job cancelled because of management server restart' where job_complete_msid=? OR (job_complete_msid IS NULL AND job_init_msid=?)"; + + Transaction txn = Transaction.currentTxn(); + PreparedStatement pstmt = null; + try { + pstmt = txn.prepareAutoCloseStatement(sql); + pstmt.setLong(1, msid); + pstmt.setLong(2, msid); + pstmt.execute(); + } catch (SQLException e) { + s_logger.warn("Unable to reset job status for management server " + msid, e); + } catch (Throwable e) { + s_logger.warn("Unable to reset job status for management server " + msid, e); + } + } } diff --git a/server/src/com/cloud/async/dao/SyncQueueDao.java b/server/src/com/cloud/async/dao/SyncQueueDao.java index a12f9043a25..9c0ee3205af 100644 --- a/server/src/com/cloud/async/dao/SyncQueueDao.java +++ b/server/src/com/cloud/async/dao/SyncQueueDao.java @@ -23,5 +23,6 @@ import com.cloud.utils.db.GenericDao; public interface SyncQueueDao extends GenericDao{ public void ensureQueue(String syncObjType, long syncObjId); - public SyncQueueVO find(String syncObjType, long syncObjId); + public SyncQueueVO find(String syncObjType, long syncObjId); + public void resetQueueProcessing(long msid); } diff --git a/server/src/com/cloud/async/dao/SyncQueueDaoImpl.java b/server/src/com/cloud/async/dao/SyncQueueDaoImpl.java index 26026906c5f..d58016301d9 100644 --- a/server/src/com/cloud/async/dao/SyncQueueDaoImpl.java +++ b/server/src/com/cloud/async/dao/SyncQueueDaoImpl.java @@ -29,6 +29,7 @@ import org.apache.log4j.Logger; import com.cloud.async.SyncQueueVO; import com.cloud.utils.DateUtil; +import com.cloud.utils.db.DB; import com.cloud.utils.db.GenericDaoBase; import com.cloud.utils.db.SearchBuilder; import com.cloud.utils.db.SearchCriteria; @@ -68,6 +69,23 @@ public class SyncQueueDaoImpl extends GenericDaoBase implemen sc.setParameters("syncObjId", syncObjId); return findOneBy(sc); } + + @Override @DB + public void resetQueueProcessing(long msid) { + String sql = "UPDATE sync_queue set queue_proc_msid=NULL, queue_proc_time=NULL where queue_proc_msid=?"; + + Transaction txn = Transaction.currentTxn(); + PreparedStatement pstmt = null; + try { + pstmt = txn.prepareAutoCloseStatement(sql); + pstmt.setLong(1, msid); + pstmt.execute(); + } catch (SQLException e) { + s_logger.warn("Unable to reset sync queue for management server " + msid, e); + } catch (Throwable e) { + s_logger.warn("Unable to reset sync queue for management server " + msid, e); + } + } protected SyncQueueDaoImpl() { super();