diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java index 464467fe87c..0f9b2c95af6 100755 --- a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java +++ b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java @@ -4181,6 +4181,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setStep(VmWorkJobVO.Step.Starting); workJob.setVmType(vm.getType()); workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); // save work context info (there are some duplications) VmWorkStart workInfo = new VmWorkStart(callingUser.getId(), callingAccount.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER); @@ -4233,6 +4234,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setStep(VmWorkJobVO.Step.Prepare); workJob.setVmType(vm.getType()); workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); // save work context info (there are some duplications) VmWorkStop workInfo = new VmWorkStop(user.getId(), account.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, cleanup); @@ -4285,6 +4287,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setStep(VmWorkJobVO.Step.Prepare); workJob.setVmType(vm.getType()); workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); // save work context info (there are some duplications) VmWorkReboot workInfo = new VmWorkReboot(user.getId(), account.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, params); @@ -4336,6 +4339,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setUserId(user.getId()); workJob.setVmType(vm.getType()); workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); // save work context info (there are some duplications) VmWorkMigrate workInfo = new VmWorkMigrate(user.getId(), account.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, srcHostId, dest); @@ -4389,6 +4393,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setUserId(user.getId()); workJob.setVmType(vm.getType()); workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); // save work context info (there are some duplications) VmWorkMigrateWithStorage workInfo = new VmWorkMigrateWithStorage(user.getId(), account.getId(), vm.getId(), @@ -4442,6 +4447,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setUserId(user.getId()); workJob.setVmType(vm.getType()); workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); // save work context info (there are some duplications) VmWorkMigrateForScale workInfo = new VmWorkMigrateForScale(user.getId(), account.getId(), vm.getId(), @@ -4495,6 +4501,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setUserId(user.getId()); workJob.setVmType(vm.getType()); workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); // save work context info (there are some duplications) VmWorkStorageMigration workInfo = new VmWorkStorageMigration(user.getId(), account.getId(), vm.getId(), @@ -4546,6 +4553,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setUserId(user.getId()); workJob.setVmType(vm.getType()); workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); // save work context info (there are some duplications) VmWorkAddVmToNetwork workInfo = new VmWorkAddVmToNetwork(user.getId(), account.getId(), vm.getId(), @@ -4596,6 +4604,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setUserId(user.getId()); workJob.setVmType(vm.getType()); workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); // save work context info (there are some duplications) VmWorkRemoveNicFromVm workInfo = new VmWorkRemoveNicFromVm(user.getId(), account.getId(), vm.getId(), @@ -4646,6 +4655,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setUserId(user.getId()); workJob.setVmType(vm.getType()); workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); // save work context info (there are some duplications) VmWorkRemoveVmFromNetwork workInfo = new VmWorkRemoveVmFromNetwork(user.getId(), account.getId(), vm.getId(), @@ -4698,6 +4708,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setUserId(user.getId()); workJob.setVmType(vm.getType()); workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); // save work context info (there are some duplications) VmWorkReconfigure workInfo = new VmWorkReconfigure(user.getId(), account.getId(), vm.getId(), diff --git a/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java b/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java index dd94d373897..3cce1e6bcf8 100644 --- a/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java +++ b/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java @@ -61,13 +61,13 @@ public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatch assert(cmd != null); if(s_logger.isDebugEnabled()) - s_logger.debug("Run VM work job: " + cmd); + s_logger.debug("Run VM work job: " + cmd + ", job origin: " + job.getRelated()); Class workClz = null; try { workClz = Class.forName(job.getCmd()); } catch(ClassNotFoundException e) { - s_logger.error("VM work class " + cmd + " is not found", e); + s_logger.error("VM work class " + cmd + " is not found" + ", job origin: " + job.getRelated(), e); _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, e.getMessage()); return; } @@ -75,13 +75,14 @@ public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatch work = VmWorkSerializer.deserialize(workClz, job.getCmdInfo()); assert(work != null); if(work == null) { - s_logger.error("Unable to deserialize VM work " + job.getCmd() + ", job info: " + job.getCmdInfo()); + s_logger.error("Unable to deserialize VM work " + job.getCmd() + ", job info: " + job.getCmdInfo() + ", job origin: " + job.getRelated()); _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, "Unable to deserialize VM work"); return; } if (_handlers == null || _handlers.isEmpty()) { - s_logger.error("Invalid startup configuration, no work job handler is found. cmd: " + job.getCmd() + ", job info: " + job.getCmdInfo()); + s_logger.error("Invalid startup configuration, no work job handler is found. cmd: " + job.getCmd() + ", job info: " + job.getCmdInfo() + + ", job origin: " + job.getRelated()); _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, "Invalid startup configuration. no job handler is found"); return; } @@ -89,7 +90,8 @@ public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatch VmWorkJobHandler handler = _handlers.get(work.getHandlerName()); if (handler == null) { - s_logger.error("Unable to find work job handler. handler name: " + work.getHandlerName() + ", job cmd: " + job.getCmd() + ", job info: " + job.getCmdInfo()); + s_logger.error("Unable to find work job handler. handler name: " + work.getHandlerName() + ", job cmd: " + job.getCmd() + + ", job info: " + job.getCmdInfo() + ", job origin: " + job.getRelated()); _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, "Unable to find work job handler"); return; } @@ -100,10 +102,10 @@ public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatch _asyncJobMgr.completeAsyncJob(job.getId(), result.first(), 0, result.second()); } catch(Throwable e) { - s_logger.error("Unable to complete " + job, e); + s_logger.error("Unable to complete " + job + ", job origin:" + job.getRelated(), e); String exceptionJson = JobSerializerHelper.toSerializedString(e); - s_logger.info("Serialize exception object into json: " + exceptionJson); + s_logger.info("Serialize exception object into json: " + exceptionJson + ", job origin: " + job.getRelated()); _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, exceptionJson); } finally { CallContext.unregister(); diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java index 0263d3d630c..24811451d9c 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java @@ -17,6 +17,8 @@ package org.apache.cloudstack.framework.jobs; import org.apache.log4j.Logger; + +import org.apache.cloudstack.context.CallContext; import org.apache.cloudstack.framework.jobs.dao.AsyncJobJoinMapDao; import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO; import org.apache.cloudstack.framework.jobs.impl.JobSerializerHelper; @@ -30,7 +32,7 @@ import com.cloud.exception.ResourceUnavailableException; public class AsyncJobExecutionContext { private AsyncJob _job; - + static private AsyncJobManager _jobMgr; static private AsyncJobJoinMapDao _joinMapDao; @@ -38,55 +40,55 @@ public class AsyncJobExecutionContext { _jobMgr = jobMgr; _joinMapDao = joinMapDao; } - + private static ManagedThreadLocal s_currentExectionContext = new ManagedThreadLocal(); public AsyncJobExecutionContext() { } - + public AsyncJobExecutionContext(AsyncJob job) { _job = job; } - + public SyncQueueItem getSyncSource() { return _job.getSyncSource(); } - + public void resetSyncSource() { _job.setSyncSource(null); } - + public AsyncJob getJob() { return _job; } - + public void setJob(AsyncJob job) { _job = job; } - + public boolean isJobDispatchedBy(String jobDispatcherName) { assert(jobDispatcherName != null); if(_job != null && _job.getDispatcher() != null && _job.getDispatcher().equals(jobDispatcherName)) return true; - + return false; } - + public void completeAsyncJob(JobInfo.Status jobStatus, int resultCode, String resultObject) { assert(_job != null); _jobMgr.completeAsyncJob(_job.getId(), jobStatus, resultCode, resultObject); } - + public void updateAsyncJobStatus(int processStatus, String resultObject) { assert(_job != null); _jobMgr.updateAsyncJobStatus(_job.getId(), processStatus, resultObject); } - + public void updateAsyncJobAttachment(String instanceType, Long instanceId) { assert(_job != null); _jobMgr.updateAsyncJobAttachment(_job.getId(), instanceType, instanceId); } - + public void logJobJournal(AsyncJob.JournalType journalType, String journalText, String journalObjJson) { assert(_job != null); _jobMgr.logJobJournal(_job.getId(), journalType, journalText, journalObjJson); @@ -101,14 +103,14 @@ public class AsyncJobExecutionContext { assert(_job != null); _jobMgr.joinJob(_job.getId(), joinJobId); } - + public void joinJob(long joinJobId, String wakeupHandler, String wakeupDispatcher, String[] wakeupTopcisOnMessageBus, long wakeupIntervalInMilliSeconds, long timeoutInMilliSeconds) { assert(_job != null); _jobMgr.joinJob(_job.getId(), joinJobId, wakeupHandler, wakeupDispatcher, wakeupTopcisOnMessageBus, wakeupIntervalInMilliSeconds, timeoutInMilliSeconds); } - + // // check failure exception before we disjoin the worker job // TODO : it is ugly and this will become unnecessary after we switch to full-async mode @@ -116,7 +118,7 @@ public class AsyncJobExecutionContext { public void disjoinJob(long joinedJobId) throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException { assert(_job != null); - + AsyncJobJoinMapVO record = _joinMapDao.getJoinRecord(_job.getId(), joinedJobId); if(record.getJoinStatus() == JobInfo.Status.FAILED && record.getJoinResult() != null) { Object exception = JobSerializerHelper.fromObjectSerializedString(record.getJoinResult()); @@ -131,15 +133,15 @@ public class AsyncJobExecutionContext { throw new RuntimeException((Exception)exception); } } - + _jobMgr.disjoinJob(_job.getId(), joinedJobId); } - + public void completeJoin(JobInfo.Status joinStatus, String joinResult) { assert(_job != null); _jobMgr.completeJoin(_job.getId(), joinStatus, joinResult); } - + public void completeJobAndJoin(JobInfo.Status joinStatus, String joinResult) { assert(_job != null); _jobMgr.completeJoin(_job.getId(), joinStatus, joinResult); @@ -150,7 +152,7 @@ public class AsyncJobExecutionContext { AsyncJobExecutionContext context = s_currentExectionContext.get(); return context; } - + public static AsyncJobExecutionContext registerPseudoExecutionContext(long accountId, long userId) { AsyncJobExecutionContext context = s_currentExectionContext.get(); if (context == null) { @@ -167,9 +169,13 @@ public class AsyncJobExecutionContext { setCurrentExecutionContext(null); return context; } - + // This is intended to be package level access for AsyncJobManagerImpl only. public static void setCurrentExecutionContext(AsyncJobExecutionContext currentContext) { s_currentExectionContext.set(currentContext); } + + public static String getOriginJobContextId() { + return String.valueOf(CallContext.current().getContextId()); + } } diff --git a/server/src/com/cloud/storage/VolumeApiServiceImpl.java b/server/src/com/cloud/storage/VolumeApiServiceImpl.java index 580a5b32ec7..7187e1ea14a 100644 --- a/server/src/com/cloud/storage/VolumeApiServiceImpl.java +++ b/server/src/com/cloud/storage/VolumeApiServiceImpl.java @@ -2193,6 +2193,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic workJob.setStep(VmWorkJobVO.Step.Starting); workJob.setVmType(vm.getType()); workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); // save work context info (there are some duplications) VmWorkAttachVolume workInfo = new VmWorkAttachVolume(callingUser.getId(), callingAccount.getId(), vm.getId(), @@ -2239,6 +2240,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic workJob.setStep(VmWorkJobVO.Step.Starting); workJob.setVmType(vm.getType()); workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); // save work context info (there are some duplications) VmWorkDetachVolume workInfo = new VmWorkDetachVolume(callingUser.getId(), callingAccount.getId(), vm.getId(), @@ -2283,6 +2285,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic workJob.setStep(VmWorkJobVO.Step.Starting); workJob.setVmType(vm.getType()); workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); // save work context info (there are some duplications) VmWorkResizeVolume workInfo = new VmWorkResizeVolume(callingUser.getId(), callingAccount.getId(), vm.getId(), @@ -2327,6 +2330,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic workJob.setStep(VmWorkJobVO.Step.Starting); workJob.setVmType(vm.getType()); workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); // save work context info (there are some duplications) VmWorkMigrateVolume workInfo = new VmWorkMigrateVolume(callingUser.getId(), callingAccount.getId(), vm.getId(), @@ -2371,6 +2375,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic workJob.setStep(VmWorkJobVO.Step.Starting); workJob.setVmType(vm.getType()); workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); // save work context info (there are some duplications) VmWorkTakeVolumeSnapshot workInfo = new VmWorkTakeVolumeSnapshot( diff --git a/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java b/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java index 2d53703130b..c9f712fee62 100644 --- a/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java +++ b/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java @@ -840,6 +840,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana workJob.setStep(VmWorkJobVO.Step.Starting); workJob.setVmType(vm.getType()); workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); // save work context info (there are some duplications) VmWorkCreateVMSnapshot workInfo = new VmWorkCreateVMSnapshot(callingUser.getId(), callingAccount.getId(), vm.getId(), @@ -883,6 +884,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana workJob.setStep(VmWorkJobVO.Step.Starting); workJob.setVmType(vm.getType()); workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); // save work context info (there are some duplications) VmWorkDeleteVMSnapshot workInfo = new VmWorkDeleteVMSnapshot(callingUser.getId(), callingAccount.getId(), vm.getId(), @@ -926,6 +928,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana workJob.setStep(VmWorkJobVO.Step.Starting); workJob.setVmType(vm.getType()); workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); // save work context info (there are some duplications) VmWorkRevertToVMSnapshot workInfo = new VmWorkRevertToVMSnapshot(callingUser.getId(), callingAccount.getId(), vm.getId(), @@ -969,6 +972,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana workJob.setStep(VmWorkJobVO.Step.Starting); workJob.setVmType(vm.getType()); workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); // save work context info (there are some duplications) VmWorkDeleteAllVMSnapshots workInfo = new VmWorkDeleteAllVMSnapshots(callingUser.getId(), callingAccount.getId(), vm.getId(),