diff --git a/engine/components-api/src/com/cloud/vm/VmWorkConstants.java b/engine/components-api/src/com/cloud/vm/VmWorkConstants.java new file mode 100644 index 00000000000..20e40b7f84b --- /dev/null +++ b/engine/components-api/src/com/cloud/vm/VmWorkConstants.java @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.vm; + +public interface VmWorkConstants { + public static final String VM_WORK_QUEUE = "VmWorkJobQueue"; + public static final String VM_WORK_JOB_DISPATCHER = "VmWorkJobDispatcher"; + public static final String VM_WORK_JOB_WAKEUP_DISPATCHER = "VmWorkJobWakeupDispatcher"; +} diff --git a/engine/orchestration/src/com/cloud/vm/VmWorkJobHandler.java b/engine/components-api/src/com/cloud/vm/VmWorkJobHandler.java similarity index 100% rename from engine/orchestration/src/com/cloud/vm/VmWorkJobHandler.java rename to engine/components-api/src/com/cloud/vm/VmWorkJobHandler.java diff --git a/engine/orchestration/src/com/cloud/vm/VmWorkSerializer.java b/engine/components-api/src/com/cloud/vm/VmWorkSerializer.java similarity index 100% rename from engine/orchestration/src/com/cloud/vm/VmWorkSerializer.java rename to engine/components-api/src/com/cloud/vm/VmWorkSerializer.java diff --git a/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml b/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml index 49cb9cdf3af..2e35ae5de61 100644 --- a/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml +++ b/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml @@ -75,17 +75,18 @@ - + + - + diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java index 4a5c2cc383a..124ae81b265 100755 --- a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java +++ b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java @@ -211,7 +211,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac public static final String VM_WORK_JOB_HANDLER = VirtualMachineManagerImpl.class.getSimpleName(); private static final String VM_SYNC_ALERT_SUBJECT = "VM state sync alert"; - + @Inject DataStoreManager dataStoreMgr; @Inject @@ -310,7 +310,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Inject protected VirtualMachinePowerStateSync _syncMgr; @Inject protected VmWorkJobDao _workJobDao; @Inject protected AsyncJobManager _jobMgr; - + Map _vmGurus = new HashMap(); protected StateMachine2 _stateMachine; @@ -330,10 +330,12 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac "On destroy, force-stop takes this value ", true); static final ConfigKey ClusterDeltaSyncInterval = new ConfigKey("Advanced", Integer.class, "sync.interval", "60", "Cluster Delta sync interval in seconds", false); - + + // TODO, remove it after transient period is over static final ConfigKey VmJobEnabled = new ConfigKey("Advanced", Boolean.class, "vm.job.enabled", "false", "True to enable new VM sync model. false to use the old way", false); + static final ConfigKey VmJobCheckInterval = new ConfigKey("Advanced", Long.class, "vm.job.check.interval", "3000", "Interval in milliseconds to check if the job is complete", false); @@ -386,17 +388,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac if (s_logger.isDebugEnabled()) { s_logger.debug("Allocating nics for " + vmFinal); } - + try { _networkMgr.allocate(vmProfile, auxiliaryNetworks); } catch (ConcurrentOperationException e) { throw new CloudRuntimeException("Concurrent operation while trying to allocate resources for the VM", e); } - + if (s_logger.isDebugEnabled()) { s_logger.debug("Allocating disks for " + vmFinal); } - + if (template.getFormat() == ImageFormat.ISO) { volumeMgr.allocateRawVolume(Type.ROOT, "ROOT-" + vmFinal.getId(), rootDiskOffering.first(), rootDiskOffering.second(), vmFinal, template, owner); } else if (template.getFormat() == ImageFormat.BAREMETAL) { @@ -404,7 +406,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } else { volumeMgr.allocateTemplatedVolume(Type.ROOT, "ROOT-" + vmFinal.getId(), rootDiskOffering.first(), template, vmFinal, owner); } - + for (Map.Entry offering : dataDiskOfferingsFinal.entrySet()) { volumeMgr.allocateRawVolume(Type.DATADISK, "DATA-" + vmFinal.getId(), offering.getKey(), offering.getValue(), vmFinal, template, owner); } @@ -624,7 +626,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac return new Ternary(null, null, work); } }); - + work = result.third(); if (result.first() != null) return result; @@ -696,21 +698,21 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Override public void advanceStart(String vmUuid, Map params) throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException { - + advanceStart(vmUuid, params, null); } @Override public void advanceStart(String vmUuid, Map params, DeploymentPlan planToDeploy) throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException { - + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); - if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) { + if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance orchestrateStart(vmUuid, params, planToDeploy); } else { Outcome outcome = startVmThroughJobQueue(vmUuid, params, planToDeploy); - + try { VirtualMachine vm = outcome.get(); } catch (InterruptedException e) { @@ -718,8 +720,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } catch (java.util.concurrent.ExecutionException e) { throw new RuntimeException("Execution excetion", e); } - - Throwable jobException = retriveExecutionException(outcome.getJob()); + + Throwable jobException = retrieveExecutionException(outcome.getJob()); if(jobException != null) { if(jobException instanceof ConcurrentOperationException) throw (ConcurrentOperationException)jobException; @@ -728,10 +730,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } } } - + private void orchestrateStart(String vmUuid, Map params, DeploymentPlan planToDeploy) throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException { - + CallContext cctxt = CallContext.current(); Account account = cctxt.getCallingAccount(); User caller = cctxt.getCallingUser(); @@ -1225,14 +1227,14 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Override public void advanceStop(String vmUuid, boolean cleanUpEvenIfUnableToStop) throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException { - + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); - if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) { + if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance orchestrateStop(vmUuid, cleanUpEvenIfUnableToStop); } else { Outcome outcome = stopVmThroughJobQueue(vmUuid, cleanUpEvenIfUnableToStop); - + try { VirtualMachine vm = outcome.get(); } catch (InterruptedException e) { @@ -1240,8 +1242,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } catch (java.util.concurrent.ExecutionException e) { throw new RuntimeException("Execution excetion", e); } - - Throwable jobException = retriveExecutionException(outcome.getJob()); + + Throwable jobException = retrieveExecutionException(outcome.getJob()); if(jobException != null) { if(jobException instanceof AgentUnavailableException) throw (AgentUnavailableException)jobException; @@ -1513,16 +1515,16 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac return true; } - + @Override public void storageMigration(String vmUuid, StoragePool destPool) { AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); - if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) { + if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance orchestrateStorageMigration(vmUuid, destPool); } else { Outcome outcome = migrateVmStorageThroughJobQueue(vmUuid, destPool); - + try { VirtualMachine vm = outcome.get(); } catch (InterruptedException e) { @@ -1530,8 +1532,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } catch (java.util.concurrent.ExecutionException e) { throw new RuntimeException("Execution excetion", e); } - - Throwable jobException = retriveExecutionException(outcome.getJob()); + + Throwable jobException = retrieveExecutionException(outcome.getJob()); if(jobException != null) { if(jobException instanceof RuntimeException) throw (RuntimeException)jobException; @@ -1597,14 +1599,14 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Override public void migrate(String vmUuid, long srcHostId, DeployDestination dest) throws ResourceUnavailableException, ConcurrentOperationException { - + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); - if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) { + if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance orchestrateMigrate(vmUuid, srcHostId, dest); } else { Outcome outcome = migrateVmThroughJobQueue(vmUuid, srcHostId, dest); - + try { VirtualMachine vm = outcome.get(); } catch (InterruptedException e) { @@ -1612,8 +1614,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } catch (java.util.concurrent.ExecutionException e) { throw new RuntimeException("Execution excetion", e); } - - Throwable jobException = retriveExecutionException(outcome.getJob()); + + Throwable jobException = retrieveExecutionException(outcome.getJob()); if(jobException != null) { if(jobException instanceof ResourceUnavailableException) throw (ResourceUnavailableException)jobException; @@ -1624,7 +1626,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } } } - + private void orchestrateMigrate(String vmUuid, long srcHostId, DeployDestination dest) throws ResourceUnavailableException, ConcurrentOperationException { VMInstanceVO vm = _vmDao.findByUuid(vmUuid); if (vm == null) { @@ -1868,14 +1870,14 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Override public void migrateWithStorage(String vmUuid, long srcHostId, long destHostId, Map volumeToPool) throws ResourceUnavailableException, ConcurrentOperationException { - + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); - if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) { + if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance orchestrateMigrateWithStorage(vmUuid, srcHostId, destHostId, volumeToPool); } else { Outcome outcome = migrateVmWithStorageThroughJobQueue(vmUuid, srcHostId, destHostId, volumeToPool); - + try { VirtualMachine vm = outcome.get(); } catch (InterruptedException e) { @@ -1884,7 +1886,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac throw new RuntimeException("Execution excetion", e); } - Throwable jobException = retriveExecutionException(outcome.getJob()); + Throwable jobException = retrieveExecutionException(outcome.getJob()); if(jobException != null) { if(jobException instanceof ResourceUnavailableException) throw (ResourceUnavailableException)jobException; @@ -1893,10 +1895,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } } } - + private void orchestrateMigrateWithStorage(String vmUuid, long srcHostId, long destHostId, Map volumeToPool) throws ResourceUnavailableException, ConcurrentOperationException { - + VMInstanceVO vm = _vmDao.findByUuid(vmUuid); HostVO srcHost = _hostDao.findById(srcHostId); @@ -2143,18 +2145,18 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac throw new CloudRuntimeException("Unable to reboot a VM due to concurrent operation", e); } } - + @Override public void advanceReboot(String vmUuid, Map params) throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException { - + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); - if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) { + if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance orchestrateReboot(vmUuid, params); } else { Outcome outcome = rebootVmThroughJobQueue(vmUuid, params); - + try { VirtualMachine vm = outcome.get(); } catch (InterruptedException e) { @@ -2162,8 +2164,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } catch (java.util.concurrent.ExecutionException e) { throw new RuntimeException("Execution excetion", e); } - - Throwable jobException = retriveExecutionException(outcome.getJob()); + + Throwable jobException = retrieveExecutionException(outcome.getJob()); if(jobException != null) { if(jobException instanceof ResourceUnavailableException) throw (ResourceUnavailableException)jobException; @@ -2174,7 +2176,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } } } - + private void orchestrateReboot(String vmUuid, Map params) throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException { VMInstanceVO vm = _vmDao.findByUuid(vmUuid); @@ -2823,13 +2825,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } } } - + if(VmJobEnabled.value()) { if(ping.getHostVmStateReport() != null && ping.getHostVmStateReport().size() > 0) { _syncMgr.processHostVmStatePingReport(agentId, ping.getHostVmStateReport()); } } - + // take the chance to scan VMs that are stuck in transitional states // and are missing from the report scanStalledVMInTransitionStateOnUpHost(agentId); @@ -2854,14 +2856,14 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac if (!(cmd instanceof StartupRoutingCommand)) { return; } - + if(s_logger.isDebugEnabled()) s_logger.debug("Received startup command from hypervisor host. host id: " + agent.getId()); if(VmJobEnabled.value()) { _syncMgr.resetHostSyncState(agent.getId()); } - + if (forRebalance) { s_logger.debug("Not processing listener " + this + " as connect happens on rebalance process"); return; @@ -2968,7 +2970,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac this.vm = vm; hostUuid = host; this.platform = platform; - + } public AgentVmInfo(String name, VMInstanceVO vm, State state, String host) { @@ -2982,7 +2984,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac public String getHostUuid() { return hostUuid; } - + public String getPlatform() { return platform; } @@ -3066,18 +3068,18 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac vmForUpdate.setServiceOfferingId(newSvcOff.getId()); return _vmDao.update(vmId, vmForUpdate); } - + @Override public NicProfile addVmToNetwork(VirtualMachine vm, Network network, NicProfile requested) throws ConcurrentOperationException, ResourceUnavailableException, InsufficientCapacityException { - + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); - if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) { + if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance return orchestrateAddVmToNetwork(vm, network,requested); } else { Outcome outcome = addVmToNetworkThroughJobQueue(vm, network, requested); - + try { outcome.get(); } catch (InterruptedException e) { @@ -3085,14 +3087,14 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } catch (java.util.concurrent.ExecutionException e) { throw new RuntimeException("Execution excetion", e); } - + AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, outcome.getJob().getId()); if(jobVo.getResultCode() == JobInfo.Status.SUCCEEDED.ordinal()) { - + NicProfile nic = (NicProfile)JobSerializerHelper.fromObjectSerializedString(jobVo.getResult()); return nic; } else { - Throwable jobException = retriveExecutionException(outcome.getJob()); + Throwable jobException = retrieveExecutionException(outcome.getJob()); if(jobException != null) { if(jobException instanceof ResourceUnavailableException) throw (ResourceUnavailableException)jobException; @@ -3107,7 +3109,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } } } - + private NicProfile orchestrateAddVmToNetwork(VirtualMachine vm, Network network, NicProfile requested) throws ConcurrentOperationException, ResourceUnavailableException, InsufficientCapacityException { CallContext cctx = CallContext.current(); @@ -3173,18 +3175,18 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac NicTO nicTO = hvGuru.toNicTO(nic); return nicTO; } - + @Override public boolean removeNicFromVm(VirtualMachine vm, Nic nic) throws ConcurrentOperationException, ResourceUnavailableException { - + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); - if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) { + if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance return orchestrateRemoveNicFromVm(vm, nic); } else { Outcome outcome = removeNicFromVmThroughJobQueue(vm, nic); - + try { outcome.get(); } catch (InterruptedException e) { @@ -3194,12 +3196,12 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, outcome.getJob().getId()); - + if(jobVo.getResultCode() == JobInfo.Status.SUCCEEDED.ordinal()) { Boolean result = (Boolean)JobSerializerHelper.fromObjectSerializedString(jobVo.getResult()); return result; } else { - Throwable jobException = retriveExecutionException(outcome.getJob()); + Throwable jobException = retrieveExecutionException(outcome.getJob()); if(jobException != null) { if(jobException instanceof ResourceUnavailableException) throw (ResourceUnavailableException)jobException; @@ -3208,7 +3210,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac else if(jobException instanceof RuntimeException) throw (RuntimeException)jobException; } - + throw new RuntimeException("Job failed with un-handled exception"); } } @@ -3277,7 +3279,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac // TODO will serialize on the VM object later to resolve operation conflicts return orchestrateRemoveVmFromNetwork(vm, network, broadcastUri); } - + @DB private boolean orchestrateRemoveVmFromNetwork(VirtualMachine vm, Network network, URI broadcastUri) throws ConcurrentOperationException, ResourceUnavailableException { CallContext cctx = CallContext.current(); @@ -3415,17 +3417,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac throw e; } } - + @Override public void migrateForScale(String vmUuid, long srcHostId, DeployDestination dest, Long oldSvcOfferingId) throws ResourceUnavailableException, ConcurrentOperationException { AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); - if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) { + if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance orchestrateMigrateForScale(vmUuid, srcHostId, dest, oldSvcOfferingId); } else { Outcome outcome = migrateVmForScaleThroughJobQueue(vmUuid, srcHostId, dest, oldSvcOfferingId); - + try { VirtualMachine vm = outcome.get(); } catch (InterruptedException e) { @@ -3433,8 +3435,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } catch (java.util.concurrent.ExecutionException e) { throw new RuntimeException("Execution excetion", e); } - - Throwable jobException = retriveExecutionException(outcome.getJob()); + + Throwable jobException = retrieveExecutionException(outcome.getJob()); if(jobException != null) { if(jobException instanceof ResourceUnavailableException) throw (ResourceUnavailableException)jobException; @@ -3446,7 +3448,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac private void orchestrateMigrateForScale(String vmUuid, long srcHostId, DeployDestination dest, Long oldSvcOfferingId) throws ResourceUnavailableException, ConcurrentOperationException { - + VMInstanceVO vm = _vmDao.findByUuid(vmUuid); s_logger.info("Migrating " + vm + " to " + dest); @@ -3662,19 +3664,19 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac return result; } - + @Override public VMInstanceVO reConfigureVm(String vmUuid, ServiceOffering oldServiceOffering, boolean reconfiguringOnExistingHost) throws ResourceUnavailableException, ConcurrentOperationException { AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); - if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) { + if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance return orchestrateReConfigureVm(vmUuid, oldServiceOffering, reconfiguringOnExistingHost); } else { Outcome outcome = reconfigureVmThroughJobQueue(vmUuid, oldServiceOffering, reconfiguringOnExistingHost); - + VirtualMachine vm = null; try { vm = outcome.get(); @@ -3688,19 +3690,19 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac if(jobVo.getResultCode() == JobInfo.Status.SUCCEEDED.ordinal()) { return _entityMgr.findById(VMInstanceVO.class, vm.getId()); } else { - Throwable jobException = retriveExecutionException(outcome.getJob()); + Throwable jobException = retrieveExecutionException(outcome.getJob()); if(jobException != null) { if(jobException instanceof ResourceUnavailableException) throw (ResourceUnavailableException)jobException; else if(jobException instanceof ConcurrentOperationException) throw (ConcurrentOperationException)jobException; } - + throw new RuntimeException("Failed with un-handled exception"); } } } - + private VMInstanceVO orchestrateReConfigureVm(String vmUuid, ServiceOffering oldServiceOffering, boolean reconfiguringOnExistingHost) throws ResourceUnavailableException, ConcurrentOperationException { VMInstanceVO vm = _vmDao.findByUuid(vmUuid); @@ -3775,16 +3777,16 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac _storagePoolAllocators = storagePoolAllocators; } - + // // PowerState report handling for out-of-band changes and handling of left-over transitional VM states // - + @MessageHandler(topic = Topics.VM_POWER_STATE) private void HandlePownerStateReport(Object target, String subject, String senderAddress, Object args) { assert(args != null); Long vmId = (Long)args; - + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( VirtualMachine.Type.Instance, vmId); if(pendingWorkJobs.size() == 0) { @@ -3795,7 +3797,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac case PowerOn : handlePowerOnReportWithNoPendingJobsOnVM(vm); break; - + case PowerOff : handlePowerOffReportWithNoPendingJobsOnVM(vm); break; @@ -3815,7 +3817,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac // we will skip it for nows } } - + private void handlePowerOnReportWithNoPendingJobsOnVM(VMInstanceVO vm) { // // 1) handle left-over transitional VM states @@ -3830,12 +3832,12 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } catch(NoTransitionException e) { s_logger.warn("Unexpected VM state transition exception, race-condition?", e); } - + // we need to alert admin or user about this risky state transition _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(), VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (Starting -> Running) from out-of-context transition. VM network environment may need to be reset"); break; - + case Running : try { if(vm.getHostId() != null && vm.getHostId().longValue() != vm.getPowerHostId().longValue()) @@ -3845,7 +3847,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac s_logger.warn("Unexpected VM state transition exception, race-condition?", e); } break; - + case Stopping : case Stopped : try { @@ -3856,13 +3858,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(), VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (" + vm.getState() + " -> Running) from out-of-context transition. VM network environment may need to be reset"); break; - + case Destroyed : case Expunging : s_logger.info("Receive power on report when VM is in destroyed or expunging state. vm: " + vm.getId() + ", state: " + vm.getState()); break; - + case Migrating : try { stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId()); @@ -3870,7 +3872,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac s_logger.warn("Unexpected VM state transition exception, race-condition?", e); } break; - + case Error : default : s_logger.info("Receive power on report when VM is in error or unexpected state. vm: " @@ -3878,7 +3880,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac break; } } - + private void handlePowerOffReportWithNoPendingJobsOnVM(VMInstanceVO vm) { // 1) handle left-over transitional VM states @@ -3898,18 +3900,18 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (" + vm.getState() + " -> Stopped) from out-of-context transition."); // TODO: we need to forcely release all resource allocation break; - + case Running : case Destroyed : case Expunging : break; - + case Error : default : break; } } - + private void scanStalledVMInTransitionStateOnUpHost(long hostId) { // // Check VM that is stuck in Starting, Stopping, Migrating states, we won't check @@ -3926,7 +3928,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac // Therefor, we will scan thoses VMs on UP host based on last update timestamp, if the host is UP // and a VM stalls for status update, we will consider them to be powered off // (which is relatively safe to do so) - + long stallThresholdInMs = VmJobStateReportInterval.value() + (VmJobStateReportInterval.value() >> 1); Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - stallThresholdInMs); List mostlikelyStoppedVMs = listStalledVMInTransitionStateOnUpHost(hostId, cutTime); @@ -3935,7 +3937,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac assert(vm != null); handlePowerOffReportWithNoPendingJobsOnVM(vm); } - + List vmsWithRecentReport = listVMInTransitionStateWithRecentReportOnUpHost(hostId, cutTime); for(Long vmId : vmsWithRecentReport) { VMInstanceVO vm = _vmDao.findById(vmId); @@ -3946,36 +3948,36 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac handlePowerOffReportWithNoPendingJobsOnVM(vm); } } - + private void scanStalledVMInTransitionStateOnDisconnectedHosts() { Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - VmOpWaitInterval.value()*1000); List stuckAndUncontrollableVMs = listStalledVMInTransitionStateOnDisconnectedHosts(cutTime); for(Long vmId : stuckAndUncontrollableVMs) { VMInstanceVO vm = _vmDao.findById(vmId); - + // We now only alert administrator about this situation _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(), VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") is stuck in " + vm.getState() + " state and its host is unreachable for too long"); } } - - + + // VMs that in transitional state without recent power state report private List listStalledVMInTransitionStateOnUpHost(long hostId, Date cutTime) { String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status = 'UP' " + "AND h.id = ? AND i.power_state_update_time < ? AND i.host_id = h.id " + "AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " + "AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)"; - + List l = new ArrayList(); TransactionLegacy txn = null; try { txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB); - + PreparedStatement pstmt = null; try { pstmt = txn.prepareAutoCloseStatement(sql); - + pstmt.setLong(1, hostId); pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime)); pstmt.setInt(3, JobInfo.Status.IN_PROGRESS.ordinal()); @@ -3986,21 +3988,21 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } catch (SQLException e) { } catch (Throwable e) { } - + } finally { if(txn != null) txn.close(); } return l; } - + // VMs that in transitional state and recently have power state update private List listVMInTransitionStateWithRecentReportOnUpHost(long hostId, Date cutTime) { String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status = 'UP' " + "AND h.id = ? AND i.power_state_update_time > ? AND i.host_id = h.id " + "AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " + "AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)"; - + List l = new ArrayList(); TransactionLegacy txn = null; try { @@ -4008,7 +4010,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac PreparedStatement pstmt = null; try { pstmt = txn.prepareAutoCloseStatement(sql); - + pstmt.setLong(1, hostId); pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime)); pstmt.setInt(3, JobInfo.Status.IN_PROGRESS.ordinal()); @@ -4025,13 +4027,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac txn.close(); } } - + private List listStalledVMInTransitionStateOnDisconnectedHosts(Date cutTime) { String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status != 'UP' " + "AND i.power_state_update_time < ? AND i.host_id = h.id " + "AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " + "AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)"; - + List l = new ArrayList(); TransactionLegacy txn = null; try { @@ -4039,7 +4041,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac PreparedStatement pstmt = null; try { pstmt = txn.prepareAutoCloseStatement(sql); - + pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime)); pstmt.setInt(2, JobInfo.Status.IN_PROGRESS.ordinal()); ResultSet rs = pstmt.executeQuery(); @@ -4055,11 +4057,11 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac txn.close(); } } - + // // VM operation based on new sync model // - + public class VmStateSyncOutcome extends OutcomeImpl { private long _vmId; @@ -4093,7 +4095,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac assert(jobVo != null); if(jobVo == null || jobVo.getStatus() != JobInfo.Status.IN_PROGRESS) return true; - + return false; } }, AsyncJob.Topics.JOB_STATE); @@ -4105,21 +4107,21 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac return _vmDao.findById(_vmId); } } - - public Throwable retriveExecutionException(AsyncJob job) { + + public Throwable retrieveExecutionException(AsyncJob job) { assert(job != null); - assert(job.getDispatcher().equals(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)); - + assert (job.getDispatcher().equals(VmWorkConstants.VM_WORK_JOB_DISPATCHER)); + AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, job.getId()); if(jobVo != null && jobVo.getResult() != null) { Object obj = JobSerializerHelper.fromSerializedString(job.getResult()); - + if(obj != null && obj instanceof Throwable) return (Throwable)obj; } return null; } - + // // TODO build a common pattern to reduce code duplication in following methods // no time for this at current iteration @@ -4127,7 +4129,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac public Outcome startVmThroughJobQueue(final String vmUuid, final Map params, final DeploymentPlan planToDeploy) { - + final CallContext context = CallContext.current(); final User callingUser = context.getCallingUser(); final Account callingAccount = context.getCallingAccount(); @@ -4138,18 +4140,18 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Override public void doInTransactionWithoutResult(TransactionStatus status) { VmWorkJobVO workJob = null; - + _vmDao.lockRow(vm.getId(), true); List pendingWorkJobs = _workJobDao.listPendingWorkJobs(VirtualMachine.Type.Instance, vm.getId(), VmWorkStart.class.getName()); - + if (pendingWorkJobs.size() > 0) { assert (pendingWorkJobs.size() == 1); workJob = pendingWorkJobs.get(0); } else { workJob = new VmWorkJobVO(context.getContextId()); - workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER); + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); workJob.setCmd(VmWorkStart.class.getName()); workJob.setAccountId(callingAccount.getId()); @@ -4164,61 +4166,61 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workInfo.setParams(params); workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId()); + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); } - + // Transaction syntax sugar has a cost here context.putContextParameter("workJob", workJob); context.putContextParameter("jobId", new Long(workJob.getId())); } }); - + final long jobId = (Long)context.getContextParameter("jobId"); AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); - + return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), VirtualMachine.PowerState.PowerOn, vm.getId(), null); } - + public Outcome stopVmThroughJobQueue(final String vmUuid, final boolean cleanup) { final CallContext context = CallContext.current(); final Account account = context.getCallingAccount(); final User user = context.getCallingUser(); final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); - + Transaction.execute(new TransactionCallbackNoReturn () { @Override public void doInTransactionWithoutResult(TransactionStatus status) { _vmDao.lockRow(vm.getId(), true); - + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( VirtualMachine.Type.Instance, vm.getId(), VmWorkStop.class.getName()); - + VmWorkJobVO workJob = null; if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { assert (pendingWorkJobs.size() == 1); workJob = pendingWorkJobs.get(0); } else { workJob = new VmWorkJobVO(context.getContextId()); - - workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER); + + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); workJob.setCmd(VmWorkStop.class.getName()); - + workJob.setAccountId(account.getId()); workJob.setUserId(user.getId()); workJob.setStep(VmWorkJobVO.Step.Prepare); workJob.setVmType(vm.getType()); workJob.setVmInstanceId(vm.getId()); - + // save work context info (there are some duplications) VmWorkStop workInfo = new VmWorkStop(user.getId(), account.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, cleanup); workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - - _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId()); + + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); } - + context.putContextParameter("workJob", workJob); context.putContextParameter("jobId", new Long(workJob.getId())); } @@ -4226,52 +4228,52 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final long jobId = (Long)context.getContextParameter("jobId"); AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); - + return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), VirtualMachine.PowerState.PowerOff, vm.getId(), null); } - + public Outcome rebootVmThroughJobQueue(final String vmUuid, final Map params) { - + final CallContext context = CallContext.current(); final Account account = context.getCallingAccount(); final User user = context.getCallingUser(); final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); - + Transaction.execute(new TransactionCallbackNoReturn () { @Override public void doInTransactionWithoutResult(TransactionStatus status) { _vmDao.lockRow(vm.getId(), true); - + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( VirtualMachine.Type.Instance, vm.getId(), VmWorkReboot.class.getName()); - + VmWorkJobVO workJob = null; if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { assert (pendingWorkJobs.size() == 1); workJob = pendingWorkJobs.get(0); } else { workJob = new VmWorkJobVO(context.getContextId()); - - workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER); + + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); workJob.setCmd(VmWorkReboot.class.getName()); - + workJob.setAccountId(account.getId()); workJob.setUserId(user.getId()); workJob.setStep(VmWorkJobVO.Step.Prepare); workJob.setVmType(vm.getType()); workJob.setVmInstanceId(vm.getId()); - + // save work context info (there are some duplications) VmWorkReboot workInfo = new VmWorkReboot(user.getId(), account.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, params); workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - - _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId()); + + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); } - + context.putContextParameter("workJob", workJob); context.putContextParameter("jobId", new Long(workJob.getId())); } @@ -4279,11 +4281,11 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final long jobId = (Long)context.getContextParameter("jobId"); AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); - + return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId()); } - + public Outcome migrateVmThroughJobQueue(final String vmUuid, final long srcHostId, final DeployDestination dest) { final CallContext context = CallContext.current(); final User user = context.getCallingUser(); @@ -4294,51 +4296,51 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac Transaction.execute(new TransactionCallbackNoReturn () { @Override public void doInTransactionWithoutResult(TransactionStatus status) { - + _vmDao.lockRow(vm.getId(), true); - + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( VirtualMachine.Type.Instance, vm.getId(), VmWorkMigrate.class.getName()); - + VmWorkJobVO workJob = null; if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { assert (pendingWorkJobs.size() == 1); workJob = pendingWorkJobs.get(0); } else { - + workJob = new VmWorkJobVO(context.getContextId()); - - workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER); + + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); workJob.setCmd(VmWorkMigrate.class.getName()); - + workJob.setAccountId(account.getId()); workJob.setUserId(user.getId()); workJob.setVmType(vm.getType()); workJob.setVmInstanceId(vm.getId()); - + // save work context info (there are some duplications) VmWorkMigrate workInfo = new VmWorkMigrate(user.getId(), account.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, srcHostId, dest); workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - - _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId()); + + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); } context.putContextParameter("workJob", workJob); context.putContextParameter("jobId", new Long(workJob.getId())); } }); - + final long jobId = (Long)context.getContextParameter("jobId"); AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); - + return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), VirtualMachine.PowerState.PowerOn, vm.getId(), vm.getPowerHostId()); } - + public Outcome migrateVmWithStorageThroughJobQueue( final String vmUuid, final long srcHostId, final long destHostId, final Map volumeToPool) { - + final CallContext context = CallContext.current(); final User user = context.getCallingUser(); final Account account = context.getCallingAccount(); @@ -4348,51 +4350,51 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac Transaction.execute(new TransactionCallbackNoReturn () { @Override public void doInTransactionWithoutResult(TransactionStatus status) { - + _vmDao.lockRow(vm.getId(), true); - + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( VirtualMachine.Type.Instance, vm.getId(), VmWorkMigrateWithStorage.class.getName()); - + VmWorkJobVO workJob = null; if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { assert (pendingWorkJobs.size() == 1); workJob = pendingWorkJobs.get(0); } else { - + workJob = new VmWorkJobVO(context.getContextId()); - - workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER); + + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); workJob.setCmd(VmWorkMigrate.class.getName()); - + workJob.setAccountId(account.getId()); workJob.setUserId(user.getId()); workJob.setVmType(vm.getType()); workJob.setVmInstanceId(vm.getId()); - + // save work context info (there are some duplications) VmWorkMigrateWithStorage workInfo = new VmWorkMigrateWithStorage(user.getId(), account.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, srcHostId, destHostId, volumeToPool); workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - - _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId()); + + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); } context.putContextParameter("workJob", workJob); context.putContextParameter("jobId", new Long(workJob.getId())); } }); - + final long jobId = (Long)context.getContextParameter("jobId"); AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); - + return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), VirtualMachine.PowerState.PowerOn, vm.getId(), destHostId); } - + public Outcome migrateVmForScaleThroughJobQueue( final String vmUuid, final long srcHostId, final DeployDestination dest, final Long newSvcOfferingId) { - + final CallContext context = CallContext.current(); final User user = context.getCallingUser(); final Account account = context.getCallingAccount(); @@ -4402,50 +4404,50 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac Transaction.execute(new TransactionCallbackNoReturn () { @Override public void doInTransactionWithoutResult(TransactionStatus status) { - + _vmDao.lockRow(vm.getId(), true); - + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( VirtualMachine.Type.Instance, vm.getId(), VmWorkMigrateForScale.class.getName()); - + VmWorkJobVO workJob = null; if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { assert (pendingWorkJobs.size() == 1); workJob = pendingWorkJobs.get(0); } else { - + workJob = new VmWorkJobVO(context.getContextId()); - - workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER); + + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); workJob.setCmd(VmWorkMigrate.class.getName()); - + workJob.setAccountId(account.getId()); workJob.setUserId(user.getId()); workJob.setVmType(vm.getType()); workJob.setVmInstanceId(vm.getId()); - + // save work context info (there are some duplications) VmWorkMigrateForScale workInfo = new VmWorkMigrateForScale(user.getId(), account.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, srcHostId, dest, newSvcOfferingId); workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - - _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId()); + + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); } context.putContextParameter("workJob", workJob); context.putContextParameter("jobId", new Long(workJob.getId())); } }); - + final long jobId = (Long)context.getContextParameter("jobId"); AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); - + return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId()); } - + public Outcome migrateVmStorageThroughJobQueue( final String vmUuid, final StoragePool destPool) { - + final CallContext context = CallContext.current(); final User user = context.getCallingUser(); final Account account = context.getCallingAccount(); @@ -4455,50 +4457,50 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac Transaction.execute(new TransactionCallbackNoReturn () { @Override public void doInTransactionWithoutResult(TransactionStatus status) { - + _vmDao.lockRow(vm.getId(), true); - + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( VirtualMachine.Type.Instance, vm.getId(), VmWorkStorageMigration.class.getName()); - + VmWorkJobVO workJob = null; if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { assert (pendingWorkJobs.size() == 1); workJob = pendingWorkJobs.get(0); } else { - + workJob = new VmWorkJobVO(context.getContextId()); - - workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER); + + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); workJob.setCmd(VmWorkStorageMigration.class.getName()); - + workJob.setAccountId(account.getId()); workJob.setUserId(user.getId()); workJob.setVmType(vm.getType()); workJob.setVmInstanceId(vm.getId()); - + // save work context info (there are some duplications) VmWorkStorageMigration workInfo = new VmWorkStorageMigration(user.getId(), account.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, destPool); workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - - _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId()); + + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); } context.putContextParameter("workJob", workJob); context.putContextParameter("jobId", new Long(workJob.getId())); } }); - + final long jobId = (Long)context.getContextParameter("jobId"); AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); - + return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId()); } - + public Outcome addVmToNetworkThroughJobQueue( final VirtualMachine vm, final Network network, final NicProfile requested) { - + final CallContext context = CallContext.current(); final User user = context.getCallingUser(); final Account account = context.getCallingAccount(); @@ -4506,50 +4508,50 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac Transaction.execute(new TransactionCallbackNoReturn () { @Override public void doInTransactionWithoutResult(TransactionStatus status) { - + _vmDao.lockRow(vm.getId(), true); - + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( VirtualMachine.Type.Instance, vm.getId(), VmWorkAddVmToNetwork.class.getName()); - + VmWorkJobVO workJob = null; if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { assert (pendingWorkJobs.size() == 1); workJob = pendingWorkJobs.get(0); } else { - + workJob = new VmWorkJobVO(context.getContextId()); - - workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER); + + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); workJob.setCmd(VmWorkAddVmToNetwork.class.getName()); - + workJob.setAccountId(account.getId()); workJob.setUserId(user.getId()); workJob.setVmType(vm.getType()); workJob.setVmInstanceId(vm.getId()); - + // save work context info (there are some duplications) VmWorkAddVmToNetwork workInfo = new VmWorkAddVmToNetwork(user.getId(), account.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, network, requested); workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - - _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId()); + + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); } context.putContextParameter("workJob", workJob); context.putContextParameter("jobId", new Long(workJob.getId())); } }); - + final long jobId = (Long)context.getContextParameter("jobId"); AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); - + return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId()); } - + public Outcome removeNicFromVmThroughJobQueue( final VirtualMachine vm, final Nic nic) { - + final CallContext context = CallContext.current(); final User user = context.getCallingUser(); final Account account = context.getCallingAccount(); @@ -4557,50 +4559,50 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac Transaction.execute(new TransactionCallbackNoReturn () { @Override public void doInTransactionWithoutResult(TransactionStatus status) { - + _vmDao.lockRow(vm.getId(), true); - + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( VirtualMachine.Type.Instance, vm.getId(), VmWorkRemoveNicFromVm.class.getName()); - + VmWorkJobVO workJob = null; if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { assert (pendingWorkJobs.size() == 1); workJob = pendingWorkJobs.get(0); } else { - + workJob = new VmWorkJobVO(context.getContextId()); - - workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER); + + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); workJob.setCmd(VmWorkRemoveNicFromVm.class.getName()); - + workJob.setAccountId(account.getId()); workJob.setUserId(user.getId()); workJob.setVmType(vm.getType()); workJob.setVmInstanceId(vm.getId()); - + // save work context info (there are some duplications) VmWorkRemoveNicFromVm workInfo = new VmWorkRemoveNicFromVm(user.getId(), account.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, nic); workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - - _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId()); + + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); } context.putContextParameter("workJob", workJob); context.putContextParameter("jobId", new Long(workJob.getId())); } }); - + final long jobId = (Long)context.getContextParameter("jobId"); AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); - + return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId()); } public Outcome removeVmFromNetworkThroughJobQueue( final VirtualMachine vm, final Network network, final URI broadcastUri) { - + final CallContext context = CallContext.current(); final User user = context.getCallingUser(); final Account account = context.getCallingAccount(); @@ -4608,50 +4610,50 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac Transaction.execute(new TransactionCallbackNoReturn () { @Override public void doInTransactionWithoutResult(TransactionStatus status) { - + _vmDao.lockRow(vm.getId(), true); - + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( VirtualMachine.Type.Instance, vm.getId(), VmWorkRemoveVmFromNetwork.class.getName()); - + VmWorkJobVO workJob = null; if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { assert (pendingWorkJobs.size() == 1); workJob = pendingWorkJobs.get(0); } else { - + workJob = new VmWorkJobVO(context.getContextId()); - - workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER); + + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); workJob.setCmd(VmWorkRemoveVmFromNetwork.class.getName()); - + workJob.setAccountId(account.getId()); workJob.setUserId(user.getId()); workJob.setVmType(vm.getType()); workJob.setVmInstanceId(vm.getId()); - + // save work context info (there are some duplications) VmWorkRemoveVmFromNetwork workInfo = new VmWorkRemoveVmFromNetwork(user.getId(), account.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, network, broadcastUri); workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - - _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId()); + + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); } context.putContextParameter("workJob", workJob); context.putContextParameter("jobId", new Long(workJob.getId())); } }); - + final long jobId = (Long)context.getContextParameter("jobId"); AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); - + return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId()); } - + public Outcome reconfigureVmThroughJobQueue( final String vmUuid, final ServiceOffering oldServiceOffering, final boolean reconfiguringOnExistingHost) { - + final CallContext context = CallContext.current(); final User user = context.getCallingUser(); final Account account = context.getCallingAccount(); @@ -4661,47 +4663,47 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac Transaction.execute(new TransactionCallbackNoReturn () { @Override public void doInTransactionWithoutResult(TransactionStatus status) { - + _vmDao.lockRow(vm.getId(), true); - + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( VirtualMachine.Type.Instance, vm.getId(), VmWorkReconfigure.class.getName()); - + VmWorkJobVO workJob = null; if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { assert (pendingWorkJobs.size() == 1); workJob = pendingWorkJobs.get(0); } else { - + workJob = new VmWorkJobVO(context.getContextId()); - - workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER); + + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); workJob.setCmd(VmWorkReconfigure.class.getName()); - + workJob.setAccountId(account.getId()); workJob.setUserId(user.getId()); workJob.setVmType(vm.getType()); workJob.setVmInstanceId(vm.getId()); - + // save work context info (there are some duplications) VmWorkReconfigure workInfo = new VmWorkReconfigure(user.getId(), account.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, oldServiceOffering, reconfiguringOnExistingHost); workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - - _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId()); + + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); } context.putContextParameter("workJob", workJob); context.putContextParameter("jobId", new Long(workJob.getId())); } }); - + final long jobId = (Long)context.getContextParameter("jobId"); AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); - + return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId()); } - + @Override public Pair handleVmWorkJob(AsyncJob job, VmWork work) throws Exception { diff --git a/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java b/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java index 00eb1ed8d8c..2ad548e56b2 100644 --- a/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java +++ b/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java @@ -36,10 +36,6 @@ import com.cloud.vm.dao.VMInstanceDao; public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatcher { private static final Logger s_logger = Logger.getLogger(VmWorkJobDispatcher.class); - public static final String VM_WORK_QUEUE = "VmWorkJobQueue"; - public static final String VM_WORK_JOB_DISPATCHER = "VmWorkJobDispatcher"; - public static final String VM_WORK_JOB_WAKEUP_DISPATCHER = "VmWorkJobWakeupDispatcher"; - @Inject private VirtualMachineManagerImpl _vmMgr; @Inject private AsyncJobManager _asyncJobMgr; @Inject private VMInstanceDao _instanceDao; @@ -103,79 +99,6 @@ public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatch Pair result = handler.handleVmWorkJob(job, work); _asyncJobMgr.completeAsyncJob(job.getId(), result.first(), 0, result.second()); -/* - VMInstanceVO vm = _instanceDao.findById(work.getVmId()); - if (vm == null) { - s_logger.info("Unable to find vm " + work.getVmId()); - } - assert(vm != null); - if(work instanceof VmWorkStart) { - VmWorkStart workStart = (VmWorkStart)work; - _vmMgr.orchestrateStart(vm.getUuid(), workStart.getParams(), workStart.getPlan()); - _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, null); - } else if(work instanceof VmWorkStop) { - VmWorkStop workStop = (VmWorkStop)work; - _vmMgr.orchestrateStop(vm.getUuid(), workStop.isCleanup()); - _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, null); - } else if(work instanceof VmWorkMigrate) { - VmWorkMigrate workMigrate = (VmWorkMigrate)work; - _vmMgr.orchestrateMigrate(vm.getUuid(), workMigrate.getSrcHostId(), workMigrate.getDeployDestination()); - _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, null); - } else if(work instanceof VmWorkMigrateWithStorage) { - VmWorkMigrateWithStorage workMigrateWithStorage = (VmWorkMigrateWithStorage)work; - _vmMgr.orchestrateMigrateWithStorage(vm.getUuid(), - workMigrateWithStorage.getSrcHostId(), - workMigrateWithStorage.getDestHostId(), - workMigrateWithStorage.getVolumeToPool()); - _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, null); - } else if(work instanceof VmWorkMigrateForScale) { - VmWorkMigrateForScale workMigrateForScale = (VmWorkMigrateForScale)work; - _vmMgr.orchestrateMigrateForScale(vm.getUuid(), - workMigrateForScale.getSrcHostId(), - workMigrateForScale.getDeployDestination(), - workMigrateForScale.getNewServiceOfferringId()); - _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, null); - } else if(work instanceof VmWorkReboot) { - VmWorkReboot workReboot = (VmWorkReboot)work; - _vmMgr.orchestrateReboot(vm.getUuid(), workReboot.getParams()); - _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, null); - } else if(work instanceof VmWorkAddVmToNetwork) { - VmWorkAddVmToNetwork workAddVmToNetwork = (VmWorkAddVmToNetwork)work; - NicProfile nic = _vmMgr.orchestrateAddVmToNetwork(vm, workAddVmToNetwork.getNetwork(), - workAddVmToNetwork.getRequestedNicProfile()); - _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, - JobSerializerHelper.toObjectSerializedString(nic)); - } else if(work instanceof VmWorkRemoveNicFromVm) { - VmWorkRemoveNicFromVm workRemoveNicFromVm = (VmWorkRemoveNicFromVm)work; - boolean result = _vmMgr.orchestrateRemoveNicFromVm(vm, workRemoveNicFromVm.getNic()); - _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, - JobSerializerHelper.toObjectSerializedString(new Boolean(result))); - } else if(work instanceof VmWorkRemoveVmFromNetwork) { - VmWorkRemoveVmFromNetwork workRemoveVmFromNetwork = (VmWorkRemoveVmFromNetwork)work; - boolean result = _vmMgr.orchestrateRemoveVmFromNetwork(vm, - workRemoveVmFromNetwork.getNetwork(), workRemoveVmFromNetwork.getBroadcastUri()); - _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, - JobSerializerHelper.toObjectSerializedString(new Boolean(result))); - } else if(work instanceof VmWorkReconfigure) { - VmWorkReconfigure workReconfigure = (VmWorkReconfigure)work; - _vmMgr.reConfigureVm(vm.getUuid(), workReconfigure.getNewServiceOffering(), - workReconfigure.isSameHost()); - _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, null); - } else if(work instanceof VmWorkStorageMigration) { - VmWorkStorageMigration workStorageMigration = (VmWorkStorageMigration)work; - _vmMgr.orchestrateStorageMigration(vm.getUuid(), workStorageMigration.getDestStoragePool()); - _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, null); - } else { - assert(false); - s_logger.error("Unhandled VM work command: " + job.getCmd()); - - RuntimeException e = new RuntimeException("Unsupported VM work command: " + job.getCmd()); - String exceptionJson = JobSerializerHelper.toSerializedString(e); - s_logger.error("Serialize exception object into json: " + exceptionJson); - _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, exceptionJson); - } - */ - } catch(Throwable e) { s_logger.error("Unable to complete " + job, e); diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/OutcomeImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/OutcomeImpl.java index 03c652c388a..d957811c83f 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/OutcomeImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/OutcomeImpl.java @@ -109,13 +109,11 @@ public class OutcomeImpl implements Outcome { @Override public void execute(Task task) { // TODO Auto-generated method stub - } @Override public void execute(Task task, long wait, TimeUnit unit) { // TODO Auto-generated method stub - } public Predicate getPredicate() { diff --git a/server/src/com/cloud/storage/VmWorkAttachVolume.java b/server/src/com/cloud/storage/VmWorkAttachVolume.java new file mode 100644 index 00000000000..3cdfbb52a2d --- /dev/null +++ b/server/src/com/cloud/storage/VmWorkAttachVolume.java @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.storage; + +import com.cloud.vm.VmWork; + +public class VmWorkAttachVolume extends VmWork { + private static final long serialVersionUID = 553291814854451740L; + + private Long volumeId; + private Long deviceId; + + public VmWorkAttachVolume(long userId, long accountId, long vmId, String handlerName, Long volumeId, Long deviceId) { + super(userId, accountId, vmId, handlerName); + this.volumeId = volumeId; + this.deviceId = deviceId; + } + + public Long getVolumeId() { + return volumeId; + } + + public Long getDeviceId() { + return deviceId; + } +} diff --git a/server/src/com/cloud/storage/VmWorkDetachVolume.java b/server/src/com/cloud/storage/VmWorkDetachVolume.java new file mode 100644 index 00000000000..18262d254aa --- /dev/null +++ b/server/src/com/cloud/storage/VmWorkDetachVolume.java @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.storage; + +import com.cloud.vm.VmWork; + +public class VmWorkDetachVolume extends VmWork { + private static final long serialVersionUID = -8722243207385263101L; + + private Long volumeId; + + public VmWorkDetachVolume(long userId, long accountId, long vmId, String handlerName, Long volumeId) { + super(userId, accountId, vmId, handlerName); + this.volumeId = volumeId; + } + + public Long getVolumeId() { + return volumeId; + } +} diff --git a/server/src/com/cloud/storage/VolumeApiServiceImpl.java b/server/src/com/cloud/storage/VolumeApiServiceImpl.java index 264f2bd3030..dd6358a1a5d 100644 --- a/server/src/com/cloud/storage/VolumeApiServiceImpl.java +++ b/server/src/com/cloud/storage/VolumeApiServiceImpl.java @@ -18,9 +18,9 @@ package com.cloud.storage; import java.util.ArrayList; import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.HashMap; import java.util.UUID; import java.util.concurrent.ExecutionException; @@ -53,10 +53,17 @@ import org.apache.cloudstack.engine.subsystem.api.storage.VolumeInfo; import org.apache.cloudstack.engine.subsystem.api.storage.VolumeService; import org.apache.cloudstack.engine.subsystem.api.storage.VolumeService.VolumeApiResult; import org.apache.cloudstack.framework.async.AsyncCallFuture; +import org.apache.cloudstack.framework.config.ConfigKey; import org.apache.cloudstack.framework.config.dao.ConfigurationDao; import org.apache.cloudstack.framework.jobs.AsyncJob; import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext; import org.apache.cloudstack.framework.jobs.AsyncJobManager; +import org.apache.cloudstack.framework.jobs.Outcome; +import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO; +import org.apache.cloudstack.framework.jobs.impl.JobSerializerHelper; +import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl; +import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO; +import org.apache.cloudstack.jobs.JobInfo; import org.apache.cloudstack.storage.command.AttachAnswer; import org.apache.cloudstack.storage.command.AttachCommand; import org.apache.cloudstack.storage.command.DettachCommand; @@ -130,18 +137,22 @@ import com.cloud.template.TemplateManager; import com.cloud.user.Account; import com.cloud.user.AccountManager; import com.cloud.user.ResourceLimitService; +import com.cloud.user.User; import com.cloud.user.VmDiskStatisticsVO; import com.cloud.user.dao.AccountDao; import com.cloud.user.dao.UserDao; import com.cloud.user.dao.VmDiskStatisticsDao; import com.cloud.utils.EnumUtils; import com.cloud.utils.NumbersUtil; +import com.cloud.utils.Pair; +import com.cloud.utils.Predicate; import com.cloud.utils.UriUtils; import com.cloud.utils.component.ManagerBase; import com.cloud.utils.db.DB; import com.cloud.utils.db.EntityManager; import com.cloud.utils.db.Transaction; import com.cloud.utils.db.TransactionCallback; +import com.cloud.utils.db.TransactionCallbackNoReturn; import com.cloud.utils.db.TransactionStatus; import com.cloud.utils.exception.CloudRuntimeException; import com.cloud.utils.fsm.NoTransitionException; @@ -152,6 +163,10 @@ import com.cloud.vm.VMInstanceVO; import com.cloud.vm.VirtualMachine; import com.cloud.vm.VirtualMachine.State; import com.cloud.vm.VirtualMachineManager; +import com.cloud.vm.VmWork; +import com.cloud.vm.VmWorkConstants; +import com.cloud.vm.VmWorkJobHandler; +import com.cloud.vm.VmWorkSerializer; import com.cloud.vm.dao.ConsoleProxyDao; import com.cloud.vm.dao.DomainRouterDao; import com.cloud.vm.dao.SecondaryStorageVmDao; @@ -160,8 +175,11 @@ import com.cloud.vm.dao.VMInstanceDao; import com.cloud.vm.snapshot.VMSnapshotVO; import com.cloud.vm.snapshot.dao.VMSnapshotDao; -public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiService { +public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiService, VmWorkJobHandler { private final static Logger s_logger = Logger.getLogger(VolumeApiServiceImpl.class); + + public static final String VM_WORK_JOB_HANDLER = VolumeApiServiceImpl.class.getSimpleName(); + @Inject VolumeOrchestrationService _volumeMgr; @@ -307,6 +325,18 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic protected HypervisorCapabilitiesDao _hypervisorCapabilitiesDao; @Inject StorageManager storageMgr; + + @Inject + protected AsyncJobManager _jobMgr; + + // TODO + static final ConfigKey VmJobEnabled = new ConfigKey("Advanced", + Boolean.class, "vm.job.enabled", "false", + "True to enable new VM sync model. false to use the old way", false); + static final ConfigKey VmJobCheckInterval = new ConfigKey("Advanced", + Long.class, "vm.job.check.interval", "3000", + "Interval in milliseconds to check if the job is complete", false); + private int _customDiskOfferingMinSize = 1; private final int _customDiskOfferingMaxSize = 1024; private long _maxVolumeSizeInGb; @@ -379,8 +409,8 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic userSpecifiedName = getRandomVolumeName(); } if ((!url.toLowerCase().endsWith("vhd")) && (!url.toLowerCase().endsWith("vhd.zip")) && (!url.toLowerCase().endsWith("vhd.bz2")) && - (!url.toLowerCase().endsWith("vhdx")) && (!url.toLowerCase().endsWith("vhdx.zip")) && - (!url.toLowerCase().endsWith("vhdx.gz")) && (!url.toLowerCase().endsWith("vhdx.bz2")) && + (!url.toLowerCase().endsWith("vhdx")) && (!url.toLowerCase().endsWith("vhdx.zip")) && + (!url.toLowerCase().endsWith("vhdx.gz")) && (!url.toLowerCase().endsWith("vhdx.bz2")) && (!url.toLowerCase().endsWith("vhd.gz")) && (!url.toLowerCase().endsWith("qcow2")) && (!url.toLowerCase().endsWith("qcow2.zip")) && (!url.toLowerCase().endsWith("qcow2.bz2")) && (!url.toLowerCase().endsWith("qcow2.gz")) && (!url.toLowerCase().endsWith("ova")) && (!url.toLowerCase().endsWith("ova.zip")) && (!url.toLowerCase().endsWith("ova.bz2")) && (!url.toLowerCase().endsWith("ova.gz")) && @@ -1026,12 +1056,37 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic @Override public Volume attachVolumeToVM(AttachVolumeCmd command) { - Long vmId = command.getVirtualMachineId(); - Long volumeId = command.getId(); - Long deviceId = command.getDeviceId(); - return attachVolumeToVM(vmId, volumeId, deviceId); + + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); + if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { + // avoid re-entrance + return orchestrateAttachVolumeToVM(command.getVirtualMachineId(), command.getId(), command.getDeviceId()); + } else { + Outcome outcome = attachVolumeToVmThroughJobQueue(command.getVirtualMachineId(), command.getId(), command.getDeviceId()); + + Volume vol = null; + try { + vol = outcome.get(); + } catch (InterruptedException e) { + throw new RuntimeException("Operation is interrupted", e); + } catch (java.util.concurrent.ExecutionException e) { + throw new RuntimeException("Execution excetion", e); + } + + Throwable jobException = retrieveExecutionException(outcome.getJob()); + if (jobException != null) { + if (jobException instanceof ConcurrentOperationException) + throw (ConcurrentOperationException)jobException; + else + throw new RuntimeException("Unexpected exception", jobException); + } + return vol; + } } + private Volume orchestrateAttachVolumeToVM(Long vmId, Long volumeId, Long deviceId) { + return attachVolumeToVM(vmId, volumeId, deviceId); + } @ActionEvent(eventType = EventTypes.EVENT_VOLUME_ATTACH, eventDescription = "attaching volume", async = true) public Volume attachVolumeToVM(Long vmId, Long volumeId, Long deviceId) { @@ -1214,7 +1269,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic @ActionEvent(eventType = EventTypes.EVENT_VOLUME_UPDATE, eventDescription = "updating volume", async = true) public Volume updateVolume(long volumeId, String path, String state, Long storageId, Boolean displayVolume) { VolumeVO volume = _volumeDao.findById(volumeId); - + if (path != null) { volume.setPath(path); } @@ -1222,7 +1277,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic if (displayVolume != null) { volume.setDisplayVolume(displayVolume); } - + if (state != null) { try { Volume.State volumeState = Volume.State.valueOf(state); @@ -1232,7 +1287,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic throw new InvalidParameterValueException("Invalid volume state specified"); } } - + if (storageId != null) { StoragePool pool = _storagePoolDao.findById(storageId); if (pool.getDataCenterId() != volume.getDataCenterId()) { @@ -1240,7 +1295,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic } volume.setPoolId(pool.getId()); } - + _volumeDao.update(volumeId, volume); return volume; @@ -1315,6 +1370,38 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic _asyncMgr.updateAsyncJobStatus(job.getId(), BaseCmd.PROGRESS_INSTANCE_CREATED, volumeId.toString()); } + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); + if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { + // avoid re-entrance + return orchestrateDetachVolumeFromVM(vmId, volumeId); + } else { + Outcome outcome = detachVolumeFromVmThroughJobQueue(vmId, volumeId); + + Volume vol = null; + try { + vol = outcome.get(); + } catch (InterruptedException e) { + throw new RuntimeException("Operation is interrupted", e); + } catch (java.util.concurrent.ExecutionException e) { + throw new RuntimeException("Execution excetion", e); + } + + Throwable jobException = retrieveExecutionException(outcome.getJob()); + if (jobException != null) { + if (jobException instanceof ConcurrentOperationException) + throw (ConcurrentOperationException)jobException; + else + throw new RuntimeException("Unexpected exception", jobException); + } + return vol; + } + } + + private Volume orchestrateDetachVolumeFromVM(long vmId, long volumeId) { + + Volume volume = _volumeDao.findById(volumeId); + VMInstanceVO vm = _vmInstanceDao.findById(vmId); + String errorMsg = "Failed to detach volume: " + volume.getName() + " from VM: " + vm.getHostName(); boolean sendCommand = (vm.getState() == State.Running); @@ -1861,4 +1948,176 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic _storagePoolAllocators = storagePoolAllocators; } + public Throwable retrieveExecutionException(AsyncJob job) { + assert (job != null); + assert (job.getDispatcher().equals(VmWorkConstants.VM_WORK_JOB_DISPATCHER)); + + AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, job.getId()); + if (jobVo != null && jobVo.getResult() != null) { + Object obj = JobSerializerHelper.fromSerializedString(job.getResult()); + + if (obj != null && obj instanceof Throwable) + return (Throwable)obj; + } + return null; + } + + public class VmJobSyncOutcome extends OutcomeImpl { + private long _volumeId; + + public VmJobSyncOutcome(final AsyncJob job, final long volumeId) { + super(Volume.class, job, VmJobCheckInterval.value(), new Predicate() { + @Override + public boolean checkCondition() { + AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, job.getId()); + assert (jobVo != null); + if (jobVo == null || jobVo.getStatus() != JobInfo.Status.IN_PROGRESS) + return true; + + return false; + } + }, AsyncJob.Topics.JOB_STATE); + _volumeId = volumeId; + } + + @Override + protected Volume retrieve() { + return _volumeDao.findById(_volumeId); + } + } + + public Outcome attachVolumeToVmThroughJobQueue(final Long vmId, final Long volumeId, final Long deviceId) { + + final CallContext context = CallContext.current(); + final User callingUser = context.getCallingUser(); + final Account callingAccount = context.getCallingAccount(); + + final VMInstanceVO vm = _vmInstanceDao.findById(vmId); + + Transaction.execute(new TransactionCallbackNoReturn() { + @Override + public void doInTransactionWithoutResult(TransactionStatus status) { + VmWorkJobVO workJob = null; + + _vmInstanceDao.lockRow(vm.getId(), true); + workJob = new VmWorkJobVO(context.getContextId()); + + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkAttachVolume.class.getName()); + + workJob.setAccountId(callingAccount.getId()); + workJob.setUserId(callingUser.getId()); + workJob.setStep(VmWorkJobVO.Step.Starting); + workJob.setVmType(vm.getType()); + workJob.setVmInstanceId(vm.getId()); + + // save work context info (there are some duplications) + VmWorkAttachVolume workInfo = new VmWorkAttachVolume(callingUser.getId(), callingAccount.getId(), vm.getId(), + VolumeApiServiceImpl.VM_WORK_JOB_HANDLER, volumeId, deviceId); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); + + // Transaction syntax sugar has a cost here + context.putContextParameter("workJob", workJob); + context.putContextParameter("jobId", new Long(workJob.getId())); + } + }); + + final long jobId = (Long)context.getContextParameter("jobId"); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + + return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), + volumeId); + } + + public Outcome detachVolumeFromVmThroughJobQueue(final Long vmId, final Long volumeId) { + + final CallContext context = CallContext.current(); + final User callingUser = context.getCallingUser(); + final Account callingAccount = context.getCallingAccount(); + + final VMInstanceVO vm = _vmInstanceDao.findById(vmId); + + Transaction.execute(new TransactionCallbackNoReturn() { + @Override + public void doInTransactionWithoutResult(TransactionStatus status) { + VmWorkJobVO workJob = null; + + _vmInstanceDao.lockRow(vm.getId(), true); + workJob = new VmWorkJobVO(context.getContextId()); + + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkDetachVolume.class.getName()); + + workJob.setAccountId(callingAccount.getId()); + workJob.setUserId(callingUser.getId()); + workJob.setStep(VmWorkJobVO.Step.Starting); + workJob.setVmType(vm.getType()); + workJob.setVmInstanceId(vm.getId()); + + // save work context info (there are some duplications) + VmWorkDetachVolume workInfo = new VmWorkDetachVolume(callingUser.getId(), callingAccount.getId(), vm.getId(), + VolumeApiServiceImpl.VM_WORK_JOB_HANDLER, volumeId); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); + + // Transaction syntax sugar has a cost here + context.putContextParameter("workJob", workJob); + context.putContextParameter("jobId", new Long(workJob.getId())); + } + }); + + final long jobId = (Long)context.getContextParameter("jobId"); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + + return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), + volumeId); + } + + @Override + public Pair handleVmWorkJob(AsyncJob job, VmWork work) throws Exception { + VMInstanceVO vm = _entityMgr.findById(VMInstanceVO.class, work.getVmId()); + if (vm == null) { + s_logger.info("Unable to find vm " + work.getVmId()); + } + assert (vm != null); + + if (work instanceof VmWorkAttachVolume) { + + VmWorkAttachVolume attachWork = (VmWorkAttachVolume)work; + + if (s_logger.isDebugEnabled()) + s_logger.debug("Execute Attach-Volume within VM work job context. vmId: " + attachWork.getVmId() + + ", volId: " + attachWork.getVolumeId() + ", deviceId: " + attachWork.getDeviceId()); + + orchestrateAttachVolumeToVM(attachWork.getVmId(), attachWork.getVolumeId(), attachWork.getDeviceId()); + + if (s_logger.isDebugEnabled()) + s_logger.debug("Done executing Attach-Volume within VM work job context. vmId: " + attachWork.getVmId() + + ", volId: " + attachWork.getVolumeId() + ", deviceId: " + attachWork.getDeviceId()); + + return new Pair(JobInfo.Status.SUCCEEDED, null); + } else if (work instanceof VmWorkDetachVolume) { + VmWorkDetachVolume detachWork = (VmWorkDetachVolume)work; + + if (s_logger.isDebugEnabled()) + s_logger.debug("Execute Detach-Volume within VM work job context. vmId: " + detachWork.getVmId() + + ", volId: " + detachWork.getVolumeId()); + + orchestrateDetachVolumeFromVM(detachWork.getVmId(), detachWork.getVolumeId()); + + if (s_logger.isDebugEnabled()) + s_logger.debug("Done executing Detach-Volume within VM work job context. vmId: " + detachWork.getVmId() + + ", volId: " + detachWork.getVolumeId()); + + return new Pair(JobInfo.Status.SUCCEEDED, null); + } else { + RuntimeException e = new RuntimeException("Unsupported VM work command: " + job.getCmd()); + String exceptionJson = JobSerializerHelper.toSerializedString(e); + s_logger.error("Serialize exception object into json: " + exceptionJson); + return new Pair(JobInfo.Status.FAILED, exceptionJson); + } + } }