diff --git a/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml b/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml index 2e35ae5de61..7445102cd3c 100644 --- a/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml +++ b/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml @@ -81,6 +81,7 @@ + diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java index a81fea86980..34bdbfad6c7 100755 --- a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java +++ b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java @@ -42,7 +42,6 @@ import javax.naming.ConfigurationException; import org.apache.log4j.Logger; -import com.cloud.deploy.DeploymentPlanner; import org.apache.cloudstack.affinity.dao.AffinityGroupVMMapDao; import org.apache.cloudstack.context.CallContext; import org.apache.cloudstack.engine.orchestration.service.NetworkOrchestrationService; @@ -119,6 +118,7 @@ import com.cloud.dc.dao.HostPodDao; import com.cloud.deploy.DataCenterDeployment; import com.cloud.deploy.DeployDestination; import com.cloud.deploy.DeploymentPlan; +import com.cloud.deploy.DeploymentPlanner; import com.cloud.deploy.DeploymentPlanner.ExcludeList; import com.cloud.deploy.DeploymentPlanningManager; import com.cloud.domain.dao.DomainDao; @@ -726,12 +726,12 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac throw new RuntimeException("Execution excetion", e); } - Throwable jobException = retrieveExecutionException(outcome.getJob()); - if (jobException != null) { - if (jobException instanceof ConcurrentOperationException) - throw (ConcurrentOperationException)jobException; - else if (jobException instanceof ResourceUnavailableException) - throw (ResourceUnavailableException)jobException; + Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob()); + if (jobResult != null) { + if (jobResult instanceof ConcurrentOperationException) + throw (ConcurrentOperationException)jobResult; + else if (jobResult instanceof ResourceUnavailableException) + throw (ResourceUnavailableException)jobResult; } } } @@ -739,7 +739,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Override public void orchestrateStart(String vmUuid, Map params, DeploymentPlan planToDeploy, DeploymentPlanner planner) - throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException { + throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException { CallContext cctxt = CallContext.current(); Account account = cctxt.getCallingAccount(); @@ -1248,14 +1248,14 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac throw new RuntimeException("Execution excetion", e); } - Throwable jobException = retrieveExecutionException(outcome.getJob()); - if (jobException != null) { - if (jobException instanceof AgentUnavailableException) - throw (AgentUnavailableException)jobException; - else if (jobException instanceof ConcurrentOperationException) - throw (ConcurrentOperationException)jobException; - else if (jobException instanceof OperationTimedoutException) - throw (OperationTimedoutException)jobException; + Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob()); + if (jobResult != null) { + if (jobResult instanceof AgentUnavailableException) + throw (AgentUnavailableException)jobResult; + else if (jobResult instanceof ConcurrentOperationException) + throw (ConcurrentOperationException)jobResult; + else if (jobResult instanceof OperationTimedoutException) + throw (OperationTimedoutException)jobResult; } } } @@ -1540,10 +1540,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac throw new RuntimeException("Execution excetion", e); } - Throwable jobException = retrieveExecutionException(outcome.getJob()); - if (jobException != null) { - if (jobException instanceof RuntimeException) - throw (RuntimeException)jobException; + Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob()); + if (jobResult != null) { + if (jobResult instanceof RuntimeException) + throw (RuntimeException)jobResult; } } } @@ -1622,14 +1622,14 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac throw new RuntimeException("Execution excetion", e); } - Throwable jobException = retrieveExecutionException(outcome.getJob()); - if (jobException != null) { - if (jobException instanceof ResourceUnavailableException) - throw (ResourceUnavailableException)jobException; - else if (jobException instanceof ConcurrentOperationException) - throw (ConcurrentOperationException)jobException; - else if (jobException instanceof RuntimeException) - throw (RuntimeException)jobException; + Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob()); + if (jobResult != null) { + if (jobResult instanceof ResourceUnavailableException) + throw (ResourceUnavailableException)jobResult; + else if (jobResult instanceof ConcurrentOperationException) + throw (ConcurrentOperationException)jobResult; + else if (jobResult instanceof RuntimeException) + throw (RuntimeException)jobResult; } } } @@ -1893,7 +1893,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac throw new RuntimeException("Execution excetion", e); } - Throwable jobException = retrieveExecutionException(outcome.getJob()); + Object jobException = _jobMgr.unmarshallResultObject(outcome.getJob()); if (jobException != null) { if (jobException instanceof ResourceUnavailableException) throw (ResourceUnavailableException)jobException; @@ -2172,14 +2172,14 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac throw new RuntimeException("Execution excetion", e); } - Throwable jobException = retrieveExecutionException(outcome.getJob()); - if (jobException != null) { - if (jobException instanceof ResourceUnavailableException) - throw (ResourceUnavailableException)jobException; - else if (jobException instanceof ConcurrentOperationException) - throw (ConcurrentOperationException)jobException; - else if (jobException instanceof InsufficientCapacityException) - throw (InsufficientCapacityException)jobException; + Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob()); + if (jobResult != null) { + if (jobResult instanceof ResourceUnavailableException) + throw (ResourceUnavailableException)jobResult; + else if (jobResult instanceof ConcurrentOperationException) + throw (ConcurrentOperationException)jobResult; + else if (jobResult instanceof InsufficientCapacityException) + throw (InsufficientCapacityException)jobResult; } } } @@ -3099,7 +3099,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac NicProfile nic = (NicProfile)JobSerializerHelper.fromObjectSerializedString(jobVo.getResult()); return nic; } else { - Throwable jobException = retrieveExecutionException(outcome.getJob()); + Object jobException = _jobMgr.unmarshallResultObject(outcome.getJob()); if (jobException != null) { if (jobException instanceof ResourceUnavailableException) throw (ResourceUnavailableException)jobException; @@ -3206,14 +3206,14 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac Boolean result = (Boolean)JobSerializerHelper.fromObjectSerializedString(jobVo.getResult()); return result; } else { - Throwable jobException = retrieveExecutionException(outcome.getJob()); - if (jobException != null) { - if (jobException instanceof ResourceUnavailableException) - throw (ResourceUnavailableException)jobException; - else if (jobException instanceof ConcurrentOperationException) - throw (ConcurrentOperationException)jobException; - else if (jobException instanceof RuntimeException) - throw (RuntimeException)jobException; + Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob()); + if (jobResult != null) { + if (jobResult instanceof ResourceUnavailableException) + throw (ResourceUnavailableException)jobResult; + else if (jobResult instanceof ConcurrentOperationException) + throw (ConcurrentOperationException)jobResult; + else if (jobResult instanceof RuntimeException) + throw (RuntimeException)jobResult; } throw new RuntimeException("Job failed with un-handled exception"); @@ -3444,12 +3444,12 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac throw new RuntimeException("Execution excetion", e); } - Throwable jobException = retrieveExecutionException(outcome.getJob()); - if (jobException != null) { - if (jobException instanceof ResourceUnavailableException) - throw (ResourceUnavailableException)jobException; - else if (jobException instanceof ConcurrentOperationException) - throw (ConcurrentOperationException)jobException; + Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob()); + if (jobResult != null) { + if (jobResult instanceof ResourceUnavailableException) + throw (ResourceUnavailableException)jobResult; + else if (jobResult instanceof ConcurrentOperationException) + throw (ConcurrentOperationException)jobResult; } } } @@ -3698,12 +3698,12 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac if (jobVo.getResultCode() == JobInfo.Status.SUCCEEDED.ordinal()) { return _entityMgr.findById(VMInstanceVO.class, vm.getId()); } else { - Throwable jobException = retrieveExecutionException(outcome.getJob()); - if (jobException != null) { - if (jobException instanceof ResourceUnavailableException) - throw (ResourceUnavailableException)jobException; - else if (jobException instanceof ConcurrentOperationException) - throw (ConcurrentOperationException)jobException; + Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob()); + if (jobResult != null) { + if (jobResult instanceof ResourceUnavailableException) + throw (ResourceUnavailableException)jobResult; + else if (jobResult instanceof ConcurrentOperationException) + throw (ConcurrentOperationException)jobResult; } throw new RuntimeException("Failed with un-handled exception"); @@ -4121,20 +4121,6 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } } - public Throwable retrieveExecutionException(AsyncJob job) { - assert (job != null); - assert (job.getDispatcher().equals(VmWorkConstants.VM_WORK_JOB_DISPATCHER)); - - AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, job.getId()); - if (jobVo != null && jobVo.getResult() != null) { - Object obj = JobSerializerHelper.fromSerializedString(job.getResult()); - - if (obj != null && obj instanceof Throwable) - return (Throwable)obj; - } - return null; - } - // // TODO build a common pattern to reduce code duplication in following methods // no time for this at current iteration diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java index d1a4a9d5930..67733ed1c90 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java @@ -16,6 +16,7 @@ // under the License. package org.apache.cloudstack.framework.jobs; +import java.io.Serializable; import java.util.List; import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO; @@ -41,8 +42,8 @@ public interface AsyncJobManager extends Manager { void updateAsyncJobStatus(long jobId, int processStatus, String resultObject); void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId); - - void logJobJournal(long jobId, AsyncJob.JournalType journalType, String journalText, String journalObjJson); + void logJobJournal(long jobId, AsyncJob.JournalType journalType, String + journalText, String journalObjJson); /** * A running thread inside management server can have a 1:1 linked pseudo job. @@ -81,8 +82,8 @@ public interface AsyncJobManager extends Manager { * @param wakeupIntervalInMilliSeconds * @param timeoutInMilliSeconds */ - void joinJob(long jobId, long joinJobId, String wakeupHandler, String wakupDispatcher, String[] wakeupTopicsOnMessageBus, long wakeupIntervalInMilliSeconds, - long timeoutInMilliSeconds); + void joinJob(long jobId, long joinJobId, String wakeupHandler, String wakupDispatcher, + String[] wakeupTopicsOnMessageBus, long wakeupIntervalInMilliSeconds, long timeoutInMilliSeconds); /** * Dis-join two related jobs @@ -124,4 +125,7 @@ public interface AsyncJobManager extends Manager { AsyncJob queryJob(long jobId, boolean updatePollTime); + String marshallResultObject(Serializable obj); + + Object unmarshallResultObject(AsyncJob job); } diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java index d98d8329cc2..e9442ec6e06 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java @@ -17,6 +17,7 @@ package org.apache.cloudstack.framework.jobs.impl; +import java.io.Serializable; import java.util.Arrays; import java.util.Collections; import java.util.Date; @@ -611,6 +612,21 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, return false; } + @Override + public String marshallResultObject(Serializable obj) { + if (obj != null) + return JobSerializerHelper.toObjectSerializedString(obj); + + return null; + } + + @Override + public Object unmarshallResultObject(AsyncJob job) { + if(job.getResult() != null) + return JobSerializerHelper.fromObjectSerializedString(job.getResult()); + return null; + } + private void checkQueue(long queueId) { while (true) { try { diff --git a/server/src/com/cloud/storage/VmWorkMigrateVolume.java b/server/src/com/cloud/storage/VmWorkMigrateVolume.java new file mode 100644 index 00000000000..c83e02df3ba --- /dev/null +++ b/server/src/com/cloud/storage/VmWorkMigrateVolume.java @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.storage; + +import com.cloud.vm.VmWork; + +public class VmWorkMigrateVolume extends VmWork { + private static final long serialVersionUID = -565778516928408602L; + + private long volumeId; + private long destPoolId; + private boolean liveMigrate; + + public VmWorkMigrateVolume(long userId, long accountId, long vmId, String handlerName, long volumeId, long destPoolId, boolean liveMigrate) { + super(userId, accountId, vmId, handlerName); + this.volumeId = volumeId; + this.destPoolId = destPoolId; + this.liveMigrate = liveMigrate; + } + + public long getVolumeId() { + return volumeId; + } + + public long getDestPoolId() { + return destPoolId; + } + + public boolean isLiveMigrate() { + return liveMigrate; + } +} diff --git a/server/src/com/cloud/storage/VmWorkResizeVolume.java b/server/src/com/cloud/storage/VmWorkResizeVolume.java new file mode 100644 index 00000000000..3ccaecd2429 --- /dev/null +++ b/server/src/com/cloud/storage/VmWorkResizeVolume.java @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.storage; + +import com.cloud.vm.VmWork; + +public class VmWorkResizeVolume extends VmWork { + private static final long serialVersionUID = 6112366316907642498L; + + private long volumeId; + private long currentSize; + private long newSize; + private Long newServiceOfferingId; + private boolean shrinkOk; + + public VmWorkResizeVolume(long userId, long accountId, long vmId, String handlerName, + long volumeId, long currentSize, long newSize, Long newServiceOfferingId, boolean shrinkOk) { + + super(userId, accountId, vmId, handlerName); + + this.volumeId = volumeId; + this.currentSize = currentSize; + this.newSize = newSize; + this.newServiceOfferingId = newServiceOfferingId; + this.shrinkOk = shrinkOk; + } + + public long getVolumeId() { + return volumeId; + } + + public long getCurrentSize() { + return currentSize; + } + + public long getNewSize() { + return newSize; + } + + public Long getNewServiceOfferingId() { + return newServiceOfferingId; + } + + public boolean isShrinkOk() { + return shrinkOk; + } +} diff --git a/server/src/com/cloud/storage/VolumeApiServiceImpl.java b/server/src/com/cloud/storage/VolumeApiServiceImpl.java index e1d1c7c4750..f65084afbcb 100644 --- a/server/src/com/cloud/storage/VolumeApiServiceImpl.java +++ b/server/src/com/cloud/storage/VolumeApiServiceImpl.java @@ -906,6 +906,43 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic - currentSize)); } + if (userVm != null) { + // serialize VM operation + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); + if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { + // avoid re-entrance + return orchestrateResizeVolume(volume.getId(), currentSize, newSize, + newDiskOffering != null ? cmd.getNewDiskOfferingId() : null, shrinkOk); + } else { + Outcome outcome = resizeVolumeThroughJobQueue(userVm.getId(), volume.getId(), currentSize, newSize, + newDiskOffering != null ? cmd.getNewDiskOfferingId() : null, shrinkOk); + + Volume vol = null; + try { + vol = outcome.get(); + } catch (InterruptedException e) { + throw new RuntimeException("Operation is interrupted", e); + } catch (java.util.concurrent.ExecutionException e) { + throw new RuntimeException("Execution excetion", e); + } + + Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob()); + if (jobResult != null) { + if (jobResult instanceof ConcurrentOperationException) + throw (ConcurrentOperationException)jobResult; + else if (jobResult instanceof Throwable) + throw new RuntimeException("Unexpected exception", (Throwable)jobResult); + } + return volume; + } + } + return orchestrateResizeVolume(volume.getId(), currentSize, newSize, + newDiskOffering != null ? cmd.getNewDiskOfferingId() : null, shrinkOk); + } + + private VolumeVO orchestrateResizeVolume(long volumeId, long currentSize, long newSize, Long newDiskOfferingId, boolean shrinkOk) { + VolumeVO volume = _volsDao.findById(volumeId); + UserVmVO userVm = _userVmDao.findById(volume.getInstanceId()); /* * get a list of hosts to send the commands to, try the system the * associated vm is running on first, then the last known place it ran. @@ -943,8 +980,8 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic volume = _volsDao.findById(volume.getId()); - if (newDiskOffering != null) { - volume.setDiskOfferingId(cmd.getNewDiskOfferingId()); + if (newDiskOfferingId != null) { + volume.setDiskOfferingId(newDiskOfferingId); } _volsDao.update(volume.getId(), volume); // Log usage event for volumes belonging user VM's only @@ -1078,12 +1115,12 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic throw new RuntimeException("Execution excetion", e); } - Throwable jobException = retrieveExecutionException(outcome.getJob()); - if (jobException != null) { - if (jobException instanceof ConcurrentOperationException) - throw (ConcurrentOperationException)jobException; - else - throw new RuntimeException("Unexpected exception", jobException); + Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob()); + if (jobResult != null) { + if (jobResult instanceof ConcurrentOperationException) + throw (ConcurrentOperationException)jobResult; + else if (jobResult instanceof Throwable) + throw new RuntimeException("Unexpected exception", (Throwable)jobResult); } return vol; } @@ -1383,12 +1420,12 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic throw new RuntimeException("Execution excetion", e); } - Throwable jobException = retrieveExecutionException(outcome.getJob()); - if (jobException != null) { - if (jobException instanceof ConcurrentOperationException) - throw (ConcurrentOperationException)jobException; - else - throw new RuntimeException("Unexpected exception", jobException); + Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob()); + if (jobResult != null) { + if (jobResult instanceof ConcurrentOperationException) + throw (ConcurrentOperationException)jobResult; + else if (jobResult instanceof Throwable) + throw new RuntimeException("Unexpected exception", (Throwable)jobResult); } return vol; } @@ -1517,6 +1554,48 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic throw new InvalidParameterValueException("Migration of volume from local storage pool is not supported"); } + if (vm != null) { + // serialize VM operation + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); + if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { + // avoid re-entrance + return orchestrateMigrateVolume(vol.getId(), destPool.getId(), liveMigrateVolume); + } else { + Outcome outcome = migrateVolumeThroughJobQueue(vm.getId(), vol.getId(), destPool.getId(), liveMigrateVolume); + + try { + outcome.get(); + } catch (InterruptedException e) { + throw new RuntimeException("Operation is interrupted", e); + } catch (java.util.concurrent.ExecutionException e) { + throw new RuntimeException("Execution excetion", e); + } + + Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob()); + if (jobResult != null) { + if (jobResult instanceof ConcurrentOperationException) + throw (ConcurrentOperationException)jobResult; + else if (jobResult instanceof Throwable) + throw new RuntimeException("Unexpected exception", (Throwable)jobResult); + } + + // retrieve the migrated new volume from job result + if (jobResult != null && jobResult instanceof Long) { + return _entityMgr.findById(VolumeVO.class, ((Long)jobResult).longValue()); + } + return null; + } + } + + return orchestrateMigrateVolume(vol.getId(), destPool.getId(), liveMigrateVolume); + } + + private Volume orchestrateMigrateVolume(long volumeId, long destPoolId, boolean liveMigrateVolume) { + VolumeVO vol = _volsDao.findById(volumeId); + assert (vol != null); + StoragePool destPool = (StoragePool)dataStoreMgr.getDataStore(destPoolId, DataStoreRole.Primary); + assert (destPool != null); + Volume newVol = null; if (liveMigrateVolume) { newVol = liveMigrateVolume(vol, destPool); @@ -1940,20 +2019,6 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic _storagePoolAllocators = storagePoolAllocators; } - public Throwable retrieveExecutionException(AsyncJob job) { - assert (job != null); - assert (job.getDispatcher().equals(VmWorkConstants.VM_WORK_JOB_DISPATCHER)); - - AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, job.getId()); - if (jobVo != null && jobVo.getResult() != null) { - Object obj = JobSerializerHelper.fromSerializedString(job.getResult()); - - if (obj != null && obj instanceof Throwable) - return (Throwable)obj; - } - return null; - } - public class VmJobSyncOutcome extends OutcomeImpl { private long _volumeId; @@ -2065,6 +2130,98 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), volumeId); } + public Outcome resizeVolumeThroughJobQueue(final Long vmId, final long volumeId, + final long currentSize, final long newSize, final Long newServiceOfferingId, final boolean shrinkOk) { + + final CallContext context = CallContext.current(); + final User callingUser = context.getCallingUser(); + final Account callingAccount = context.getCallingAccount(); + + final VMInstanceVO vm = _vmInstanceDao.findById(vmId); + + Transaction.execute(new TransactionCallbackNoReturn() { + @Override + public void doInTransactionWithoutResult(TransactionStatus status) { + VmWorkJobVO workJob = null; + + _vmInstanceDao.lockRow(vm.getId(), true); + workJob = new VmWorkJobVO(context.getContextId()); + + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkResizeVolume.class.getName()); + + workJob.setAccountId(callingAccount.getId()); + workJob.setUserId(callingUser.getId()); + workJob.setStep(VmWorkJobVO.Step.Starting); + workJob.setVmType(vm.getType()); + workJob.setVmInstanceId(vm.getId()); + + // save work context info (there are some duplications) + VmWorkResizeVolume workInfo = new VmWorkResizeVolume(callingUser.getId(), callingAccount.getId(), vm.getId(), + VolumeApiServiceImpl.VM_WORK_JOB_HANDLER, volumeId, currentSize, newSize, newServiceOfferingId, shrinkOk); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); + + // Transaction syntax sugar has a cost here + context.putContextParameter("workJob", workJob); + context.putContextParameter("jobId", new Long(workJob.getId())); + } + }); + + final long jobId = (Long)context.getContextParameter("jobId"); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + + return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), + volumeId); + } + + public Outcome migrateVolumeThroughJobQueue(final Long vmId, final long volumeId, + final long destPoolId, final boolean liveMigrate) { + + final CallContext context = CallContext.current(); + final User callingUser = context.getCallingUser(); + final Account callingAccount = context.getCallingAccount(); + + final VMInstanceVO vm = _vmInstanceDao.findById(vmId); + + Transaction.execute(new TransactionCallbackNoReturn() { + @Override + public void doInTransactionWithoutResult(TransactionStatus status) { + VmWorkJobVO workJob = null; + + _vmInstanceDao.lockRow(vm.getId(), true); + workJob = new VmWorkJobVO(context.getContextId()); + + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkMigrateVolume.class.getName()); + + workJob.setAccountId(callingAccount.getId()); + workJob.setUserId(callingUser.getId()); + workJob.setStep(VmWorkJobVO.Step.Starting); + workJob.setVmType(vm.getType()); + workJob.setVmInstanceId(vm.getId()); + + // save work context info (there are some duplications) + VmWorkMigrateVolume workInfo = new VmWorkMigrateVolume(callingUser.getId(), callingAccount.getId(), vm.getId(), + VolumeApiServiceImpl.VM_WORK_JOB_HANDLER, volumeId, destPoolId, liveMigrate); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); + + // Transaction syntax sugar has a cost here + context.putContextParameter("workJob", workJob); + context.putContextParameter("jobId", new Long(workJob.getId())); + } + }); + + final long jobId = (Long)context.getContextParameter("jobId"); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + + return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), + volumeId); + } + @Override public Pair handleVmWorkJob(AsyncJob job, VmWork work) throws Exception { VMInstanceVO vm = _entityMgr.findById(VMInstanceVO.class, work.getVmId()); @@ -2100,6 +2257,36 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic s_logger.debug("Done executing Detach-Volume within VM work job context. vmId: " + detachWork.getVmId() + ", volId: " + detachWork.getVolumeId()); return new Pair(JobInfo.Status.SUCCEEDED, null); + } else if (work instanceof VmWorkResizeVolume) { + VmWorkResizeVolume resizeWork = (VmWorkResizeVolume)work; + + if (s_logger.isDebugEnabled()) + s_logger.debug("Execute Resize-Volume within VM work job context. vmId: " + resizeWork.getVmId() + + ", volId: " + resizeWork.getVolumeId() + ", size " + resizeWork.getCurrentSize() + " -> " + resizeWork.getNewSize()); + + orchestrateResizeVolume(resizeWork.getVolumeId(), resizeWork.getCurrentSize(), resizeWork.getNewSize(), + resizeWork.getNewServiceOfferingId(), resizeWork.isShrinkOk()); + + if (s_logger.isDebugEnabled()) + s_logger.debug("Done executing Resize-Volume within VM work job context. vmId: " + resizeWork.getVmId() + + ", volId: " + resizeWork.getVolumeId() + ", size " + resizeWork.getCurrentSize() + " -> " + resizeWork.getNewSize()); + + return new Pair(JobInfo.Status.SUCCEEDED, null); + + } else if (work instanceof VmWorkMigrateVolume) { + VmWorkMigrateVolume migrateWork = (VmWorkMigrateVolume)work; + + if (s_logger.isDebugEnabled()) + s_logger.debug("Execute Migrate-Volume within VM work job context. vmId: " + migrateWork.getVmId() + + ", volId: " + migrateWork.getVolumeId() + ", destPoolId: " + migrateWork.getDestPoolId() + ", live: " + migrateWork.isLiveMigrate()); + + Volume newVol = orchestrateMigrateVolume(migrateWork.getVolumeId(), migrateWork.getDestPoolId(), migrateWork.isLiveMigrate()); + + if (s_logger.isDebugEnabled()) + s_logger.debug("Done executing Migrate-Volume within VM work job context. vmId: " + migrateWork.getVmId() + + ", volId: " + migrateWork.getVolumeId() + ", destPoolId: " + migrateWork.getDestPoolId() + ", live: " + migrateWork.isLiveMigrate()); + + return new Pair(JobInfo.Status.SUCCEEDED, JobSerializerHelper.toObjectSerializedString(new Long(newVol.getId()))); } else { RuntimeException e = new RuntimeException("Unsupported VM work command: " + job.getCmd()); String exceptionJson = JobSerializerHelper.toSerializedString(e); diff --git a/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java b/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java index 93c845a9d26..1b66ff8da14 100644 --- a/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java +++ b/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java @@ -35,7 +35,17 @@ import org.apache.cloudstack.context.CallContext; import org.apache.cloudstack.engine.subsystem.api.storage.StorageStrategyFactory; import org.apache.cloudstack.engine.subsystem.api.storage.VMSnapshotOptions; import org.apache.cloudstack.engine.subsystem.api.storage.VMSnapshotStrategy; +import org.apache.cloudstack.framework.config.ConfigKey; import org.apache.cloudstack.framework.config.dao.ConfigurationDao; +import org.apache.cloudstack.framework.jobs.AsyncJob; +import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext; +import org.apache.cloudstack.framework.jobs.AsyncJobManager; +import org.apache.cloudstack.framework.jobs.Outcome; +import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO; +import org.apache.cloudstack.framework.jobs.impl.JobSerializerHelper; +import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl; +import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO; +import org.apache.cloudstack.jobs.JobInfo; import com.cloud.event.ActionEvent; import com.cloud.event.EventTypes; @@ -55,15 +65,22 @@ import com.cloud.storage.dao.SnapshotDao; import com.cloud.storage.dao.VolumeDao; import com.cloud.user.Account; import com.cloud.user.AccountManager; +import com.cloud.user.User; import com.cloud.user.dao.AccountDao; import com.cloud.uservm.UserVm; import com.cloud.utils.DateUtil; import com.cloud.utils.NumbersUtil; +import com.cloud.utils.Pair; +import com.cloud.utils.Predicate; import com.cloud.utils.Ternary; import com.cloud.utils.component.ManagerBase; +import com.cloud.utils.db.EntityManager; import com.cloud.utils.db.Filter; import com.cloud.utils.db.SearchBuilder; import com.cloud.utils.db.SearchCriteria; +import com.cloud.utils.db.Transaction; +import com.cloud.utils.db.TransactionCallbackNoReturn; +import com.cloud.utils.db.TransactionStatus; import com.cloud.utils.exception.CloudRuntimeException; import com.cloud.vm.UserVmVO; import com.cloud.vm.VMInstanceVO; @@ -71,40 +88,53 @@ import com.cloud.vm.VirtualMachine; import com.cloud.vm.VirtualMachine.State; import com.cloud.vm.VirtualMachineManager; import com.cloud.vm.VirtualMachineProfile; +import com.cloud.vm.VmWork; +import com.cloud.vm.VmWorkConstants; +import com.cloud.vm.VmWorkJobHandler; +import com.cloud.vm.VmWorkSerializer; import com.cloud.vm.dao.UserVmDao; +import com.cloud.vm.dao.VMInstanceDao; import com.cloud.vm.snapshot.dao.VMSnapshotDao; @Component -@Local(value = {VMSnapshotManager.class, VMSnapshotService.class}) -public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotManager, VMSnapshotService { +@Local(value = { VMSnapshotManager.class, VMSnapshotService.class }) +public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotManager, VMSnapshotService, VmWorkJobHandler { private static final Logger s_logger = Logger.getLogger(VMSnapshotManagerImpl.class); + + public static final String VM_WORK_JOB_HANDLER = VMSnapshotManagerImpl.class.getSimpleName(); + String _name; @Inject - VMSnapshotDao _vmSnapshotDao; - @Inject - VolumeDao _volumeDao; - @Inject - AccountDao _accountDao; - @Inject - UserVmDao _userVMDao; - @Inject - AccountManager _accountMgr; - @Inject - GuestOSDao _guestOSDao; - @Inject - SnapshotDao _snapshotDao; - @Inject - VirtualMachineManager _itMgr; - @Inject - ConfigurationDao _configDao; - @Inject - HypervisorCapabilitiesDao _hypervisorCapabilitiesDao; + VMInstanceDao _vmInstanceDao; + @Inject VMSnapshotDao _vmSnapshotDao; + @Inject VolumeDao _volumeDao; + @Inject AccountDao _accountDao; + @Inject UserVmDao _userVMDao; + @Inject AccountManager _accountMgr; + @Inject GuestOSDao _guestOSDao; + @Inject SnapshotDao _snapshotDao; + @Inject VirtualMachineManager _itMgr; + @Inject ConfigurationDao _configDao; + @Inject HypervisorCapabilitiesDao _hypervisorCapabilitiesDao; @Inject StorageStrategyFactory storageStrategyFactory; + @Inject + EntityManager _entityMgr; + @Inject + AsyncJobManager _jobMgr; + int _vmSnapshotMax; int _wait; + // TODO + static final ConfigKey VmJobEnabled = new ConfigKey("Advanced", + Boolean.class, "vm.job.enabled", "false", + "True to enable new VM sync model. false to use the old way", false); + static final ConfigKey VmJobCheckInterval = new ConfigKey("Advanced", + Long.class, "vm.job.check.interval", "3000", + "Interval in milliseconds to check if the job is complete", false); + @Override public boolean configure(String name, Map params) throws ConfigurationException { _name = name; @@ -249,7 +279,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana throw new InvalidParameterValueException("Creating vm snapshot failed due to VM:" + vmId + " is not in the running or Stopped state"); } - if (snapshotMemory && userVmVo.getState() == VirtualMachine.State.Stopped) { + if(snapshotMemory && userVmVo.getState() == VirtualMachine.State.Stopped){ throw new InvalidParameterValueException("Can not snapshot memory when VM is in stopped state"); } @@ -327,6 +357,46 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana if (vmSnapshot == null) { throw new CloudRuntimeException("VM snapshot id: " + vmSnapshotId + " can not be found"); } + + // serialize VM operation + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); + if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { + // avoid re-entrance + return orchestrateCreateVMSnapshot(vmId, vmSnapshotId, quiescevm); + } else { + Outcome outcome = createVMSnapshotThroughJobQueue(vmId, vmSnapshotId, quiescevm); + + VMSnapshot result = null; + try { + result = outcome.get(); + } catch (InterruptedException e) { + throw new RuntimeException("Operation is interrupted", e); + } catch (java.util.concurrent.ExecutionException e) { + throw new RuntimeException("Execution excetion", e); + } + + Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob()); + if (jobResult != null) { + if (jobResult instanceof ConcurrentOperationException) + throw (ConcurrentOperationException)jobResult; + else if (jobResult instanceof Throwable) + throw new RuntimeException("Unexpected exception", (Throwable)jobResult); + } + + return result; + } + } + + private VMSnapshot orchestrateCreateVMSnapshot(Long vmId, Long vmSnapshotId, Boolean quiescevm) { + UserVmVO userVm = _userVMDao.findById(vmId); + if (userVm == null) { + throw new InvalidParameterValueException("Create vm to snapshot failed due to vm: " + vmId + " is not found"); + } + VMSnapshotVO vmSnapshot = _vmSnapshotDao.findById(vmSnapshotId); + if (vmSnapshot == null) { + throw new CloudRuntimeException("VM snapshot id: " + vmSnapshotId + " can not be found"); + } + VMSnapshotOptions options = new VMSnapshotOptions(quiescevm); vmSnapshot.setOptions(options); try { @@ -376,6 +446,62 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana throw new InvalidParameterValueException("There is other active vm snapshot tasks on the instance, please try again later"); } + // serialize VM operation + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); + if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { + // avoid re-entrance + return orchestrateDeleteVMSnapshot(vmSnapshotId); + } else { + Outcome outcome = deleteVMSnapshotThroughJobQueue(vmSnapshot.getVmId(), vmSnapshotId); + + VMSnapshot result = null; + try { + result = outcome.get(); + } catch (InterruptedException e) { + throw new RuntimeException("Operation is interrupted", e); + } catch (java.util.concurrent.ExecutionException e) { + throw new RuntimeException("Execution excetion", e); + } + + Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob()); + if (jobResult != null) { + if (jobResult instanceof ConcurrentOperationException) + throw (ConcurrentOperationException)jobResult; + else if (jobResult instanceof Throwable) + throw new RuntimeException("Unexpected exception", (Throwable)jobResult); + } + + if (jobResult instanceof Boolean) + return ((Boolean)jobResult).booleanValue(); + + return false; + } + } + + public boolean orchestrateDeleteVMSnapshot(Long vmSnapshotId) { + Account caller = getCaller(); + + VMSnapshotVO vmSnapshot = _vmSnapshotDao.findById(vmSnapshotId); + if (vmSnapshot == null) { + throw new InvalidParameterValueException("unable to find the vm snapshot with id " + vmSnapshotId); + } + + _accountMgr.checkAccess(caller, null, true, vmSnapshot); + + // check VM snapshot states, only allow to delete vm snapshots in created and error state + if (VMSnapshot.State.Ready != vmSnapshot.getState() && VMSnapshot.State.Expunging != vmSnapshot.getState() && VMSnapshot.State.Error != vmSnapshot.getState()) { + throw new InvalidParameterValueException("Can't delete the vm snapshotshot " + vmSnapshotId + " due to it is not in Created or Error, or Expunging State"); + } + + // check if there are other active VM snapshot tasks + if (hasActiveVMSnapshotTasks(vmSnapshot.getVmId())) { + List expungingSnapshots = _vmSnapshotDao.listByInstanceId(vmSnapshot.getVmId(), VMSnapshot.State.Expunging); + if (expungingSnapshots.size() > 0 && expungingSnapshots.get(0).getId() == vmSnapshot.getId()) + s_logger.debug("Target VM snapshot already in expunging state, go on deleting it: " + vmSnapshot.getDisplayName()); + else + throw new InvalidParameterValueException("There is other active vm snapshot tasks on the instance, please try again later"); + } + if (vmSnapshot.getState() == VMSnapshot.State.Allocated) { return _vmSnapshotDao.remove(vmSnapshot.getId()); } else { @@ -413,6 +539,77 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana Account caller = getCaller(); _accountMgr.checkAccess(caller, null, true, vmSnapshotVo); + // VM should be in running or stopped states + if (userVm.getState() != VirtualMachine.State.Running + && userVm.getState() != VirtualMachine.State.Stopped) { + throw new InvalidParameterValueException( + "VM Snapshot reverting failed due to vm is not in the state of Running or Stopped."); + } + + // if snapshot is not created, error out + if (vmSnapshotVo.getState() != VMSnapshot.State.Ready) { + throw new InvalidParameterValueException( + "VM Snapshot reverting failed due to vm snapshot is not in the state of Created."); + } + + // serialize VM operation + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); + if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { + // avoid re-entrance + return orchestrateRevertToVMSnapshot(vmSnapshotId); + } else { + Outcome outcome = revertToVMSnapshotThroughJobQueue(vmSnapshotVo.getVmId(), vmSnapshotId); + + VMSnapshot result = null; + try { + result = outcome.get(); + } catch (InterruptedException e) { + throw new RuntimeException("Operation is interrupted", e); + } catch (java.util.concurrent.ExecutionException e) { + throw new RuntimeException("Execution excetion", e); + } + + Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob()); + if (jobResult != null) { + if (jobResult instanceof ConcurrentOperationException) + throw (ConcurrentOperationException)jobResult; + else if (jobResult instanceof InsufficientCapacityException) + throw (InsufficientCapacityException)jobResult; + else if (jobResult instanceof ResourceUnavailableException) + throw (ResourceUnavailableException)jobResult; + else if (jobResult instanceof Throwable) + throw new RuntimeException("Unexpected exception", (Throwable)jobResult); + } + + return userVm; + } + } + + public UserVm orchestrateRevertToVMSnapshot(Long vmSnapshotId) throws InsufficientCapacityException, ResourceUnavailableException, ConcurrentOperationException { + + // check if VM snapshot exists in DB + VMSnapshotVO vmSnapshotVo = _vmSnapshotDao.findById(vmSnapshotId); + if (vmSnapshotVo == null) { + throw new InvalidParameterValueException( + "unable to find the vm snapshot with id " + vmSnapshotId); + } + Long vmId = vmSnapshotVo.getVmId(); + UserVmVO userVm = _userVMDao.findById(vmId); + // check if VM exists + if (userVm == null) { + throw new InvalidParameterValueException("Revert vm to snapshot: " + + vmSnapshotId + " failed due to vm: " + vmId + + " is not found"); + } + + // check if there are other active VM snapshot tasks + if (hasActiveVMSnapshotTasks(vmId)) { + throw new InvalidParameterValueException("There is other active vm snapshot tasks on the instance, please try again later"); + } + + Account caller = getCaller(); + _accountMgr.checkAccess(caller, null, true, vmSnapshotVo); + // VM should be in running or stopped states if (userVm.getState() != VirtualMachine.State.Running && userVm.getState() != VirtualMachine.State.Stopped) { throw new InvalidParameterValueException("VM Snapshot reverting failed due to vm is not in the state of Running or Stopped."); @@ -481,6 +678,38 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana @Override public boolean deleteAllVMSnapshots(long vmId, VMSnapshot.Type type) { + // serialize VM operation + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); + if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { + // avoid re-entrance + return orchestrateDeleteAllVMSnapshots(vmId, type); + } else { + Outcome outcome = deleteAllVMSnapshotsThroughJobQueue(vmId, type); + + try { + outcome.get(); + } catch (InterruptedException e) { + throw new RuntimeException("Operation is interrupted", e); + } catch (java.util.concurrent.ExecutionException e) { + throw new RuntimeException("Execution excetion", e); + } + + Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob()); + if (jobResult != null) { + if (jobResult instanceof ConcurrentOperationException) + throw (ConcurrentOperationException)jobResult; + else if (jobResult instanceof Throwable) + throw new RuntimeException("Unexpected exception", (Throwable)jobResult); + } + + if (jobResult instanceof Boolean) + return (Boolean)jobResult; + + return false; + } + } + + private boolean orchestrateDeleteAllVMSnapshots(long vmId, VMSnapshot.Type type) { boolean result = true; List listVmSnapshots = _vmSnapshotDao.findByVm(vmId); if (listVmSnapshots == null || listVmSnapshots.isEmpty()) { @@ -501,14 +730,13 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana @Override public boolean syncVMSnapshot(VMInstanceVO vm, Long hostId) { - try { + try{ UserVmVO userVm = _userVMDao.findById(vm.getId()); if (userVm == null) return false; - List vmSnapshotsInExpungingStates = - _vmSnapshotDao.listByInstanceId(vm.getId(), VMSnapshot.State.Expunging, VMSnapshot.State.Reverting, VMSnapshot.State.Creating); + List vmSnapshotsInExpungingStates = _vmSnapshotDao.listByInstanceId(vm.getId(), VMSnapshot.State.Expunging, VMSnapshot.State.Reverting, VMSnapshot.State.Creating); for (VMSnapshotVO vmSnapshotVO : vmSnapshotsInExpungingStates) { VMSnapshotStrategy strategy = findVMSnapshotStrategy(vmSnapshotVO); if (vmSnapshotVO.getState() == VMSnapshot.State.Expunging) { @@ -529,4 +757,299 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana return false; } + public class VmJobSyncOutcome extends OutcomeImpl { + private long _vmSnapshotId; + + public VmJobSyncOutcome(final AsyncJob job, final long vmSnapshotId) { + super(VMSnapshot.class, job, VmJobCheckInterval.value(), new Predicate() { + @Override + public boolean checkCondition() { + AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, job.getId()); + assert (jobVo != null); + if (jobVo == null || jobVo.getStatus() != JobInfo.Status.IN_PROGRESS) + return true; + + return false; + } + }, AsyncJob.Topics.JOB_STATE); + _vmSnapshotId = vmSnapshotId; + } + + @Override + protected VMSnapshot retrieve() { + return _vmSnapshotDao.findById(_vmSnapshotId); + } + } + + public class VmJobSyncVirtualMachineOutcome extends OutcomeImpl { + long vmId; + + public VmJobSyncVirtualMachineOutcome(final AsyncJob job, final long vmId) { + super(VirtualMachine.class, job, VmJobCheckInterval.value(), new Predicate() { + @Override + public boolean checkCondition() { + AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, job.getId()); + assert (jobVo != null); + if (jobVo == null || jobVo.getStatus() != JobInfo.Status.IN_PROGRESS) + return true; + + return false; + } + }, AsyncJob.Topics.JOB_STATE); + } + + @Override + protected VirtualMachine retrieve() { + return _vmInstanceDao.findById(vmId); + } + } + + public Outcome createVMSnapshotThroughJobQueue(final Long vmId, final Long vmSnapshotId, final boolean quiesceVm) { + + final CallContext context = CallContext.current(); + final User callingUser = context.getCallingUser(); + final Account callingAccount = context.getCallingAccount(); + + final VMInstanceVO vm = _vmInstanceDao.findById(vmId); + + Transaction.execute(new TransactionCallbackNoReturn() { + @Override + public void doInTransactionWithoutResult(TransactionStatus status) { + VmWorkJobVO workJob = null; + + _vmInstanceDao.lockRow(vm.getId(), true); + workJob = new VmWorkJobVO(context.getContextId()); + + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkCreateVMSnapshot.class.getName()); + + workJob.setAccountId(callingAccount.getId()); + workJob.setUserId(callingUser.getId()); + workJob.setStep(VmWorkJobVO.Step.Starting); + workJob.setVmType(vm.getType()); + workJob.setVmInstanceId(vm.getId()); + + // save work context info (there are some duplications) + VmWorkCreateVMSnapshot workInfo = new VmWorkCreateVMSnapshot(callingUser.getId(), callingAccount.getId(), vm.getId(), + VMSnapshotManagerImpl.VM_WORK_JOB_HANDLER, vmSnapshotId, quiesceVm); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); + + // Transaction syntax sugar has a cost here + context.putContextParameter("workJob", workJob); + context.putContextParameter("jobId", new Long(workJob.getId())); + } + }); + + final long jobId = (Long)context.getContextParameter("jobId"); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + + return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), + vmSnapshotId); + } + + public Outcome deleteVMSnapshotThroughJobQueue(final Long vmId, final Long vmSnapshotId) { + + final CallContext context = CallContext.current(); + final User callingUser = context.getCallingUser(); + final Account callingAccount = context.getCallingAccount(); + + final VMInstanceVO vm = _vmInstanceDao.findById(vmId); + + Transaction.execute(new TransactionCallbackNoReturn() { + @Override + public void doInTransactionWithoutResult(TransactionStatus status) { + VmWorkJobVO workJob = null; + + _vmInstanceDao.lockRow(vm.getId(), true); + workJob = new VmWorkJobVO(context.getContextId()); + + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkDeleteVMSnapshot.class.getName()); + + workJob.setAccountId(callingAccount.getId()); + workJob.setUserId(callingUser.getId()); + workJob.setStep(VmWorkJobVO.Step.Starting); + workJob.setVmType(vm.getType()); + workJob.setVmInstanceId(vm.getId()); + + // save work context info (there are some duplications) + VmWorkDeleteVMSnapshot workInfo = new VmWorkDeleteVMSnapshot(callingUser.getId(), callingAccount.getId(), vm.getId(), + VMSnapshotManagerImpl.VM_WORK_JOB_HANDLER, vmSnapshotId); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); + + // Transaction syntax sugar has a cost here + context.putContextParameter("workJob", workJob); + context.putContextParameter("jobId", new Long(workJob.getId())); + } + }); + + final long jobId = (Long)context.getContextParameter("jobId"); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + + return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), + vmSnapshotId); + } + + public Outcome revertToVMSnapshotThroughJobQueue(final Long vmId, final Long vmSnapshotId) { + + final CallContext context = CallContext.current(); + final User callingUser = context.getCallingUser(); + final Account callingAccount = context.getCallingAccount(); + + final VMInstanceVO vm = _vmInstanceDao.findById(vmId); + + Transaction.execute(new TransactionCallbackNoReturn() { + @Override + public void doInTransactionWithoutResult(TransactionStatus status) { + VmWorkJobVO workJob = null; + + _vmInstanceDao.lockRow(vm.getId(), true); + workJob = new VmWorkJobVO(context.getContextId()); + + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkRevertToVMSnapshot.class.getName()); + + workJob.setAccountId(callingAccount.getId()); + workJob.setUserId(callingUser.getId()); + workJob.setStep(VmWorkJobVO.Step.Starting); + workJob.setVmType(vm.getType()); + workJob.setVmInstanceId(vm.getId()); + + // save work context info (there are some duplications) + VmWorkRevertToVMSnapshot workInfo = new VmWorkRevertToVMSnapshot(callingUser.getId(), callingAccount.getId(), vm.getId(), + VMSnapshotManagerImpl.VM_WORK_JOB_HANDLER, vmSnapshotId); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); + + // Transaction syntax sugar has a cost here + context.putContextParameter("workJob", workJob); + context.putContextParameter("jobId", new Long(workJob.getId())); + } + }); + + final long jobId = (Long)context.getContextParameter("jobId"); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + + return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), + vmSnapshotId); + } + + public Outcome deleteAllVMSnapshotsThroughJobQueue(final Long vmId, final VMSnapshot.Type type) { + + final CallContext context = CallContext.current(); + final User callingUser = context.getCallingUser(); + final Account callingAccount = context.getCallingAccount(); + + final VMInstanceVO vm = _vmInstanceDao.findById(vmId); + + Transaction.execute(new TransactionCallbackNoReturn() { + @Override + public void doInTransactionWithoutResult(TransactionStatus status) { + VmWorkJobVO workJob = null; + + _vmInstanceDao.lockRow(vm.getId(), true); + workJob = new VmWorkJobVO(context.getContextId()); + + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkDeleteAllVMSnapshots.class.getName()); + + workJob.setAccountId(callingAccount.getId()); + workJob.setUserId(callingUser.getId()); + workJob.setStep(VmWorkJobVO.Step.Starting); + workJob.setVmType(vm.getType()); + workJob.setVmInstanceId(vm.getId()); + + // save work context info (there are some duplications) + VmWorkDeleteAllVMSnapshots workInfo = new VmWorkDeleteAllVMSnapshots(callingUser.getId(), callingAccount.getId(), vm.getId(), + VMSnapshotManagerImpl.VM_WORK_JOB_HANDLER, type); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); + + // Transaction syntax sugar has a cost here + context.putContextParameter("workJob", workJob); + context.putContextParameter("jobId", new Long(workJob.getId())); + } + }); + + final long jobId = (Long)context.getContextParameter("jobId"); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + + return new VmJobSyncVirtualMachineOutcome((VmWorkJobVO)context.getContextParameter("workJob"), + vmId); + } + + @Override + public Pair handleVmWorkJob(AsyncJob job, VmWork work) throws Exception { + + if (work instanceof VmWorkCreateVMSnapshot) { + VmWorkCreateVMSnapshot createWork = (VmWorkCreateVMSnapshot)work; + + if (s_logger.isDebugEnabled()) + s_logger.debug("Execute Create-VM-Snapshot within VM work job context. vmId: " + createWork.getVmId() + + ", VM snapshotId: " + createWork.getVmSnapshotId() + "quiesce: " + createWork.isQuiesceVm()); + + VMSnapshot vmSnapshot = orchestrateCreateVMSnapshot(createWork.getVmId(), createWork.getVmSnapshotId(), createWork.isQuiesceVm()); + + if (s_logger.isDebugEnabled()) + s_logger.debug("Execute Create-VM-Snapshot within VM work job context. vmId: " + createWork.getVmId() + + ", VM snapshotId: " + createWork.getVmSnapshotId() + "quiesce: " + createWork.isQuiesceVm()); + + return new Pair(JobInfo.Status.SUCCEEDED, JobSerializerHelper.toObjectSerializedString(new Long(vmSnapshot.getId()))); + } else if (work instanceof VmWorkDeleteVMSnapshot) { + VmWorkDeleteVMSnapshot deleteWork = (VmWorkDeleteVMSnapshot)work; + + if (s_logger.isDebugEnabled()) + s_logger.debug("Execute Delete-VM-Snapshot within VM work job context. vmId: " + deleteWork.getVmId() + + ", VM snapshotId: " + deleteWork.getVmSnapshotId()); + + boolean result = orchestrateDeleteVMSnapshot(deleteWork.getVmSnapshotId()); + + if (s_logger.isDebugEnabled()) + s_logger.debug("Done executing Delete-VM-Snapshot within VM work job context. vmId: " + deleteWork.getVmId() + + ", VM snapshotId: " + deleteWork.getVmSnapshotId()); + + return new Pair(JobInfo.Status.SUCCEEDED, JobSerializerHelper.toObjectSerializedString(new Boolean(result))); + + } else if (work instanceof VmWorkRevertToVMSnapshot) { + VmWorkRevertToVMSnapshot revertWork = (VmWorkRevertToVMSnapshot)work; + + if (s_logger.isDebugEnabled()) + s_logger.debug("Execute Revert-VM-Snapshot within VM work job context. vmId: " + revertWork.getVmId() + + ", VM snapshotId: " + revertWork.getVmSnapshotId()); + + orchestrateRevertToVMSnapshot(revertWork.getVmSnapshotId()); + + if (s_logger.isDebugEnabled()) + s_logger.debug("Done executing Revert-VM-Snapshot within VM work job context. vmId: " + revertWork.getVmId() + + ", VM snapshotId: " + revertWork.getVmSnapshotId()); + + return new Pair(JobInfo.Status.SUCCEEDED, null); + + } else if (work instanceof VmWorkDeleteAllVMSnapshots) { + VmWorkDeleteAllVMSnapshots deleteAllWork = (VmWorkDeleteAllVMSnapshots)work; + + if (s_logger.isDebugEnabled()) + s_logger.debug("Execute Delete-All-VM-Snapshot within VM work job context. vmId: " + deleteAllWork.getVmId()); + + boolean result = orchestrateDeleteAllVMSnapshots(deleteAllWork.getVmId(), deleteAllWork.getSnapshotType()); + + if (s_logger.isDebugEnabled()) + s_logger.debug("Execute Delete-All-VM-Snapshot within VM work job context. vmId: " + deleteAllWork.getVmId()); + + return new Pair(JobInfo.Status.SUCCEEDED, JobSerializerHelper.toObjectSerializedString(new Boolean(result))); + + } else { + + RuntimeException e = new RuntimeException("Unsupported VM work command: " + job.getCmd()); + String exceptionJson = JobSerializerHelper.toSerializedString(e); + s_logger.error("Serialize exception object into json: " + exceptionJson); + return new Pair(JobInfo.Status.FAILED, exceptionJson); + } + } } diff --git a/server/src/com/cloud/vm/snapshot/VmWorkCreateVMSnapshot.java b/server/src/com/cloud/vm/snapshot/VmWorkCreateVMSnapshot.java new file mode 100644 index 00000000000..33718029303 --- /dev/null +++ b/server/src/com/cloud/vm/snapshot/VmWorkCreateVMSnapshot.java @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.vm.snapshot; + +import com.cloud.vm.VmWork; + +public class VmWorkCreateVMSnapshot extends VmWork { + private static final long serialVersionUID = 124386202146049838L; + + private Long vmSnapshotId; + private boolean quiesceVm; + + public VmWorkCreateVMSnapshot(long userId, long accountId, long vmId, String handlerName, Long vmSnapshotId, boolean quiesceVm) { + super(userId, accountId, vmId, handlerName); + + this.vmSnapshotId = vmSnapshotId; + this.quiesceVm = quiesceVm; + } + + public Long getVmSnapshotId() { + return vmSnapshotId; + } + + public boolean isQuiesceVm() { + return quiesceVm; + } +} diff --git a/server/src/com/cloud/vm/snapshot/VmWorkDeleteAllVMSnapshots.java b/server/src/com/cloud/vm/snapshot/VmWorkDeleteAllVMSnapshots.java new file mode 100644 index 00000000000..ce20dfc984d --- /dev/null +++ b/server/src/com/cloud/vm/snapshot/VmWorkDeleteAllVMSnapshots.java @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.vm.snapshot; + +import com.cloud.vm.VmWork; + +public class VmWorkDeleteAllVMSnapshots extends VmWork { + private static final long serialVersionUID = -6010083039865471888L; + + private VMSnapshot.Type type; + + public VmWorkDeleteAllVMSnapshots(long userId, long accountId, long vmId, String handlerName, VMSnapshot.Type type) { + super(userId, accountId, vmId, handlerName); + + this.type = type; + } + + public VMSnapshot.Type getSnapshotType() { + return type; + } +} diff --git a/server/src/com/cloud/vm/snapshot/VmWorkDeleteVMSnapshot.java b/server/src/com/cloud/vm/snapshot/VmWorkDeleteVMSnapshot.java new file mode 100644 index 00000000000..1a80e39406d --- /dev/null +++ b/server/src/com/cloud/vm/snapshot/VmWorkDeleteVMSnapshot.java @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.vm.snapshot; + +import com.cloud.vm.VmWork; + +public class VmWorkDeleteVMSnapshot extends VmWork { + private static final long serialVersionUID = 7168101866614517508L; + + private Long vmSnapshotId; + + public VmWorkDeleteVMSnapshot(long userId, long accountId, long vmId, String handlerName, Long vmSnapshotId) { + super(userId, accountId, vmId, handlerName); + + this.vmSnapshotId = vmSnapshotId; + } + + public Long getVmSnapshotId() { + return vmSnapshotId; + } +} diff --git a/server/src/com/cloud/vm/snapshot/VmWorkRevertToVMSnapshot.java b/server/src/com/cloud/vm/snapshot/VmWorkRevertToVMSnapshot.java new file mode 100644 index 00000000000..f7beee5c4e5 --- /dev/null +++ b/server/src/com/cloud/vm/snapshot/VmWorkRevertToVMSnapshot.java @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.vm.snapshot; + +import com.cloud.vm.VmWork; + +public class VmWorkRevertToVMSnapshot extends VmWork { + private static final long serialVersionUID = -3406543280278986843L; + + private Long vmSnapshotId; + + public VmWorkRevertToVMSnapshot(long userId, long accountId, long vmId, String handlerName, Long vmSnapshotId) { + super(userId, accountId, vmId, handlerName); + + this.vmSnapshotId = vmSnapshotId; + } + + public Long getVmSnapshotId() { + return vmSnapshotId; + } +}