diff --git a/client/tomcatconf/applicationContext.xml.in b/client/tomcatconf/applicationContext.xml.in
index fa8ca6f717a..2c6f0f2e24e 100644
--- a/client/tomcatconf/applicationContext.xml.in
+++ b/client/tomcatconf/applicationContext.xml.in
@@ -808,6 +808,9 @@
+
+
+
diff --git a/core/src/com/cloud/async/AsyncJobJoinMapVO.java b/core/src/com/cloud/async/AsyncJobJoinMapVO.java
index 34f852b2136..68bf5d12412 100644
--- a/core/src/com/cloud/async/AsyncJobJoinMapVO.java
+++ b/core/src/com/cloud/async/AsyncJobJoinMapVO.java
@@ -52,7 +52,7 @@ public class AsyncJobJoinMapVO {
@Column(name="join_msid")
private long joinMsid;
-
+
@Column(name="complete_msid")
private Long completeMsid;
@@ -65,6 +65,9 @@ public class AsyncJobJoinMapVO {
@Column(name="wakeup_dispatcher")
private String wakeupDispatcher;
+ @Column(name="wakeup_interval")
+ private long wakeupInterval;
+
@Column(name=GenericDao.CREATED_COLUMN)
private Date created;
@@ -72,8 +75,17 @@ public class AsyncJobJoinMapVO {
@Temporal(TemporalType.TIMESTAMP)
private Date lastUpdated;
+ @Column(name="next_wakeup")
+ @Temporal(TemporalType.TIMESTAMP)
+ private Date nextWakeupTime;
+
+ @Column(name="expiration")
+ @Temporal(TemporalType.TIMESTAMP)
+ private Date expiration;
+
public AsyncJobJoinMapVO() {
created = DateUtil.currentGMTTime();
+ lastUpdated = DateUtil.currentGMTTime();
}
public Long getId() {
@@ -171,4 +183,28 @@ public class AsyncJobJoinMapVO {
public void setWakeupDispatcher(String wakeupDispatcher) {
this.wakeupDispatcher = wakeupDispatcher;
}
+
+ public long getWakeupInterval() {
+ return wakeupInterval;
+ }
+
+ public void setWakeupInterval(long wakeupInterval) {
+ this.wakeupInterval = wakeupInterval;
+ }
+
+ public Date getNextWakeupTime() {
+ return nextWakeupTime;
+ }
+
+ public void setNextWakeupTime(Date nextWakeupTime) {
+ this.nextWakeupTime = nextWakeupTime;
+ }
+
+ public Date getExpiration() {
+ return expiration;
+ }
+
+ public void setExpiration(Date expiration) {
+ this.expiration = expiration;
+ }
}
diff --git a/server/src/com/cloud/api/ApiAsyncJobDispatcher.java b/server/src/com/cloud/api/ApiAsyncJobDispatcher.java
index c96457f3952..0984d25a662 100644
--- a/server/src/com/cloud/api/ApiAsyncJobDispatcher.java
+++ b/server/src/com/cloud/api/ApiAsyncJobDispatcher.java
@@ -51,7 +51,7 @@ public class ApiAsyncJobDispatcher extends AdapterBase implements AsyncJobDispat
}
@Override
- public void RunJob(AsyncJob job) {
+ public void runJob(AsyncJob job) {
BaseAsyncCmd cmdObj = null;
try {
Class> cmdClass = Class.forName(job.getCmd());
diff --git a/server/src/com/cloud/async/AsyncJobConstants.java b/server/src/com/cloud/async/AsyncJobConstants.java
index 0a0a2a77ab5..bbebad60471 100644
--- a/server/src/com/cloud/async/AsyncJobConstants.java
+++ b/server/src/com/cloud/async/AsyncJobConstants.java
@@ -23,4 +23,10 @@ public interface AsyncJobConstants {
public static final String JOB_DISPATCHER_PSEUDO = "pseudoJobDispatcher";
public static final String PSEUDO_JOB_INSTANCE_TYPE = "Thread";
+
+ // Although we may have detailed masks for each individual wakeup event, i.e.
+ // periodical timer, matched topic from message bus, it seems that we don't
+ // need to distinguish them to such level. Therefore, only one wakeup signal
+ // is defined
+ public static final int SIGNAL_MASK_WAKEUP = 1;
}
diff --git a/server/src/com/cloud/async/AsyncJobDispatcher.java b/server/src/com/cloud/async/AsyncJobDispatcher.java
index 44b860328d5..ed03d339415 100644
--- a/server/src/com/cloud/async/AsyncJobDispatcher.java
+++ b/server/src/com/cloud/async/AsyncJobDispatcher.java
@@ -19,5 +19,5 @@ package com.cloud.async;
import com.cloud.utils.component.Adapter;
public interface AsyncJobDispatcher extends Adapter {
- void RunJob(AsyncJob job);
+ void runJob(AsyncJob job);
}
diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java
index 3b16ffa181a..e495838c2cf 100644
--- a/server/src/com/cloud/async/AsyncJobManagerImpl.java
+++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java
@@ -315,7 +315,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
@Override @DB
public void joinJob(long jobId, long joinJobId) {
- _joinMapDao.joinJob(jobId, joinJobId, this.getMsid(), null, null, null);
+ _joinMapDao.joinJob(jobId, joinJobId, this.getMsid(), 0, 0, null, null, null);
}
@Override @DB
@@ -329,7 +329,9 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
syncSourceId = context.getJob().getSyncSource().getQueueId();
}
- _joinMapDao.joinJob(jobId, joinJobId, this.getMsid(), syncSourceId, wakeupHandler, wakeupDispatcher);
+ _joinMapDao.joinJob(jobId, joinJobId, this.getMsid(),
+ wakeupIntervalInMilliSeconds, timeoutInMilliSeconds,
+ syncSourceId, wakeupHandler, wakeupDispatcher);
}
@Override @DB
@@ -476,6 +478,20 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
return null;
}
+ private AsyncJobDispatcher getWakeupDispatcher(AsyncJob job) {
+ if(_jobDispatchers != null) {
+ List joinRecords = this._joinMapDao.listJoinRecords(job.getId());
+ if(joinRecords.size() > 0) {
+ AsyncJobJoinMapVO joinRecord = joinRecords.get(0);
+ for(AsyncJobDispatcher dispatcher : _jobDispatchers) {
+ if(dispatcher.getName().equals(joinRecord.getWakeupDispatcher()))
+ return dispatcher;
+ }
+ }
+ }
+ return null;
+ }
+
private Runnable getExecutorRunnable(final AsyncJobManager mgr, final AsyncJob job) {
return new Runnable() {
@Override
@@ -503,13 +519,22 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
if(s_logger.isDebugEnabled()) {
s_logger.debug("Executing " + job.getCmd() + " for job-" + job.getId());
}
-
- AsyncJobDispatcher jobDispatcher = getDispatcher(job.getDispatcher());
- if(jobDispatcher != null) {
- jobDispatcher.RunJob(job);
+
+ if((getAndResetPendingSignals(job) & AsyncJobConstants.SIGNAL_MASK_WAKEUP) != 0) {
+ AsyncJobDispatcher jobDispatcher = getWakeupDispatcher(job);
+ if(jobDispatcher != null) {
+ jobDispatcher.runJob(job);
+ } else {
+ s_logger.error("Unable to find a wakeup dispatcher from the joined job. job-" + job.getId());
+ }
} else {
- s_logger.error("Unable to find job dispatcher, job will be cancelled");
- completeAsyncJob(job.getId(), AsyncJobConstants.STATUS_FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), null);
+ AsyncJobDispatcher jobDispatcher = getDispatcher(job.getDispatcher());
+ if(jobDispatcher != null) {
+ jobDispatcher.runJob(job);
+ } else {
+ s_logger.error("Unable to find job dispatcher, job will be cancelled");
+ completeAsyncJob(job.getId(), AsyncJobConstants.STATUS_FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), null);
+ }
}
if (s_logger.isDebugEnabled()) {
@@ -550,6 +575,16 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
};
}
+ private int getAndResetPendingSignals(AsyncJob job) {
+ int signals = job.getPendingSignals();
+ if(signals != 0) {
+ AsyncJobVO jobRecord = _jobDao.findById(job.getId());
+ jobRecord.setPendingSignals(0);
+ _jobDao.update(job.getId(), jobRecord);
+ }
+ return signals;
+ }
+
private void executeQueueItem(SyncQueueItemVO item, boolean fromPreviousSession) {
AsyncJobVO job = _jobDao.findById(item.getContentId());
if (job != null) {
@@ -653,6 +688,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
executeQueueItem(item, false);
}
}
+
+ _joinMapDao.wakeupScan();
} catch(Throwable e) {
s_logger.error("Unexpected exception when trying to execute queue item, ", e);
} finally {
@@ -662,7 +699,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
s_logger.error("Unexpected exception", e);
}
}
- }
+ }
};
}
diff --git a/server/src/com/cloud/async/dao/AsyncJobJoinMapDao.java b/server/src/com/cloud/async/dao/AsyncJobJoinMapDao.java
index cb81069d425..414b9ea22be 100644
--- a/server/src/com/cloud/async/dao/AsyncJobJoinMapDao.java
+++ b/server/src/com/cloud/async/dao/AsyncJobJoinMapDao.java
@@ -24,6 +24,7 @@ import com.cloud.utils.db.GenericDao;
public interface AsyncJobJoinMapDao extends GenericDao {
Long joinJob(long jobId, long joinJobId, long joinMsid,
+ long wakeupIntervalMs, long expirationMs,
Long syncSourceId, String wakeupHandler, String wakeupDispatcher);
void disjoinJob(long jobId, long joinedJobId);
@@ -31,4 +32,6 @@ public interface AsyncJobJoinMapDao extends GenericDao
List listJoinRecords(long jobId);
void completeJoin(long joinJobId, int joinStatus, String joinResult, long completeMsid);
+
+ void wakeupScan();
}
diff --git a/server/src/com/cloud/async/dao/AsyncJobJoinMapDaoImpl.java b/server/src/com/cloud/async/dao/AsyncJobJoinMapDaoImpl.java
index a7d3779bf03..2e0500efe33 100644
--- a/server/src/com/cloud/async/dao/AsyncJobJoinMapDaoImpl.java
+++ b/server/src/com/cloud/async/dao/AsyncJobJoinMapDaoImpl.java
@@ -16,7 +16,13 @@
// under the License.
package com.cloud.async.dao;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Date;
import java.util.List;
+import java.util.TimeZone;
+
+import org.apache.log4j.Logger;
import com.cloud.async.AsyncJobConstants;
import com.cloud.async.AsyncJobJoinMapVO;
@@ -24,14 +30,17 @@ import com.cloud.utils.DateUtil;
import com.cloud.utils.db.GenericDaoBase;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
+import com.cloud.utils.db.Transaction;
import com.cloud.utils.db.UpdateBuilder;
import com.cloud.utils.db.SearchCriteria.Op;
public class AsyncJobJoinMapDaoImpl extends GenericDaoBase implements AsyncJobJoinMapDao {
+ public static final Logger s_logger = Logger.getLogger(AsyncJobJoinMapDaoImpl.class);
private final SearchBuilder RecordSearch;
private final SearchBuilder RecordSearchByOwner;
private final SearchBuilder CompleteJoinSearch;
+ private final SearchBuilder WakeupSearch;
public AsyncJobJoinMapDaoImpl() {
RecordSearch = createSearchBuilder();
@@ -46,9 +55,16 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase ? AND join_status = ?)";
+ pstmt = txn.prepareStatement(sql);
+ pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+ pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+ pstmt.setInt(3, AsyncJobConstants.STATUS_IN_PROGRESS);
+ pstmt.executeUpdate();
+ pstmt.close();
+
+ sql = "UPDATE async_job_join_map SET next_wakeup=next_wakeup + SEC_TO_TIME(wakeup_interval) WHERE next_wakeup < ? AND expiration > ? AND join_status = ?";
+ pstmt = txn.prepareStatement(sql);
+ pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+ pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+ pstmt.setInt(3, AsyncJobConstants.STATUS_IN_PROGRESS);
+ pstmt.executeUpdate();
+ pstmt.close();
+
+ txn.commit();
+ } catch (SQLException e) {
+ s_logger.error("Unexpected exception", e);
+ }
+ }
}
diff --git a/server/src/com/cloud/vm/VmWorkConstants.java b/server/src/com/cloud/vm/VmWorkConstants.java
index 44176ee74d8..eb6ddc19467 100644
--- a/server/src/com/cloud/vm/VmWorkConstants.java
+++ b/server/src/com/cloud/vm/VmWorkConstants.java
@@ -20,6 +20,7 @@ public interface VmWorkConstants {
// VmWork queue name
public static final String VM_WORK_QUEUE = "VmWorkJobQueue";
public static final String VM_WORK_JOB_DISPATCHER = "VmWorkJobDispatcher";
+ public static final String VM_WORK_JOB_WAKEUP_DISPATCHER = "VmWorkJobWakeupDispatcher";
// work job commands
public static final String VM_WORK_START = "vmWorkStart";
diff --git a/server/src/com/cloud/vm/VmWorkJobDispatcher.java b/server/src/com/cloud/vm/VmWorkJobDispatcher.java
index 631d9f15339..740b81c20ff 100644
--- a/server/src/com/cloud/vm/VmWorkJobDispatcher.java
+++ b/server/src/com/cloud/vm/VmWorkJobDispatcher.java
@@ -44,9 +44,9 @@ public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatch
@Inject private VMInstanceDao _instanceDao;
private Map _handlerMap = new HashMap();
-
+
@Override
- public void RunJob(AsyncJob job) {
+ public void runJob(AsyncJob job) {
try {
String cmd = job.getCmd();
assert(cmd != null);
diff --git a/server/src/com/cloud/vm/VmWorkJobWakeupDispatcher.java b/server/src/com/cloud/vm/VmWorkJobWakeupDispatcher.java
index 2e57c7948dd..7447df6fb30 100644
--- a/server/src/com/cloud/vm/VmWorkJobWakeupDispatcher.java
+++ b/server/src/com/cloud/vm/VmWorkJobWakeupDispatcher.java
@@ -47,15 +47,16 @@ public class VmWorkJobWakeupDispatcher extends AdapterBase implements AsyncJobDi
@Inject private VirtualMachineManager _vmMgr;
private Map _handlerMap = new HashMap();
-
+
@Override
- public void RunJob(AsyncJob job) {
+ public void runJob(AsyncJob job) {
try {
List joinRecords =_joinMapDao.listJoinRecords(job.getId());
if(joinRecords.size() != 1) {
s_logger.warn("Job-" + job.getId()
+ " received wakeup call with un-supported joining job number: " + joinRecords.size());
+ // if we fail wakeup-execution for any reason, avoid release sync-source if there is any
job.setSyncSource(null);
return;
}
@@ -89,6 +90,7 @@ public class VmWorkJobWakeupDispatcher extends AdapterBase implements AsyncJobDi
} catch(Throwable e) {
s_logger.warn("Unexpected exception in waking up job-" + job.getId());
+ // if we fail wakeup-execution for any reason, avoid release sync-source if there is any
job.setSyncSource(null);
}
}
diff --git a/server/test/com/cloud/async/AsyncJobTestConfiguration.java b/server/test/com/cloud/async/AsyncJobTestConfiguration.java
index 12ee2f8c015..03dff84a03d 100644
--- a/server/test/com/cloud/async/AsyncJobTestConfiguration.java
+++ b/server/test/com/cloud/async/AsyncJobTestConfiguration.java
@@ -38,7 +38,6 @@ import com.cloud.dao.EntityManager;
import com.cloud.user.AccountManager;
import com.cloud.user.dao.AccountDao;
import com.cloud.vm.VirtualMachineManager;
-import com.cloud.vm.VirtualMachineManagerImpl;
@Configuration
public class AsyncJobTestConfiguration {
diff --git a/server/test/com/cloud/async/TestAsyncJobManager.java b/server/test/com/cloud/async/TestAsyncJobManager.java
index 9dcc26e6190..93b7972c0f5 100644
--- a/server/test/com/cloud/async/TestAsyncJobManager.java
+++ b/server/test/com/cloud/async/TestAsyncJobManager.java
@@ -18,6 +18,7 @@ package com.cloud.async;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Date;
import java.util.List;
import javax.inject.Inject;
@@ -40,6 +41,8 @@ import com.cloud.async.AsyncJobManager;
import com.cloud.async.AsyncJobMonitor;
import com.cloud.async.dao.AsyncJobJoinMapDao;
import com.cloud.async.dao.AsyncJobJournalDao;
+import com.cloud.async.dao.SyncQueueDao;
+import com.cloud.async.dao.SyncQueueItemDao;
import com.cloud.cluster.ClusterManager;
import com.cloud.user.AccountManager;
import com.cloud.user.AccountVO;
@@ -59,6 +62,8 @@ public class TestAsyncJobManager extends TestCase {
@Inject AsyncJobJournalDao journalDao;
@Inject AsyncJobJoinMapDao joinMapDao;
@Inject AccountManager accountMgr;
+ @Inject SyncQueueDao syncQueueDao;
+ @Inject SyncQueueItemDao syncQueueItemDao;
@Before
public void setUp() {
@@ -123,8 +128,8 @@ public class TestAsyncJobManager extends TestCase {
@Test
public void testJoinMapDao() {
- joinMapDao.joinJob(2, 1, 100, null, null, null);
- joinMapDao.joinJob(3, 1, 100, null, null, null);
+ joinMapDao.joinJob(2, 1, 100, 3000, 120000, null, "wakeupHandler", "wakeupDispatcher");
+ joinMapDao.joinJob(3, 1, 100, 5000, 120000, null, "wakeupHandler", "wakeupDispatcher");
AsyncJobJoinMapVO record = joinMapDao.getJoinRecord(2, 1);
Assert.assertTrue(record != null);
@@ -151,6 +156,38 @@ public class TestAsyncJobManager extends TestCase {
joinMapDao.disjoinJob(3, 1);
}
+ @Test
+ public void testJoinWakeup() {
+ joinMapDao.joinJob(2, 1, 100, 3000, 120000, null, "wakeupHandler", "wakeupDispatcher");
+ joinMapDao.joinJob(3, 1, 100, 5000, 120000, null, "wakeupHandler", "wakeupDispatcher");
+
+ SyncQueueVO queue = new SyncQueueVO();
+ queue.setCreated(new Date());
+ queue.setLastProcessNumber(1L);
+ queue.setLastUpdated(new Date());
+ queue.setQueueSizeLimit(1);
+ queue.setSyncObjType("AsynJob");
+ queue.setSyncObjId(1L);
+ syncQueueDao.persist(queue);
+
+ SyncQueueItemVO queueItem = new SyncQueueItemVO();
+ queueItem.setQueueId(queue.getId());
+ queueItem.setContentId(2L);
+ queueItem.setContentType("AsyncJob");
+ queueItem.setLastProcessMsid(1L);
+ queueItem.setLastProcessNumber(1L);
+ syncQueueItemDao.persist(queueItem);
+ Assert.assertTrue(queueItem.getId() != 0);
+
+ joinMapDao.wakeupScan();
+
+ joinMapDao.disjoinJob(2, 1);
+ joinMapDao.disjoinJob(3, 1);
+
+ syncQueueItemDao.expunge(queueItem.getId());
+ syncQueueDao.expunge(queue.getId());
+ }
+
@Test
public void testPseudoJob() {
AsyncJob job = asyncMgr.getPseudoJob();
diff --git a/setup/db/db/schema-410to420.sql b/setup/db/db/schema-410to420.sql
index 45c9cb21b1b..16f93b2039d 100644
--- a/setup/db/db/schema-410to420.sql
+++ b/setup/db/db/schema-410to420.sql
@@ -455,14 +455,19 @@ CREATE TABLE `cloud`.`async_job_join_map` (
`sync_source_id` bigint COMMENT 'upper-level job sync source info before join',
`wakeup_handler` varchar(64),
`wakeup_dispatcher` varchar(64),
+ `wakeup_interval` bigint NOT NULL DEFAULT 3000 COMMENT 'wakeup interval in seconds',
`created` datetime NOT NULL,
`last_updated` datetime,
+ `next_wakeup` datetime,
+ `expiration` datetime,
PRIMARY KEY (`id`),
CONSTRAINT `fk_async_job_join_map__job_id` FOREIGN KEY (`job_id`) REFERENCES `async_job`(`id`) ON DELETE CASCADE,
CONSTRAINT `fk_async_job_join_map__join_job_id` FOREIGN KEY (`join_job_id`) REFERENCES `async_job`(`id`),
CONSTRAINT `fk_async_job_join_map__join` UNIQUE (`job_id`, `join_job_id`),
INDEX `i_async_job_join_map__join_job_id`(`join_job_id`),
INDEX `i_async_job_join_map__created`(`created`),
- INDEX `i_async_job_join_map__last_updated`(`last_updated`)
+ INDEX `i_async_job_join_map__last_updated`(`last_updated`),
+ INDEX `i_async_job_join_map__next_wakeup`(`next_wakeup`),
+ INDEX `i_async_job_join_map__expiration`(`expiration`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;