diff --git a/engine/components-api/src/com/cloud/vm/VmWorkConstants.java b/engine/components-api/src/com/cloud/vm/VmWorkConstants.java new file mode 100644 index 00000000000..20e40b7f84b --- /dev/null +++ b/engine/components-api/src/com/cloud/vm/VmWorkConstants.java @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.vm; + +public interface VmWorkConstants { + public static final String VM_WORK_QUEUE = "VmWorkJobQueue"; + public static final String VM_WORK_JOB_DISPATCHER = "VmWorkJobDispatcher"; + public static final String VM_WORK_JOB_WAKEUP_DISPATCHER = "VmWorkJobWakeupDispatcher"; +} diff --git a/engine/orchestration/src/com/cloud/vm/VmWorkJobHandler.java b/engine/components-api/src/com/cloud/vm/VmWorkJobHandler.java similarity index 100% rename from engine/orchestration/src/com/cloud/vm/VmWorkJobHandler.java rename to engine/components-api/src/com/cloud/vm/VmWorkJobHandler.java diff --git a/engine/orchestration/src/com/cloud/vm/VmWorkSerializer.java b/engine/components-api/src/com/cloud/vm/VmWorkSerializer.java similarity index 100% rename from engine/orchestration/src/com/cloud/vm/VmWorkSerializer.java rename to engine/components-api/src/com/cloud/vm/VmWorkSerializer.java diff --git a/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml b/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml index 49cb9cdf3af..2e35ae5de61 100644 --- a/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml +++ b/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml @@ -75,17 +75,18 @@ - + + - + diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java index 75ab47b35d2..5e8824ffff4 100755 --- a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java +++ b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java @@ -708,10 +708,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Override public void advanceStart(String vmUuid, Map params, DeploymentPlan planToDeploy) throws InsufficientCapacityException, - ConcurrentOperationException, ResourceUnavailableException { + ConcurrentOperationException, ResourceUnavailableException { AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); - if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) { + if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance orchestrateStart(vmUuid, params, planToDeploy); } else { @@ -725,7 +725,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac throw new RuntimeException("Execution excetion", e); } - Throwable jobException = retriveExecutionException(outcome.getJob()); + Throwable jobException = retrieveExecutionException(outcome.getJob()); if (jobException != null) { if (jobException instanceof ConcurrentOperationException) throw (ConcurrentOperationException)jobException; @@ -1228,10 +1228,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Override public void advanceStop(String vmUuid, boolean cleanUpEvenIfUnableToStop) - throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException { + throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException { AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); - if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) { + if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance orchestrateStop(vmUuid, cleanUpEvenIfUnableToStop); } else { @@ -1245,7 +1245,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac throw new RuntimeException("Execution excetion", e); } - Throwable jobException = retriveExecutionException(outcome.getJob()); + Throwable jobException = retrieveExecutionException(outcome.getJob()); if (jobException != null) { if (jobException instanceof AgentUnavailableException) throw (AgentUnavailableException)jobException; @@ -1521,7 +1521,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Override public void storageMigration(String vmUuid, StoragePool destPool) { AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); - if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) { + if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance orchestrateStorageMigration(vmUuid, destPool); } else { @@ -1535,7 +1535,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac throw new RuntimeException("Execution excetion", e); } - Throwable jobException = retriveExecutionException(outcome.getJob()); + Throwable jobException = retrieveExecutionException(outcome.getJob()); if (jobException != null) { if (jobException instanceof RuntimeException) throw (RuntimeException)jobException; @@ -1600,10 +1600,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Override public void migrate(String vmUuid, long srcHostId, DeployDestination dest) - throws ResourceUnavailableException, ConcurrentOperationException { + throws ResourceUnavailableException, ConcurrentOperationException { AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); - if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) { + if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance orchestrateMigrate(vmUuid, srcHostId, dest); } else { @@ -1617,7 +1617,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac throw new RuntimeException("Execution excetion", e); } - Throwable jobException = retriveExecutionException(outcome.getJob()); + Throwable jobException = retrieveExecutionException(outcome.getJob()); if (jobException != null) { if (jobException instanceof ResourceUnavailableException) throw (ResourceUnavailableException)jobException; @@ -1871,10 +1871,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Override public void migrateWithStorage(String vmUuid, long srcHostId, long destHostId, Map volumeToPool) - throws ResourceUnavailableException, ConcurrentOperationException { + throws ResourceUnavailableException, ConcurrentOperationException { AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); - if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) { + if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance orchestrateMigrateWithStorage(vmUuid, srcHostId, destHostId, volumeToPool); } else { @@ -1888,7 +1888,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac throw new RuntimeException("Execution excetion", e); } - Throwable jobException = retriveExecutionException(outcome.getJob()); + Throwable jobException = retrieveExecutionException(outcome.getJob()); if (jobException != null) { if (jobException instanceof ResourceUnavailableException) throw (ResourceUnavailableException)jobException; @@ -2150,10 +2150,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Override public void advanceReboot(String vmUuid, Map params) - throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException { + throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException { AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); - if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) { + if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance orchestrateReboot(vmUuid, params); } else { @@ -2167,7 +2167,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac throw new RuntimeException("Execution excetion", e); } - Throwable jobException = retriveExecutionException(outcome.getJob()); + Throwable jobException = retrieveExecutionException(outcome.getJob()); if (jobException != null) { if (jobException instanceof ResourceUnavailableException) throw (ResourceUnavailableException)jobException; @@ -2826,7 +2826,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } } - if (VmJobEnabled.value()) { + if(VmJobEnabled.value()) { if (ping.getHostVmStateReport() != null && ping.getHostVmStateReport().size() > 0) { _syncMgr.processHostVmStatePingReport(agentId, ping.getHostVmStateReport()); } @@ -3071,10 +3071,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Override public NicProfile addVmToNetwork(VirtualMachine vm, Network network, NicProfile requested) - throws ConcurrentOperationException, ResourceUnavailableException, InsufficientCapacityException { + throws ConcurrentOperationException, ResourceUnavailableException, InsufficientCapacityException { AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); - if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) { + if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance return orchestrateAddVmToNetwork(vm, network, requested); } else { @@ -3094,7 +3094,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac NicProfile nic = (NicProfile)JobSerializerHelper.fromObjectSerializedString(jobVo.getResult()); return nic; } else { - Throwable jobException = retriveExecutionException(outcome.getJob()); + Throwable jobException = retrieveExecutionException(outcome.getJob()); if (jobException != null) { if (jobException instanceof ResourceUnavailableException) throw (ResourceUnavailableException)jobException; @@ -3178,10 +3178,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Override public boolean removeNicFromVm(VirtualMachine vm, Nic nic) - throws ConcurrentOperationException, ResourceUnavailableException { + throws ConcurrentOperationException, ResourceUnavailableException { AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); - if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) { + if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance return orchestrateRemoveNicFromVm(vm, nic); } else { @@ -3201,7 +3201,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac Boolean result = (Boolean)JobSerializerHelper.fromObjectSerializedString(jobVo.getResult()); return result; } else { - Throwable jobException = retriveExecutionException(outcome.getJob()); + Throwable jobException = retrieveExecutionException(outcome.getJob()); if (jobException != null) { if (jobException instanceof ResourceUnavailableException) throw (ResourceUnavailableException)jobException; @@ -3281,6 +3281,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac // TODO will serialize on the VM object later to resolve operation conflicts return orchestrateRemoveVmFromNetwork(vm, network, broadcastUri); } + @DB private boolean orchestrateRemoveVmFromNetwork(VirtualMachine vm, Network network, URI broadcastUri) throws ConcurrentOperationException, ResourceUnavailableException { CallContext cctx = CallContext.current(); @@ -3422,9 +3423,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Override public void migrateForScale(String vmUuid, long srcHostId, DeployDestination dest, Long oldSvcOfferingId) - throws ResourceUnavailableException, ConcurrentOperationException { + throws ResourceUnavailableException, ConcurrentOperationException { AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); - if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) { + if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance orchestrateMigrateForScale(vmUuid, srcHostId, dest, oldSvcOfferingId); } else { @@ -3438,7 +3439,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac throw new RuntimeException("Execution excetion", e); } - Throwable jobException = retriveExecutionException(outcome.getJob()); + Throwable jobException = retrieveExecutionException(outcome.getJob()); if (jobException != null) { if (jobException instanceof ResourceUnavailableException) throw (ResourceUnavailableException)jobException; @@ -3669,11 +3670,11 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Override public VMInstanceVO reConfigureVm(String vmUuid, ServiceOffering oldServiceOffering, - boolean reconfiguringOnExistingHost) - throws ResourceUnavailableException, ConcurrentOperationException { + boolean reconfiguringOnExistingHost) + throws ResourceUnavailableException, ConcurrentOperationException { AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); - if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) { + if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance return orchestrateReConfigureVm(vmUuid, oldServiceOffering, reconfiguringOnExistingHost); } else { @@ -3692,7 +3693,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac if (jobVo.getResultCode() == JobInfo.Status.SUCCEEDED.ordinal()) { return _entityMgr.findById(VMInstanceVO.class, vm.getId()); } else { - Throwable jobException = retriveExecutionException(outcome.getJob()); + Throwable jobException = retrieveExecutionException(outcome.getJob()); if (jobException != null) { if (jobException instanceof ResourceUnavailableException) throw (ResourceUnavailableException)jobException; @@ -3791,26 +3792,26 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac Long vmId = (Long)args; List pendingWorkJobs = _workJobDao.listPendingWorkJobs( - VirtualMachine.Type.Instance, vmId); + VirtualMachine.Type.Instance, vmId); if (pendingWorkJobs.size() == 0) { // there is no pending operation job VMInstanceVO vm = _vmDao.findById(vmId); if (vm != null) { switch (vm.getPowerState()) { - case PowerOn: - handlePowerOnReportWithNoPendingJobsOnVM(vm); - break; + case PowerOn: + handlePowerOnReportWithNoPendingJobsOnVM(vm); + break; - case PowerOff: - handlePowerOffReportWithNoPendingJobsOnVM(vm); - break; + case PowerOff: + handlePowerOffReportWithNoPendingJobsOnVM(vm); + break; - // PowerUnknown shouldn't be reported, it is a derived - // VM power state from host state (host un-reachable - case PowerUnknown: - default: - assert (false); - break; + // PowerUnknown shouldn't be reported, it is a derived + // VM power state from host state (host un-reachable + case PowerUnknown: + default: + assert (false); + break; } } else { s_logger.warn("VM " + vmId + " no longer exists when processing VM state report"); @@ -3823,98 +3824,98 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac private void handlePowerOnReportWithNoPendingJobsOnVM(VMInstanceVO vm) { // - // 1) handle left-over transitional VM states - // 2) handle out of band VM live migration - // 3) handle out of sync stationary states, marking VM from Stopped to Running with - // alert messages + // 1) handle left-over transitional VM states + // 2) handle out of band VM live migration + // 3) handle out of sync stationary states, marking VM from Stopped to Running with + // alert messages // switch (vm.getState()) { - case Starting: - try { - stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId()); - } catch (NoTransitionException e) { - s_logger.warn("Unexpected VM state transition exception, race-condition?", e); - } + case Starting: + try { + stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId()); + } catch (NoTransitionException e) { + s_logger.warn("Unexpected VM state transition exception, race-condition?", e); + } - // we need to alert admin or user about this risky state transition - _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(), - VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + - ") state is sync-ed (Starting -> Running) from out-of-context transition. VM network environment may need to be reset"); - break; + // we need to alert admin or user about this risky state transition + _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(), + VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + + ") state is sync-ed (Starting -> Running) from out-of-context transition. VM network environment may need to be reset"); + break; - case Running: - try { - if (vm.getHostId() != null && vm.getHostId().longValue() != vm.getPowerHostId().longValue()) - s_logger.info("Detected out of band VM migration from host " + vm.getHostId() + " to host " + vm.getPowerHostId()); - stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId()); - } catch (NoTransitionException e) { - s_logger.warn("Unexpected VM state transition exception, race-condition?", e); - } - break; + case Running: + try { + if (vm.getHostId() != null && vm.getHostId().longValue() != vm.getPowerHostId().longValue()) + s_logger.info("Detected out of band VM migration from host " + vm.getHostId() + " to host " + vm.getPowerHostId()); + stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId()); + } catch (NoTransitionException e) { + s_logger.warn("Unexpected VM state transition exception, race-condition?", e); + } + break; - case Stopping: - case Stopped: - try { - stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId()); - } catch (NoTransitionException e) { - s_logger.warn("Unexpected VM state transition exception, race-condition?", e); - } - _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(), - VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (" + vm.getState() + - " -> Running) from out-of-context transition. VM network environment may need to be reset"); - break; + case Stopping: + case Stopped: + try { + stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId()); + } catch (NoTransitionException e) { + s_logger.warn("Unexpected VM state transition exception, race-condition?", e); + } + _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(), + VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (" + vm.getState() + + " -> Running) from out-of-context transition. VM network environment may need to be reset"); + break; - case Destroyed: - case Expunging: - s_logger.info("Receive power on report when VM is in destroyed or expunging state. vm: " + case Destroyed: + case Expunging: + s_logger.info("Receive power on report when VM is in destroyed or expunging state. vm: " + vm.getId() + ", state: " + vm.getState()); - break; + break; - case Migrating: - try { - stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId()); - } catch (NoTransitionException e) { - s_logger.warn("Unexpected VM state transition exception, race-condition?", e); - } - break; + case Migrating: + try { + stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId()); + } catch (NoTransitionException e) { + s_logger.warn("Unexpected VM state transition exception, race-condition?", e); + } + break; - case Error: - default: - s_logger.info("Receive power on report when VM is in error or unexpected state. vm: " + case Error: + default: + s_logger.info("Receive power on report when VM is in error or unexpected state. vm: " + vm.getId() + ", state: " + vm.getState()); - break; + break; } } private void handlePowerOffReportWithNoPendingJobsOnVM(VMInstanceVO vm) { - // 1) handle left-over transitional VM states - // 2) handle out of sync stationary states, schedule force-stop to release resources + // 1) handle left-over transitional VM states + // 2) handle out of sync stationary states, schedule force-stop to release resources // switch (vm.getState()) { - case Starting: - case Stopping: - case Stopped: - case Migrating: - try { - stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOffReport, vm.getPowerHostId()); - } catch (NoTransitionException e) { - s_logger.warn("Unexpected VM state transition exception, race-condition?", e); - } - _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(), - VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (" + vm.getState() + - " -> Stopped) from out-of-context transition."); - // TODO: we need to forcely release all resource allocation - break; + case Starting: + case Stopping: + case Stopped: + case Migrating: + try { + stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOffReport, vm.getPowerHostId()); + } catch (NoTransitionException e) { + s_logger.warn("Unexpected VM state transition exception, race-condition?", e); + } + _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(), + VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (" + vm.getState() + + " -> Stopped) from out-of-context transition."); + // TODO: we need to forcely release all resource allocation + break; - case Running: - case Destroyed: - case Expunging: - break; + case Running: + case Destroyed: + case Expunging: + break; - case Error: - default: - break; + case Error: + default: + break; } } @@ -3924,8 +3925,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac // VMs in expunging state (this need to be handled specially) // // checking condition - // 1) no pending VmWork job - // 2) on hostId host and host is UP + // 1) no pending VmWork job + // 2) on hostId host and host is UP // // When host is UP, soon or later we will get a report from the host about the VM, // however, if VM is missing from the host report (it may happen in out of band changes @@ -3963,17 +3964,18 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac // We now only alert administrator about this situation _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(), - VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") is stuck in " + vm.getState() + - " state and its host is unreachable for too long"); + VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") is stuck in " + vm.getState() + + " state and its host is unreachable for too long"); } } + // VMs that in transitional state without recent power state report private List listStalledVMInTransitionStateOnUpHost(long hostId, Date cutTime) { String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status = 'UP' " + - "AND h.id = ? AND i.power_state_update_time < ? AND i.host_id = h.id " + - "AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " + - "AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)"; + "AND h.id = ? AND i.power_state_update_time < ? AND i.host_id = h.id " + + "AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " + + "AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)"; List l = new ArrayList(); TransactionLegacy txn = null; @@ -4005,9 +4007,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac // VMs that in transitional state and recently have power state update private List listVMInTransitionStateWithRecentReportOnUpHost(long hostId, Date cutTime) { String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status = 'UP' " + - "AND h.id = ? AND i.power_state_update_time > ? AND i.host_id = h.id " + - "AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " + - "AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)"; + "AND h.id = ? AND i.power_state_update_time > ? AND i.host_id = h.id " + + "AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " + + "AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)"; List l = new ArrayList(); TransactionLegacy txn = null; @@ -4036,9 +4038,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac private List listStalledVMInTransitionStateOnDisconnectedHosts(Date cutTime) { String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status != 'UP' " + - "AND i.power_state_update_time < ? AND i.host_id = h.id " + - "AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " + - "AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)"; + "AND i.power_state_update_time < ? AND i.host_id = h.id " + + "AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " + + "AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)"; List l = new ArrayList(); TransactionLegacy txn = null; @@ -4114,9 +4116,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } } - public Throwable retriveExecutionException(AsyncJob job) { + public Throwable retrieveExecutionException(AsyncJob job) { assert (job != null); - assert (job.getDispatcher().equals(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)); + assert (job.getDispatcher().equals(VmWorkConstants.VM_WORK_JOB_DISPATCHER)); AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, job.getId()); if (jobVo != null && jobVo.getResult() != null) { @@ -4133,8 +4135,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac // no time for this at current iteration // public Outcome startVmThroughJobQueue(final String vmUuid, - final Map params, - final DeploymentPlan planToDeploy) { + final Map params, + final DeploymentPlan planToDeploy) { final CallContext context = CallContext.current(); final User callingUser = context.getCallingUser(); @@ -4157,7 +4159,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } else { workJob = new VmWorkJobVO(context.getContextId()); - workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER); + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); workJob.setCmd(VmWorkStart.class.getName()); workJob.setAccountId(callingAccount.getId()); @@ -4172,7 +4174,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workInfo.setParams(params); workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId()); + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); } // Transaction syntax sugar has a cost here @@ -4185,7 +4187,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), - VirtualMachine.PowerState.PowerOn, vm.getId(), null); + VirtualMachine.PowerState.PowerOn, vm.getId(), null); } public Outcome stopVmThroughJobQueue(final String vmUuid, final boolean cleanup) { @@ -4211,7 +4213,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } else { workJob = new VmWorkJobVO(context.getContextId()); - workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER); + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); workJob.setCmd(VmWorkStop.class.getName()); workJob.setAccountId(account.getId()); @@ -4224,7 +4226,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac VmWorkStop workInfo = new VmWorkStop(user.getId(), account.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, cleanup); workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId()); + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); } context.putContextParameter("workJob", workJob); @@ -4236,11 +4238,11 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), - VirtualMachine.PowerState.PowerOff, vm.getId(), null); + VirtualMachine.PowerState.PowerOff, vm.getId(), null); } public Outcome rebootVmThroughJobQueue(final String vmUuid, - final Map params) { + final Map params) { final CallContext context = CallContext.current(); final Account account = context.getCallingAccount(); @@ -4264,7 +4266,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } else { workJob = new VmWorkJobVO(context.getContextId()); - workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER); + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); workJob.setCmd(VmWorkReboot.class.getName()); workJob.setAccountId(account.getId()); @@ -4277,7 +4279,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac VmWorkReboot workInfo = new VmWorkReboot(user.getId(), account.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, params); workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId()); + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); } context.putContextParameter("workJob", workJob); @@ -4289,7 +4291,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), - vm.getId()); + vm.getId()); } public Outcome migrateVmThroughJobQueue(final String vmUuid, final long srcHostId, final DeployDestination dest) { @@ -4316,7 +4318,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob = new VmWorkJobVO(context.getContextId()); - workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER); + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); workJob.setCmd(VmWorkMigrate.class.getName()); workJob.setAccountId(account.getId()); @@ -4328,7 +4330,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac VmWorkMigrate workInfo = new VmWorkMigrate(user.getId(), account.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, srcHostId, dest); workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId()); + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); } context.putContextParameter("workJob", workJob); context.putContextParameter("jobId", new Long(workJob.getId())); @@ -4355,6 +4357,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac Transaction.execute(new TransactionCallbackNoReturn() { @Override public void doInTransactionWithoutResult(TransactionStatus status) { + _vmDao.lockRow(vm.getId(), true); List pendingWorkJobs = _workJobDao.listPendingWorkJobs( @@ -4369,7 +4372,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob = new VmWorkJobVO(context.getContextId()); - workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER); + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); workJob.setCmd(VmWorkMigrate.class.getName()); workJob.setAccountId(account.getId()); @@ -4382,7 +4385,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, srcHostId, destHostId, volumeToPool); workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId()); + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); } context.putContextParameter("workJob", workJob); context.putContextParameter("jobId", new Long(workJob.getId())); @@ -4422,7 +4425,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob = new VmWorkJobVO(context.getContextId()); - workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER); + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); workJob.setCmd(VmWorkMigrate.class.getName()); workJob.setAccountId(account.getId()); @@ -4435,7 +4438,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, srcHostId, dest, newSvcOfferingId); workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId()); + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); } context.putContextParameter("workJob", workJob); context.putContextParameter("jobId", new Long(workJob.getId())); @@ -4474,7 +4477,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob = new VmWorkJobVO(context.getContextId()); - workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER); + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); workJob.setCmd(VmWorkStorageMigration.class.getName()); workJob.setAccountId(account.getId()); @@ -4487,7 +4490,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, destPool); workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId()); + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); } context.putContextParameter("workJob", workJob); context.putContextParameter("jobId", new Long(workJob.getId())); @@ -4524,7 +4527,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob = new VmWorkJobVO(context.getContextId()); - workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER); + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); workJob.setCmd(VmWorkAddVmToNetwork.class.getName()); workJob.setAccountId(account.getId()); @@ -4537,7 +4540,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, network, requested); workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId()); + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); } context.putContextParameter("workJob", workJob); context.putContextParameter("jobId", new Long(workJob.getId())); @@ -4574,7 +4577,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob = new VmWorkJobVO(context.getContextId()); - workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER); + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); workJob.setCmd(VmWorkRemoveNicFromVm.class.getName()); workJob.setAccountId(account.getId()); @@ -4587,7 +4590,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, nic); workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId()); + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); } context.putContextParameter("workJob", workJob); context.putContextParameter("jobId", new Long(workJob.getId())); @@ -4624,7 +4627,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob = new VmWorkJobVO(context.getContextId()); - workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER); + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); workJob.setCmd(VmWorkRemoveVmFromNetwork.class.getName()); workJob.setAccountId(account.getId()); @@ -4637,7 +4640,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, network, broadcastUri); workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId()); + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); } context.putContextParameter("workJob", workJob); context.putContextParameter("jobId", new Long(workJob.getId())); @@ -4651,7 +4654,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } public Outcome reconfigureVmThroughJobQueue( - final String vmUuid, final ServiceOffering oldServiceOffering, final boolean reconfiguringOnExistingHost) { + final String vmUuid, final ServiceOffering oldServiceOffering, final boolean reconfiguringOnExistingHost) { final CallContext context = CallContext.current(); final User user = context.getCallingUser(); @@ -4676,7 +4679,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob = new VmWorkJobVO(context.getContextId()); - workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER); + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); workJob.setCmd(VmWorkReconfigure.class.getName()); workJob.setAccountId(account.getId()); @@ -4689,7 +4692,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, oldServiceOffering, reconfiguringOnExistingHost); workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId()); + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); } context.putContextParameter("workJob", workJob); context.putContextParameter("jobId", new Long(workJob.getId())); @@ -4698,7 +4701,6 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final long jobId = (Long)context.getContextParameter("jobId"); AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); - return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId()); } diff --git a/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java b/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java index 7ed9f39411f..dea64daed4a 100644 --- a/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java +++ b/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java @@ -36,10 +36,6 @@ import com.cloud.vm.dao.VMInstanceDao; public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatcher { private static final Logger s_logger = Logger.getLogger(VmWorkJobDispatcher.class); - public static final String VM_WORK_QUEUE = "VmWorkJobQueue"; - public static final String VM_WORK_JOB_DISPATCHER = "VmWorkJobDispatcher"; - public static final String VM_WORK_JOB_WAKEUP_DISPATCHER = "VmWorkJobWakeupDispatcher"; - @Inject private VirtualMachineManagerImpl _vmMgr; @Inject private AsyncJobManager _asyncJobMgr; diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/OutcomeImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/OutcomeImpl.java index c65d9c5b3b3..7fd28db573f 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/OutcomeImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/OutcomeImpl.java @@ -110,13 +110,11 @@ public class OutcomeImpl implements Outcome { @Override public void execute(Task task) { // TODO Auto-generated method stub - } @Override public void execute(Task task, long wait, TimeUnit unit) { // TODO Auto-generated method stub - } public Predicate getPredicate() { diff --git a/server/src/com/cloud/storage/VmWorkAttachVolume.java b/server/src/com/cloud/storage/VmWorkAttachVolume.java new file mode 100644 index 00000000000..3cdfbb52a2d --- /dev/null +++ b/server/src/com/cloud/storage/VmWorkAttachVolume.java @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.storage; + +import com.cloud.vm.VmWork; + +public class VmWorkAttachVolume extends VmWork { + private static final long serialVersionUID = 553291814854451740L; + + private Long volumeId; + private Long deviceId; + + public VmWorkAttachVolume(long userId, long accountId, long vmId, String handlerName, Long volumeId, Long deviceId) { + super(userId, accountId, vmId, handlerName); + this.volumeId = volumeId; + this.deviceId = deviceId; + } + + public Long getVolumeId() { + return volumeId; + } + + public Long getDeviceId() { + return deviceId; + } +} diff --git a/server/src/com/cloud/storage/VmWorkDetachVolume.java b/server/src/com/cloud/storage/VmWorkDetachVolume.java new file mode 100644 index 00000000000..18262d254aa --- /dev/null +++ b/server/src/com/cloud/storage/VmWorkDetachVolume.java @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.storage; + +import com.cloud.vm.VmWork; + +public class VmWorkDetachVolume extends VmWork { + private static final long serialVersionUID = -8722243207385263101L; + + private Long volumeId; + + public VmWorkDetachVolume(long userId, long accountId, long vmId, String handlerName, Long volumeId) { + super(userId, accountId, vmId, handlerName); + this.volumeId = volumeId; + } + + public Long getVolumeId() { + return volumeId; + } +} diff --git a/server/src/com/cloud/storage/VolumeApiServiceImpl.java b/server/src/com/cloud/storage/VolumeApiServiceImpl.java index d215858dfb4..2fa023ef7e0 100644 --- a/server/src/com/cloud/storage/VolumeApiServiceImpl.java +++ b/server/src/com/cloud/storage/VolumeApiServiceImpl.java @@ -26,7 +26,6 @@ import java.util.concurrent.ExecutionException; import javax.inject.Inject; -import com.cloud.uuididentity.UUIDManager; import org.apache.log4j.Logger; import org.apache.cloudstack.api.BaseCmd; @@ -54,10 +53,17 @@ import org.apache.cloudstack.engine.subsystem.api.storage.VolumeInfo; import org.apache.cloudstack.engine.subsystem.api.storage.VolumeService; import org.apache.cloudstack.engine.subsystem.api.storage.VolumeService.VolumeApiResult; import org.apache.cloudstack.framework.async.AsyncCallFuture; +import org.apache.cloudstack.framework.config.ConfigKey; import org.apache.cloudstack.framework.config.dao.ConfigurationDao; import org.apache.cloudstack.framework.jobs.AsyncJob; import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext; import org.apache.cloudstack.framework.jobs.AsyncJobManager; +import org.apache.cloudstack.framework.jobs.Outcome; +import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO; +import org.apache.cloudstack.framework.jobs.impl.JobSerializerHelper; +import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl; +import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO; +import org.apache.cloudstack.jobs.JobInfo; import org.apache.cloudstack.storage.command.AttachAnswer; import org.apache.cloudstack.storage.command.AttachCommand; import org.apache.cloudstack.storage.command.DettachCommand; @@ -131,28 +137,37 @@ import com.cloud.template.TemplateManager; import com.cloud.user.Account; import com.cloud.user.AccountManager; import com.cloud.user.ResourceLimitService; +import com.cloud.user.User; import com.cloud.user.VmDiskStatisticsVO; import com.cloud.user.dao.AccountDao; import com.cloud.user.dao.UserDao; import com.cloud.user.dao.VmDiskStatisticsDao; import com.cloud.utils.EnumUtils; import com.cloud.utils.NumbersUtil; +import com.cloud.utils.Pair; +import com.cloud.utils.Predicate; import com.cloud.utils.UriUtils; import com.cloud.utils.component.ManagerBase; import com.cloud.utils.db.DB; import com.cloud.utils.db.EntityManager; import com.cloud.utils.db.Transaction; import com.cloud.utils.db.TransactionCallback; +import com.cloud.utils.db.TransactionCallbackNoReturn; import com.cloud.utils.db.TransactionStatus; import com.cloud.utils.exception.CloudRuntimeException; import com.cloud.utils.fsm.NoTransitionException; import com.cloud.utils.fsm.StateMachine2; +import com.cloud.uuididentity.UUIDManager; import com.cloud.vm.UserVmManager; import com.cloud.vm.UserVmVO; import com.cloud.vm.VMInstanceVO; import com.cloud.vm.VirtualMachine; import com.cloud.vm.VirtualMachine.State; import com.cloud.vm.VirtualMachineManager; +import com.cloud.vm.VmWork; +import com.cloud.vm.VmWorkConstants; +import com.cloud.vm.VmWorkJobHandler; +import com.cloud.vm.VmWorkSerializer; import com.cloud.vm.dao.ConsoleProxyDao; import com.cloud.vm.dao.DomainRouterDao; import com.cloud.vm.dao.SecondaryStorageVmDao; @@ -161,8 +176,11 @@ import com.cloud.vm.dao.VMInstanceDao; import com.cloud.vm.snapshot.VMSnapshotVO; import com.cloud.vm.snapshot.dao.VMSnapshotDao; -public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiService { +public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiService, VmWorkJobHandler { private final static Logger s_logger = Logger.getLogger(VolumeApiServiceImpl.class); + + public static final String VM_WORK_JOB_HANDLER = VolumeApiServiceImpl.class.getSimpleName(); + @Inject VolumeOrchestrationService _volumeMgr; @@ -310,6 +328,18 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic protected HypervisorCapabilitiesDao _hypervisorCapabilitiesDao; @Inject StorageManager storageMgr; + + @Inject + protected AsyncJobManager _jobMgr; + + // TODO + static final ConfigKey VmJobEnabled = new ConfigKey("Advanced", + Boolean.class, "vm.job.enabled", "false", + "True to enable new VM sync model. false to use the old way", false); + static final ConfigKey VmJobCheckInterval = new ConfigKey("Advanced", + Long.class, "vm.job.check.interval", "3000", + "Interval in milliseconds to check if the job is complete", false); + private int _customDiskOfferingMinSize = 1; private final int _customDiskOfferingMaxSize = 1024; private long _maxVolumeSizeInGb; @@ -1034,9 +1064,35 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic @Override public Volume attachVolumeToVM(AttachVolumeCmd command) { - Long vmId = command.getVirtualMachineId(); - Long volumeId = command.getId(); - Long deviceId = command.getDeviceId(); + + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); + if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { + // avoid re-entrance + return orchestrateAttachVolumeToVM(command.getVirtualMachineId(), command.getId(), command.getDeviceId()); + } else { + Outcome outcome = attachVolumeToVmThroughJobQueue(command.getVirtualMachineId(), command.getId(), command.getDeviceId()); + + Volume vol = null; + try { + vol = outcome.get(); + } catch (InterruptedException e) { + throw new RuntimeException("Operation is interrupted", e); + } catch (java.util.concurrent.ExecutionException e) { + throw new RuntimeException("Execution excetion", e); + } + + Throwable jobException = retrieveExecutionException(outcome.getJob()); + if (jobException != null) { + if (jobException instanceof ConcurrentOperationException) + throw (ConcurrentOperationException)jobException; + else + throw new RuntimeException("Unexpected exception", jobException); + } + return vol; + } + } + + private Volume orchestrateAttachVolumeToVM(Long vmId, Long volumeId, Long deviceId) { return attachVolumeToVM(vmId, volumeId, deviceId); } @@ -1203,6 +1259,10 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic volume.setPath(path); } + if (displayVolume != null) { + volume.setDisplayVolume(displayVolume); + } + if (state != null) { try { Volume.State volumeState = Volume.State.valueOf(state); @@ -1305,6 +1365,38 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic _asyncMgr.updateAsyncJobStatus(job.getId(), BaseCmd.PROGRESS_INSTANCE_CREATED, volumeId.toString()); } + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); + if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { + // avoid re-entrance + return orchestrateDetachVolumeFromVM(vmId, volumeId); + } else { + Outcome outcome = detachVolumeFromVmThroughJobQueue(vmId, volumeId); + + Volume vol = null; + try { + vol = outcome.get(); + } catch (InterruptedException e) { + throw new RuntimeException("Operation is interrupted", e); + } catch (java.util.concurrent.ExecutionException e) { + throw new RuntimeException("Execution excetion", e); + } + + Throwable jobException = retrieveExecutionException(outcome.getJob()); + if (jobException != null) { + if (jobException instanceof ConcurrentOperationException) + throw (ConcurrentOperationException)jobException; + else + throw new RuntimeException("Unexpected exception", jobException); + } + return vol; + } + } + + private Volume orchestrateDetachVolumeFromVM(long vmId, long volumeId) { + + Volume volume = _volumeDao.findById(volumeId); + VMInstanceVO vm = _vmInstanceDao.findById(vmId); + String errorMsg = "Failed to detach volume: " + volume.getName() + " from VM: " + vm.getHostName(); boolean sendCommand = (vm.getState() == State.Running); @@ -1850,4 +1942,176 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic _storagePoolAllocators = storagePoolAllocators; } + public Throwable retrieveExecutionException(AsyncJob job) { + assert (job != null); + assert (job.getDispatcher().equals(VmWorkConstants.VM_WORK_JOB_DISPATCHER)); + + AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, job.getId()); + if (jobVo != null && jobVo.getResult() != null) { + Object obj = JobSerializerHelper.fromSerializedString(job.getResult()); + + if (obj != null && obj instanceof Throwable) + return (Throwable)obj; + } + return null; + } + + public class VmJobSyncOutcome extends OutcomeImpl { + private long _volumeId; + + public VmJobSyncOutcome(final AsyncJob job, final long volumeId) { + super(Volume.class, job, VmJobCheckInterval.value(), new Predicate() { + @Override + public boolean checkCondition() { + AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, job.getId()); + assert (jobVo != null); + if (jobVo == null || jobVo.getStatus() != JobInfo.Status.IN_PROGRESS) + return true; + + return false; + } + }, AsyncJob.Topics.JOB_STATE); + _volumeId = volumeId; + } + + @Override + protected Volume retrieve() { + return _volumeDao.findById(_volumeId); + } + } + + public Outcome attachVolumeToVmThroughJobQueue(final Long vmId, final Long volumeId, final Long deviceId) { + + final CallContext context = CallContext.current(); + final User callingUser = context.getCallingUser(); + final Account callingAccount = context.getCallingAccount(); + + final VMInstanceVO vm = _vmInstanceDao.findById(vmId); + + Transaction.execute(new TransactionCallbackNoReturn() { + @Override + public void doInTransactionWithoutResult(TransactionStatus status) { + VmWorkJobVO workJob = null; + + _vmInstanceDao.lockRow(vm.getId(), true); + workJob = new VmWorkJobVO(context.getContextId()); + + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkAttachVolume.class.getName()); + + workJob.setAccountId(callingAccount.getId()); + workJob.setUserId(callingUser.getId()); + workJob.setStep(VmWorkJobVO.Step.Starting); + workJob.setVmType(vm.getType()); + workJob.setVmInstanceId(vm.getId()); + + // save work context info (there are some duplications) + VmWorkAttachVolume workInfo = new VmWorkAttachVolume(callingUser.getId(), callingAccount.getId(), vm.getId(), + VolumeApiServiceImpl.VM_WORK_JOB_HANDLER, volumeId, deviceId); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); + + // Transaction syntax sugar has a cost here + context.putContextParameter("workJob", workJob); + context.putContextParameter("jobId", new Long(workJob.getId())); + } + }); + + final long jobId = (Long)context.getContextParameter("jobId"); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + + return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), + volumeId); + } + + public Outcome detachVolumeFromVmThroughJobQueue(final Long vmId, final Long volumeId) { + + final CallContext context = CallContext.current(); + final User callingUser = context.getCallingUser(); + final Account callingAccount = context.getCallingAccount(); + + final VMInstanceVO vm = _vmInstanceDao.findById(vmId); + + Transaction.execute(new TransactionCallbackNoReturn() { + @Override + public void doInTransactionWithoutResult(TransactionStatus status) { + VmWorkJobVO workJob = null; + + _vmInstanceDao.lockRow(vm.getId(), true); + workJob = new VmWorkJobVO(context.getContextId()); + + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkDetachVolume.class.getName()); + + workJob.setAccountId(callingAccount.getId()); + workJob.setUserId(callingUser.getId()); + workJob.setStep(VmWorkJobVO.Step.Starting); + workJob.setVmType(vm.getType()); + workJob.setVmInstanceId(vm.getId()); + + // save work context info (there are some duplications) + VmWorkDetachVolume workInfo = new VmWorkDetachVolume(callingUser.getId(), callingAccount.getId(), vm.getId(), + VolumeApiServiceImpl.VM_WORK_JOB_HANDLER, volumeId); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); + + // Transaction syntax sugar has a cost here + context.putContextParameter("workJob", workJob); + context.putContextParameter("jobId", new Long(workJob.getId())); + } + }); + + final long jobId = (Long)context.getContextParameter("jobId"); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + + return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), + volumeId); + } + + @Override + public Pair handleVmWorkJob(AsyncJob job, VmWork work) throws Exception { + VMInstanceVO vm = _entityMgr.findById(VMInstanceVO.class, work.getVmId()); + if (vm == null) { + s_logger.info("Unable to find vm " + work.getVmId()); + } + assert (vm != null); + + if (work instanceof VmWorkAttachVolume) { + + VmWorkAttachVolume attachWork = (VmWorkAttachVolume)work; + + if (s_logger.isDebugEnabled()) + s_logger.debug("Execute Attach-Volume within VM work job context. vmId: " + attachWork.getVmId() + + ", volId: " + attachWork.getVolumeId() + ", deviceId: " + attachWork.getDeviceId()); + + orchestrateAttachVolumeToVM(attachWork.getVmId(), attachWork.getVolumeId(), attachWork.getDeviceId()); + + if (s_logger.isDebugEnabled()) + s_logger.debug("Done executing Attach-Volume within VM work job context. vmId: " + attachWork.getVmId() + + ", volId: " + attachWork.getVolumeId() + ", deviceId: " + attachWork.getDeviceId()); + + return new Pair(JobInfo.Status.SUCCEEDED, null); + } else if (work instanceof VmWorkDetachVolume) { + VmWorkDetachVolume detachWork = (VmWorkDetachVolume)work; + + if (s_logger.isDebugEnabled()) + s_logger.debug("Execute Detach-Volume within VM work job context. vmId: " + detachWork.getVmId() + + ", volId: " + detachWork.getVolumeId()); + + orchestrateDetachVolumeFromVM(detachWork.getVmId(), detachWork.getVolumeId()); + + if (s_logger.isDebugEnabled()) + s_logger.debug("Done executing Detach-Volume within VM work job context. vmId: " + detachWork.getVmId() + + ", volId: " + detachWork.getVolumeId()); + + return new Pair(JobInfo.Status.SUCCEEDED, null); + } else { + RuntimeException e = new RuntimeException("Unsupported VM work command: " + job.getCmd()); + String exceptionJson = JobSerializerHelper.toSerializedString(e); + s_logger.error("Serialize exception object into json: " + exceptionJson); + return new Pair(JobInfo.Status.FAILED, exceptionJson); + } + } }