diff --git a/server/src/com/cloud/async/AsyncJobConstants.java b/server/src/com/cloud/async/AsyncJobConstants.java index bbebad60471..6081d0ccf88 100644 --- a/server/src/com/cloud/async/AsyncJobConstants.java +++ b/server/src/com/cloud/async/AsyncJobConstants.java @@ -24,6 +24,8 @@ public interface AsyncJobConstants { public static final String JOB_DISPATCHER_PSEUDO = "pseudoJobDispatcher"; public static final String PSEUDO_JOB_INSTANCE_TYPE = "Thread"; + public static final String JOB_POOL_THREAD_PREFIX = "Job-Executor"; + // Although we may have detailed masks for each individual wakeup event, i.e. // periodical timer, matched topic from message bus, it seems that we don't // need to distinguish them to such level. Therefore, only one wakeup signal diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java index f23f3d7b546..cde184ae8b1 100644 --- a/server/src/com/cloud/async/AsyncJobManagerImpl.java +++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java @@ -92,6 +92,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, @Inject private AsyncJobJoinMapDao _joinMapDao; @Inject private List _jobDispatchers; @Inject private MessageBus _messageBus; + @Inject private AsyncJobMonitor _jobMonitor; // property private String defaultDispatcher; @@ -524,6 +525,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, s_logger.warn("Unable to register active job " + job.getId() + " to JMX monitoring due to exception " + ExceptionUtil.toString(e)); } + _jobMonitor.registerActiveTask(job.getId()); AsyncJobExecutionContext.setCurrentExecutionContext( (AsyncJobExecutionContext)ComponentContext.inject(new AsyncJobExecutionContext(job)) ); @@ -560,6 +562,10 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, } finally { // guard final clause as well try { + AsyncJobVO jobToUpdate = _jobDao.findById(job.getId()); + jobToUpdate.setExecutingMsid(null); + _jobDao.update(job.getId(), jobToUpdate); + if (job.getSyncSource() != null) { _queueMgr.purgeItem(job.getSyncSource().getId()); checkQueue(job.getSyncSource().getQueueId()); @@ -569,6 +575,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, // clean execution environment // AsyncJobExecutionContext.setCurrentExecutionContext(null); + _jobMonitor.unregisterActiveTask(job.getId()); try { JmxUtil.unregisterMBean("AsyncJobManager", "Active Job " + job.getId()); @@ -608,7 +615,6 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, job.setSyncSource(item); job.setExecutingMsid(getMsid()); - job.setCompleteMsid(getMsid()); _jobDao.update(job.getId(), job); try { @@ -616,10 +622,10 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, } catch(RejectedExecutionException e) { s_logger.warn("Execution for job-" + job.getId() + " is rejected, return it to the queue for next turn"); _queueMgr.returnItem(item.getId()); - } finally { + job.setExecutingMsid(null); _jobDao.update(job.getId(), job); - } + } } else { if(s_logger.isDebugEnabled()) { @@ -838,7 +844,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, int poolSize = (cloudMaxActive * 2) / 3; s_logger.info("Start AsyncJobManager thread pool in size " + poolSize); - _executor = Executors.newFixedThreadPool(poolSize, new NamedThreadFactory("Job-Executor")); + _executor = Executors.newFixedThreadPool(poolSize, new NamedThreadFactory(AsyncJobConstants.JOB_POOL_THREAD_PREFIX)); } catch (final Exception e) { throw new ConfigurationException("Unable to load db.properties to configure AsyncJobManagerImpl"); } diff --git a/server/src/com/cloud/async/AsyncJobMonitor.java b/server/src/com/cloud/async/AsyncJobMonitor.java index bd5b2cd1df3..98c340bb680 100644 --- a/server/src/com/cloud/async/AsyncJobMonitor.java +++ b/server/src/com/cloud/async/AsyncJobMonitor.java @@ -105,10 +105,12 @@ public class AsyncJobMonitor extends ManagerBase { return true; } - public void registerActiveTask(long jobId, long threadId, boolean fromPoolThread) { + public void registerActiveTask(long jobId) { synchronized(this) { assert(_activeTasks.get(jobId) == null); + long threadId = Thread.currentThread().getId(); + boolean fromPoolThread = Thread.currentThread().getName().contains(AsyncJobConstants.JOB_POOL_THREAD_PREFIX); ActiveTaskRecord record = new ActiveTaskRecord(threadId, jobId, fromPoolThread); _activeTasks.put(jobId, record); if(fromPoolThread) diff --git a/server/test/com/cloud/vm/VmWorkTestConfiguration.java b/server/test/com/cloud/vm/VmWorkTestConfiguration.java index 67a3c0012b0..93aa41a0af8 100644 --- a/server/test/com/cloud/vm/VmWorkTestConfiguration.java +++ b/server/test/com/cloud/vm/VmWorkTestConfiguration.java @@ -21,6 +21,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import com.cloud.api.ApiDispatcher; +import com.cloud.async.AsyncJobMonitor; import com.cloud.async.SyncQueueManager; import com.cloud.async.SyncQueueManagerImpl; import com.cloud.async.dao.AsyncJobDao; @@ -123,4 +124,9 @@ public class VmWorkTestConfiguration { public VMInstanceDao vmInstanceDao() { return Mockito.mock(VMInstanceDao.class); } + + @Bean + public AsyncJobMonitor jobMonitor() { + return Mockito.mock(AsyncJobMonitor.class); + } }