Hook job monitoring

This commit is contained in:
Kelven Yang 2013-05-12 18:15:47 -07:00
parent 2210c10271
commit c7530dbd70
4 changed files with 21 additions and 5 deletions

View File

@ -24,6 +24,8 @@ public interface AsyncJobConstants {
public static final String JOB_DISPATCHER_PSEUDO = "pseudoJobDispatcher";
public static final String PSEUDO_JOB_INSTANCE_TYPE = "Thread";
public static final String JOB_POOL_THREAD_PREFIX = "Job-Executor";
// Although we may have detailed masks for each individual wakeup event, i.e.
// periodical timer, matched topic from message bus, it seems that we don't
// need to distinguish them to such level. Therefore, only one wakeup signal

View File

@ -92,6 +92,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
@Inject private AsyncJobJoinMapDao _joinMapDao;
@Inject private List<AsyncJobDispatcher> _jobDispatchers;
@Inject private MessageBus _messageBus;
@Inject private AsyncJobMonitor _jobMonitor;
// property
private String defaultDispatcher;
@ -524,6 +525,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
s_logger.warn("Unable to register active job " + job.getId() + " to JMX monitoring due to exception " + ExceptionUtil.toString(e));
}
_jobMonitor.registerActiveTask(job.getId());
AsyncJobExecutionContext.setCurrentExecutionContext(
(AsyncJobExecutionContext)ComponentContext.inject(new AsyncJobExecutionContext(job))
);
@ -560,6 +562,10 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
} finally {
// guard final clause as well
try {
AsyncJobVO jobToUpdate = _jobDao.findById(job.getId());
jobToUpdate.setExecutingMsid(null);
_jobDao.update(job.getId(), jobToUpdate);
if (job.getSyncSource() != null) {
_queueMgr.purgeItem(job.getSyncSource().getId());
checkQueue(job.getSyncSource().getQueueId());
@ -569,6 +575,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
// clean execution environment
//
AsyncJobExecutionContext.setCurrentExecutionContext(null);
_jobMonitor.unregisterActiveTask(job.getId());
try {
JmxUtil.unregisterMBean("AsyncJobManager", "Active Job " + job.getId());
@ -608,7 +615,6 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
job.setSyncSource(item);
job.setExecutingMsid(getMsid());
job.setCompleteMsid(getMsid());
_jobDao.update(job.getId(), job);
try {
@ -616,10 +622,10 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
} catch(RejectedExecutionException e) {
s_logger.warn("Execution for job-" + job.getId() + " is rejected, return it to the queue for next turn");
_queueMgr.returnItem(item.getId());
} finally {
job.setExecutingMsid(null);
_jobDao.update(job.getId(), job);
}
}
} else {
if(s_logger.isDebugEnabled()) {
@ -838,7 +844,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
int poolSize = (cloudMaxActive * 2) / 3;
s_logger.info("Start AsyncJobManager thread pool in size " + poolSize);
_executor = Executors.newFixedThreadPool(poolSize, new NamedThreadFactory("Job-Executor"));
_executor = Executors.newFixedThreadPool(poolSize, new NamedThreadFactory(AsyncJobConstants.JOB_POOL_THREAD_PREFIX));
} catch (final Exception e) {
throw new ConfigurationException("Unable to load db.properties to configure AsyncJobManagerImpl");
}

View File

@ -105,10 +105,12 @@ public class AsyncJobMonitor extends ManagerBase {
return true;
}
public void registerActiveTask(long jobId, long threadId, boolean fromPoolThread) {
public void registerActiveTask(long jobId) {
synchronized(this) {
assert(_activeTasks.get(jobId) == null);
long threadId = Thread.currentThread().getId();
boolean fromPoolThread = Thread.currentThread().getName().contains(AsyncJobConstants.JOB_POOL_THREAD_PREFIX);
ActiveTaskRecord record = new ActiveTaskRecord(threadId, jobId, fromPoolThread);
_activeTasks.put(jobId, record);
if(fromPoolThread)

View File

@ -21,6 +21,7 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.cloud.api.ApiDispatcher;
import com.cloud.async.AsyncJobMonitor;
import com.cloud.async.SyncQueueManager;
import com.cloud.async.SyncQueueManagerImpl;
import com.cloud.async.dao.AsyncJobDao;
@ -123,4 +124,9 @@ public class VmWorkTestConfiguration {
public VMInstanceDao vmInstanceDao() {
return Mockito.mock(VMInstanceDao.class);
}
@Bean
public AsyncJobMonitor jobMonitor() {
return Mockito.mock(AsyncJobMonitor.class);
}
}