Fixes the issue where the Job-Executor can hit an exception in cleanup and eventually the entire executor pool is gone.

This commit is contained in:
Alex Huang 2011-06-02 09:54:03 -07:00
parent 018b1c7ce9
commit d140ca7555
1 changed files with 174 additions and 147 deletions

View File

@ -136,7 +136,7 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
s_logger.debug("submit async job-" + job.getId() + ", details: " + job.toString());
}
return job.getId();
} catch(Exception e) {
} catch(Exception e) {
txt.rollback();
String errMsg = "Unable to schedule async job for command " + job.getCmd() + ", unexpected exception.";
s_logger.warn(errMsg, e);
@ -146,18 +146,20 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
@Override @DB
public void completeAsyncJob(long jobId, int jobStatus, int resultCode, Object resultObject) {
if(s_logger.isDebugEnabled())
s_logger.debug("Complete async job-" + jobId + ", jobStatus: " + jobStatus +
", resultCode: " + resultCode + ", result: " + resultObject);
if(s_logger.isDebugEnabled()) {
s_logger.debug("Complete async job-" + jobId + ", jobStatus: " + jobStatus +
", resultCode: " + resultCode + ", result: " + resultObject);
}
Transaction txt = Transaction.currentTxn();
try {
txt.start();
AsyncJobVO job = _jobDao.findById(jobId);
if(job == null) {
if(s_logger.isDebugEnabled())
s_logger.debug("job-" + jobId + " no longer exists, we just log completion info here. " + jobStatus +
", resultCode: " + resultCode + ", result: " + resultObject);
if(s_logger.isDebugEnabled()) {
s_logger.debug("job-" + jobId + " no longer exists, we just log completion info here. " + jobStatus +
", resultCode: " + resultCode + ", result: " + resultObject);
}
txt.rollback();
return;
@ -186,25 +188,28 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
@Override @DB
public void updateAsyncJobStatus(long jobId, int processStatus, Object resultObject) {
if(s_logger.isDebugEnabled())
s_logger.debug("Update async-job progress, job-" + jobId + ", processStatus: " + processStatus +
", result: " + resultObject);
if(s_logger.isDebugEnabled()) {
s_logger.debug("Update async-job progress, job-" + jobId + ", processStatus: " + processStatus +
", result: " + resultObject);
}
Transaction txt = Transaction.currentTxn();
try {
txt.start();
AsyncJobVO job = _jobDao.findById(jobId);
if(job == null) {
if(s_logger.isDebugEnabled())
s_logger.debug("job-" + jobId + " no longer exists, we just log progress info here. progress status: " + processStatus);
if(s_logger.isDebugEnabled()) {
s_logger.debug("job-" + jobId + " no longer exists, we just log progress info here. progress status: " + processStatus);
}
txt.rollback();
return;
}
job.setProcessStatus(processStatus);
if(resultObject != null)
job.setResult(ApiSerializerHelper.toSerializedStringOld(resultObject));
if(resultObject != null) {
job.setResult(ApiSerializerHelper.toSerializedStringOld(resultObject));
}
job.setLastUpdated(DateUtil.currentGMTTime());
_jobDao.update(jobId, job);
txt.commit();
@ -216,9 +221,10 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
@Override @DB
public void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId) {
if(s_logger.isDebugEnabled())
s_logger.debug("Update async-job attachment, job-" + jobId + ", instanceType: " + instanceType +
", instanceId: " + instanceId);
if(s_logger.isDebugEnabled()) {
s_logger.debug("Update async-job attachment, job-" + jobId + ", instanceType: " + instanceType +
", instanceId: " + instanceId);
}
Transaction txt = Transaction.currentTxn();
try {
@ -248,19 +254,21 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
return;
}
if(s_logger.isDebugEnabled())
if(s_logger.isDebugEnabled()) {
s_logger.debug("Sync job-" + job.getId() + " execution on object " + syncObjType + "." + syncObjId);
}
SyncQueueVO queue = null;
// to deal with temporary DB exceptions like DB deadlock/Lock-wait time out cased rollbacks
// we retry five times until we throw an exception
Random random = new Random();
Random random = new Random();
for(int i = 0; i < 5; i++) {
queue = _queueMgr.queue(syncObjType, syncObjId, "AsyncJob", job.getId());
if(queue != null)
break;
if(queue != null) {
break;
}
try {
Thread.sleep(1000 + random.nextInt(5000));
@ -302,8 +310,9 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
@Override @DB
public AsyncJobResult queryAsyncJobResult(long jobId) {
if(s_logger.isTraceEnabled())
s_logger.trace("Query async-job status, job-" + jobId);
if(s_logger.isTraceEnabled()) {
s_logger.trace("Query async-job status, job-" + jobId);
}
Transaction txt = Transaction.currentTxn();
AsyncJobResult jobResult = new AsyncJobResult(jobId);
@ -321,15 +330,17 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
if(job.getStatus() == AsyncJobResult.STATUS_SUCCEEDED ||
job.getStatus() == AsyncJobResult.STATUS_FAILED) {
if(s_logger.isDebugEnabled())
s_logger.debug("Async job-" + jobId + " completed");
if(s_logger.isDebugEnabled()) {
s_logger.debug("Async job-" + jobId + " completed");
}
} else {
job.setLastPolled(DateUtil.currentGMTTime());
_jobDao.update(jobId, job);
}
} else {
if(s_logger.isDebugEnabled())
s_logger.debug("Async job-" + jobId + " does not exist, invalid job id?");
if(s_logger.isDebugEnabled()) {
s_logger.debug("Async job-" + jobId + " does not exist, invalid job id?");
}
jobResult.setJobStatus(AsyncJobResult.STATUS_FAILED);
jobResult.setResult("job-" + jobId + " does not exist");
@ -343,8 +354,9 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
txt.rollback();
}
if(s_logger.isTraceEnabled())
s_logger.trace("Job status: " + jobResult.toString());
if(s_logger.isTraceEnabled()) {
s_logger.trace("Job status: " + jobResult.toString());
}
return jobResult;
}
@ -366,114 +378,122 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
return new Runnable() {
@Override
public void run() {
long jobId = 0;
try {
JmxUtil.registerMBean("AsyncJobManager", "Active Job " + job.getId(), new AsyncJobMBeanImpl(job));
} catch(Exception e) {
s_logger.warn("Unable to register active job " + job.getId() + " to JMX minotoring due to exception " + ExceptionUtil.toString(e));
}
BaseAsyncCmd cmdObj = null;
Transaction txn = Transaction.open(Transaction.CLOUD_DB);
try {
jobId = job.getId();
NDC.push("job-" + jobId);
if(s_logger.isDebugEnabled()) {
s_logger.debug("Executing " + job.getCmd() + " for job-" + jobId);
}
Class<?> cmdClass = Class.forName(job.getCmd());
cmdObj = (BaseAsyncCmd)cmdClass.newInstance();
cmdObj.setJob(job);
Type mapType = new TypeToken<Map<String, String>>() {}.getType();
Gson gson = ApiGsonHelper.getBuilder().create();
Map<String, String> params = gson.fromJson(job.getCmdInfo(), mapType);
// whenever we deserialize, the UserContext needs to be updated
String userIdStr = params.get("ctxUserId");
String acctIdStr = params.get("ctxAccountId");
Long userId = null;
Account accountObject = null;
if (userIdStr != null) {
userId = Long.parseLong(userIdStr);
}
if (acctIdStr != null) {
accountObject = _accountDao.findById(Long.parseLong(acctIdStr));
}
UserContext.registerContext(userId, accountObject, null, false);
try {
// dispatch could ultimately queue the job
_dispatcher.dispatch(cmdObj, params);
// serialize this to the async job table
completeAsyncJob(jobId, AsyncJobResult.STATUS_SUCCEEDED, 0, cmdObj.getResponseObject());
} finally {
UserContext.unregisterContext();
}
// commands might need to be queued as part of synchronization here, so they just have to be re-dispatched from the queue mechanism...
if (job.getSyncSource() != null) {
_queueMgr.purgeItem(job.getSyncSource().getId());
checkQueue(job.getSyncSource().getQueueId());
}
if (s_logger.isDebugEnabled())
s_logger.debug("Done executing " + job.getCmd() + " for job-" + jobId);
long jobId = 0;
} catch(Throwable e) {
if (e instanceof AsyncCommandQueued) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("job " + job.getCmd() + " for job-" + jobId + " was queued, processing the queue.");
}
checkQueue(((AsyncCommandQueued)e).getQueue().getId());
} else {
String errorMsg = null;
int errorCode = BaseCmd.INTERNAL_ERROR;
if (!(e instanceof ServerApiException)) {
s_logger.error("Unexpected exception while executing " + job.getCmd(), e);
errorMsg = e.getMessage();
} else {
ServerApiException sApiEx = (ServerApiException)e;
errorMsg = sApiEx.getDescription();
errorCode = sApiEx.getErrorCode();
}
ExceptionResponse response = new ExceptionResponse();
response.setErrorCode(errorCode);
response.setErrorText(errorMsg);
response.setResponseName((cmdObj == null) ? "unknowncommandresponse" : cmdObj.getCommandName());
// FIXME: setting resultCode to BaseCmd.INTERNAL_ERROR is not right, usually executors have their exception handling
// and we need to preserve that as much as possible here
completeAsyncJob(jobId, AsyncJobResult.STATUS_FAILED, BaseCmd.INTERNAL_ERROR, response);
// need to clean up any queue that happened as part of the dispatching and move on to the next item in the queue
try {
if (job.getSyncSource() != null) {
_queueMgr.purgeItem(job.getSyncSource().getId());
checkQueue(job.getSyncSource().getQueueId());
}
} catch(Throwable ex) {
s_logger.fatal("Exception on exception, log it for record", ex);
}
}
} finally {
try {
JmxUtil.unregisterMBean("AsyncJobManager", "Active Job " + job.getId());
JmxUtil.registerMBean("AsyncJobManager", "Active Job " + job.getId(), new AsyncJobMBeanImpl(job));
} catch(Exception e) {
s_logger.warn("Unable to unregister active job " + job.getId() + " from JMX minotoring");
s_logger.warn("Unable to register active job " + job.getId() + " to JMX minotoring due to exception " + ExceptionUtil.toString(e));
}
BaseAsyncCmd cmdObj = null;
Transaction txn = Transaction.open(Transaction.CLOUD_DB);
try {
jobId = job.getId();
NDC.push("job-" + jobId);
if(s_logger.isDebugEnabled()) {
s_logger.debug("Executing " + job.getCmd() + " for job-" + jobId);
}
Class<?> cmdClass = Class.forName(job.getCmd());
cmdObj = (BaseAsyncCmd)cmdClass.newInstance();
cmdObj.setJob(job);
Type mapType = new TypeToken<Map<String, String>>() {}.getType();
Gson gson = ApiGsonHelper.getBuilder().create();
Map<String, String> params = gson.fromJson(job.getCmdInfo(), mapType);
// whenever we deserialize, the UserContext needs to be updated
String userIdStr = params.get("ctxUserId");
String acctIdStr = params.get("ctxAccountId");
Long userId = null;
Account accountObject = null;
if (userIdStr != null) {
userId = Long.parseLong(userIdStr);
}
if (acctIdStr != null) {
accountObject = _accountDao.findById(Long.parseLong(acctIdStr));
}
UserContext.registerContext(userId, accountObject, null, false);
try {
// dispatch could ultimately queue the job
_dispatcher.dispatch(cmdObj, params);
// serialize this to the async job table
completeAsyncJob(jobId, AsyncJobResult.STATUS_SUCCEEDED, 0, cmdObj.getResponseObject());
} finally {
UserContext.unregisterContext();
}
// commands might need to be queued as part of synchronization here, so they just have to be re-dispatched from the queue mechanism...
if (job.getSyncSource() != null) {
_queueMgr.purgeItem(job.getSyncSource().getId());
checkQueue(job.getSyncSource().getQueueId());
}
if (s_logger.isDebugEnabled()) {
s_logger.debug("Done executing " + job.getCmd() + " for job-" + jobId);
}
} catch(Throwable e) {
if (e instanceof AsyncCommandQueued) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("job " + job.getCmd() + " for job-" + jobId + " was queued, processing the queue.");
}
checkQueue(((AsyncCommandQueued)e).getQueue().getId());
} else {
String errorMsg = null;
int errorCode = BaseCmd.INTERNAL_ERROR;
if (!(e instanceof ServerApiException)) {
s_logger.error("Unexpected exception while executing " + job.getCmd(), e);
errorMsg = e.getMessage();
} else {
ServerApiException sApiEx = (ServerApiException)e;
errorMsg = sApiEx.getDescription();
errorCode = sApiEx.getErrorCode();
}
ExceptionResponse response = new ExceptionResponse();
response.setErrorCode(errorCode);
response.setErrorText(errorMsg);
response.setResponseName((cmdObj == null) ? "unknowncommandresponse" : cmdObj.getCommandName());
// FIXME: setting resultCode to BaseCmd.INTERNAL_ERROR is not right, usually executors have their exception handling
// and we need to preserve that as much as possible here
completeAsyncJob(jobId, AsyncJobResult.STATUS_FAILED, BaseCmd.INTERNAL_ERROR, response);
// need to clean up any queue that happened as part of the dispatching and move on to the next item in the queue
try {
if (job.getSyncSource() != null) {
_queueMgr.purgeItem(job.getSyncSource().getId());
checkQueue(job.getSyncSource().getQueueId());
}
} catch(Throwable ex) {
s_logger.fatal("Exception on exception, log it for record", ex);
}
}
} finally {
try {
JmxUtil.unregisterMBean("AsyncJobManager", "Active Job " + job.getId());
} catch(Exception e) {
s_logger.warn("Unable to unregister active job " + job.getId() + " from JMX minotoring");
}
StackMaid.current().exitCleanup();
txn.close();
NDC.pop();
}
} catch (Throwable th) {
try {
s_logger.error("Caught: " + th);
} catch (Throwable th2) {
}
StackMaid.current().exitCleanup();
txn.close();
NDC.pop();
}
}
};
@ -493,8 +513,9 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
_jobDao.update(job.getId(), job);
scheduleExecution(job);
} else {
if(s_logger.isDebugEnabled())
if(s_logger.isDebugEnabled()) {
s_logger.debug("Unable to find related job for queue item: " + item.toString());
}
_queueMgr.purgeItem(item.getId());
}
@ -503,10 +524,11 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
@Override
public void releaseSyncSource(AsyncJobExecutor executor) {
if(executor.getSyncSource() != null) {
if(s_logger.isDebugEnabled())
s_logger.debug("Release sync source for job-" + executor.getJob().getId() + " sync source: "
if(s_logger.isDebugEnabled()) {
s_logger.debug("Release sync source for job-" + executor.getJob().getId() + " sync source: "
+ executor.getSyncSource().getContentType() + "-"
+ executor.getSyncSource().getContentId());
+ executor.getSyncSource().getContentId());
}
_queueMgr.purgeItem(executor.getSyncSource().getId());
checkQueue(executor.getSyncSource().getQueueId());
@ -518,8 +540,9 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
try {
SyncQueueItemVO item = _queueMgr.dequeueFromOne(queueId, getMsid());
if(item != null) {
if(s_logger.isDebugEnabled())
s_logger.debug("Executing sync queue item: " + item.toString());
if(s_logger.isDebugEnabled()) {
s_logger.debug("Executing sync queue item: " + item.toString());
}
executeQueueItem(item, false);
} else {
@ -540,8 +563,9 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
List<SyncQueueItemVO> l = _queueMgr.dequeueFromAny(getMsid(), MAX_ONETIME_SCHEDULE_SIZE);
if(l != null && l.size() > 0) {
for(SyncQueueItemVO item: l) {
if(s_logger.isDebugEnabled())
s_logger.debug("Execute sync-queue item: " + item.toString());
if(s_logger.isDebugEnabled()) {
s_logger.debug("Execute sync-queue item: " + item.toString());
}
executeQueueItem(item, false);
}
}
@ -592,8 +616,9 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
List<SyncQueueItemVO> blockItems = _queueMgr.getBlockedQueueItems(_jobCancelThresholdSeconds*1000, false);
if(blockItems != null && blockItems.size() > 0) {
for(SyncQueueItemVO item : blockItems) {
if(item.getContentType().equalsIgnoreCase("AsyncJob"))
completeAsyncJob(item.getContentId(), 2, 0, getResetResultMessage("Job is cancelled as it has been blocking others for too long"));
if(item.getContentType().equalsIgnoreCase("AsyncJob")) {
completeAsyncJob(item.getContentId(), 2, 0, getResetResultMessage("Job is cancelled as it has been blocking others for too long"));
}
// purge the item and resume queue processing
_queueMgr.purgeItem(item.getId());
@ -611,8 +636,9 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
}
private long getMsid() {
if(_clusterMgr != null)
return _clusterMgr.getManagementNodeId();
if(_clusterMgr != null) {
return _clusterMgr.getManagementNodeId();
}
return MacAddress.getMacAddress().toLong();
}
@ -620,14 +646,15 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
private void cleanupPendingJobs(List<SyncQueueItemVO> l) {
if(l != null && l.size() > 0) {
for(SyncQueueItemVO item: l) {
if(s_logger.isInfoEnabled())
s_logger.info("Discard left-over queue item: " + item.toString());
if(s_logger.isInfoEnabled()) {
s_logger.info("Discard left-over queue item: " + item.toString());
}
String contentType = item.getContentType();
if(contentType != null && contentType.equals("AsyncJob")) {
Long jobId = item.getContentId();
if(jobId != null) {
s_logger.warn("Mark job as failed as its correspoding queue-item has been discarded. job id: " + jobId);
s_logger.warn("Mark job as failed as its correspoding queue-item has been discarded. job id: " + jobId);
completeAsyncJob(jobId, AsyncJobResult.STATUS_FAILED, 0, getResetResultMessage("Execution was cancelled because of server shutdown"));
}
}