diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java index ea7af83bd31..cd106435df5 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java @@ -107,7 +107,7 @@ public class AsyncJobMonitor extends ManagerBase { return true; } - public void registerActiveTask(long jobId) { + public void registerActiveTask(long runNumber, long jobId) { synchronized(this) { s_logger.info("Add job-" + jobId + " into job monitoring"); @@ -116,7 +116,7 @@ public class AsyncJobMonitor extends ManagerBase { long threadId = Thread.currentThread().getId(); boolean fromPoolThread = Thread.currentThread().getName().contains(AsyncJobConstants.JOB_POOL_THREAD_PREFIX); ActiveTaskRecord record = new ActiveTaskRecord(jobId, threadId, fromPoolThread); - _activeTasks.put(jobId, record); + _activeTasks.put(runNumber, record); if(fromPoolThread) _activePoolThreads++; else @@ -124,29 +124,23 @@ public class AsyncJobMonitor extends ManagerBase { } } - public void unregisterActiveTask(long jobId) { + public void unregisterActiveTask(long runNumber) { synchronized(this) { - s_logger.info("Remove job-" + jobId + " from job monitoring"); - - ActiveTaskRecord record = _activeTasks.get(jobId); + ActiveTaskRecord record = _activeTasks.get(runNumber); assert(record != null); if(record != null) { + s_logger.info("Remove job-" + record.getJobId() + " from job monitoring"); + if(record.isPoolThread()) _activePoolThreads--; else _activeInplaceThreads--; - _activeTasks.remove(jobId); + _activeTasks.remove(runNumber); } } } - public boolean isJobActive(long jobId) { - synchronized(this) { - return _activeTasks.get(jobId) != null; - } - } - public int getActivePoolThreads() { return _activePoolThreads; } diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java index 81350c46360..d65347c02bc 100644 --- a/server/src/com/cloud/async/AsyncJobManagerImpl.java +++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java @@ -109,6 +109,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, private long _jobExpireSeconds = 86400; // 1 day private long _jobCancelThresholdSeconds = 3600; // 1 hour (for cancelling the jobs blocking other jobs) + + private volatile long _executionRunNumber = 1; private final ScheduledExecutorService _heartbeatScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("AsyncJobMgr-Heartbeat")); @@ -507,11 +509,19 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, return null; } + private long getJobRunNumber() { + synchronized(this) { + return this._executionRunNumber++; + } + } + private Runnable getExecutorRunnable(final AsyncJobManager mgr, final AsyncJob job) { return new Runnable() { @Override public void run() { Transaction txn = null; + long runNumber = getJobRunNumber(); + try { // // setup execution environment @@ -521,10 +531,13 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, try { JmxUtil.registerMBean("AsyncJobManager", "Active Job " + job.getId(), new AsyncJobMBeanImpl(job)); } catch(Exception e) { - s_logger.warn("Unable to register active job " + job.getId() + " to JMX monitoring due to exception " + ExceptionUtil.toString(e)); + // Due to co-existence of normal-dispatched-job/wakeup-dispatched-job, MBean register() call + // is expected to fail under situations + if(s_logger.isTraceEnabled()) + s_logger.trace("Unable to register active job " + job.getId() + " to JMX monitoring due to exception " + ExceptionUtil.toString(e)); } - _jobMonitor.registerActiveTask(job.getId()); + _jobMonitor.registerActiveTask(runNumber, job.getId()); AsyncJobExecutionContext.setCurrentExecutionContext( (AsyncJobExecutionContext)ComponentContext.inject(new AsyncJobExecutionContext(job)) ); @@ -535,17 +548,13 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, } if((getAndResetPendingSignals(job) & AsyncJobConstants.SIGNAL_MASK_WAKEUP) != 0) { - if(!_jobMonitor.isJobActive(job.getId())) { - AsyncJobDispatcher jobDispatcher = getWakeupDispatcher(job); - if(jobDispatcher != null) { - jobDispatcher.runJob(job); - } else { - s_logger.error("Unable to find a wakeup dispatcher from the joined job. job-" + job.getId()); - } + AsyncJobDispatcher jobDispatcher = getWakeupDispatcher(job); + if(jobDispatcher != null) { + jobDispatcher.runJob(job); + } else { + s_logger.error("Unable to find a wakeup dispatcher from the joined job. job-" + job.getId()); } } else { - assert(_jobMonitor.isJobActive(job.getId())); - AsyncJobDispatcher jobDispatcher = getDispatcher(job.getDispatcher()); if(jobDispatcher != null) { jobDispatcher.runJob(job); @@ -574,21 +583,24 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, checkQueue(job.getSyncSource().getQueueId()); } - // - // clean execution environment - // - AsyncJobExecutionContext.setCurrentExecutionContext(null); - _jobMonitor.unregisterActiveTask(job.getId()); - try { JmxUtil.unregisterMBean("AsyncJobManager", "Active Job " + job.getId()); } catch(Exception e) { - s_logger.warn("Unable to unregister job " + job.getId() + " to JMX monitoring due to exception " + ExceptionUtil.toString(e)); + // Due to co-existence of normal-dispatched-job/wakeup-dispatched-job, MBean unregister() call + // is expected to fail under situations + if(s_logger.isTraceEnabled()) + s_logger.trace("Unable to unregister job " + job.getId() + " to JMX monitoring due to exception " + ExceptionUtil.toString(e)); } if(txn != null) txn.close(); + // + // clean execution environment + // + AsyncJobExecutionContext.setCurrentExecutionContext(null); + _jobMonitor.unregisterActiveTask(runNumber); + } catch(Throwable e) { s_logger.error("Double exception", e); } diff --git a/server/test/com/cloud/async/TestAsyncJobManager.java b/server/test/com/cloud/async/TestAsyncJobManager.java index d0857778095..74c22f8c464 100644 --- a/server/test/com/cloud/async/TestAsyncJobManager.java +++ b/server/test/com/cloud/async/TestAsyncJobManager.java @@ -217,7 +217,7 @@ public class TestAsyncJobManager extends TestCase { }); thread.start(); - jobMonitor.registerActiveTask(1); + jobMonitor.registerActiveTask(1, 1); asyncMgr.waitAndCheck(new String[] {"VM"}, 5000L, 10000L, new Predicate() { public boolean checkCondition() {