From d140ca755543747272e9f78a83687c0a84dd0a51 Mon Sep 17 00:00:00 2001 From: Alex Huang Date: Thu, 2 Jun 2011 09:54:03 -0700 Subject: [PATCH] Fixes the issue where the Job-Executor can hit an exception in cleanup and eventually the entire executor pool is gone. --- .../com/cloud/async/AsyncJobManagerImpl.java | 321 ++++++++++-------- 1 file changed, 174 insertions(+), 147 deletions(-) diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java index e040e4dba79..ba0018ea704 100644 --- a/server/src/com/cloud/async/AsyncJobManagerImpl.java +++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java @@ -136,7 +136,7 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe s_logger.debug("submit async job-" + job.getId() + ", details: " + job.toString()); } return job.getId(); - } catch(Exception e) { + } catch(Exception e) { txt.rollback(); String errMsg = "Unable to schedule async job for command " + job.getCmd() + ", unexpected exception."; s_logger.warn(errMsg, e); @@ -146,18 +146,20 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe @Override @DB public void completeAsyncJob(long jobId, int jobStatus, int resultCode, Object resultObject) { - if(s_logger.isDebugEnabled()) - s_logger.debug("Complete async job-" + jobId + ", jobStatus: " + jobStatus + - ", resultCode: " + resultCode + ", result: " + resultObject); + if(s_logger.isDebugEnabled()) { + s_logger.debug("Complete async job-" + jobId + ", jobStatus: " + jobStatus + + ", resultCode: " + resultCode + ", result: " + resultObject); + } Transaction txt = Transaction.currentTxn(); try { txt.start(); AsyncJobVO job = _jobDao.findById(jobId); if(job == null) { - if(s_logger.isDebugEnabled()) - s_logger.debug("job-" + jobId + " no longer exists, we just log completion info here. " + jobStatus + - ", resultCode: " + resultCode + ", result: " + resultObject); + if(s_logger.isDebugEnabled()) { + s_logger.debug("job-" + jobId + " no longer exists, we just log completion info here. " + jobStatus + + ", resultCode: " + resultCode + ", result: " + resultObject); + } txt.rollback(); return; @@ -186,25 +188,28 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe @Override @DB public void updateAsyncJobStatus(long jobId, int processStatus, Object resultObject) { - if(s_logger.isDebugEnabled()) - s_logger.debug("Update async-job progress, job-" + jobId + ", processStatus: " + processStatus + - ", result: " + resultObject); + if(s_logger.isDebugEnabled()) { + s_logger.debug("Update async-job progress, job-" + jobId + ", processStatus: " + processStatus + + ", result: " + resultObject); + } Transaction txt = Transaction.currentTxn(); try { txt.start(); AsyncJobVO job = _jobDao.findById(jobId); if(job == null) { - if(s_logger.isDebugEnabled()) - s_logger.debug("job-" + jobId + " no longer exists, we just log progress info here. progress status: " + processStatus); + if(s_logger.isDebugEnabled()) { + s_logger.debug("job-" + jobId + " no longer exists, we just log progress info here. progress status: " + processStatus); + } txt.rollback(); return; } job.setProcessStatus(processStatus); - if(resultObject != null) - job.setResult(ApiSerializerHelper.toSerializedStringOld(resultObject)); + if(resultObject != null) { + job.setResult(ApiSerializerHelper.toSerializedStringOld(resultObject)); + } job.setLastUpdated(DateUtil.currentGMTTime()); _jobDao.update(jobId, job); txt.commit(); @@ -216,9 +221,10 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe @Override @DB public void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId) { - if(s_logger.isDebugEnabled()) - s_logger.debug("Update async-job attachment, job-" + jobId + ", instanceType: " + instanceType + - ", instanceId: " + instanceId); + if(s_logger.isDebugEnabled()) { + s_logger.debug("Update async-job attachment, job-" + jobId + ", instanceType: " + instanceType + + ", instanceId: " + instanceId); + } Transaction txt = Transaction.currentTxn(); try { @@ -248,19 +254,21 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe return; } - if(s_logger.isDebugEnabled()) + if(s_logger.isDebugEnabled()) { s_logger.debug("Sync job-" + job.getId() + " execution on object " + syncObjType + "." + syncObjId); + } SyncQueueVO queue = null; // to deal with temporary DB exceptions like DB deadlock/Lock-wait time out cased rollbacks // we retry five times until we throw an exception - Random random = new Random(); + Random random = new Random(); for(int i = 0; i < 5; i++) { queue = _queueMgr.queue(syncObjType, syncObjId, "AsyncJob", job.getId()); - if(queue != null) - break; + if(queue != null) { + break; + } try { Thread.sleep(1000 + random.nextInt(5000)); @@ -302,8 +310,9 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe @Override @DB public AsyncJobResult queryAsyncJobResult(long jobId) { - if(s_logger.isTraceEnabled()) - s_logger.trace("Query async-job status, job-" + jobId); + if(s_logger.isTraceEnabled()) { + s_logger.trace("Query async-job status, job-" + jobId); + } Transaction txt = Transaction.currentTxn(); AsyncJobResult jobResult = new AsyncJobResult(jobId); @@ -321,15 +330,17 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe if(job.getStatus() == AsyncJobResult.STATUS_SUCCEEDED || job.getStatus() == AsyncJobResult.STATUS_FAILED) { - if(s_logger.isDebugEnabled()) - s_logger.debug("Async job-" + jobId + " completed"); + if(s_logger.isDebugEnabled()) { + s_logger.debug("Async job-" + jobId + " completed"); + } } else { job.setLastPolled(DateUtil.currentGMTTime()); _jobDao.update(jobId, job); } } else { - if(s_logger.isDebugEnabled()) - s_logger.debug("Async job-" + jobId + " does not exist, invalid job id?"); + if(s_logger.isDebugEnabled()) { + s_logger.debug("Async job-" + jobId + " does not exist, invalid job id?"); + } jobResult.setJobStatus(AsyncJobResult.STATUS_FAILED); jobResult.setResult("job-" + jobId + " does not exist"); @@ -343,8 +354,9 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe txt.rollback(); } - if(s_logger.isTraceEnabled()) - s_logger.trace("Job status: " + jobResult.toString()); + if(s_logger.isTraceEnabled()) { + s_logger.trace("Job status: " + jobResult.toString()); + } return jobResult; } @@ -366,114 +378,122 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe return new Runnable() { @Override public void run() { - long jobId = 0; - try { - JmxUtil.registerMBean("AsyncJobManager", "Active Job " + job.getId(), new AsyncJobMBeanImpl(job)); - } catch(Exception e) { - s_logger.warn("Unable to register active job " + job.getId() + " to JMX minotoring due to exception " + ExceptionUtil.toString(e)); - } - - BaseAsyncCmd cmdObj = null; - Transaction txn = Transaction.open(Transaction.CLOUD_DB); - try { - jobId = job.getId(); - NDC.push("job-" + jobId); - - if(s_logger.isDebugEnabled()) { - s_logger.debug("Executing " + job.getCmd() + " for job-" + jobId); - } - - Class cmdClass = Class.forName(job.getCmd()); - cmdObj = (BaseAsyncCmd)cmdClass.newInstance(); - cmdObj.setJob(job); - - Type mapType = new TypeToken>() {}.getType(); - Gson gson = ApiGsonHelper.getBuilder().create(); - Map params = gson.fromJson(job.getCmdInfo(), mapType); - - // whenever we deserialize, the UserContext needs to be updated - String userIdStr = params.get("ctxUserId"); - String acctIdStr = params.get("ctxAccountId"); - Long userId = null; - Account accountObject = null; - - if (userIdStr != null) { - userId = Long.parseLong(userIdStr); - } - - if (acctIdStr != null) { - accountObject = _accountDao.findById(Long.parseLong(acctIdStr)); - } - - UserContext.registerContext(userId, accountObject, null, false); - try { - // dispatch could ultimately queue the job - _dispatcher.dispatch(cmdObj, params); - - // serialize this to the async job table - completeAsyncJob(jobId, AsyncJobResult.STATUS_SUCCEEDED, 0, cmdObj.getResponseObject()); - } finally { - UserContext.unregisterContext(); - } - - // commands might need to be queued as part of synchronization here, so they just have to be re-dispatched from the queue mechanism... - if (job.getSyncSource() != null) { - _queueMgr.purgeItem(job.getSyncSource().getId()); - checkQueue(job.getSyncSource().getQueueId()); - } - - if (s_logger.isDebugEnabled()) - s_logger.debug("Done executing " + job.getCmd() + " for job-" + jobId); + long jobId = 0; - } catch(Throwable e) { - if (e instanceof AsyncCommandQueued) { - if (s_logger.isDebugEnabled()) { - s_logger.debug("job " + job.getCmd() + " for job-" + jobId + " was queued, processing the queue."); - } - checkQueue(((AsyncCommandQueued)e).getQueue().getId()); - } else { - String errorMsg = null; - int errorCode = BaseCmd.INTERNAL_ERROR; - if (!(e instanceof ServerApiException)) { - s_logger.error("Unexpected exception while executing " + job.getCmd(), e); - errorMsg = e.getMessage(); - } else { - ServerApiException sApiEx = (ServerApiException)e; - errorMsg = sApiEx.getDescription(); - errorCode = sApiEx.getErrorCode(); - } - - ExceptionResponse response = new ExceptionResponse(); - response.setErrorCode(errorCode); - response.setErrorText(errorMsg); - response.setResponseName((cmdObj == null) ? "unknowncommandresponse" : cmdObj.getCommandName()); - - // FIXME: setting resultCode to BaseCmd.INTERNAL_ERROR is not right, usually executors have their exception handling - // and we need to preserve that as much as possible here - completeAsyncJob(jobId, AsyncJobResult.STATUS_FAILED, BaseCmd.INTERNAL_ERROR, response); - - // need to clean up any queue that happened as part of the dispatching and move on to the next item in the queue - try { - if (job.getSyncSource() != null) { - _queueMgr.purgeItem(job.getSyncSource().getId()); - checkQueue(job.getSyncSource().getQueueId()); - } - } catch(Throwable ex) { - s_logger.fatal("Exception on exception, log it for record", ex); - } - } - } finally { - try { - JmxUtil.unregisterMBean("AsyncJobManager", "Active Job " + job.getId()); + JmxUtil.registerMBean("AsyncJobManager", "Active Job " + job.getId(), new AsyncJobMBeanImpl(job)); } catch(Exception e) { - s_logger.warn("Unable to unregister active job " + job.getId() + " from JMX minotoring"); + s_logger.warn("Unable to register active job " + job.getId() + " to JMX minotoring due to exception " + ExceptionUtil.toString(e)); + } + + BaseAsyncCmd cmdObj = null; + Transaction txn = Transaction.open(Transaction.CLOUD_DB); + try { + jobId = job.getId(); + NDC.push("job-" + jobId); + + if(s_logger.isDebugEnabled()) { + s_logger.debug("Executing " + job.getCmd() + " for job-" + jobId); + } + + Class cmdClass = Class.forName(job.getCmd()); + cmdObj = (BaseAsyncCmd)cmdClass.newInstance(); + cmdObj.setJob(job); + + Type mapType = new TypeToken>() {}.getType(); + Gson gson = ApiGsonHelper.getBuilder().create(); + Map params = gson.fromJson(job.getCmdInfo(), mapType); + + // whenever we deserialize, the UserContext needs to be updated + String userIdStr = params.get("ctxUserId"); + String acctIdStr = params.get("ctxAccountId"); + Long userId = null; + Account accountObject = null; + + if (userIdStr != null) { + userId = Long.parseLong(userIdStr); + } + + if (acctIdStr != null) { + accountObject = _accountDao.findById(Long.parseLong(acctIdStr)); + } + + UserContext.registerContext(userId, accountObject, null, false); + try { + // dispatch could ultimately queue the job + _dispatcher.dispatch(cmdObj, params); + + // serialize this to the async job table + completeAsyncJob(jobId, AsyncJobResult.STATUS_SUCCEEDED, 0, cmdObj.getResponseObject()); + } finally { + UserContext.unregisterContext(); + } + + // commands might need to be queued as part of synchronization here, so they just have to be re-dispatched from the queue mechanism... + if (job.getSyncSource() != null) { + _queueMgr.purgeItem(job.getSyncSource().getId()); + checkQueue(job.getSyncSource().getQueueId()); + } + + if (s_logger.isDebugEnabled()) { + s_logger.debug("Done executing " + job.getCmd() + " for job-" + jobId); + } + + } catch(Throwable e) { + if (e instanceof AsyncCommandQueued) { + if (s_logger.isDebugEnabled()) { + s_logger.debug("job " + job.getCmd() + " for job-" + jobId + " was queued, processing the queue."); + } + checkQueue(((AsyncCommandQueued)e).getQueue().getId()); + } else { + String errorMsg = null; + int errorCode = BaseCmd.INTERNAL_ERROR; + if (!(e instanceof ServerApiException)) { + s_logger.error("Unexpected exception while executing " + job.getCmd(), e); + errorMsg = e.getMessage(); + } else { + ServerApiException sApiEx = (ServerApiException)e; + errorMsg = sApiEx.getDescription(); + errorCode = sApiEx.getErrorCode(); + } + + ExceptionResponse response = new ExceptionResponse(); + response.setErrorCode(errorCode); + response.setErrorText(errorMsg); + response.setResponseName((cmdObj == null) ? "unknowncommandresponse" : cmdObj.getCommandName()); + + // FIXME: setting resultCode to BaseCmd.INTERNAL_ERROR is not right, usually executors have their exception handling + // and we need to preserve that as much as possible here + completeAsyncJob(jobId, AsyncJobResult.STATUS_FAILED, BaseCmd.INTERNAL_ERROR, response); + + // need to clean up any queue that happened as part of the dispatching and move on to the next item in the queue + try { + if (job.getSyncSource() != null) { + _queueMgr.purgeItem(job.getSyncSource().getId()); + checkQueue(job.getSyncSource().getQueueId()); + } + } catch(Throwable ex) { + s_logger.fatal("Exception on exception, log it for record", ex); + } + } + } finally { + + try { + JmxUtil.unregisterMBean("AsyncJobManager", "Active Job " + job.getId()); + } catch(Exception e) { + s_logger.warn("Unable to unregister active job " + job.getId() + " from JMX minotoring"); + } + + StackMaid.current().exitCleanup(); + txn.close(); + NDC.pop(); + } + } catch (Throwable th) { + try { + s_logger.error("Caught: " + th); + } catch (Throwable th2) { } - - StackMaid.current().exitCleanup(); - txn.close(); - NDC.pop(); } } }; @@ -493,8 +513,9 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe _jobDao.update(job.getId(), job); scheduleExecution(job); } else { - if(s_logger.isDebugEnabled()) + if(s_logger.isDebugEnabled()) { s_logger.debug("Unable to find related job for queue item: " + item.toString()); + } _queueMgr.purgeItem(item.getId()); } @@ -503,10 +524,11 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe @Override public void releaseSyncSource(AsyncJobExecutor executor) { if(executor.getSyncSource() != null) { - if(s_logger.isDebugEnabled()) - s_logger.debug("Release sync source for job-" + executor.getJob().getId() + " sync source: " + if(s_logger.isDebugEnabled()) { + s_logger.debug("Release sync source for job-" + executor.getJob().getId() + " sync source: " + executor.getSyncSource().getContentType() + "-" - + executor.getSyncSource().getContentId()); + + executor.getSyncSource().getContentId()); + } _queueMgr.purgeItem(executor.getSyncSource().getId()); checkQueue(executor.getSyncSource().getQueueId()); @@ -518,8 +540,9 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe try { SyncQueueItemVO item = _queueMgr.dequeueFromOne(queueId, getMsid()); if(item != null) { - if(s_logger.isDebugEnabled()) - s_logger.debug("Executing sync queue item: " + item.toString()); + if(s_logger.isDebugEnabled()) { + s_logger.debug("Executing sync queue item: " + item.toString()); + } executeQueueItem(item, false); } else { @@ -540,8 +563,9 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe List l = _queueMgr.dequeueFromAny(getMsid(), MAX_ONETIME_SCHEDULE_SIZE); if(l != null && l.size() > 0) { for(SyncQueueItemVO item: l) { - if(s_logger.isDebugEnabled()) - s_logger.debug("Execute sync-queue item: " + item.toString()); + if(s_logger.isDebugEnabled()) { + s_logger.debug("Execute sync-queue item: " + item.toString()); + } executeQueueItem(item, false); } } @@ -592,8 +616,9 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe List blockItems = _queueMgr.getBlockedQueueItems(_jobCancelThresholdSeconds*1000, false); if(blockItems != null && blockItems.size() > 0) { for(SyncQueueItemVO item : blockItems) { - if(item.getContentType().equalsIgnoreCase("AsyncJob")) - completeAsyncJob(item.getContentId(), 2, 0, getResetResultMessage("Job is cancelled as it has been blocking others for too long")); + if(item.getContentType().equalsIgnoreCase("AsyncJob")) { + completeAsyncJob(item.getContentId(), 2, 0, getResetResultMessage("Job is cancelled as it has been blocking others for too long")); + } // purge the item and resume queue processing _queueMgr.purgeItem(item.getId()); @@ -611,8 +636,9 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe } private long getMsid() { - if(_clusterMgr != null) - return _clusterMgr.getManagementNodeId(); + if(_clusterMgr != null) { + return _clusterMgr.getManagementNodeId(); + } return MacAddress.getMacAddress().toLong(); } @@ -620,14 +646,15 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe private void cleanupPendingJobs(List l) { if(l != null && l.size() > 0) { for(SyncQueueItemVO item: l) { - if(s_logger.isInfoEnabled()) - s_logger.info("Discard left-over queue item: " + item.toString()); + if(s_logger.isInfoEnabled()) { + s_logger.info("Discard left-over queue item: " + item.toString()); + } String contentType = item.getContentType(); if(contentType != null && contentType.equals("AsyncJob")) { Long jobId = item.getContentId(); if(jobId != null) { - s_logger.warn("Mark job as failed as its correspoding queue-item has been discarded. job id: " + jobId); + s_logger.warn("Mark job as failed as its correspoding queue-item has been discarded. job id: " + jobId); completeAsyncJob(jobId, AsyncJobResult.STATUS_FAILED, 0, getResetResultMessage("Execution was cancelled because of server shutdown")); } }