Replace hard-coded job wakeup signal constants

This commit is contained in:
Kelven Yang 2013-06-26 13:45:09 -07:00
parent 1f0186aafe
commit fc0713fd55
3 changed files with 19 additions and 12 deletions

View File

@ -31,6 +31,15 @@ public interface AsyncJob extends JobInfo {
public static final String JOB_HEARTBEAT = "job.heartbeat";
public static final String JOB_STATE = "job.state";
}
public static interface Contants {
// Although we may have detailed masks for each individual wakeup event, i.e.
// periodical timer, matched topic from message bus, it seems that we don't
// need to distinguish them to such level. Therefore, only one wakeup signal
// is defined
public static final int SIGNAL_MASK_WAKEUP = 1;
}
@Override
String getType();

View File

@ -26,6 +26,7 @@ import java.util.TimeZone;
import org.apache.log4j.Logger;
import org.apache.cloudstack.framework.jobs.AsyncJob;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO;
import org.apache.cloudstack.jobs.JobInfo;
@ -158,11 +159,12 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
//
// performance sensitive processing, do it in plain SQL
//
String sql = "UPDATE async_job SET job_pending_signals=1 WHERE id IN " +
String sql = "UPDATE async_job SET job_pending_signals=? WHERE id IN " +
"(SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ?)";
pstmt = txn.prepareStatement(sql);
pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
pstmt.setInt(1, AsyncJob.Contants.SIGNAL_MASK_WAKEUP);
pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
pstmt.setString(3, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
pstmt.executeUpdate();
pstmt.close();
@ -213,10 +215,11 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
//
// performance sensitive processing, do it in plain SQL
//
String sql = "UPDATE async_job SET job_pending_signals=1 WHERE id IN " +
String sql = "UPDATE async_job SET job_pending_signals=? WHERE id IN " +
"(SELECT job_id FROM async_job_join_map WHERE join_job_id = ?)";
pstmt = txn.prepareStatement(sql);
pstmt.setLong(1, joinedJobId);
pstmt.setInt(1, AsyncJob.Contants.SIGNAL_MASK_WAKEUP);
pstmt.setLong(2, joinedJobId);
pstmt.executeUpdate();
pstmt.close();

View File

@ -80,11 +80,6 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
private static final Logger s_logger = Logger.getLogger(AsyncJobManagerImpl.class);
private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3; // 3 seconds
// Although we may have detailed masks for each individual wakeup event, i.e.
// periodical timer, matched topic from message bus, it seems that we don't
// need to distinguish them to such level. Therefore, only one wakeup signal
// is defined
public static final int SIGNAL_MASK_WAKEUP = 1;
private static final int MAX_ONETIME_SCHEDULE_SIZE = 50;
private static final int HEARTBEAT_INTERVAL = 2000;
@ -235,7 +230,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
for(Long id : wakeupList) {
// TODO, we assume that all jobs in this category is API job only
AsyncJobVO jobToWakeup = _jobDao.findById(id);
if (jobToWakeup != null && (jobToWakeup.getPendingSignals() & SIGNAL_MASK_WAKEUP) != 0)
if (jobToWakeup != null && (jobToWakeup.getPendingSignals() & AsyncJob.Contants.SIGNAL_MASK_WAKEUP) != 0)
scheduleExecution(jobToWakeup, false);
}
@ -462,7 +457,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
s_logger.debug("Executing " + job);
}
if ((getAndResetPendingSignals(job) & SIGNAL_MASK_WAKEUP) != 0) {
if ((getAndResetPendingSignals(job) & AsyncJob.Contants.SIGNAL_MASK_WAKEUP) != 0) {
AsyncJobDispatcher jobDispatcher = getWakeupDispatcher(job);
if(jobDispatcher != null) {
jobDispatcher.runJob(job);
@ -651,7 +646,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
for(Long jobId : standaloneWakeupJobs) {
// TODO, we assume that all jobs in this category is API job only
AsyncJobVO job = _jobDao.findById(jobId);
if (job != null && (job.getPendingSignals() & SIGNAL_MASK_WAKEUP) != 0)
if (job != null && (job.getPendingSignals() & AsyncJob.Contants.SIGNAL_MASK_WAKEUP) != 0)
scheduleExecution(job, false);
}
} catch(Throwable e) {