more job wakeup improvements

This commit is contained in:
Kelven Yang 2013-05-09 19:52:09 -07:00
parent d3766e33c3
commit a681a7efe5
14 changed files with 206 additions and 21 deletions

View File

@ -808,6 +808,9 @@
<bean id="VmWorkJobDispatcher" class="com.cloud.vm.VmWorkJobDispatcher">
<property name="name" value="VmWorkJobDispatcher" />
</bean>
<bean id="VmWorkJobWakeupDispatcher" class="com.cloud.vm.VmWorkJobWakeupDispatcher">
<property name="name" value="VmWorkJobWakeupDispatcher" />
</bean>
<bean id="vmWorkJobDaoImpl" class="com.cloud.vm.VmWorkJobDaoImpl" />

View File

@ -52,7 +52,7 @@ public class AsyncJobJoinMapVO {
@Column(name="join_msid")
private long joinMsid;
@Column(name="complete_msid")
private Long completeMsid;
@ -65,6 +65,9 @@ public class AsyncJobJoinMapVO {
@Column(name="wakeup_dispatcher")
private String wakeupDispatcher;
@Column(name="wakeup_interval")
private long wakeupInterval;
@Column(name=GenericDao.CREATED_COLUMN)
private Date created;
@ -72,8 +75,17 @@ public class AsyncJobJoinMapVO {
@Temporal(TemporalType.TIMESTAMP)
private Date lastUpdated;
@Column(name="next_wakeup")
@Temporal(TemporalType.TIMESTAMP)
private Date nextWakeupTime;
@Column(name="expiration")
@Temporal(TemporalType.TIMESTAMP)
private Date expiration;
public AsyncJobJoinMapVO() {
created = DateUtil.currentGMTTime();
lastUpdated = DateUtil.currentGMTTime();
}
public Long getId() {
@ -171,4 +183,28 @@ public class AsyncJobJoinMapVO {
public void setWakeupDispatcher(String wakeupDispatcher) {
this.wakeupDispatcher = wakeupDispatcher;
}
public long getWakeupInterval() {
return wakeupInterval;
}
public void setWakeupInterval(long wakeupInterval) {
this.wakeupInterval = wakeupInterval;
}
public Date getNextWakeupTime() {
return nextWakeupTime;
}
public void setNextWakeupTime(Date nextWakeupTime) {
this.nextWakeupTime = nextWakeupTime;
}
public Date getExpiration() {
return expiration;
}
public void setExpiration(Date expiration) {
this.expiration = expiration;
}
}

View File

@ -51,7 +51,7 @@ public class ApiAsyncJobDispatcher extends AdapterBase implements AsyncJobDispat
}
@Override
public void RunJob(AsyncJob job) {
public void runJob(AsyncJob job) {
BaseAsyncCmd cmdObj = null;
try {
Class<?> cmdClass = Class.forName(job.getCmd());

View File

@ -23,4 +23,10 @@ public interface AsyncJobConstants {
public static final String JOB_DISPATCHER_PSEUDO = "pseudoJobDispatcher";
public static final String PSEUDO_JOB_INSTANCE_TYPE = "Thread";
// Although we may have detailed masks for each individual wakeup event, i.e.
// periodical timer, matched topic from message bus, it seems that we don't
// need to distinguish them to such level. Therefore, only one wakeup signal
// is defined
public static final int SIGNAL_MASK_WAKEUP = 1;
}

View File

@ -19,5 +19,5 @@ package com.cloud.async;
import com.cloud.utils.component.Adapter;
public interface AsyncJobDispatcher extends Adapter {
void RunJob(AsyncJob job);
void runJob(AsyncJob job);
}

View File

@ -315,7 +315,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
@Override @DB
public void joinJob(long jobId, long joinJobId) {
_joinMapDao.joinJob(jobId, joinJobId, this.getMsid(), null, null, null);
_joinMapDao.joinJob(jobId, joinJobId, this.getMsid(), 0, 0, null, null, null);
}
@Override @DB
@ -329,7 +329,9 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
syncSourceId = context.getJob().getSyncSource().getQueueId();
}
_joinMapDao.joinJob(jobId, joinJobId, this.getMsid(), syncSourceId, wakeupHandler, wakeupDispatcher);
_joinMapDao.joinJob(jobId, joinJobId, this.getMsid(),
wakeupIntervalInMilliSeconds, timeoutInMilliSeconds,
syncSourceId, wakeupHandler, wakeupDispatcher);
}
@Override @DB
@ -476,6 +478,20 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
return null;
}
private AsyncJobDispatcher getWakeupDispatcher(AsyncJob job) {
if(_jobDispatchers != null) {
List<AsyncJobJoinMapVO> joinRecords = this._joinMapDao.listJoinRecords(job.getId());
if(joinRecords.size() > 0) {
AsyncJobJoinMapVO joinRecord = joinRecords.get(0);
for(AsyncJobDispatcher dispatcher : _jobDispatchers) {
if(dispatcher.getName().equals(joinRecord.getWakeupDispatcher()))
return dispatcher;
}
}
}
return null;
}
private Runnable getExecutorRunnable(final AsyncJobManager mgr, final AsyncJob job) {
return new Runnable() {
@Override
@ -503,13 +519,22 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
if(s_logger.isDebugEnabled()) {
s_logger.debug("Executing " + job.getCmd() + " for job-" + job.getId());
}
AsyncJobDispatcher jobDispatcher = getDispatcher(job.getDispatcher());
if(jobDispatcher != null) {
jobDispatcher.RunJob(job);
if((getAndResetPendingSignals(job) & AsyncJobConstants.SIGNAL_MASK_WAKEUP) != 0) {
AsyncJobDispatcher jobDispatcher = getWakeupDispatcher(job);
if(jobDispatcher != null) {
jobDispatcher.runJob(job);
} else {
s_logger.error("Unable to find a wakeup dispatcher from the joined job. job-" + job.getId());
}
} else {
s_logger.error("Unable to find job dispatcher, job will be cancelled");
completeAsyncJob(job.getId(), AsyncJobConstants.STATUS_FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), null);
AsyncJobDispatcher jobDispatcher = getDispatcher(job.getDispatcher());
if(jobDispatcher != null) {
jobDispatcher.runJob(job);
} else {
s_logger.error("Unable to find job dispatcher, job will be cancelled");
completeAsyncJob(job.getId(), AsyncJobConstants.STATUS_FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), null);
}
}
if (s_logger.isDebugEnabled()) {
@ -550,6 +575,16 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
};
}
private int getAndResetPendingSignals(AsyncJob job) {
int signals = job.getPendingSignals();
if(signals != 0) {
AsyncJobVO jobRecord = _jobDao.findById(job.getId());
jobRecord.setPendingSignals(0);
_jobDao.update(job.getId(), jobRecord);
}
return signals;
}
private void executeQueueItem(SyncQueueItemVO item, boolean fromPreviousSession) {
AsyncJobVO job = _jobDao.findById(item.getContentId());
if (job != null) {
@ -653,6 +688,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
executeQueueItem(item, false);
}
}
_joinMapDao.wakeupScan();
} catch(Throwable e) {
s_logger.error("Unexpected exception when trying to execute queue item, ", e);
} finally {
@ -662,7 +699,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
s_logger.error("Unexpected exception", e);
}
}
}
}
};
}

View File

@ -24,6 +24,7 @@ import com.cloud.utils.db.GenericDao;
public interface AsyncJobJoinMapDao extends GenericDao<AsyncJobJoinMapVO, Long> {
Long joinJob(long jobId, long joinJobId, long joinMsid,
long wakeupIntervalMs, long expirationMs,
Long syncSourceId, String wakeupHandler, String wakeupDispatcher);
void disjoinJob(long jobId, long joinedJobId);
@ -31,4 +32,6 @@ public interface AsyncJobJoinMapDao extends GenericDao<AsyncJobJoinMapVO, Long>
List<AsyncJobJoinMapVO> listJoinRecords(long jobId);
void completeJoin(long joinJobId, int joinStatus, String joinResult, long completeMsid);
void wakeupScan();
}

View File

@ -16,7 +16,13 @@
// under the License.
package com.cloud.async.dao;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Date;
import java.util.List;
import java.util.TimeZone;
import org.apache.log4j.Logger;
import com.cloud.async.AsyncJobConstants;
import com.cloud.async.AsyncJobJoinMapVO;
@ -24,14 +30,17 @@ import com.cloud.utils.DateUtil;
import com.cloud.utils.db.GenericDaoBase;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.Transaction;
import com.cloud.utils.db.UpdateBuilder;
import com.cloud.utils.db.SearchCriteria.Op;
public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Long> implements AsyncJobJoinMapDao {
public static final Logger s_logger = Logger.getLogger(AsyncJobJoinMapDaoImpl.class);
private final SearchBuilder<AsyncJobJoinMapVO> RecordSearch;
private final SearchBuilder<AsyncJobJoinMapVO> RecordSearchByOwner;
private final SearchBuilder<AsyncJobJoinMapVO> CompleteJoinSearch;
private final SearchBuilder<AsyncJobJoinMapVO> WakeupSearch;
public AsyncJobJoinMapDaoImpl() {
RecordSearch = createSearchBuilder();
@ -46,9 +55,16 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
CompleteJoinSearch = createSearchBuilder();
CompleteJoinSearch.and("joinJobId", CompleteJoinSearch.entity().getJoinJobId(), Op.EQ);
CompleteJoinSearch.done();
WakeupSearch = createSearchBuilder();
WakeupSearch.and("nextWakeupTime", WakeupSearch.entity().getNextWakeupTime(), Op.LT);
WakeupSearch.and("expiration", WakeupSearch.entity().getExpiration(), Op.GT);
WakeupSearch.and("joinStatus", WakeupSearch.entity().getJoinStatus(), Op.EQ);
WakeupSearch.done();
}
public Long joinJob(long jobId, long joinJobId, long joinMsid,
long wakeupIntervalMs, long expirationMs,
Long syncSourceId, String wakeupHandler, String wakeupDispatcher) {
AsyncJobJoinMapVO record = new AsyncJobJoinMapVO();
@ -57,8 +73,13 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
record.setJoinMsid(joinMsid);
record.setJoinStatus(AsyncJobConstants.STATUS_IN_PROGRESS);
record.setSyncSourceId(syncSourceId);
record.setWakeupInterval(wakeupIntervalMs / 1000); // convert millisecond to second
record.setWakeupHandler(wakeupHandler);
record.setWakeupHandler(wakeupHandler);
record.setWakeupDispatcher(wakeupDispatcher);
if(wakeupHandler != null) {
record.setNextWakeupTime(new Date(DateUtil.currentGMTTime().getTime() + wakeupIntervalMs));
record.setExpiration(new Date(DateUtil.currentGMTTime().getTime() + expirationMs));
}
this.persist(record);
return record.getId();
@ -106,4 +127,39 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
sc.setParameters("joinJobId", joinJobId);
update(ub, sc, null);
}
public void wakeupScan() {
Date cutDate = DateUtil.currentGMTTime();
Transaction txn = Transaction.currentTxn();
PreparedStatement pstmt = null;
try {
txn.start();
//
// performance sensitive processing, do it in plain SQL
//
String sql = "UPDATE sync_queue_item SET queue_proc_msid=NULL, queue_proc_number=NULL WHERE content_id IN " +
"(SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ? AND join_status = ?)";
pstmt = txn.prepareStatement(sql);
pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
pstmt.setInt(3, AsyncJobConstants.STATUS_IN_PROGRESS);
pstmt.executeUpdate();
pstmt.close();
sql = "UPDATE async_job_join_map SET next_wakeup=next_wakeup + SEC_TO_TIME(wakeup_interval) WHERE next_wakeup < ? AND expiration > ? AND join_status = ?";
pstmt = txn.prepareStatement(sql);
pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
pstmt.setInt(3, AsyncJobConstants.STATUS_IN_PROGRESS);
pstmt.executeUpdate();
pstmt.close();
txn.commit();
} catch (SQLException e) {
s_logger.error("Unexpected exception", e);
}
}
}

View File

@ -20,6 +20,7 @@ public interface VmWorkConstants {
// VmWork queue name
public static final String VM_WORK_QUEUE = "VmWorkJobQueue";
public static final String VM_WORK_JOB_DISPATCHER = "VmWorkJobDispatcher";
public static final String VM_WORK_JOB_WAKEUP_DISPATCHER = "VmWorkJobWakeupDispatcher";
// work job commands
public static final String VM_WORK_START = "vmWorkStart";

View File

@ -44,9 +44,9 @@ public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatch
@Inject private VMInstanceDao _instanceDao;
private Map<String, Method> _handlerMap = new HashMap<String, Method>();
@Override
public void RunJob(AsyncJob job) {
public void runJob(AsyncJob job) {
try {
String cmd = job.getCmd();
assert(cmd != null);

View File

@ -47,15 +47,16 @@ public class VmWorkJobWakeupDispatcher extends AdapterBase implements AsyncJobDi
@Inject private VirtualMachineManager _vmMgr;
private Map<String, Method> _handlerMap = new HashMap<String, Method>();
@Override
public void RunJob(AsyncJob job) {
public void runJob(AsyncJob job) {
try {
List<AsyncJobJoinMapVO> joinRecords =_joinMapDao.listJoinRecords(job.getId());
if(joinRecords.size() != 1) {
s_logger.warn("Job-" + job.getId()
+ " received wakeup call with un-supported joining job number: " + joinRecords.size());
// if we fail wakeup-execution for any reason, avoid release sync-source if there is any
job.setSyncSource(null);
return;
}
@ -89,6 +90,7 @@ public class VmWorkJobWakeupDispatcher extends AdapterBase implements AsyncJobDi
} catch(Throwable e) {
s_logger.warn("Unexpected exception in waking up job-" + job.getId());
// if we fail wakeup-execution for any reason, avoid release sync-source if there is any
job.setSyncSource(null);
}
}

View File

@ -38,7 +38,6 @@ import com.cloud.dao.EntityManager;
import com.cloud.user.AccountManager;
import com.cloud.user.dao.AccountDao;
import com.cloud.vm.VirtualMachineManager;
import com.cloud.vm.VirtualMachineManagerImpl;
@Configuration
public class AsyncJobTestConfiguration {

View File

@ -18,6 +18,7 @@ package com.cloud.async;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Date;
import java.util.List;
import javax.inject.Inject;
@ -40,6 +41,8 @@ import com.cloud.async.AsyncJobManager;
import com.cloud.async.AsyncJobMonitor;
import com.cloud.async.dao.AsyncJobJoinMapDao;
import com.cloud.async.dao.AsyncJobJournalDao;
import com.cloud.async.dao.SyncQueueDao;
import com.cloud.async.dao.SyncQueueItemDao;
import com.cloud.cluster.ClusterManager;
import com.cloud.user.AccountManager;
import com.cloud.user.AccountVO;
@ -59,6 +62,8 @@ public class TestAsyncJobManager extends TestCase {
@Inject AsyncJobJournalDao journalDao;
@Inject AsyncJobJoinMapDao joinMapDao;
@Inject AccountManager accountMgr;
@Inject SyncQueueDao syncQueueDao;
@Inject SyncQueueItemDao syncQueueItemDao;
@Before
public void setUp() {
@ -123,8 +128,8 @@ public class TestAsyncJobManager extends TestCase {
@Test
public void testJoinMapDao() {
joinMapDao.joinJob(2, 1, 100, null, null, null);
joinMapDao.joinJob(3, 1, 100, null, null, null);
joinMapDao.joinJob(2, 1, 100, 3000, 120000, null, "wakeupHandler", "wakeupDispatcher");
joinMapDao.joinJob(3, 1, 100, 5000, 120000, null, "wakeupHandler", "wakeupDispatcher");
AsyncJobJoinMapVO record = joinMapDao.getJoinRecord(2, 1);
Assert.assertTrue(record != null);
@ -151,6 +156,38 @@ public class TestAsyncJobManager extends TestCase {
joinMapDao.disjoinJob(3, 1);
}
@Test
public void testJoinWakeup() {
joinMapDao.joinJob(2, 1, 100, 3000, 120000, null, "wakeupHandler", "wakeupDispatcher");
joinMapDao.joinJob(3, 1, 100, 5000, 120000, null, "wakeupHandler", "wakeupDispatcher");
SyncQueueVO queue = new SyncQueueVO();
queue.setCreated(new Date());
queue.setLastProcessNumber(1L);
queue.setLastUpdated(new Date());
queue.setQueueSizeLimit(1);
queue.setSyncObjType("AsynJob");
queue.setSyncObjId(1L);
syncQueueDao.persist(queue);
SyncQueueItemVO queueItem = new SyncQueueItemVO();
queueItem.setQueueId(queue.getId());
queueItem.setContentId(2L);
queueItem.setContentType("AsyncJob");
queueItem.setLastProcessMsid(1L);
queueItem.setLastProcessNumber(1L);
syncQueueItemDao.persist(queueItem);
Assert.assertTrue(queueItem.getId() != 0);
joinMapDao.wakeupScan();
joinMapDao.disjoinJob(2, 1);
joinMapDao.disjoinJob(3, 1);
syncQueueItemDao.expunge(queueItem.getId());
syncQueueDao.expunge(queue.getId());
}
@Test
public void testPseudoJob() {
AsyncJob job = asyncMgr.getPseudoJob();

View File

@ -455,14 +455,19 @@ CREATE TABLE `cloud`.`async_job_join_map` (
`sync_source_id` bigint COMMENT 'upper-level job sync source info before join',
`wakeup_handler` varchar(64),
`wakeup_dispatcher` varchar(64),
`wakeup_interval` bigint NOT NULL DEFAULT 3000 COMMENT 'wakeup interval in seconds',
`created` datetime NOT NULL,
`last_updated` datetime,
`next_wakeup` datetime,
`expiration` datetime,
PRIMARY KEY (`id`),
CONSTRAINT `fk_async_job_join_map__job_id` FOREIGN KEY (`job_id`) REFERENCES `async_job`(`id`) ON DELETE CASCADE,
CONSTRAINT `fk_async_job_join_map__join_job_id` FOREIGN KEY (`join_job_id`) REFERENCES `async_job`(`id`),
CONSTRAINT `fk_async_job_join_map__join` UNIQUE (`job_id`, `join_job_id`),
INDEX `i_async_job_join_map__join_job_id`(`join_job_id`),
INDEX `i_async_job_join_map__created`(`created`),
INDEX `i_async_job_join_map__last_updated`(`last_updated`)
INDEX `i_async_job_join_map__last_updated`(`last_updated`),
INDEX `i_async_job_join_map__next_wakeup`(`next_wakeup`),
INDEX `i_async_job_join_map__expiration`(`expiration`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;