From a681a7efe54f84562090495ecfb09c35d110dc2c Mon Sep 17 00:00:00 2001 From: Kelven Yang Date: Thu, 9 May 2013 19:52:09 -0700 Subject: [PATCH] more job wakeup improvements --- client/tomcatconf/applicationContext.xml.in | 3 + .../com/cloud/async/AsyncJobJoinMapVO.java | 38 +++++++++++- .../com/cloud/api/ApiAsyncJobDispatcher.java | 2 +- .../com/cloud/async/AsyncJobConstants.java | 6 ++ .../com/cloud/async/AsyncJobDispatcher.java | 2 +- .../com/cloud/async/AsyncJobManagerImpl.java | 55 +++++++++++++++--- .../cloud/async/dao/AsyncJobJoinMapDao.java | 3 + .../async/dao/AsyncJobJoinMapDaoImpl.java | 58 ++++++++++++++++++- server/src/com/cloud/vm/VmWorkConstants.java | 1 + .../src/com/cloud/vm/VmWorkJobDispatcher.java | 4 +- .../cloud/vm/VmWorkJobWakeupDispatcher.java | 6 +- .../async/AsyncJobTestConfiguration.java | 1 - .../com/cloud/async/TestAsyncJobManager.java | 41 ++++++++++++- setup/db/db/schema-410to420.sql | 7 ++- 14 files changed, 206 insertions(+), 21 deletions(-) diff --git a/client/tomcatconf/applicationContext.xml.in b/client/tomcatconf/applicationContext.xml.in index fa8ca6f717a..2c6f0f2e24e 100644 --- a/client/tomcatconf/applicationContext.xml.in +++ b/client/tomcatconf/applicationContext.xml.in @@ -808,6 +808,9 @@ + + + diff --git a/core/src/com/cloud/async/AsyncJobJoinMapVO.java b/core/src/com/cloud/async/AsyncJobJoinMapVO.java index 34f852b2136..68bf5d12412 100644 --- a/core/src/com/cloud/async/AsyncJobJoinMapVO.java +++ b/core/src/com/cloud/async/AsyncJobJoinMapVO.java @@ -52,7 +52,7 @@ public class AsyncJobJoinMapVO { @Column(name="join_msid") private long joinMsid; - + @Column(name="complete_msid") private Long completeMsid; @@ -65,6 +65,9 @@ public class AsyncJobJoinMapVO { @Column(name="wakeup_dispatcher") private String wakeupDispatcher; + @Column(name="wakeup_interval") + private long wakeupInterval; + @Column(name=GenericDao.CREATED_COLUMN) private Date created; @@ -72,8 +75,17 @@ public class AsyncJobJoinMapVO { @Temporal(TemporalType.TIMESTAMP) private Date lastUpdated; + @Column(name="next_wakeup") + @Temporal(TemporalType.TIMESTAMP) + private Date nextWakeupTime; + + @Column(name="expiration") + @Temporal(TemporalType.TIMESTAMP) + private Date expiration; + public AsyncJobJoinMapVO() { created = DateUtil.currentGMTTime(); + lastUpdated = DateUtil.currentGMTTime(); } public Long getId() { @@ -171,4 +183,28 @@ public class AsyncJobJoinMapVO { public void setWakeupDispatcher(String wakeupDispatcher) { this.wakeupDispatcher = wakeupDispatcher; } + + public long getWakeupInterval() { + return wakeupInterval; + } + + public void setWakeupInterval(long wakeupInterval) { + this.wakeupInterval = wakeupInterval; + } + + public Date getNextWakeupTime() { + return nextWakeupTime; + } + + public void setNextWakeupTime(Date nextWakeupTime) { + this.nextWakeupTime = nextWakeupTime; + } + + public Date getExpiration() { + return expiration; + } + + public void setExpiration(Date expiration) { + this.expiration = expiration; + } } diff --git a/server/src/com/cloud/api/ApiAsyncJobDispatcher.java b/server/src/com/cloud/api/ApiAsyncJobDispatcher.java index c96457f3952..0984d25a662 100644 --- a/server/src/com/cloud/api/ApiAsyncJobDispatcher.java +++ b/server/src/com/cloud/api/ApiAsyncJobDispatcher.java @@ -51,7 +51,7 @@ public class ApiAsyncJobDispatcher extends AdapterBase implements AsyncJobDispat } @Override - public void RunJob(AsyncJob job) { + public void runJob(AsyncJob job) { BaseAsyncCmd cmdObj = null; try { Class cmdClass = Class.forName(job.getCmd()); diff --git a/server/src/com/cloud/async/AsyncJobConstants.java b/server/src/com/cloud/async/AsyncJobConstants.java index 0a0a2a77ab5..bbebad60471 100644 --- a/server/src/com/cloud/async/AsyncJobConstants.java +++ b/server/src/com/cloud/async/AsyncJobConstants.java @@ -23,4 +23,10 @@ public interface AsyncJobConstants { public static final String JOB_DISPATCHER_PSEUDO = "pseudoJobDispatcher"; public static final String PSEUDO_JOB_INSTANCE_TYPE = "Thread"; + + // Although we may have detailed masks for each individual wakeup event, i.e. + // periodical timer, matched topic from message bus, it seems that we don't + // need to distinguish them to such level. Therefore, only one wakeup signal + // is defined + public static final int SIGNAL_MASK_WAKEUP = 1; } diff --git a/server/src/com/cloud/async/AsyncJobDispatcher.java b/server/src/com/cloud/async/AsyncJobDispatcher.java index 44b860328d5..ed03d339415 100644 --- a/server/src/com/cloud/async/AsyncJobDispatcher.java +++ b/server/src/com/cloud/async/AsyncJobDispatcher.java @@ -19,5 +19,5 @@ package com.cloud.async; import com.cloud.utils.component.Adapter; public interface AsyncJobDispatcher extends Adapter { - void RunJob(AsyncJob job); + void runJob(AsyncJob job); } diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java index 3b16ffa181a..e495838c2cf 100644 --- a/server/src/com/cloud/async/AsyncJobManagerImpl.java +++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java @@ -315,7 +315,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, @Override @DB public void joinJob(long jobId, long joinJobId) { - _joinMapDao.joinJob(jobId, joinJobId, this.getMsid(), null, null, null); + _joinMapDao.joinJob(jobId, joinJobId, this.getMsid(), 0, 0, null, null, null); } @Override @DB @@ -329,7 +329,9 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, syncSourceId = context.getJob().getSyncSource().getQueueId(); } - _joinMapDao.joinJob(jobId, joinJobId, this.getMsid(), syncSourceId, wakeupHandler, wakeupDispatcher); + _joinMapDao.joinJob(jobId, joinJobId, this.getMsid(), + wakeupIntervalInMilliSeconds, timeoutInMilliSeconds, + syncSourceId, wakeupHandler, wakeupDispatcher); } @Override @DB @@ -476,6 +478,20 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, return null; } + private AsyncJobDispatcher getWakeupDispatcher(AsyncJob job) { + if(_jobDispatchers != null) { + List joinRecords = this._joinMapDao.listJoinRecords(job.getId()); + if(joinRecords.size() > 0) { + AsyncJobJoinMapVO joinRecord = joinRecords.get(0); + for(AsyncJobDispatcher dispatcher : _jobDispatchers) { + if(dispatcher.getName().equals(joinRecord.getWakeupDispatcher())) + return dispatcher; + } + } + } + return null; + } + private Runnable getExecutorRunnable(final AsyncJobManager mgr, final AsyncJob job) { return new Runnable() { @Override @@ -503,13 +519,22 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, if(s_logger.isDebugEnabled()) { s_logger.debug("Executing " + job.getCmd() + " for job-" + job.getId()); } - - AsyncJobDispatcher jobDispatcher = getDispatcher(job.getDispatcher()); - if(jobDispatcher != null) { - jobDispatcher.RunJob(job); + + if((getAndResetPendingSignals(job) & AsyncJobConstants.SIGNAL_MASK_WAKEUP) != 0) { + AsyncJobDispatcher jobDispatcher = getWakeupDispatcher(job); + if(jobDispatcher != null) { + jobDispatcher.runJob(job); + } else { + s_logger.error("Unable to find a wakeup dispatcher from the joined job. job-" + job.getId()); + } } else { - s_logger.error("Unable to find job dispatcher, job will be cancelled"); - completeAsyncJob(job.getId(), AsyncJobConstants.STATUS_FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), null); + AsyncJobDispatcher jobDispatcher = getDispatcher(job.getDispatcher()); + if(jobDispatcher != null) { + jobDispatcher.runJob(job); + } else { + s_logger.error("Unable to find job dispatcher, job will be cancelled"); + completeAsyncJob(job.getId(), AsyncJobConstants.STATUS_FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), null); + } } if (s_logger.isDebugEnabled()) { @@ -550,6 +575,16 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, }; } + private int getAndResetPendingSignals(AsyncJob job) { + int signals = job.getPendingSignals(); + if(signals != 0) { + AsyncJobVO jobRecord = _jobDao.findById(job.getId()); + jobRecord.setPendingSignals(0); + _jobDao.update(job.getId(), jobRecord); + } + return signals; + } + private void executeQueueItem(SyncQueueItemVO item, boolean fromPreviousSession) { AsyncJobVO job = _jobDao.findById(item.getContentId()); if (job != null) { @@ -653,6 +688,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, executeQueueItem(item, false); } } + + _joinMapDao.wakeupScan(); } catch(Throwable e) { s_logger.error("Unexpected exception when trying to execute queue item, ", e); } finally { @@ -662,7 +699,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, s_logger.error("Unexpected exception", e); } } - } + } }; } diff --git a/server/src/com/cloud/async/dao/AsyncJobJoinMapDao.java b/server/src/com/cloud/async/dao/AsyncJobJoinMapDao.java index cb81069d425..414b9ea22be 100644 --- a/server/src/com/cloud/async/dao/AsyncJobJoinMapDao.java +++ b/server/src/com/cloud/async/dao/AsyncJobJoinMapDao.java @@ -24,6 +24,7 @@ import com.cloud.utils.db.GenericDao; public interface AsyncJobJoinMapDao extends GenericDao { Long joinJob(long jobId, long joinJobId, long joinMsid, + long wakeupIntervalMs, long expirationMs, Long syncSourceId, String wakeupHandler, String wakeupDispatcher); void disjoinJob(long jobId, long joinedJobId); @@ -31,4 +32,6 @@ public interface AsyncJobJoinMapDao extends GenericDao List listJoinRecords(long jobId); void completeJoin(long joinJobId, int joinStatus, String joinResult, long completeMsid); + + void wakeupScan(); } diff --git a/server/src/com/cloud/async/dao/AsyncJobJoinMapDaoImpl.java b/server/src/com/cloud/async/dao/AsyncJobJoinMapDaoImpl.java index a7d3779bf03..2e0500efe33 100644 --- a/server/src/com/cloud/async/dao/AsyncJobJoinMapDaoImpl.java +++ b/server/src/com/cloud/async/dao/AsyncJobJoinMapDaoImpl.java @@ -16,7 +16,13 @@ // under the License. package com.cloud.async.dao; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Date; import java.util.List; +import java.util.TimeZone; + +import org.apache.log4j.Logger; import com.cloud.async.AsyncJobConstants; import com.cloud.async.AsyncJobJoinMapVO; @@ -24,14 +30,17 @@ import com.cloud.utils.DateUtil; import com.cloud.utils.db.GenericDaoBase; import com.cloud.utils.db.SearchBuilder; import com.cloud.utils.db.SearchCriteria; +import com.cloud.utils.db.Transaction; import com.cloud.utils.db.UpdateBuilder; import com.cloud.utils.db.SearchCriteria.Op; public class AsyncJobJoinMapDaoImpl extends GenericDaoBase implements AsyncJobJoinMapDao { + public static final Logger s_logger = Logger.getLogger(AsyncJobJoinMapDaoImpl.class); private final SearchBuilder RecordSearch; private final SearchBuilder RecordSearchByOwner; private final SearchBuilder CompleteJoinSearch; + private final SearchBuilder WakeupSearch; public AsyncJobJoinMapDaoImpl() { RecordSearch = createSearchBuilder(); @@ -46,9 +55,16 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase ? AND join_status = ?)"; + pstmt = txn.prepareStatement(sql); + pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); + pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); + pstmt.setInt(3, AsyncJobConstants.STATUS_IN_PROGRESS); + pstmt.executeUpdate(); + pstmt.close(); + + sql = "UPDATE async_job_join_map SET next_wakeup=next_wakeup + SEC_TO_TIME(wakeup_interval) WHERE next_wakeup < ? AND expiration > ? AND join_status = ?"; + pstmt = txn.prepareStatement(sql); + pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); + pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); + pstmt.setInt(3, AsyncJobConstants.STATUS_IN_PROGRESS); + pstmt.executeUpdate(); + pstmt.close(); + + txn.commit(); + } catch (SQLException e) { + s_logger.error("Unexpected exception", e); + } + } } diff --git a/server/src/com/cloud/vm/VmWorkConstants.java b/server/src/com/cloud/vm/VmWorkConstants.java index 44176ee74d8..eb6ddc19467 100644 --- a/server/src/com/cloud/vm/VmWorkConstants.java +++ b/server/src/com/cloud/vm/VmWorkConstants.java @@ -20,6 +20,7 @@ public interface VmWorkConstants { // VmWork queue name public static final String VM_WORK_QUEUE = "VmWorkJobQueue"; public static final String VM_WORK_JOB_DISPATCHER = "VmWorkJobDispatcher"; + public static final String VM_WORK_JOB_WAKEUP_DISPATCHER = "VmWorkJobWakeupDispatcher"; // work job commands public static final String VM_WORK_START = "vmWorkStart"; diff --git a/server/src/com/cloud/vm/VmWorkJobDispatcher.java b/server/src/com/cloud/vm/VmWorkJobDispatcher.java index 631d9f15339..740b81c20ff 100644 --- a/server/src/com/cloud/vm/VmWorkJobDispatcher.java +++ b/server/src/com/cloud/vm/VmWorkJobDispatcher.java @@ -44,9 +44,9 @@ public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatch @Inject private VMInstanceDao _instanceDao; private Map _handlerMap = new HashMap(); - + @Override - public void RunJob(AsyncJob job) { + public void runJob(AsyncJob job) { try { String cmd = job.getCmd(); assert(cmd != null); diff --git a/server/src/com/cloud/vm/VmWorkJobWakeupDispatcher.java b/server/src/com/cloud/vm/VmWorkJobWakeupDispatcher.java index 2e57c7948dd..7447df6fb30 100644 --- a/server/src/com/cloud/vm/VmWorkJobWakeupDispatcher.java +++ b/server/src/com/cloud/vm/VmWorkJobWakeupDispatcher.java @@ -47,15 +47,16 @@ public class VmWorkJobWakeupDispatcher extends AdapterBase implements AsyncJobDi @Inject private VirtualMachineManager _vmMgr; private Map _handlerMap = new HashMap(); - + @Override - public void RunJob(AsyncJob job) { + public void runJob(AsyncJob job) { try { List joinRecords =_joinMapDao.listJoinRecords(job.getId()); if(joinRecords.size() != 1) { s_logger.warn("Job-" + job.getId() + " received wakeup call with un-supported joining job number: " + joinRecords.size()); + // if we fail wakeup-execution for any reason, avoid release sync-source if there is any job.setSyncSource(null); return; } @@ -89,6 +90,7 @@ public class VmWorkJobWakeupDispatcher extends AdapterBase implements AsyncJobDi } catch(Throwable e) { s_logger.warn("Unexpected exception in waking up job-" + job.getId()); + // if we fail wakeup-execution for any reason, avoid release sync-source if there is any job.setSyncSource(null); } } diff --git a/server/test/com/cloud/async/AsyncJobTestConfiguration.java b/server/test/com/cloud/async/AsyncJobTestConfiguration.java index 12ee2f8c015..03dff84a03d 100644 --- a/server/test/com/cloud/async/AsyncJobTestConfiguration.java +++ b/server/test/com/cloud/async/AsyncJobTestConfiguration.java @@ -38,7 +38,6 @@ import com.cloud.dao.EntityManager; import com.cloud.user.AccountManager; import com.cloud.user.dao.AccountDao; import com.cloud.vm.VirtualMachineManager; -import com.cloud.vm.VirtualMachineManagerImpl; @Configuration public class AsyncJobTestConfiguration { diff --git a/server/test/com/cloud/async/TestAsyncJobManager.java b/server/test/com/cloud/async/TestAsyncJobManager.java index 9dcc26e6190..93b7972c0f5 100644 --- a/server/test/com/cloud/async/TestAsyncJobManager.java +++ b/server/test/com/cloud/async/TestAsyncJobManager.java @@ -18,6 +18,7 @@ package com.cloud.async; import java.sql.SQLException; import java.sql.Statement; +import java.util.Date; import java.util.List; import javax.inject.Inject; @@ -40,6 +41,8 @@ import com.cloud.async.AsyncJobManager; import com.cloud.async.AsyncJobMonitor; import com.cloud.async.dao.AsyncJobJoinMapDao; import com.cloud.async.dao.AsyncJobJournalDao; +import com.cloud.async.dao.SyncQueueDao; +import com.cloud.async.dao.SyncQueueItemDao; import com.cloud.cluster.ClusterManager; import com.cloud.user.AccountManager; import com.cloud.user.AccountVO; @@ -59,6 +62,8 @@ public class TestAsyncJobManager extends TestCase { @Inject AsyncJobJournalDao journalDao; @Inject AsyncJobJoinMapDao joinMapDao; @Inject AccountManager accountMgr; + @Inject SyncQueueDao syncQueueDao; + @Inject SyncQueueItemDao syncQueueItemDao; @Before public void setUp() { @@ -123,8 +128,8 @@ public class TestAsyncJobManager extends TestCase { @Test public void testJoinMapDao() { - joinMapDao.joinJob(2, 1, 100, null, null, null); - joinMapDao.joinJob(3, 1, 100, null, null, null); + joinMapDao.joinJob(2, 1, 100, 3000, 120000, null, "wakeupHandler", "wakeupDispatcher"); + joinMapDao.joinJob(3, 1, 100, 5000, 120000, null, "wakeupHandler", "wakeupDispatcher"); AsyncJobJoinMapVO record = joinMapDao.getJoinRecord(2, 1); Assert.assertTrue(record != null); @@ -151,6 +156,38 @@ public class TestAsyncJobManager extends TestCase { joinMapDao.disjoinJob(3, 1); } + @Test + public void testJoinWakeup() { + joinMapDao.joinJob(2, 1, 100, 3000, 120000, null, "wakeupHandler", "wakeupDispatcher"); + joinMapDao.joinJob(3, 1, 100, 5000, 120000, null, "wakeupHandler", "wakeupDispatcher"); + + SyncQueueVO queue = new SyncQueueVO(); + queue.setCreated(new Date()); + queue.setLastProcessNumber(1L); + queue.setLastUpdated(new Date()); + queue.setQueueSizeLimit(1); + queue.setSyncObjType("AsynJob"); + queue.setSyncObjId(1L); + syncQueueDao.persist(queue); + + SyncQueueItemVO queueItem = new SyncQueueItemVO(); + queueItem.setQueueId(queue.getId()); + queueItem.setContentId(2L); + queueItem.setContentType("AsyncJob"); + queueItem.setLastProcessMsid(1L); + queueItem.setLastProcessNumber(1L); + syncQueueItemDao.persist(queueItem); + Assert.assertTrue(queueItem.getId() != 0); + + joinMapDao.wakeupScan(); + + joinMapDao.disjoinJob(2, 1); + joinMapDao.disjoinJob(3, 1); + + syncQueueItemDao.expunge(queueItem.getId()); + syncQueueDao.expunge(queue.getId()); + } + @Test public void testPseudoJob() { AsyncJob job = asyncMgr.getPseudoJob(); diff --git a/setup/db/db/schema-410to420.sql b/setup/db/db/schema-410to420.sql index 45c9cb21b1b..16f93b2039d 100644 --- a/setup/db/db/schema-410to420.sql +++ b/setup/db/db/schema-410to420.sql @@ -455,14 +455,19 @@ CREATE TABLE `cloud`.`async_job_join_map` ( `sync_source_id` bigint COMMENT 'upper-level job sync source info before join', `wakeup_handler` varchar(64), `wakeup_dispatcher` varchar(64), + `wakeup_interval` bigint NOT NULL DEFAULT 3000 COMMENT 'wakeup interval in seconds', `created` datetime NOT NULL, `last_updated` datetime, + `next_wakeup` datetime, + `expiration` datetime, PRIMARY KEY (`id`), CONSTRAINT `fk_async_job_join_map__job_id` FOREIGN KEY (`job_id`) REFERENCES `async_job`(`id`) ON DELETE CASCADE, CONSTRAINT `fk_async_job_join_map__join_job_id` FOREIGN KEY (`join_job_id`) REFERENCES `async_job`(`id`), CONSTRAINT `fk_async_job_join_map__join` UNIQUE (`job_id`, `join_job_id`), INDEX `i_async_job_join_map__join_job_id`(`join_job_id`), INDEX `i_async_job_join_map__created`(`created`), - INDEX `i_async_job_join_map__last_updated`(`last_updated`) + INDEX `i_async_job_join_map__last_updated`(`last_updated`), + INDEX `i_async_job_join_map__next_wakeup`(`next_wakeup`), + INDEX `i_async_job_join_map__expiration`(`expiration`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;