diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java index 981b4471466..df51a3c5266 100755 --- a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java +++ b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java @@ -347,7 +347,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac Long.class, "vm.job.timeout", "600000", "Time in milliseconds to wait before attempting to cancel a job", false); static final ConfigKey VmJobStateReportInterval = new ConfigKey("Advanced", - Integer.class, "vm.job.report.interval", "60", + Integer.class, "vm.job.report.interval", "60", "Interval to send application level pings to make sure the connection is still working", false); ScheduledExecutorService _executor = null; @@ -740,18 +740,18 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Override public void advanceStart(String vmUuid, Map params, DeploymentPlanner planner) - throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException { + throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException { advanceStart(vmUuid, params, null, planner); } @Override public void advanceStart(String vmUuid, Map params, DeploymentPlan planToDeploy, DeploymentPlanner planner) throws InsufficientCapacityException, - ConcurrentOperationException, ResourceUnavailableException { + ConcurrentOperationException, ResourceUnavailableException { - AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { - // avoid re-entrance + // avoid re-entrance VmWorkJobVO placeHolder = null; if (VmJobEnabled.value()) { VirtualMachine vm = _vmDao.findByUuid(vmUuid); @@ -763,16 +763,16 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac if (VmJobEnabled.value()) _workJobDao.expunge(placeHolder.getId()); } - } else { - Outcome outcome = startVmThroughJobQueue(vmUuid, params, planToDeploy); + } else { + Outcome outcome = startVmThroughJobQueue(vmUuid, params, planToDeploy); - try { - VirtualMachine vm = outcome.get(); - } catch (InterruptedException e) { - throw new RuntimeException("Operation is interrupted", e); - } catch (java.util.concurrent.ExecutionException e) { - throw new RuntimeException("Execution excetion", e); - } + try { + VirtualMachine vm = outcome.get(); + } catch (InterruptedException e) { + throw new RuntimeException("Operation is interrupted", e); + } catch (java.util.concurrent.ExecutionException e) { + throw new RuntimeException("Execution excetion", e); + } Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob()); if (jobResult != null) { @@ -780,8 +780,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac throw (ConcurrentOperationException)jobResult; else if (jobResult instanceof ResourceUnavailableException) throw (ResourceUnavailableException)jobResult; - } - } + } + } } @@ -789,7 +789,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac public void orchestrateStart(String vmUuid, Map params, DeploymentPlan planToDeploy, DeploymentPlanner planner) throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException { - CallContext cctxt = CallContext.current(); + CallContext cctxt = CallContext.current(); Account account = cctxt.getCallingAccount(); User caller = cctxt.getCallingUser(); @@ -1287,11 +1287,11 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Override public void advanceStop(String vmUuid, boolean cleanUpEvenIfUnableToStop) - throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException { + throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException { - AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { - // avoid re-entrance + // avoid re-entrance VmWorkJobVO placeHolder = null; if (VmJobEnabled.value()) { @@ -1305,16 +1305,16 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac _workJobDao.expunge(placeHolder.getId()); } - } else { - Outcome outcome = stopVmThroughJobQueue(vmUuid, cleanUpEvenIfUnableToStop); + } else { + Outcome outcome = stopVmThroughJobQueue(vmUuid, cleanUpEvenIfUnableToStop); - try { - VirtualMachine vm = outcome.get(); - } catch (InterruptedException e) { - throw new RuntimeException("Operation is interrupted", e); - } catch (java.util.concurrent.ExecutionException e) { - throw new RuntimeException("Execution excetion", e); - } + try { + VirtualMachine vm = outcome.get(); + } catch (InterruptedException e) { + throw new RuntimeException("Operation is interrupted", e); + } catch (java.util.concurrent.ExecutionException e) { + throw new RuntimeException("Execution excetion", e); + } Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob()); if (jobResult != null) { @@ -1324,8 +1324,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac throw (ConcurrentOperationException)jobResult; else if (jobResult instanceof OperationTimedoutException) throw (OperationTimedoutException)jobResult; - } - } + } + } } private void orchestrateStop(String vmUuid, boolean cleanUpEvenIfUnableToStop) throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException { @@ -1593,9 +1593,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Override public void storageMigration(String vmUuid, StoragePool destPool) { - AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { - // avoid re-entrance + // avoid re-entrance VmWorkJobVO placeHolder = null; if (VmJobEnabled.value()) { VirtualMachine vm = _vmDao.findByUuid(vmUuid); @@ -1607,23 +1607,23 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac if (VmJobEnabled.value()) _workJobDao.expunge(placeHolder.getId()); } - } else { - Outcome outcome = migrateVmStorageThroughJobQueue(vmUuid, destPool); + } else { + Outcome outcome = migrateVmStorageThroughJobQueue(vmUuid, destPool); - try { - VirtualMachine vm = outcome.get(); - } catch (InterruptedException e) { - throw new RuntimeException("Operation is interrupted", e); - } catch (java.util.concurrent.ExecutionException e) { - throw new RuntimeException("Execution excetion", e); - } + try { + VirtualMachine vm = outcome.get(); + } catch (InterruptedException e) { + throw new RuntimeException("Operation is interrupted", e); + } catch (java.util.concurrent.ExecutionException e) { + throw new RuntimeException("Execution excetion", e); + } Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob()); if (jobResult != null) { if (jobResult instanceof RuntimeException) throw (RuntimeException)jobResult; - } - } + } + } } private void orchestrateStorageMigration(String vmUuid, StoragePool destPool) { @@ -1683,11 +1683,11 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Override public void migrate(String vmUuid, long srcHostId, DeployDestination dest) - throws ResourceUnavailableException, ConcurrentOperationException { + throws ResourceUnavailableException, ConcurrentOperationException { - AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { - // avoid re-entrance + // avoid re-entrance VmWorkJobVO placeHolder = null; if (VmJobEnabled.value()) { VirtualMachine vm = _vmDao.findByUuid(vmUuid); @@ -1699,27 +1699,27 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac if (VmJobEnabled.value()) _workJobDao.expunge(placeHolder.getId()); } - } else { - Outcome outcome = migrateVmThroughJobQueue(vmUuid, srcHostId, dest); + } else { + Outcome outcome = migrateVmThroughJobQueue(vmUuid, srcHostId, dest); - try { - VirtualMachine vm = outcome.get(); - } catch (InterruptedException e) { - throw new RuntimeException("Operation is interrupted", e); - } catch (java.util.concurrent.ExecutionException e) { - throw new RuntimeException("Execution excetion", e); - } + try { + VirtualMachine vm = outcome.get(); + } catch (InterruptedException e) { + throw new RuntimeException("Operation is interrupted", e); + } catch (java.util.concurrent.ExecutionException e) { + throw new RuntimeException("Execution excetion", e); + } Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob()); - if(jobResult != null) { - if(jobResult instanceof ResourceUnavailableException) - throw (ResourceUnavailableException)jobResult; - else if(jobResult instanceof ConcurrentOperationException) - throw (ConcurrentOperationException)jobResult; - else if(jobResult instanceof RuntimeException) - throw (RuntimeException)jobResult; - } - } + if (jobResult != null) { + if (jobResult instanceof ResourceUnavailableException) + throw (ResourceUnavailableException)jobResult; + else if (jobResult instanceof ConcurrentOperationException) + throw (ConcurrentOperationException)jobResult; + else if (jobResult instanceof RuntimeException) + throw (RuntimeException)jobResult; + } + } } private void orchestrateMigrate(String vmUuid, long srcHostId, DeployDestination dest) throws ResourceUnavailableException, ConcurrentOperationException { @@ -1964,11 +1964,11 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Override public void migrateWithStorage(String vmUuid, long srcHostId, long destHostId, Map volumeToPool) - throws ResourceUnavailableException, ConcurrentOperationException { + throws ResourceUnavailableException, ConcurrentOperationException { - AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { - // avoid re-entrance + // avoid re-entrance VmWorkJobVO placeHolder = null; if (VmJobEnabled.value()) { @@ -1982,29 +1982,29 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac _workJobDao.expunge(placeHolder.getId()); } - } else { + } else { Outcome outcome = migrateVmWithStorageThroughJobQueue(vmUuid, srcHostId, destHostId, volumeToPool); - try { + try { VirtualMachine vm = outcome.get(); - } catch (InterruptedException e) { + } catch (InterruptedException e) { throw new RuntimeException("Operation is interrupted", e); - } catch (java.util.concurrent.ExecutionException e) { + } catch (java.util.concurrent.ExecutionException e) { throw new RuntimeException("Execution excetion", e); - } + } Object jobException = _jobMgr.unmarshallResultObject(outcome.getJob()); - if(jobException != null) { - if(jobException instanceof ResourceUnavailableException) + if (jobException != null) { + if (jobException instanceof ResourceUnavailableException) throw (ResourceUnavailableException)jobException; - else if(jobException instanceof ConcurrentOperationException) - throw (ConcurrentOperationException)jobException; + else if (jobException instanceof ConcurrentOperationException) + throw (ConcurrentOperationException)jobException; } - } + } } private void orchestrateMigrateWithStorage(String vmUuid, long srcHostId, long destHostId, Map volumeToPool) throws ResourceUnavailableException, - ConcurrentOperationException { + ConcurrentOperationException { VMInstanceVO vm = _vmDao.findByUuid(vmUuid); @@ -2259,11 +2259,11 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Override public void advanceReboot(String vmUuid, Map params) - throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException { + throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException { - AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { - // avoid re-entrance + // avoid re-entrance VmWorkJobVO placeHolder = null; if (VmJobEnabled.value()) { VirtualMachine vm = _vmDao.findByUuid(vmUuid); @@ -2275,16 +2275,16 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac if (VmJobEnabled.value()) _workJobDao.expunge(placeHolder.getId()); } - } else { - Outcome outcome = rebootVmThroughJobQueue(vmUuid, params); + } else { + Outcome outcome = rebootVmThroughJobQueue(vmUuid, params); - try { - VirtualMachine vm = outcome.get(); - } catch (InterruptedException e) { - throw new RuntimeException("Operation is interrupted", e); - } catch (java.util.concurrent.ExecutionException e) { - throw new RuntimeException("Execution excetion", e); - } + try { + VirtualMachine vm = outcome.get(); + } catch (InterruptedException e) { + throw new RuntimeException("Operation is interrupted", e); + } catch (java.util.concurrent.ExecutionException e) { + throw new RuntimeException("Execution excetion", e); + } Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob()); if (jobResult != null) { @@ -2294,8 +2294,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac throw (ConcurrentOperationException)jobResult; else if (jobResult instanceof InsufficientCapacityException) throw (InsufficientCapacityException)jobResult; - } - } + } + } } private void orchestrateReboot(String vmUuid, Map params) throws InsufficientCapacityException, ConcurrentOperationException, @@ -2948,9 +2948,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } if(VmJobEnabled.value()) { - if(ping.getHostVmStateReport() != null && ping.getHostVmStateReport().size() > 0) { - _syncMgr.processHostVmStatePingReport(agentId, ping.getHostVmStateReport()); - } + if (ping.getHostVmStateReport() != null && ping.getHostVmStateReport().size() > 0) { + _syncMgr.processHostVmStatePingReport(agentId, ping.getHostVmStateReport()); + } } // take the chance to scan VMs that are stuck in transitional states @@ -2979,10 +2979,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } if(s_logger.isDebugEnabled()) - s_logger.debug("Received startup command from hypervisor host. host id: " + agent.getId()); + s_logger.debug("Received startup command from hypervisor host. host id: " + agent.getId()); if(VmJobEnabled.value()) { - _syncMgr.resetHostSyncState(agent.getId()); + _syncMgr.resetHostSyncState(agent.getId()); } if (forRebalance) { @@ -3193,11 +3193,11 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Override public NicProfile addVmToNetwork(VirtualMachine vm, Network network, NicProfile requested) - throws ConcurrentOperationException, ResourceUnavailableException, InsufficientCapacityException { + throws ConcurrentOperationException, ResourceUnavailableException, InsufficientCapacityException { - AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { - // avoid re-entrance + // avoid re-entrance VmWorkJobVO placeHolder = null; if (VmJobEnabled.value()) { placeHolder = createPlaceHolderWork(vm.getId()); @@ -3208,16 +3208,16 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac if (VmJobEnabled.value()) _workJobDao.expunge(placeHolder.getId()); } - } else { + } else { Outcome outcome = addVmToNetworkThroughJobQueue(vm, network, requested); - try { + try { outcome.get(); - } catch (InterruptedException e) { + } catch (InterruptedException e) { throw new RuntimeException("Operation is interrupted", e); - } catch (java.util.concurrent.ExecutionException e) { + } catch (java.util.concurrent.ExecutionException e) { throw new RuntimeException("Execution excetion", e); - } + } Object jobException = _jobMgr.unmarshallResultObject(outcome.getJob()); if (jobException != null) { @@ -3234,7 +3234,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } throw new RuntimeException("Unexpected job execution result"); - } + } } private NicProfile orchestrateAddVmToNetwork(VirtualMachine vm, Network network, NicProfile requested) throws ConcurrentOperationException, ResourceUnavailableException, @@ -3305,11 +3305,11 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Override public boolean removeNicFromVm(VirtualMachine vm, Nic nic) - throws ConcurrentOperationException, ResourceUnavailableException { + throws ConcurrentOperationException, ResourceUnavailableException { - AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { - // avoid re-entrance + // avoid re-entrance VmWorkJobVO placeHolder = null; if (VmJobEnabled.value()) { placeHolder = createPlaceHolderWork(vm.getId()); @@ -3321,16 +3321,16 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac _workJobDao.expunge(placeHolder.getId()); } - } else { + } else { Outcome outcome = removeNicFromVmThroughJobQueue(vm, nic); - try { + try { outcome.get(); - } catch (InterruptedException e) { + } catch (InterruptedException e) { throw new RuntimeException("Operation is interrupted", e); - } catch (java.util.concurrent.ExecutionException e) { + } catch (java.util.concurrent.ExecutionException e) { throw new RuntimeException("Execution excetion", e); - } + } Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob()); if (jobResult != null) { @@ -3344,8 +3344,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac return (Boolean)jobResult; } - throw new RuntimeException("Job failed with un-handled exception"); - } + throw new RuntimeException("Job failed with un-handled exception"); + } } private boolean orchestrateRemoveNicFromVm(VirtualMachine vm, Nic nic) throws ConcurrentOperationException, ResourceUnavailableException { @@ -3408,8 +3408,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Override @DB public boolean removeVmFromNetwork(VirtualMachine vm, Network network, URI broadcastUri) throws ConcurrentOperationException, ResourceUnavailableException { - // TODO will serialize on the VM object later to resolve operation conflicts - return orchestrateRemoveVmFromNetwork(vm, network, broadcastUri); + // TODO will serialize on the VM object later to resolve operation conflicts + return orchestrateRemoveVmFromNetwork(vm, network, broadcastUri); } @DB @@ -3552,10 +3552,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Override public void migrateForScale(String vmUuid, long srcHostId, DeployDestination dest, Long oldSvcOfferingId) - throws ResourceUnavailableException, ConcurrentOperationException { - AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); + throws ResourceUnavailableException, ConcurrentOperationException { + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { - // avoid re-entrance + // avoid re-entrance VmWorkJobVO placeHolder = null; if (VmJobEnabled.value()) { VirtualMachine vm = _vmDao.findByUuid(vmUuid); @@ -3567,16 +3567,16 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac if (VmJobEnabled.value()) _workJobDao.expunge(placeHolder.getId()); } - } else { - Outcome outcome = migrateVmForScaleThroughJobQueue(vmUuid, srcHostId, dest, oldSvcOfferingId); + } else { + Outcome outcome = migrateVmForScaleThroughJobQueue(vmUuid, srcHostId, dest, oldSvcOfferingId); - try { - VirtualMachine vm = outcome.get(); - } catch (InterruptedException e) { - throw new RuntimeException("Operation is interrupted", e); - } catch (java.util.concurrent.ExecutionException e) { - throw new RuntimeException("Execution excetion", e); - } + try { + VirtualMachine vm = outcome.get(); + } catch (InterruptedException e) { + throw new RuntimeException("Operation is interrupted", e); + } catch (java.util.concurrent.ExecutionException e) { + throw new RuntimeException("Execution excetion", e); + } Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob()); if (jobResult != null) { @@ -3584,14 +3584,14 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac throw (ResourceUnavailableException)jobResult; else if (jobResult instanceof ConcurrentOperationException) throw (ConcurrentOperationException)jobResult; - } - } + } + } } private void orchestrateMigrateForScale(String vmUuid, long srcHostId, DeployDestination dest, Long oldSvcOfferingId) - throws ResourceUnavailableException, ConcurrentOperationException { + throws ResourceUnavailableException, ConcurrentOperationException { - VMInstanceVO vm = _vmDao.findByUuid(vmUuid); + VMInstanceVO vm = _vmDao.findByUuid(vmUuid); s_logger.info("Migrating " + vm + " to " + dest); vm.getServiceOfferingId(); @@ -3775,7 +3775,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } public boolean unplugNic(Network network, NicTO nic, VirtualMachineTO vm, ReservationContext context, DeployDestination dest) throws ConcurrentOperationException, - ResourceUnavailableException { + ResourceUnavailableException { boolean result = true; VMInstanceVO router = _vmDao.findById(vm.getId()); @@ -3809,12 +3809,12 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Override public VMInstanceVO reConfigureVm(String vmUuid, ServiceOffering oldServiceOffering, - boolean reconfiguringOnExistingHost) + boolean reconfiguringOnExistingHost) throws ResourceUnavailableException, InsufficientServerCapacityException, ConcurrentOperationException { - AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { - // avoid re-entrance + // avoid re-entrance VmWorkJobVO placeHolder = null; if (VmJobEnabled.value()) { VirtualMachine vm = _vmDao.findByUuid(vmUuid); @@ -3826,17 +3826,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac if (VmJobEnabled.value()) _workJobDao.expunge(placeHolder.getId()); } - } else { - Outcome outcome = reconfigureVmThroughJobQueue(vmUuid, oldServiceOffering, reconfiguringOnExistingHost); + } else { + Outcome outcome = reconfigureVmThroughJobQueue(vmUuid, oldServiceOffering, reconfiguringOnExistingHost); - VirtualMachine vm = null; - try { - vm = outcome.get(); - } catch (InterruptedException e) { - throw new RuntimeException("Operation is interrupted", e); - } catch (java.util.concurrent.ExecutionException e) { - throw new RuntimeException("Execution excetion", e); - } + VirtualMachine vm = null; + try { + vm = outcome.get(); + } catch (InterruptedException e) { + throw new RuntimeException("Operation is interrupted", e); + } catch (java.util.concurrent.ExecutionException e) { + throw new RuntimeException("Execution excetion", e); + } Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob()); if (jobResult != null) { @@ -3853,7 +3853,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } return (VMInstanceVO)vm; - } + } } private VMInstanceVO orchestrateReConfigureVm(String vmUuid, ServiceOffering oldServiceOffering, boolean reconfiguringOnExistingHost) throws ResourceUnavailableException, @@ -3937,278 +3937,282 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @MessageHandler(topic = Topics.VM_POWER_STATE) private void HandlePowerStateReport(String subject, String senderAddress, Object args) { - assert(args != null); - Long vmId = (Long)args; + assert (args != null); + Long vmId = (Long)args; - List pendingWorkJobs = _workJobDao.listPendingWorkJobs( - VirtualMachine.Type.Instance, vmId); - if(pendingWorkJobs.size() == 0) { - // there is no pending operation job + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( + VirtualMachine.Type.Instance, vmId); + if (pendingWorkJobs.size() == 0) { + // there is no pending operation job VMInstanceVO vm = _vmDao.findById(vmId); - if(vm != null) { - switch(vm.getPowerState()) { - case PowerOn : - handlePowerOnReportWithNoPendingJobsOnVM(vm); - break; + if (vm != null) { + switch (vm.getPowerState()) { + case PowerOn: + handlePowerOnReportWithNoPendingJobsOnVM(vm); + break; - case PowerOff : - handlePowerOffReportWithNoPendingJobsOnVM(vm); - break; + case PowerOff: + handlePowerOffReportWithNoPendingJobsOnVM(vm); + break; - // PowerUnknown shouldn't be reported, it is a derived + // PowerUnknown shouldn't be reported, it is a derived // VM power state from host state (host un-reachable) - case PowerUnknown : - default : - assert(false); - break; - } - } else { - s_logger.warn("VM " + vmId + " no longer exists when processing VM state report"); - } - } else { + case PowerUnknown: + default: + assert (false); + break; + } + } else { + s_logger.warn("VM " + vmId + " no longer exists when processing VM state report"); + } + } else { // reset VM power state tracking so that we won't lost signal when VM has // been translated to _vmDao.resetVmPowerStateTracking(vmId); - } + } } private void handlePowerOnReportWithNoPendingJobsOnVM(VMInstanceVO vm) { - // - // 1) handle left-over transitional VM states - // 2) handle out of band VM live migration - // 3) handle out of sync stationary states, marking VM from Stopped to Running with - // alert messages - // - switch(vm.getState()) { - case Starting : - try { - stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId()); - } catch(NoTransitionException e) { - s_logger.warn("Unexpected VM state transition exception, race-condition?", e); - } + // + // 1) handle left-over transitional VM states + // 2) handle out of band VM live migration + // 3) handle out of sync stationary states, marking VM from Stopped to Running with + // alert messages + // + switch (vm.getState()) { + case Starting: + try { + stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId()); + } catch (NoTransitionException e) { + s_logger.warn("Unexpected VM state transition exception, race-condition?", e); + } - // we need to alert admin or user about this risky state transition - _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(), - VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (Starting -> Running) from out-of-context transition. VM network environment may need to be reset"); - break; + // we need to alert admin or user about this risky state transition + _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(), + VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + + ") state is sync-ed (Starting -> Running) from out-of-context transition. VM network environment may need to be reset"); + break; - case Running : - try { - if(vm.getHostId() != null && vm.getHostId().longValue() != vm.getPowerHostId().longValue()) - s_logger.info("Detected out of band VM migration from host " + vm.getHostId() + " to host " + vm.getPowerHostId()); - stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId()); - } catch(NoTransitionException e) { - s_logger.warn("Unexpected VM state transition exception, race-condition?", e); - } - break; + case Running: + try { + if (vm.getHostId() != null && vm.getHostId().longValue() != vm.getPowerHostId().longValue()) + s_logger.info("Detected out of band VM migration from host " + vm.getHostId() + " to host " + vm.getPowerHostId()); + stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId()); + } catch (NoTransitionException e) { + s_logger.warn("Unexpected VM state transition exception, race-condition?", e); + } + break; - case Stopping : - case Stopped : - try { - stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId()); - } catch(NoTransitionException e) { - s_logger.warn("Unexpected VM state transition exception, race-condition?", e); - } - _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(), - VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (" + vm.getState() + " -> Running) from out-of-context transition. VM network environment may need to be reset"); - break; + case Stopping: + case Stopped: + try { + stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId()); + } catch (NoTransitionException e) { + s_logger.warn("Unexpected VM state transition exception, race-condition?", e); + } + _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(), + VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (" + vm.getState() + + " -> Running) from out-of-context transition. VM network environment may need to be reset"); + break; - case Destroyed : - case Expunging : - s_logger.info("Receive power on report when VM is in destroyed or expunging state. vm: " - + vm.getId() + ", state: " + vm.getState()); - break; + case Destroyed: + case Expunging: + s_logger.info("Receive power on report when VM is in destroyed or expunging state. vm: " + + vm.getId() + ", state: " + vm.getState()); + break; - case Migrating : - try { - stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId()); - } catch(NoTransitionException e) { - s_logger.warn("Unexpected VM state transition exception, race-condition?", e); - } - break; + case Migrating: + try { + stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId()); + } catch (NoTransitionException e) { + s_logger.warn("Unexpected VM state transition exception, race-condition?", e); + } + break; - case Error : - default : - s_logger.info("Receive power on report when VM is in error or unexpected state. vm: " - + vm.getId() + ", state: " + vm.getState()); - break; - } + case Error: + default: + s_logger.info("Receive power on report when VM is in error or unexpected state. vm: " + + vm.getId() + ", state: " + vm.getState()); + break; + } } private void handlePowerOffReportWithNoPendingJobsOnVM(VMInstanceVO vm) { - // 1) handle left-over transitional VM states - // 2) handle out of sync stationary states, schedule force-stop to release resources - // - switch(vm.getState()) { - case Starting : - case Stopping : + // 1) handle left-over transitional VM states + // 2) handle out of sync stationary states, schedule force-stop to release resources + // + switch (vm.getState()) { + case Starting: + case Stopping: case Running: - case Stopped : - case Migrating : - try { - stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOffReport, vm.getPowerHostId()); - } catch(NoTransitionException e) { - s_logger.warn("Unexpected VM state transition exception, race-condition?", e); - } - _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(), - VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (" + vm.getState() + " -> Stopped) from out-of-context transition."); - // TODO: we need to forcely release all resource allocation - break; + case Stopped: + case Migrating: + try { + stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOffReport, vm.getPowerHostId()); + } catch (NoTransitionException e) { + s_logger.warn("Unexpected VM state transition exception, race-condition?", e); + } + _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(), + VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (" + vm.getState() + + " -> Stopped) from out-of-context transition."); + // TODO: we need to forcely release all resource allocation + break; - case Destroyed : - case Expunging : - break; + case Destroyed: + case Expunging: + break; - case Error : - default : - break; - } + case Error: + default: + break; + } } private void scanStalledVMInTransitionStateOnUpHost(long hostId) { - // - // Check VM that is stuck in Starting, Stopping, Migrating states, we won't check - // VMs in expunging state (this need to be handled specially) - // - // checking condition - // 1) no pending VmWork job - // 2) on hostId host and host is UP - // - // When host is UP, soon or later we will get a report from the host about the VM, - // however, if VM is missing from the host report (it may happen in out of band changes - // or from designed behave of XS/KVM), the VM may not get a chance to run the state-sync logic - // - // Therefor, we will scan thoses VMs on UP host based on last update timestamp, if the host is UP - // and a VM stalls for status update, we will consider them to be powered off - // (which is relatively safe to do so) + // + // Check VM that is stuck in Starting, Stopping, Migrating states, we won't check + // VMs in expunging state (this need to be handled specially) + // + // checking condition + // 1) no pending VmWork job + // 2) on hostId host and host is UP + // + // When host is UP, soon or later we will get a report from the host about the VM, + // however, if VM is missing from the host report (it may happen in out of band changes + // or from designed behave of XS/KVM), the VM may not get a chance to run the state-sync logic + // + // Therefore, we will scan thoses VMs on UP host based on last update timestamp, if the host is UP + // and a VM stalls for status update, we will consider them to be powered off + // (which is relatively safe to do so) - long stallThresholdInMs = VmJobStateReportInterval.value() + (VmJobStateReportInterval.value() >> 1); - Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - stallThresholdInMs); - List mostlikelyStoppedVMs = listStalledVMInTransitionStateOnUpHost(hostId, cutTime); - for(Long vmId : mostlikelyStoppedVMs) { - VMInstanceVO vm = _vmDao.findById(vmId); - assert(vm != null); - handlePowerOffReportWithNoPendingJobsOnVM(vm); - } + long stallThresholdInMs = VmJobStateReportInterval.value() + (VmJobStateReportInterval.value() >> 1); + Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - stallThresholdInMs); + List mostlikelyStoppedVMs = listStalledVMInTransitionStateOnUpHost(hostId, cutTime); + for (Long vmId : mostlikelyStoppedVMs) { + VMInstanceVO vm = _vmDao.findById(vmId); + assert (vm != null); + handlePowerOffReportWithNoPendingJobsOnVM(vm); + } - List vmsWithRecentReport = listVMInTransitionStateWithRecentReportOnUpHost(hostId, cutTime); - for(Long vmId : vmsWithRecentReport) { - VMInstanceVO vm = _vmDao.findById(vmId); - assert(vm != null); - if(vm.getPowerState() == PowerState.PowerOn) - handlePowerOnReportWithNoPendingJobsOnVM(vm); - else - handlePowerOffReportWithNoPendingJobsOnVM(vm); - } + List vmsWithRecentReport = listVMInTransitionStateWithRecentReportOnUpHost(hostId, cutTime); + for (Long vmId : vmsWithRecentReport) { + VMInstanceVO vm = _vmDao.findById(vmId); + assert (vm != null); + if (vm.getPowerState() == PowerState.PowerOn) + handlePowerOnReportWithNoPendingJobsOnVM(vm); + else + handlePowerOffReportWithNoPendingJobsOnVM(vm); + } } private void scanStalledVMInTransitionStateOnDisconnectedHosts() { - Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - VmOpWaitInterval.value()*1000); - List stuckAndUncontrollableVMs = listStalledVMInTransitionStateOnDisconnectedHosts(cutTime); - for(Long vmId : stuckAndUncontrollableVMs) { - VMInstanceVO vm = _vmDao.findById(vmId); + Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - VmOpWaitInterval.value() * 1000); + List stuckAndUncontrollableVMs = listStalledVMInTransitionStateOnDisconnectedHosts(cutTime); + for (Long vmId : stuckAndUncontrollableVMs) { + VMInstanceVO vm = _vmDao.findById(vmId); - // We now only alert administrator about this situation - _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(), - VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") is stuck in " + vm.getState() + " state and its host is unreachable for too long"); - } + // We now only alert administrator about this situation + _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(), + VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") is stuck in " + vm.getState() + + " state and its host is unreachable for too long"); + } } // VMs that in transitional state without recent power state report private List listStalledVMInTransitionStateOnUpHost(long hostId, Date cutTime) { - String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status = 'UP' " + + String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status = 'UP' " + "AND h.id = ? AND i.power_state_update_time < ? AND i.host_id = h.id " + - "AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " + - "AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)"; + "AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " + + "AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)"; - List l = new ArrayList(); - TransactionLegacy txn = null; - try { - txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB); + List l = new ArrayList(); + TransactionLegacy txn = null; + try { + txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB); - PreparedStatement pstmt = null; - try { - pstmt = txn.prepareAutoCloseStatement(sql); + PreparedStatement pstmt = null; + try { + pstmt = txn.prepareAutoCloseStatement(sql); - pstmt.setLong(1, hostId); - pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime)); - pstmt.setInt(3, JobInfo.Status.IN_PROGRESS.ordinal()); - ResultSet rs = pstmt.executeQuery(); - while(rs.next()) { - l.add(rs.getLong(1)); - } - } catch (SQLException e) { - } catch (Throwable e) { - } + pstmt.setLong(1, hostId); + pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime)); + pstmt.setInt(3, JobInfo.Status.IN_PROGRESS.ordinal()); + ResultSet rs = pstmt.executeQuery(); + while (rs.next()) { + l.add(rs.getLong(1)); + } + } catch (SQLException e) { + } catch (Throwable e) { + } - } finally { - if(txn != null) - txn.close(); - } + } finally { + if (txn != null) + txn.close(); + } return l; } // VMs that in transitional state and recently have power state update private List listVMInTransitionStateWithRecentReportOnUpHost(long hostId, Date cutTime) { - String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status = 'UP' " + + String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status = 'UP' " + "AND h.id = ? AND i.power_state_update_time > ? AND i.host_id = h.id " + - "AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " + - "AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)"; + "AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " + + "AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)"; - List l = new ArrayList(); - TransactionLegacy txn = null; - try { - txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB); - PreparedStatement pstmt = null; - try { - pstmt = txn.prepareAutoCloseStatement(sql); + List l = new ArrayList(); + TransactionLegacy txn = null; + try { + txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB); + PreparedStatement pstmt = null; + try { + pstmt = txn.prepareAutoCloseStatement(sql); - pstmt.setLong(1, hostId); - pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime)); - pstmt.setInt(3, JobInfo.Status.IN_PROGRESS.ordinal()); - ResultSet rs = pstmt.executeQuery(); - while(rs.next()) { - l.add(rs.getLong(1)); - } - } catch (SQLException e) { - } catch (Throwable e) { - } - return l; - } finally { - if(txn != null) - txn.close(); - } + pstmt.setLong(1, hostId); + pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime)); + pstmt.setInt(3, JobInfo.Status.IN_PROGRESS.ordinal()); + ResultSet rs = pstmt.executeQuery(); + while (rs.next()) { + l.add(rs.getLong(1)); + } + } catch (SQLException e) { + } catch (Throwable e) { + } + return l; + } finally { + if (txn != null) + txn.close(); + } } private List listStalledVMInTransitionStateOnDisconnectedHosts(Date cutTime) { - String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status != 'UP' " + + String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status != 'UP' " + "AND i.power_state_update_time < ? AND i.host_id = h.id " + - "AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " + - "AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)"; + "AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " + + "AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)"; - List l = new ArrayList(); - TransactionLegacy txn = null; - try { - txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB); - PreparedStatement pstmt = null; - try { - pstmt = txn.prepareAutoCloseStatement(sql); + List l = new ArrayList(); + TransactionLegacy txn = null; + try { + txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB); + PreparedStatement pstmt = null; + try { + pstmt = txn.prepareAutoCloseStatement(sql); - pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime)); - pstmt.setInt(2, JobInfo.Status.IN_PROGRESS.ordinal()); - ResultSet rs = pstmt.executeQuery(); - while(rs.next()) { - l.add(rs.getLong(1)); - } - } catch (SQLException e) { - } catch (Throwable e) { - } - return l; - } finally { - if(txn != null) - txn.close(); - } + pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime)); + pstmt.setInt(2, JobInfo.Status.IN_PROGRESS.ordinal()); + ResultSet rs = pstmt.executeQuery(); + while (rs.next()) { + l.add(rs.getLong(1)); + } + } catch (SQLException e) { + } catch (Throwable e) { + } + return l; + } finally { + if (txn != null) + txn.close(); + } } // @@ -4244,9 +4248,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac super(VirtualMachine.class, job, VmJobCheckInterval.value(), new Predicate() { @Override public boolean checkCondition() { - AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, job.getId()); - assert(jobVo != null); - if(jobVo == null || jobVo.getStatus() != JobInfo.Status.IN_PROGRESS) + AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, job.getId()); + assert (jobVo != null); + if (jobVo == null || jobVo.getStatus() != JobInfo.Status.IN_PROGRESS) return true; return false; @@ -4266,58 +4270,58 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac // no time for this at current iteration // public Outcome startVmThroughJobQueue(final String vmUuid, - final Map params, - final DeploymentPlan planToDeploy) { + final Map params, + final DeploymentPlan planToDeploy) { - final CallContext context = CallContext.current(); + final CallContext context = CallContext.current(); final User callingUser = context.getCallingUser(); final Account callingAccount = context.getCallingAccount(); final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); Object[] result = Transaction.execute(new TransactionCallback() { - @Override + @Override public Object[] doInTransaction(TransactionStatus status) { - VmWorkJobVO workJob = null; + VmWorkJobVO workJob = null; - _vmDao.lockRow(vm.getId(), true); - List pendingWorkJobs = _workJobDao.listPendingWorkJobs(VirtualMachine.Type.Instance, - vm.getId(), VmWorkStart.class.getName()); + _vmDao.lockRow(vm.getId(), true); + List pendingWorkJobs = _workJobDao.listPendingWorkJobs(VirtualMachine.Type.Instance, + vm.getId(), VmWorkStart.class.getName()); - if (pendingWorkJobs.size() > 0) { - assert (pendingWorkJobs.size() == 1); - workJob = pendingWorkJobs.get(0); - } else { - workJob = new VmWorkJobVO(context.getContextId()); + if (pendingWorkJobs.size() > 0) { + assert (pendingWorkJobs.size() == 1); + workJob = pendingWorkJobs.get(0); + } else { + workJob = new VmWorkJobVO(context.getContextId()); workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); - workJob.setCmd(VmWorkStart.class.getName()); + workJob.setCmd(VmWorkStart.class.getName()); - workJob.setAccountId(callingAccount.getId()); - workJob.setUserId(callingUser.getId()); - workJob.setStep(VmWorkJobVO.Step.Starting); + workJob.setAccountId(callingAccount.getId()); + workJob.setUserId(callingUser.getId()); + workJob.setStep(VmWorkJobVO.Step.Starting); workJob.setVmType(VirtualMachine.Type.Instance); - workJob.setVmInstanceId(vm.getId()); + workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); - // save work context info (there are some duplications) + // save work context info (there are some duplications) VmWorkStart workInfo = new VmWorkStart(callingUser.getId(), callingAccount.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER); - workInfo.setPlan(planToDeploy); - workInfo.setParams(params); - workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + workInfo.setPlan(planToDeploy); + workInfo.setParams(params); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); - } + } return new Object[] {workJob, new Long(workJob.getId())}; - } - }); + } + }); final long jobId = (Long)result[1]; - AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); return new VmStateSyncOutcome((VmWorkJobVO)result[0], - VirtualMachine.PowerState.PowerOn, vm.getId(), null); + VirtualMachine.PowerState.PowerOn, vm.getId(), null); } public Outcome stopVmThroughJobQueue(final String vmUuid, final boolean cleanup) { @@ -4328,51 +4332,51 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); Object[] result = Transaction.execute(new TransactionCallback() { - @Override + @Override public Object[] doInTransaction(TransactionStatus status) { - _vmDao.lockRow(vm.getId(), true); + _vmDao.lockRow(vm.getId(), true); - List pendingWorkJobs = _workJobDao.listPendingWorkJobs( - vm.getType(), vm.getId(), - VmWorkStop.class.getName()); + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( + vm.getType(), vm.getId(), + VmWorkStop.class.getName()); - VmWorkJobVO workJob = null; - if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { - assert (pendingWorkJobs.size() == 1); - workJob = pendingWorkJobs.get(0); - } else { - workJob = new VmWorkJobVO(context.getContextId()); + VmWorkJobVO workJob = null; + if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { + assert (pendingWorkJobs.size() == 1); + workJob = pendingWorkJobs.get(0); + } else { + workJob = new VmWorkJobVO(context.getContextId()); workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); - workJob.setCmd(VmWorkStop.class.getName()); + workJob.setCmd(VmWorkStop.class.getName()); - workJob.setAccountId(account.getId()); - workJob.setUserId(user.getId()); - workJob.setStep(VmWorkJobVO.Step.Prepare); + workJob.setAccountId(account.getId()); + workJob.setUserId(user.getId()); + workJob.setStep(VmWorkJobVO.Step.Prepare); workJob.setVmType(VirtualMachine.Type.Instance); - workJob.setVmInstanceId(vm.getId()); + workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); - // save work context info (there are some duplications) + // save work context info (there are some duplications) VmWorkStop workInfo = new VmWorkStop(user.getId(), account.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, cleanup); - workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); - } + } return new Object[] {workJob, new Long(workJob.getId())}; - } - }); + } + }); final long jobId = (Long)result[1]; - AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); return new VmStateSyncOutcome((VmWorkJobVO)result[0], - VirtualMachine.PowerState.PowerOff, vm.getId(), null); + VirtualMachine.PowerState.PowerOff, vm.getId(), null); } public Outcome rebootVmThroughJobQueue(final String vmUuid, - final Map params) { + final Map params) { final CallContext context = CallContext.current(); final Account account = context.getCallingAccount(); @@ -4381,47 +4385,47 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); Object[] result = Transaction.execute(new TransactionCallback() { - @Override + @Override public Object[] doInTransaction(TransactionStatus status) { - _vmDao.lockRow(vm.getId(), true); + _vmDao.lockRow(vm.getId(), true); - List pendingWorkJobs = _workJobDao.listPendingWorkJobs( - VirtualMachine.Type.Instance, vm.getId(), - VmWorkReboot.class.getName()); + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( + VirtualMachine.Type.Instance, vm.getId(), + VmWorkReboot.class.getName()); - VmWorkJobVO workJob = null; - if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { - assert (pendingWorkJobs.size() == 1); - workJob = pendingWorkJobs.get(0); - } else { - workJob = new VmWorkJobVO(context.getContextId()); + VmWorkJobVO workJob = null; + if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { + assert (pendingWorkJobs.size() == 1); + workJob = pendingWorkJobs.get(0); + } else { + workJob = new VmWorkJobVO(context.getContextId()); workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); - workJob.setCmd(VmWorkReboot.class.getName()); + workJob.setCmd(VmWorkReboot.class.getName()); - workJob.setAccountId(account.getId()); - workJob.setUserId(user.getId()); - workJob.setStep(VmWorkJobVO.Step.Prepare); + workJob.setAccountId(account.getId()); + workJob.setUserId(user.getId()); + workJob.setStep(VmWorkJobVO.Step.Prepare); workJob.setVmType(VirtualMachine.Type.Instance); - workJob.setVmInstanceId(vm.getId()); + workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); - // save work context info (there are some duplications) + // save work context info (there are some duplications) VmWorkReboot workInfo = new VmWorkReboot(user.getId(), account.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, params); - workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); - } + } return new Object[] {workJob, new Long(workJob.getId())}; - } - }); + } + }); final long jobId = (Long)result[1]; - AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); return new VmJobVirtualMachineOutcome((VmWorkJobVO)result[0], - vm.getId()); + vm.getId()); } public Outcome migrateVmThroughJobQueue(final String vmUuid, final long srcHostId, final DeployDestination dest) { @@ -4432,52 +4436,52 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); Object[] result = Transaction.execute(new TransactionCallback() { - @Override + @Override public Object[] doInTransaction(TransactionStatus status) { - _vmDao.lockRow(vm.getId(), true); + _vmDao.lockRow(vm.getId(), true); - List pendingWorkJobs = _workJobDao.listPendingWorkJobs( - VirtualMachine.Type.Instance, vm.getId(), - VmWorkMigrate.class.getName()); + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( + VirtualMachine.Type.Instance, vm.getId(), + VmWorkMigrate.class.getName()); - VmWorkJobVO workJob = null; - if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { - assert (pendingWorkJobs.size() == 1); - workJob = pendingWorkJobs.get(0); - } else { + VmWorkJobVO workJob = null; + if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { + assert (pendingWorkJobs.size() == 1); + workJob = pendingWorkJobs.get(0); + } else { - workJob = new VmWorkJobVO(context.getContextId()); + workJob = new VmWorkJobVO(context.getContextId()); workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); - workJob.setCmd(VmWorkMigrate.class.getName()); + workJob.setCmd(VmWorkMigrate.class.getName()); - workJob.setAccountId(account.getId()); - workJob.setUserId(user.getId()); + workJob.setAccountId(account.getId()); + workJob.setUserId(user.getId()); workJob.setVmType(VirtualMachine.Type.Instance); - workJob.setVmInstanceId(vm.getId()); + workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); - // save work context info (there are some duplications) + // save work context info (there are some duplications) VmWorkMigrate workInfo = new VmWorkMigrate(user.getId(), account.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, srcHostId, dest); - workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); - } + } return new Object[] {workJob, new Long(workJob.getId())}; - } - }); + } + }); final long jobId = (Long)result[1]; - AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); return new VmStateSyncOutcome((VmWorkJobVO)result[0], - VirtualMachine.PowerState.PowerOn, vm.getId(), vm.getPowerHostId()); + VirtualMachine.PowerState.PowerOn, vm.getId(), vm.getPowerHostId()); } public Outcome migrateVmWithStorageThroughJobQueue( - final String vmUuid, final long srcHostId, final long destHostId, - final Map volumeToPool) { + final String vmUuid, final long srcHostId, final long destHostId, + final Map volumeToPool) { final CallContext context = CallContext.current(); final User user = context.getCallingUser(); @@ -4486,52 +4490,52 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); Object[] result = Transaction.execute(new TransactionCallback() { - @Override + @Override public Object[] doInTransaction(TransactionStatus status) { - _vmDao.lockRow(vm.getId(), true); + _vmDao.lockRow(vm.getId(), true); - List pendingWorkJobs = _workJobDao.listPendingWorkJobs( - VirtualMachine.Type.Instance, vm.getId(), - VmWorkMigrateWithStorage.class.getName()); + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( + VirtualMachine.Type.Instance, vm.getId(), + VmWorkMigrateWithStorage.class.getName()); - VmWorkJobVO workJob = null; - if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { - assert (pendingWorkJobs.size() == 1); - workJob = pendingWorkJobs.get(0); - } else { + VmWorkJobVO workJob = null; + if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { + assert (pendingWorkJobs.size() == 1); + workJob = pendingWorkJobs.get(0); + } else { - workJob = new VmWorkJobVO(context.getContextId()); + workJob = new VmWorkJobVO(context.getContextId()); workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); - workJob.setCmd(VmWorkMigrate.class.getName()); + workJob.setCmd(VmWorkMigrate.class.getName()); - workJob.setAccountId(account.getId()); - workJob.setUserId(user.getId()); + workJob.setAccountId(account.getId()); + workJob.setUserId(user.getId()); workJob.setVmType(VirtualMachine.Type.Instance); - workJob.setVmInstanceId(vm.getId()); + workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); - // save work context info (there are some duplications) - VmWorkMigrateWithStorage workInfo = new VmWorkMigrateWithStorage(user.getId(), account.getId(), vm.getId(), + // save work context info (there are some duplications) + VmWorkMigrateWithStorage workInfo = new VmWorkMigrateWithStorage(user.getId(), account.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, srcHostId, destHostId, volumeToPool); - workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); - } + } return new Object[] {workJob, new Long(workJob.getId())}; - } - }); + } + }); final long jobId = (Long)result[1]; - AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); return new VmStateSyncOutcome((VmWorkJobVO)result[0], - VirtualMachine.PowerState.PowerOn, vm.getId(), destHostId); + VirtualMachine.PowerState.PowerOn, vm.getId(), destHostId); } public Outcome migrateVmForScaleThroughJobQueue( - final String vmUuid, final long srcHostId, final DeployDestination dest, final Long newSvcOfferingId) { + final String vmUuid, final long srcHostId, final DeployDestination dest, final Long newSvcOfferingId) { final CallContext context = CallContext.current(); final User user = context.getCallingUser(); @@ -4540,52 +4544,52 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); Object[] result = Transaction.execute(new TransactionCallback() { - @Override + @Override public Object[] doInTransaction(TransactionStatus status) { - _vmDao.lockRow(vm.getId(), true); + _vmDao.lockRow(vm.getId(), true); - List pendingWorkJobs = _workJobDao.listPendingWorkJobs( - VirtualMachine.Type.Instance, vm.getId(), - VmWorkMigrateForScale.class.getName()); + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( + VirtualMachine.Type.Instance, vm.getId(), + VmWorkMigrateForScale.class.getName()); - VmWorkJobVO workJob = null; - if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { - assert (pendingWorkJobs.size() == 1); - workJob = pendingWorkJobs.get(0); - } else { + VmWorkJobVO workJob = null; + if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { + assert (pendingWorkJobs.size() == 1); + workJob = pendingWorkJobs.get(0); + } else { - workJob = new VmWorkJobVO(context.getContextId()); + workJob = new VmWorkJobVO(context.getContextId()); workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); - workJob.setCmd(VmWorkMigrate.class.getName()); + workJob.setCmd(VmWorkMigrate.class.getName()); - workJob.setAccountId(account.getId()); - workJob.setUserId(user.getId()); + workJob.setAccountId(account.getId()); + workJob.setUserId(user.getId()); workJob.setVmType(VirtualMachine.Type.Instance); - workJob.setVmInstanceId(vm.getId()); + workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); - // save work context info (there are some duplications) - VmWorkMigrateForScale workInfo = new VmWorkMigrateForScale(user.getId(), account.getId(), vm.getId(), + // save work context info (there are some duplications) + VmWorkMigrateForScale workInfo = new VmWorkMigrateForScale(user.getId(), account.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, srcHostId, dest, newSvcOfferingId); - workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); - } + } return new Object[] {workJob, new Long(workJob.getId())}; - } - }); + } + }); final long jobId = (Long)result[1]; - AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); return new VmJobVirtualMachineOutcome((VmWorkJobVO)result[0], vm.getId()); } public Outcome migrateVmStorageThroughJobQueue( - final String vmUuid, final StoragePool destPool) { + final String vmUuid, final StoragePool destPool) { final CallContext context = CallContext.current(); final User user = context.getCallingUser(); @@ -4594,205 +4598,205 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); Object[] result = Transaction.execute(new TransactionCallback() { - @Override + @Override public Object[] doInTransaction(TransactionStatus status) { - _vmDao.lockRow(vm.getId(), true); + _vmDao.lockRow(vm.getId(), true); - List pendingWorkJobs = _workJobDao.listPendingWorkJobs( - VirtualMachine.Type.Instance, vm.getId(), - VmWorkStorageMigration.class.getName()); + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( + VirtualMachine.Type.Instance, vm.getId(), + VmWorkStorageMigration.class.getName()); - VmWorkJobVO workJob = null; - if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { - assert (pendingWorkJobs.size() == 1); - workJob = pendingWorkJobs.get(0); - } else { + VmWorkJobVO workJob = null; + if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { + assert (pendingWorkJobs.size() == 1); + workJob = pendingWorkJobs.get(0); + } else { - workJob = new VmWorkJobVO(context.getContextId()); + workJob = new VmWorkJobVO(context.getContextId()); workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); - workJob.setCmd(VmWorkStorageMigration.class.getName()); + workJob.setCmd(VmWorkStorageMigration.class.getName()); - workJob.setAccountId(account.getId()); - workJob.setUserId(user.getId()); + workJob.setAccountId(account.getId()); + workJob.setUserId(user.getId()); workJob.setVmType(VirtualMachine.Type.Instance); - workJob.setVmInstanceId(vm.getId()); + workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); - // save work context info (there are some duplications) - VmWorkStorageMigration workInfo = new VmWorkStorageMigration(user.getId(), account.getId(), vm.getId(), + // save work context info (there are some duplications) + VmWorkStorageMigration workInfo = new VmWorkStorageMigration(user.getId(), account.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, destPool.getId()); - workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); - } + } return new Object[] {workJob, new Long(workJob.getId())}; - } - }); + } + }); final long jobId = (Long)result[1]; - AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); return new VmJobVirtualMachineOutcome((VmWorkJobVO)result[0], vm.getId()); } public Outcome addVmToNetworkThroughJobQueue( - final VirtualMachine vm, final Network network, final NicProfile requested) { + final VirtualMachine vm, final Network network, final NicProfile requested) { final CallContext context = CallContext.current(); final User user = context.getCallingUser(); final Account account = context.getCallingAccount(); Object[] result = Transaction.execute(new TransactionCallback() { - @Override + @Override public Object[] doInTransaction(TransactionStatus status) { - _vmDao.lockRow(vm.getId(), true); + _vmDao.lockRow(vm.getId(), true); - List pendingWorkJobs = _workJobDao.listPendingWorkJobs( - VirtualMachine.Type.Instance, vm.getId(), - VmWorkAddVmToNetwork.class.getName()); + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( + VirtualMachine.Type.Instance, vm.getId(), + VmWorkAddVmToNetwork.class.getName()); - VmWorkJobVO workJob = null; - if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { - assert (pendingWorkJobs.size() == 1); - workJob = pendingWorkJobs.get(0); - } else { + VmWorkJobVO workJob = null; + if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { + assert (pendingWorkJobs.size() == 1); + workJob = pendingWorkJobs.get(0); + } else { - workJob = new VmWorkJobVO(context.getContextId()); + workJob = new VmWorkJobVO(context.getContextId()); workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); - workJob.setCmd(VmWorkAddVmToNetwork.class.getName()); + workJob.setCmd(VmWorkAddVmToNetwork.class.getName()); - workJob.setAccountId(account.getId()); - workJob.setUserId(user.getId()); + workJob.setAccountId(account.getId()); + workJob.setUserId(user.getId()); workJob.setVmType(VirtualMachine.Type.Instance); - workJob.setVmInstanceId(vm.getId()); + workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); - // save work context info (there are some duplications) - VmWorkAddVmToNetwork workInfo = new VmWorkAddVmToNetwork(user.getId(), account.getId(), vm.getId(), + // save work context info (there are some duplications) + VmWorkAddVmToNetwork workInfo = new VmWorkAddVmToNetwork(user.getId(), account.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, network.getId(), requested); - workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); - } + } return new Object[] {workJob, new Long(workJob.getId())}; - } - }); + } + }); final long jobId = (Long)result[1]; - AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); return new VmJobVirtualMachineOutcome((VmWorkJobVO)result[0], vm.getId()); } public Outcome removeNicFromVmThroughJobQueue( - final VirtualMachine vm, final Nic nic) { - - final CallContext context = CallContext.current(); - final User user = context.getCallingUser(); - final Account account = context.getCallingAccount(); - - Object[] result = Transaction.execute(new TransactionCallback () { - @Override - public Object[] doInTransaction(TransactionStatus status) { - - _vmDao.lockRow(vm.getId(), true); - - List pendingWorkJobs = _workJobDao.listPendingWorkJobs( - VirtualMachine.Type.Instance, vm.getId(), - VmWorkRemoveNicFromVm.class.getName()); - - VmWorkJobVO workJob = null; - if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { - assert (pendingWorkJobs.size() == 1); - workJob = pendingWorkJobs.get(0); - } else { - - workJob = new VmWorkJobVO(context.getContextId()); - - workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); - workJob.setCmd(VmWorkRemoveNicFromVm.class.getName()); - - workJob.setAccountId(account.getId()); - workJob.setUserId(user.getId()); - workJob.setVmType(VirtualMachine.Type.Instance); - workJob.setVmInstanceId(vm.getId()); - workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); - - // save work context info (there are some duplications) - VmWorkRemoveNicFromVm workInfo = new VmWorkRemoveNicFromVm(user.getId(), account.getId(), vm.getId(), - VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, nic.getId()); - workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - - _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); - } - return new Object[] { workJob, new Long(workJob.getId()) }; - } - }); - - final long jobId = (Long)result[1]; - AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); - - return new VmJobVirtualMachineOutcome((VmWorkJobVO)result[0], vm.getId()); - } - - public Outcome removeVmFromNetworkThroughJobQueue( - final VirtualMachine vm, final Network network, final URI broadcastUri) { + final VirtualMachine vm, final Nic nic) { final CallContext context = CallContext.current(); final User user = context.getCallingUser(); final Account account = context.getCallingAccount(); Object[] result = Transaction.execute(new TransactionCallback() { - @Override + @Override public Object[] doInTransaction(TransactionStatus status) { - _vmDao.lockRow(vm.getId(), true); + _vmDao.lockRow(vm.getId(), true); - List pendingWorkJobs = _workJobDao.listPendingWorkJobs( - VirtualMachine.Type.Instance, vm.getId(), - VmWorkRemoveVmFromNetwork.class.getName()); + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( + VirtualMachine.Type.Instance, vm.getId(), + VmWorkRemoveNicFromVm.class.getName()); - VmWorkJobVO workJob = null; - if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { - assert (pendingWorkJobs.size() == 1); - workJob = pendingWorkJobs.get(0); - } else { + VmWorkJobVO workJob = null; + if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { + assert (pendingWorkJobs.size() == 1); + workJob = pendingWorkJobs.get(0); + } else { - workJob = new VmWorkJobVO(context.getContextId()); + workJob = new VmWorkJobVO(context.getContextId()); workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); - workJob.setCmd(VmWorkRemoveVmFromNetwork.class.getName()); + workJob.setCmd(VmWorkRemoveNicFromVm.class.getName()); - workJob.setAccountId(account.getId()); - workJob.setUserId(user.getId()); + workJob.setAccountId(account.getId()); + workJob.setUserId(user.getId()); workJob.setVmType(VirtualMachine.Type.Instance); - workJob.setVmInstanceId(vm.getId()); + workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); - // save work context info (there are some duplications) - VmWorkRemoveVmFromNetwork workInfo = new VmWorkRemoveVmFromNetwork(user.getId(), account.getId(), vm.getId(), - VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, network, broadcastUri); - workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + // save work context info (there are some duplications) + VmWorkRemoveNicFromVm workInfo = new VmWorkRemoveNicFromVm(user.getId(), account.getId(), vm.getId(), + VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, nic.getId()); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); - } + } return new Object[] {workJob, new Long(workJob.getId())}; - } - }); + } + }); final long jobId = (Long)result[1]; - AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + + return new VmJobVirtualMachineOutcome((VmWorkJobVO)result[0], vm.getId()); + } + + public Outcome removeVmFromNetworkThroughJobQueue( + final VirtualMachine vm, final Network network, final URI broadcastUri) { + + final CallContext context = CallContext.current(); + final User user = context.getCallingUser(); + final Account account = context.getCallingAccount(); + + Object[] result = Transaction.execute(new TransactionCallback() { + @Override + public Object[] doInTransaction(TransactionStatus status) { + + _vmDao.lockRow(vm.getId(), true); + + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( + VirtualMachine.Type.Instance, vm.getId(), + VmWorkRemoveVmFromNetwork.class.getName()); + + VmWorkJobVO workJob = null; + if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { + assert (pendingWorkJobs.size() == 1); + workJob = pendingWorkJobs.get(0); + } else { + + workJob = new VmWorkJobVO(context.getContextId()); + + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkRemoveVmFromNetwork.class.getName()); + + workJob.setAccountId(account.getId()); + workJob.setUserId(user.getId()); + workJob.setVmType(VirtualMachine.Type.Instance); + workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); + + // save work context info (there are some duplications) + VmWorkRemoveVmFromNetwork workInfo = new VmWorkRemoveVmFromNetwork(user.getId(), account.getId(), vm.getId(), + VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, network, broadcastUri); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); + } + return new Object[] {workJob, new Long(workJob.getId())}; + } + }); + + final long jobId = (Long)result[1]; + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); return new VmJobVirtualMachineOutcome((VmWorkJobVO)result[0], vm.getId()); } public Outcome reconfigureVmThroughJobQueue( - final String vmUuid, final ServiceOffering newServiceOffering, final boolean reconfiguringOnExistingHost) { + final String vmUuid, final ServiceOffering newServiceOffering, final boolean reconfiguringOnExistingHost) { final CallContext context = CallContext.current(); final User user = context.getCallingUser(); @@ -4801,45 +4805,45 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); Object[] result = Transaction.execute(new TransactionCallback() { - @Override + @Override public Object[] doInTransaction(TransactionStatus status) { - _vmDao.lockRow(vm.getId(), true); + _vmDao.lockRow(vm.getId(), true); - List pendingWorkJobs = _workJobDao.listPendingWorkJobs( - VirtualMachine.Type.Instance, vm.getId(), - VmWorkReconfigure.class.getName()); + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( + VirtualMachine.Type.Instance, vm.getId(), + VmWorkReconfigure.class.getName()); - VmWorkJobVO workJob = null; - if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { - assert (pendingWorkJobs.size() == 1); - workJob = pendingWorkJobs.get(0); - } else { + VmWorkJobVO workJob = null; + if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { + assert (pendingWorkJobs.size() == 1); + workJob = pendingWorkJobs.get(0); + } else { - workJob = new VmWorkJobVO(context.getContextId()); + workJob = new VmWorkJobVO(context.getContextId()); workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); - workJob.setCmd(VmWorkReconfigure.class.getName()); + workJob.setCmd(VmWorkReconfigure.class.getName()); - workJob.setAccountId(account.getId()); - workJob.setUserId(user.getId()); + workJob.setAccountId(account.getId()); + workJob.setUserId(user.getId()); workJob.setVmType(VirtualMachine.Type.Instance); - workJob.setVmInstanceId(vm.getId()); + workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); - // save work context info (there are some duplications) - VmWorkReconfigure workInfo = new VmWorkReconfigure(user.getId(), account.getId(), vm.getId(), + // save work context info (there are some duplications) + VmWorkReconfigure workInfo = new VmWorkReconfigure(user.getId(), account.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, newServiceOffering.getId(), reconfiguringOnExistingHost); - workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); - } + } return new Object[] {workJob, new Long(workJob.getId())}; - } - }); + } + }); final long jobId = (Long)result[1]; - AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); return new VmJobVirtualMachineOutcome((VmWorkJobVO)result[0], vm.getId()); } diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java index 61670bf7043..7b6eed75a00 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java @@ -23,9 +23,14 @@ import org.apache.cloudstack.framework.jobs.impl.SyncQueueItemVO; import com.cloud.utils.db.GenericDao; public interface SyncQueueItemDao extends GenericDao { - public SyncQueueItemVO getNextQueueItem(long queueId); - public List getNextQueueItems(int maxItems); - public List getActiveQueueItems(Long msid, boolean exclusive); - public List getBlockedQueueItems(long thresholdMs, boolean exclusive); - public Long getQueueItemIdByContentIdAndType(long contentId, String contentType); + public SyncQueueItemVO getNextQueueItem(long queueId); + public int getActiveQueueItemCount(long queueId); + + public List getNextQueueItems(int maxItems); + + public List getActiveQueueItems(Long msid, boolean exclusive); + + public List getBlockedQueueItems(long thresholdMs, boolean exclusive); + + public Long getQueueItemIdByContentIdAndType(long contentId, String contentType); } diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java index 2f04a7cc890..41f14190f36 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java @@ -36,6 +36,7 @@ import com.cloud.utils.db.GenericDaoBase; import com.cloud.utils.db.GenericSearchBuilder; import com.cloud.utils.db.SearchBuilder; import com.cloud.utils.db.SearchCriteria; +import com.cloud.utils.db.SearchCriteria.Func; import com.cloud.utils.db.SearchCriteria.Op; import com.cloud.utils.db.TransactionLegacy; @@ -43,7 +44,8 @@ import com.cloud.utils.db.TransactionLegacy; public class SyncQueueItemDaoImpl extends GenericDaoBase implements SyncQueueItemDao { private static final Logger s_logger = Logger.getLogger(SyncQueueItemDaoImpl.class); final GenericSearchBuilder queueIdSearch; - + final GenericSearchBuilder queueActiveItemSearch; + public SyncQueueItemDaoImpl() { super(); queueIdSearch = createSearchBuilder(Long.class); @@ -51,37 +53,52 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase queueIdSearch.and("contentType", queueIdSearch.entity().getContentType(), Op.EQ); queueIdSearch.selectFields(queueIdSearch.entity().getId()); queueIdSearch.done(); + + queueActiveItemSearch = createSearchBuilder(Integer.class); + queueActiveItemSearch.and("queueId", queueActiveItemSearch.entity().getQueueId(), Op.EQ); + queueActiveItemSearch.and("processNumber", queueActiveItemSearch.entity().getLastProcessNumber(), Op.NNULL); + queueActiveItemSearch.select(null, Func.COUNT, queueActiveItemSearch.entity().getId()); + queueActiveItemSearch.done(); } - @Override - public SyncQueueItemVO getNextQueueItem(long queueId) { - - SearchBuilder sb = createSearchBuilder(); + @Override + public SyncQueueItemVO getNextQueueItem(long queueId) { + + SearchBuilder sb = createSearchBuilder(); sb.and("queueId", sb.entity().getQueueId(), SearchCriteria.Op.EQ); - sb.and("lastProcessNumber", sb.entity().getLastProcessNumber(), SearchCriteria.Op.NULL); + sb.and("lastProcessNumber", sb.entity().getLastProcessNumber(), SearchCriteria.Op.NULL); sb.done(); - - SearchCriteria sc = sb.create(); - sc.setParameters("queueId", queueId); - - Filter filter = new Filter(SyncQueueItemVO.class, "created", true, 0L, 1L); + + SearchCriteria sc = sb.create(); + sc.setParameters("queueId", queueId); + + Filter filter = new Filter(SyncQueueItemVO.class, "created", true, 0L, 1L); List l = listBy(sc, filter); if(l != null && l.size() > 0) - return l.get(0); - - return null; - } + return l.get(0); - @Override - public List getNextQueueItems(int maxItems) { - List l = new ArrayList(); - - String sql = "SELECT i.id, i.queue_id, i.content_type, i.content_id, i.created " + - " FROM sync_queue AS q JOIN sync_queue_item AS i ON q.id = i.queue_id " + + return null; + } + + @Override + public int getActiveQueueItemCount(long queueId) { + SearchCriteria sc = queueActiveItemSearch.create(); + sc.setParameters("queueId", queueId); + + List count = customSearch(sc, null); + return count.get(0); + } + + @Override + public List getNextQueueItems(int maxItems) { + List l = new ArrayList(); + + String sql = "SELECT i.id, i.queue_id, i.content_type, i.content_id, i.created " + + " FROM sync_queue AS q JOIN sync_queue_item AS i ON q.id = i.queue_id " + " WHERE i.queue_proc_number IS NULL " + - " GROUP BY q.id " + - " ORDER BY i.id " + - " LIMIT 0, ?"; + " GROUP BY q.id " + + " ORDER BY i.id " + + " LIMIT 0, ?"; TransactionLegacy txn = TransactionLegacy.currentTxn(); PreparedStatement pstmt = null; @@ -90,54 +107,54 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase pstmt.setInt(1, maxItems); ResultSet rs = pstmt.executeQuery(); while(rs.next()) { - SyncQueueItemVO item = new SyncQueueItemVO(); - item.setId(rs.getLong(1)); - item.setQueueId(rs.getLong(2)); - item.setContentType(rs.getString(3)); - item.setContentId(rs.getLong(4)); - item.setCreated(DateUtil.parseDateString(TimeZone.getTimeZone("GMT"), rs.getString(5))); - l.add(item); + SyncQueueItemVO item = new SyncQueueItemVO(); + item.setId(rs.getLong(1)); + item.setQueueId(rs.getLong(2)); + item.setContentType(rs.getString(3)); + item.setContentId(rs.getLong(4)); + item.setCreated(DateUtil.parseDateString(TimeZone.getTimeZone("GMT"), rs.getString(5))); + l.add(item); } } catch (SQLException e) { - s_logger.error("Unexpected sql excetpion, ", e); + s_logger.error("Unexpected sql excetpion, ", e); } catch (Throwable e) { - s_logger.error("Unexpected excetpion, ", e); + s_logger.error("Unexpected excetpion, ", e); } - return l; - } - - @Override - public List getActiveQueueItems(Long msid, boolean exclusive) { - SearchBuilder sb = createSearchBuilder(); + return l; + } + + @Override + public List getActiveQueueItems(Long msid, boolean exclusive) { + SearchBuilder sb = createSearchBuilder(); sb.and("lastProcessMsid", sb.entity().getLastProcessMsid(), - SearchCriteria.Op.EQ); + SearchCriteria.Op.EQ); sb.done(); - - SearchCriteria sc = sb.create(); - sc.setParameters("lastProcessMsid", msid); - - Filter filter = new Filter(SyncQueueItemVO.class, "created", true, null, null); - - if(exclusive) - return lockRows(sc, filter, true); + + SearchCriteria sc = sb.create(); + sc.setParameters("lastProcessMsid", msid); + + Filter filter = new Filter(SyncQueueItemVO.class, "created", true, null, null); + + if (exclusive) + return lockRows(sc, filter, true); return listBy(sc, filter); - } + } @Override public List getBlockedQueueItems(long thresholdMs, boolean exclusive) { Date cutTime = DateUtil.currentGMTTime(); - + SearchBuilder sbItem = createSearchBuilder(); sbItem.and("lastProcessMsid", sbItem.entity().getLastProcessMsid(), SearchCriteria.Op.NNULL); sbItem.and("lastProcessNumber", sbItem.entity().getLastProcessNumber(), SearchCriteria.Op.NNULL); sbItem.and("lastProcessNumber", sbItem.entity().getLastProcessTime(), SearchCriteria.Op.NNULL); sbItem.and("lastProcessTime2", sbItem.entity().getLastProcessTime(), SearchCriteria.Op.LT); - + sbItem.done(); - + SearchCriteria sc = sbItem.create(); sc.setParameters("lastProcessTime2", new Date(cutTime.getTime() - thresholdMs)); - + if(exclusive) return lockRows(sc, null, true); return listBy(sc, null); diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java index a77f864653e..63c365b406d 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java @@ -24,7 +24,6 @@ import java.util.Date; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; @@ -88,12 +87,12 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, private static final Logger s_logger = Logger.getLogger(AsyncJobManagerImpl.class); - private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3; // 3 seconds - private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_SYNC = 60; // 60 seconds + private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3; // 3 seconds + private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_SYNC = 60; // 60 seconds private static final int MAX_ONETIME_SCHEDULE_SIZE = 50; private static final int HEARTBEAT_INTERVAL = 2000; - private static final int GC_INTERVAL = 10000; // 10 seconds + private static final int GC_INTERVAL = 10000; // 10 seconds @Inject private SyncQueueItemDao _queueItemDao; @@ -362,38 +361,38 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, // I removed the temporary solution already. I think my changes should fix the deadlock. /* - ------------------------ - LATEST DETECTED DEADLOCK - ------------------------ - 130625 20:03:10 - *** (1) TRANSACTION: - TRANSACTION 0 98087127, ACTIVE 0 sec, process no 1489, OS thread id 139837829175040 fetching rows, thread declared inside InnoDB 494 - mysql tables in use 2, locked 1 - LOCK WAIT 3 lock struct(s), heap size 368, 2 row lock(s), undo log entries 1 - MySQL thread id 28408, query id 368571321 localhost 127.0.0.1 cloud preparing - UPDATE async_job SET job_pending_signals=1 WHERE id IN (SELECT job_id FROM async_job_join_map WHERE join_job_id = 9) - *** (1) WAITING FOR THIS LOCK TO BE GRANTED: - RECORD LOCKS space id 0 page no 1275 n bits 80 index `PRIMARY` of table `cloud`.`async_job` trx id 0 98087127 lock_mode X locks rec but not gap waiting - Record lock, heap no 9 PHYSICAL RECORD: n_fields 26; compact format; info bits 0 - 0: len 8; hex 0000000000000008; asc ;; 1: len 6; hex 000005d8b0d8; asc ;; 2: len 7; hex 00000009270110; asc ' ;; 3: len 8; hex 0000000000000002; asc ;; 4: len 8; hex 0000000000000002; asc ;; 5: SQL NULL; 6: SQL NULL; 7: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e636f6d6d; asc org.apache.cloudstack.api.comm;...(truncated); 8: len 30; hex 7b226964223a2232222c22706879736963616c6e6574776f726b6964223a; asc {"id":"2","physicalnetworkid":;...(truncated); 9: len 4; hex 80000000; asc ;; 10: len 4; hex 80000001; asc ;; 11: len 4; hex 80000000; asc ;; 12: len 4; hex 80000000; asc ;; 13: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e72657370; asc org.apache.cloudstack.api.resp;...(truncated); 14: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 15: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 16: len 8; hex 8000124f06cfd5b6; asc O ;; 17: len 8; hex 8000124f06cfd5b6; asc O ;; 18: SQL NULL; 19: SQL NULL; 20: len 30; hex 66376466396532362d323139622d346338652d393231332d393766653636; asc f7df9e26-219b-4c8e-9213-97fe66;...(truncated); 21: len 30; hex 36623238306364362d663436652d343563322d383833642d333863616439; asc 6b280cd6-f46e-45c2-883d-38cad9;...(truncated); 22: SQL NULL; 23: len 21; hex 4170694173796e634a6f6244697370617463686572; asc ApiAsyncJobDispatcher;; 24: SQL NULL; 25: len 4; hex 80000000; asc ;; + ------------------------ + LATEST DETECTED DEADLOCK + ------------------------ + 130625 20:03:10 + *** (1) TRANSACTION: + TRANSACTION 0 98087127, ACTIVE 0 sec, process no 1489, OS thread id 139837829175040 fetching rows, thread declared inside InnoDB 494 + mysql tables in use 2, locked 1 + LOCK WAIT 3 lock struct(s), heap size 368, 2 row lock(s), undo log entries 1 + MySQL thread id 28408, query id 368571321 localhost 127.0.0.1 cloud preparing + UPDATE async_job SET job_pending_signals=1 WHERE id IN (SELECT job_id FROM async_job_join_map WHERE join_job_id = 9) + *** (1) WAITING FOR THIS LOCK TO BE GRANTED: + RECORD LOCKS space id 0 page no 1275 n bits 80 index `PRIMARY` of table `cloud`.`async_job` trx id 0 98087127 lock_mode X locks rec but not gap waiting + Record lock, heap no 9 PHYSICAL RECORD: n_fields 26; compact format; info bits 0 + 0: len 8; hex 0000000000000008; asc ;; 1: len 6; hex 000005d8b0d8; asc ;; 2: len 7; hex 00000009270110; asc ' ;; 3: len 8; hex 0000000000000002; asc ;; 4: len 8; hex 0000000000000002; asc ;; 5: SQL NULL; 6: SQL NULL; 7: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e636f6d6d; asc org.apache.cloudstack.api.comm;...(truncated); 8: len 30; hex 7b226964223a2232222c22706879736963616c6e6574776f726b6964223a; asc {"id":"2","physicalnetworkid":;...(truncated); 9: len 4; hex 80000000; asc ;; 10: len 4; hex 80000001; asc ;; 11: len 4; hex 80000000; asc ;; 12: len 4; hex 80000000; asc ;; 13: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e72657370; asc org.apache.cloudstack.api.resp;...(truncated); 14: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 15: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 16: len 8; hex 8000124f06cfd5b6; asc O ;; 17: len 8; hex 8000124f06cfd5b6; asc O ;; 18: SQL NULL; 19: SQL NULL; 20: len 30; hex 66376466396532362d323139622d346338652d393231332d393766653636; asc f7df9e26-219b-4c8e-9213-97fe66;...(truncated); 21: len 30; hex 36623238306364362d663436652d343563322d383833642d333863616439; asc 6b280cd6-f46e-45c2-883d-38cad9;...(truncated); 22: SQL NULL; 23: len 21; hex 4170694173796e634a6f6244697370617463686572; asc ApiAsyncJobDispatcher;; 24: SQL NULL; 25: len 4; hex 80000000; asc ;; - *** (2) TRANSACTION: - TRANSACTION 0 98087128, ACTIVE 0 sec, process no 1489, OS thread id 139837671909120 fetching rows, thread declared inside InnoDB 492 - mysql tables in use 2, locked 1 - 3 lock struct(s), heap size 368, 2 row lock(s), undo log entries 1 - MySQL thread id 28406, query id 368571323 localhost 127.0.0.1 cloud preparing - UPDATE async_job SET job_pending_signals=1 WHERE id IN (SELECT job_id FROM async_job_join_map WHERE join_job_id = 8) - *** (2) HOLDS THE LOCK(S): - RECORD LOCKS space id 0 page no 1275 n bits 80 index `PRIMARY` of table `cloud`.`async_job` trx id 0 98087128 lock_mode X locks rec but not gap - Record lock, heap no 9 PHYSICAL RECORD: n_fields 26; compact format; info bits 0 - 0: len 8; hex 0000000000000008; asc ;; 1: len 6; hex 000005d8b0d8; asc ;; 2: len 7; hex 00000009270110; asc ' ;; 3: len 8; hex 0000000000000002; asc ;; 4: len 8; hex 0000000000000002; asc ;; 5: SQL NULL; 6: SQL NULL; 7: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e636f6d6d; asc org.apache.cloudstack.api.comm;...(truncated); 8: len 30; hex 7b226964223a2232222c22706879736963616c6e6574776f726b6964223a; asc {"id":"2","physicalnetworkid":;...(truncated); 9: len 4; hex 80000000; asc ;; 10: len 4; hex 80000001; asc ;; 11: len 4; hex 80000000; asc ;; 12: len 4; hex 80000000; asc ;; 13: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e72657370; asc org.apache.cloudstack.api.resp;...(truncated); 14: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 15: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 16: len 8; hex 8000124f06cfd5b6; asc O ;; 17: len 8; hex 8000124f06cfd5b6; asc O ;; 18: SQL NULL; 19: SQL NULL; 20: len 30; hex 66376466396532362d323139622d346338652d393231332d393766653636; asc f7df9e26-219b-4c8e-9213-97fe66;...(truncated); 21: len 30; hex 36623238306364362d663436652d343563322d383833642d333863616439; asc 6b280cd6-f46e-45c2-883d-38cad9;...(truncated); 22: SQL NULL; 23: len 21; hex 4170694173796e634a6f6244697370617463686572; asc ApiAsyncJobDispatcher;; 24: SQL NULL; 25: len 4; hex 80000000; asc ;; + *** (2) TRANSACTION: + TRANSACTION 0 98087128, ACTIVE 0 sec, process no 1489, OS thread id 139837671909120 fetching rows, thread declared inside InnoDB 492 + mysql tables in use 2, locked 1 + 3 lock struct(s), heap size 368, 2 row lock(s), undo log entries 1 + MySQL thread id 28406, query id 368571323 localhost 127.0.0.1 cloud preparing + UPDATE async_job SET job_pending_signals=1 WHERE id IN (SELECT job_id FROM async_job_join_map WHERE join_job_id = 8) + *** (2) HOLDS THE LOCK(S): + RECORD LOCKS space id 0 page no 1275 n bits 80 index `PRIMARY` of table `cloud`.`async_job` trx id 0 98087128 lock_mode X locks rec but not gap + Record lock, heap no 9 PHYSICAL RECORD: n_fields 26; compact format; info bits 0 + 0: len 8; hex 0000000000000008; asc ;; 1: len 6; hex 000005d8b0d8; asc ;; 2: len 7; hex 00000009270110; asc ' ;; 3: len 8; hex 0000000000000002; asc ;; 4: len 8; hex 0000000000000002; asc ;; 5: SQL NULL; 6: SQL NULL; 7: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e636f6d6d; asc org.apache.cloudstack.api.comm;...(truncated); 8: len 30; hex 7b226964223a2232222c22706879736963616c6e6574776f726b6964223a; asc {"id":"2","physicalnetworkid":;...(truncated); 9: len 4; hex 80000000; asc ;; 10: len 4; hex 80000001; asc ;; 11: len 4; hex 80000000; asc ;; 12: len 4; hex 80000000; asc ;; 13: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e72657370; asc org.apache.cloudstack.api.resp;...(truncated); 14: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 15: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 16: len 8; hex 8000124f06cfd5b6; asc O ;; 17: len 8; hex 8000124f06cfd5b6; asc O ;; 18: SQL NULL; 19: SQL NULL; 20: len 30; hex 66376466396532362d323139622d346338652d393231332d393766653636; asc f7df9e26-219b-4c8e-9213-97fe66;...(truncated); 21: len 30; hex 36623238306364362d663436652d343563322d383833642d333863616439; asc 6b280cd6-f46e-45c2-883d-38cad9;...(truncated); 22: SQL NULL; 23: len 21; hex 4170694173796e634a6f6244697370617463686572; asc ApiAsyncJobDispatcher;; 24: SQL NULL; 25: len 4; hex 80000000; asc ;; - *** (2) WAITING FOR THIS LOCK TO BE GRANTED: - RECORD LOCKS space id 0 page no 1275 n bits 80 index `PRIMARY` of table `cloud`.`async_job` trx id 0 98087128 lock_mode X locks rec but not gap waiting - Record lock, heap no 10 PHYSICAL RECORD: n_fields 26; compact format; info bits 0 - 0: len 8; hex 0000000000000009; asc ;; 1: len 6; hex 000005d8b0d7; asc ;; 2: len 7; hex 00000009280110; asc ( ;; 3: len 8; hex 0000000000000002; asc ;; 4: len 8; hex 0000000000000002; asc ;; 5: SQL NULL; 6: SQL NULL; 7: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e636f6d6d; asc org.apache.cloudstack.api.comm;...(truncated); 8: len 30; hex 7b226964223a2233222c22706879736963616c6e6574776f726b6964223a; asc {"id":"3","physicalnetworkid":;...(truncated); 9: len 4; hex 80000000; asc ;; 10: len 4; hex 80000001; asc ;; 11: len 4; hex 80000000; asc ;; 12: len 4; hex 80000000; asc ;; 13: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e72657370; asc org.apache.cloudstack.api.resp;...(truncated); 14: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 15: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 16: len 8; hex 8000124f06cfd5b6; asc O ;; 17: len 8; hex 8000124f06cfd5b6; asc O ;; 18: SQL NULL; 19: SQL NULL; 20: len 30; hex 62313065306432342d336233352d343663622d386361622d623933623562; asc b10e0d24-3b35-46cb-8cab-b93b5b;...(truncated); 21: len 30; hex 39353664383563632d383336622d346663612d623738622d646238343739; asc 956d85cc-836b-4fca-b78b-db8479;...(truncated); 22: SQL NULL; 23: len 21; hex 4170694173796e634a6f6244697370617463686572; asc ApiAsyncJobDispatcher;; 24: SQL NULL; 25: len 4; hex 80000000; asc ;; + *** (2) WAITING FOR THIS LOCK TO BE GRANTED: + RECORD LOCKS space id 0 page no 1275 n bits 80 index `PRIMARY` of table `cloud`.`async_job` trx id 0 98087128 lock_mode X locks rec but not gap waiting + Record lock, heap no 10 PHYSICAL RECORD: n_fields 26; compact format; info bits 0 + 0: len 8; hex 0000000000000009; asc ;; 1: len 6; hex 000005d8b0d7; asc ;; 2: len 7; hex 00000009280110; asc ( ;; 3: len 8; hex 0000000000000002; asc ;; 4: len 8; hex 0000000000000002; asc ;; 5: SQL NULL; 6: SQL NULL; 7: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e636f6d6d; asc org.apache.cloudstack.api.comm;...(truncated); 8: len 30; hex 7b226964223a2233222c22706879736963616c6e6574776f726b6964223a; asc {"id":"3","physicalnetworkid":;...(truncated); 9: len 4; hex 80000000; asc ;; 10: len 4; hex 80000001; asc ;; 11: len 4; hex 80000000; asc ;; 12: len 4; hex 80000000; asc ;; 13: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e72657370; asc org.apache.cloudstack.api.resp;...(truncated); 14: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 15: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 16: len 8; hex 8000124f06cfd5b6; asc O ;; 17: len 8; hex 8000124f06cfd5b6; asc O ;; 18: SQL NULL; 19: SQL NULL; 20: len 30; hex 62313065306432342d336233352d343663622d386361622d623933623562; asc b10e0d24-3b35-46cb-8cab-b93b5b;...(truncated); 21: len 30; hex 39353664383563632d383336622d346663612d623738622d646238343739; asc 956d85cc-836b-4fca-b78b-db8479;...(truncated); 22: SQL NULL; 23: len 21; hex 4170694173796e634a6f6244697370617463686572; asc ApiAsyncJobDispatcher;; 24: SQL NULL; 25: len 4; hex 80000000; asc ;; - *** WE ROLL BACK TRANSACTION (2) + *** WE ROLL BACK TRANSACTION (2) */ _joinMapDao.completeJoin(joinJobId, joinStatus, joinResult, getMsid()); @@ -406,23 +405,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, } SyncQueueVO queue = null; - - // to deal with temporary DB exceptions like DB deadlock/Lock-wait time out cased rollbacks - // we retry five times until we throw an exception - Random random = new Random(); - - for (int i = 0; i < 5; i++) { - queue = _queueMgr.queue(syncObjType, syncObjId, SyncQueueItem.AsyncJobContentType, job.getId(), queueSizeLimit); - if (queue != null) { - break; - } - - try { - Thread.sleep(1000 + random.nextInt(5000)); - } catch (InterruptedException e) { - } - } - + queue = _queueMgr.queue(syncObjType, syncObjId, SyncQueueItem.AsyncJobContentType, job.getId(), queueSizeLimit); if (queue == null) throw new CloudRuntimeException("Unable to insert queue item into database, DB is full?"); } diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java index 7fb02454c88..9d3bf80b7dd 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java @@ -23,6 +23,7 @@ import java.util.List; import javax.inject.Inject; import org.apache.log4j.Logger; + import org.apache.cloudstack.framework.jobs.dao.SyncQueueDao; import org.apache.cloudstack.framework.jobs.dao.SyncQueueItemDao; @@ -146,18 +147,18 @@ public class SyncQueueManagerImpl extends ManagerBase implements SyncQueueManage processNumber = new Long(1); else processNumber = processNumber + 1; - + Date dt = DateUtil.currentGMTTime(); queueVO.setLastProcessNumber(processNumber); queueVO.setLastUpdated(dt); queueVO.setQueueSize(queueVO.getQueueSize() + 1); _syncQueueDao.update(queueVO.getId(), queueVO); - + itemVO.setLastProcessMsid(msid); itemVO.setLastProcessNumber(processNumber); itemVO.setLastProcessTime(dt); _syncQueueItemDao.update(item.getId(), itemVO); - + resultList.add(item); } } @@ -183,9 +184,9 @@ public class SyncQueueManagerImpl extends ManagerBase implements SyncQueueManage SyncQueueItemVO itemVO = _syncQueueItemDao.findById(queueItemId); if(itemVO != null) { SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true); - + _syncQueueItemDao.expunge(itemVO.getId()); - + // if item is active, reset queue information if (itemVO.getLastProcessMsid() != null) { queueVO.setLastUpdated(DateUtil.currentGMTTime()); @@ -239,18 +240,15 @@ public class SyncQueueManagerImpl extends ManagerBase implements SyncQueueManage } private boolean queueReadyToProcess(SyncQueueVO queueVO) { - return true; - - // - // TODO - // - // Need to disable concurrency disable at queue level due to the need to support - // job wake-up dispatching task - // - // Concurrency control is better done at higher level and leave the job scheduling/serializing simpler - // - - // return queueVO.getQueueSize() < queueVO.getQueueSizeLimit(); + int nActiveItems = _syncQueueItemDao.getActiveQueueItemCount(queueVO.getId()); + if (nActiveItems < queueVO.getQueueSizeLimit()) + return true; + + if (s_logger.isDebugEnabled()) + s_logger.debug("Queue (queue id, sync type, sync id) - (" + queueVO.getId() + + "," + queueVO.getSyncObjType() + ", " + queueVO.getSyncObjId() + + ") is reaching concurrency limit " + queueVO.getQueueSizeLimit()); + return false; } @Override