CLOUDSTACK-669: covert VMsnapshot orchestration flows to make them be serialized with other VM operations

This commit is contained in:
Kelven Yang 2013-12-20 16:45:08 -08:00
parent a298f6fce9
commit bf9a554522
12 changed files with 1099 additions and 129 deletions

View File

@ -81,6 +81,7 @@
<map>
<entry key="VirtualMachineManagerImpl" value-ref="clusteredVirtualMachineManagerImpl" />
<entry key="VolumeApiServiceImpl" value-ref="volumeApiServiceImpl" />
<entry key="VMSnapshotManagerImpl" value-ref="vMSnapshotManagerImpl" />
</map>
</property>
</bean>

View File

@ -42,7 +42,6 @@ import javax.naming.ConfigurationException;
import org.apache.log4j.Logger;
import com.cloud.deploy.DeploymentPlanner;
import org.apache.cloudstack.affinity.dao.AffinityGroupVMMapDao;
import org.apache.cloudstack.context.CallContext;
import org.apache.cloudstack.engine.orchestration.service.NetworkOrchestrationService;
@ -119,6 +118,7 @@ import com.cloud.dc.dao.HostPodDao;
import com.cloud.deploy.DataCenterDeployment;
import com.cloud.deploy.DeployDestination;
import com.cloud.deploy.DeploymentPlan;
import com.cloud.deploy.DeploymentPlanner;
import com.cloud.deploy.DeploymentPlanner.ExcludeList;
import com.cloud.deploy.DeploymentPlanningManager;
import com.cloud.domain.dao.DomainDao;
@ -726,12 +726,12 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
throw new RuntimeException("Execution excetion", e);
}
Throwable jobException = retrieveExecutionException(outcome.getJob());
if (jobException != null) {
if (jobException instanceof ConcurrentOperationException)
throw (ConcurrentOperationException)jobException;
else if (jobException instanceof ResourceUnavailableException)
throw (ResourceUnavailableException)jobException;
Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
if (jobResult != null) {
if (jobResult instanceof ConcurrentOperationException)
throw (ConcurrentOperationException)jobResult;
else if (jobResult instanceof ResourceUnavailableException)
throw (ResourceUnavailableException)jobResult;
}
}
}
@ -739,7 +739,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
@Override
public void orchestrateStart(String vmUuid, Map<VirtualMachineProfile.Param, Object> params, DeploymentPlan planToDeploy, DeploymentPlanner planner)
throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException {
throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException {
CallContext cctxt = CallContext.current();
Account account = cctxt.getCallingAccount();
@ -1248,14 +1248,14 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
throw new RuntimeException("Execution excetion", e);
}
Throwable jobException = retrieveExecutionException(outcome.getJob());
if (jobException != null) {
if (jobException instanceof AgentUnavailableException)
throw (AgentUnavailableException)jobException;
else if (jobException instanceof ConcurrentOperationException)
throw (ConcurrentOperationException)jobException;
else if (jobException instanceof OperationTimedoutException)
throw (OperationTimedoutException)jobException;
Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
if (jobResult != null) {
if (jobResult instanceof AgentUnavailableException)
throw (AgentUnavailableException)jobResult;
else if (jobResult instanceof ConcurrentOperationException)
throw (ConcurrentOperationException)jobResult;
else if (jobResult instanceof OperationTimedoutException)
throw (OperationTimedoutException)jobResult;
}
}
}
@ -1540,10 +1540,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
throw new RuntimeException("Execution excetion", e);
}
Throwable jobException = retrieveExecutionException(outcome.getJob());
if (jobException != null) {
if (jobException instanceof RuntimeException)
throw (RuntimeException)jobException;
Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
if (jobResult != null) {
if (jobResult instanceof RuntimeException)
throw (RuntimeException)jobResult;
}
}
}
@ -1622,14 +1622,14 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
throw new RuntimeException("Execution excetion", e);
}
Throwable jobException = retrieveExecutionException(outcome.getJob());
if (jobException != null) {
if (jobException instanceof ResourceUnavailableException)
throw (ResourceUnavailableException)jobException;
else if (jobException instanceof ConcurrentOperationException)
throw (ConcurrentOperationException)jobException;
else if (jobException instanceof RuntimeException)
throw (RuntimeException)jobException;
Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
if (jobResult != null) {
if (jobResult instanceof ResourceUnavailableException)
throw (ResourceUnavailableException)jobResult;
else if (jobResult instanceof ConcurrentOperationException)
throw (ConcurrentOperationException)jobResult;
else if (jobResult instanceof RuntimeException)
throw (RuntimeException)jobResult;
}
}
}
@ -1893,7 +1893,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
throw new RuntimeException("Execution excetion", e);
}
Throwable jobException = retrieveExecutionException(outcome.getJob());
Object jobException = _jobMgr.unmarshallResultObject(outcome.getJob());
if (jobException != null) {
if (jobException instanceof ResourceUnavailableException)
throw (ResourceUnavailableException)jobException;
@ -2172,14 +2172,14 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
throw new RuntimeException("Execution excetion", e);
}
Throwable jobException = retrieveExecutionException(outcome.getJob());
if (jobException != null) {
if (jobException instanceof ResourceUnavailableException)
throw (ResourceUnavailableException)jobException;
else if (jobException instanceof ConcurrentOperationException)
throw (ConcurrentOperationException)jobException;
else if (jobException instanceof InsufficientCapacityException)
throw (InsufficientCapacityException)jobException;
Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
if (jobResult != null) {
if (jobResult instanceof ResourceUnavailableException)
throw (ResourceUnavailableException)jobResult;
else if (jobResult instanceof ConcurrentOperationException)
throw (ConcurrentOperationException)jobResult;
else if (jobResult instanceof InsufficientCapacityException)
throw (InsufficientCapacityException)jobResult;
}
}
}
@ -3099,7 +3099,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
NicProfile nic = (NicProfile)JobSerializerHelper.fromObjectSerializedString(jobVo.getResult());
return nic;
} else {
Throwable jobException = retrieveExecutionException(outcome.getJob());
Object jobException = _jobMgr.unmarshallResultObject(outcome.getJob());
if (jobException != null) {
if (jobException instanceof ResourceUnavailableException)
throw (ResourceUnavailableException)jobException;
@ -3206,14 +3206,14 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
Boolean result = (Boolean)JobSerializerHelper.fromObjectSerializedString(jobVo.getResult());
return result;
} else {
Throwable jobException = retrieveExecutionException(outcome.getJob());
if (jobException != null) {
if (jobException instanceof ResourceUnavailableException)
throw (ResourceUnavailableException)jobException;
else if (jobException instanceof ConcurrentOperationException)
throw (ConcurrentOperationException)jobException;
else if (jobException instanceof RuntimeException)
throw (RuntimeException)jobException;
Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
if (jobResult != null) {
if (jobResult instanceof ResourceUnavailableException)
throw (ResourceUnavailableException)jobResult;
else if (jobResult instanceof ConcurrentOperationException)
throw (ConcurrentOperationException)jobResult;
else if (jobResult instanceof RuntimeException)
throw (RuntimeException)jobResult;
}
throw new RuntimeException("Job failed with un-handled exception");
@ -3444,12 +3444,12 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
throw new RuntimeException("Execution excetion", e);
}
Throwable jobException = retrieveExecutionException(outcome.getJob());
if (jobException != null) {
if (jobException instanceof ResourceUnavailableException)
throw (ResourceUnavailableException)jobException;
else if (jobException instanceof ConcurrentOperationException)
throw (ConcurrentOperationException)jobException;
Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
if (jobResult != null) {
if (jobResult instanceof ResourceUnavailableException)
throw (ResourceUnavailableException)jobResult;
else if (jobResult instanceof ConcurrentOperationException)
throw (ConcurrentOperationException)jobResult;
}
}
}
@ -3698,12 +3698,12 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
if (jobVo.getResultCode() == JobInfo.Status.SUCCEEDED.ordinal()) {
return _entityMgr.findById(VMInstanceVO.class, vm.getId());
} else {
Throwable jobException = retrieveExecutionException(outcome.getJob());
if (jobException != null) {
if (jobException instanceof ResourceUnavailableException)
throw (ResourceUnavailableException)jobException;
else if (jobException instanceof ConcurrentOperationException)
throw (ConcurrentOperationException)jobException;
Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
if (jobResult != null) {
if (jobResult instanceof ResourceUnavailableException)
throw (ResourceUnavailableException)jobResult;
else if (jobResult instanceof ConcurrentOperationException)
throw (ConcurrentOperationException)jobResult;
}
throw new RuntimeException("Failed with un-handled exception");
@ -4121,20 +4121,6 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
}
}
public Throwable retrieveExecutionException(AsyncJob job) {
assert (job != null);
assert (job.getDispatcher().equals(VmWorkConstants.VM_WORK_JOB_DISPATCHER));
AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, job.getId());
if (jobVo != null && jobVo.getResult() != null) {
Object obj = JobSerializerHelper.fromSerializedString(job.getResult());
if (obj != null && obj instanceof Throwable)
return (Throwable)obj;
}
return null;
}
//
// TODO build a common pattern to reduce code duplication in following methods
// no time for this at current iteration

View File

@ -16,6 +16,7 @@
// under the License.
package org.apache.cloudstack.framework.jobs;
import java.io.Serializable;
import java.util.List;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
@ -41,8 +42,8 @@ public interface AsyncJobManager extends Manager {
void updateAsyncJobStatus(long jobId, int processStatus, String resultObject);
void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId);
void logJobJournal(long jobId, AsyncJob.JournalType journalType, String journalText, String journalObjJson);
void logJobJournal(long jobId, AsyncJob.JournalType journalType, String
journalText, String journalObjJson);
/**
* A running thread inside management server can have a 1:1 linked pseudo job.
@ -81,8 +82,8 @@ public interface AsyncJobManager extends Manager {
* @param wakeupIntervalInMilliSeconds
* @param timeoutInMilliSeconds
*/
void joinJob(long jobId, long joinJobId, String wakeupHandler, String wakupDispatcher, String[] wakeupTopicsOnMessageBus, long wakeupIntervalInMilliSeconds,
long timeoutInMilliSeconds);
void joinJob(long jobId, long joinJobId, String wakeupHandler, String wakupDispatcher,
String[] wakeupTopicsOnMessageBus, long wakeupIntervalInMilliSeconds, long timeoutInMilliSeconds);
/**
* Dis-join two related jobs
@ -124,4 +125,7 @@ public interface AsyncJobManager extends Manager {
AsyncJob queryJob(long jobId, boolean updatePollTime);
String marshallResultObject(Serializable obj);
Object unmarshallResultObject(AsyncJob job);
}

View File

@ -17,6 +17,7 @@
package org.apache.cloudstack.framework.jobs.impl;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
@ -611,6 +612,21 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
return false;
}
@Override
public String marshallResultObject(Serializable obj) {
if (obj != null)
return JobSerializerHelper.toObjectSerializedString(obj);
return null;
}
@Override
public Object unmarshallResultObject(AsyncJob job) {
if(job.getResult() != null)
return JobSerializerHelper.fromObjectSerializedString(job.getResult());
return null;
}
private void checkQueue(long queueId) {
while (true) {
try {

View File

@ -0,0 +1,46 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.storage;
import com.cloud.vm.VmWork;
public class VmWorkMigrateVolume extends VmWork {
private static final long serialVersionUID = -565778516928408602L;
private long volumeId;
private long destPoolId;
private boolean liveMigrate;
public VmWorkMigrateVolume(long userId, long accountId, long vmId, String handlerName, long volumeId, long destPoolId, boolean liveMigrate) {
super(userId, accountId, vmId, handlerName);
this.volumeId = volumeId;
this.destPoolId = destPoolId;
this.liveMigrate = liveMigrate;
}
public long getVolumeId() {
return volumeId;
}
public long getDestPoolId() {
return destPoolId;
}
public boolean isLiveMigrate() {
return liveMigrate;
}
}

View File

@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.storage;
import com.cloud.vm.VmWork;
public class VmWorkResizeVolume extends VmWork {
private static final long serialVersionUID = 6112366316907642498L;
private long volumeId;
private long currentSize;
private long newSize;
private Long newServiceOfferingId;
private boolean shrinkOk;
public VmWorkResizeVolume(long userId, long accountId, long vmId, String handlerName,
long volumeId, long currentSize, long newSize, Long newServiceOfferingId, boolean shrinkOk) {
super(userId, accountId, vmId, handlerName);
this.volumeId = volumeId;
this.currentSize = currentSize;
this.newSize = newSize;
this.newServiceOfferingId = newServiceOfferingId;
this.shrinkOk = shrinkOk;
}
public long getVolumeId() {
return volumeId;
}
public long getCurrentSize() {
return currentSize;
}
public long getNewSize() {
return newSize;
}
public Long getNewServiceOfferingId() {
return newServiceOfferingId;
}
public boolean isShrinkOk() {
return shrinkOk;
}
}

View File

@ -906,6 +906,43 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
- currentSize));
}
if (userVm != null) {
// serialize VM operation
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
return orchestrateResizeVolume(volume.getId(), currentSize, newSize,
newDiskOffering != null ? cmd.getNewDiskOfferingId() : null, shrinkOk);
} else {
Outcome<Volume> outcome = resizeVolumeThroughJobQueue(userVm.getId(), volume.getId(), currentSize, newSize,
newDiskOffering != null ? cmd.getNewDiskOfferingId() : null, shrinkOk);
Volume vol = null;
try {
vol = outcome.get();
} catch (InterruptedException e) {
throw new RuntimeException("Operation is interrupted", e);
} catch (java.util.concurrent.ExecutionException e) {
throw new RuntimeException("Execution excetion", e);
}
Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
if (jobResult != null) {
if (jobResult instanceof ConcurrentOperationException)
throw (ConcurrentOperationException)jobResult;
else if (jobResult instanceof Throwable)
throw new RuntimeException("Unexpected exception", (Throwable)jobResult);
}
return volume;
}
}
return orchestrateResizeVolume(volume.getId(), currentSize, newSize,
newDiskOffering != null ? cmd.getNewDiskOfferingId() : null, shrinkOk);
}
private VolumeVO orchestrateResizeVolume(long volumeId, long currentSize, long newSize, Long newDiskOfferingId, boolean shrinkOk) {
VolumeVO volume = _volsDao.findById(volumeId);
UserVmVO userVm = _userVmDao.findById(volume.getInstanceId());
/*
* get a list of hosts to send the commands to, try the system the
* associated vm is running on first, then the last known place it ran.
@ -943,8 +980,8 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
volume = _volsDao.findById(volume.getId());
if (newDiskOffering != null) {
volume.setDiskOfferingId(cmd.getNewDiskOfferingId());
if (newDiskOfferingId != null) {
volume.setDiskOfferingId(newDiskOfferingId);
}
_volsDao.update(volume.getId(), volume);
// Log usage event for volumes belonging user VM's only
@ -1078,12 +1115,12 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
throw new RuntimeException("Execution excetion", e);
}
Throwable jobException = retrieveExecutionException(outcome.getJob());
if (jobException != null) {
if (jobException instanceof ConcurrentOperationException)
throw (ConcurrentOperationException)jobException;
else
throw new RuntimeException("Unexpected exception", jobException);
Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
if (jobResult != null) {
if (jobResult instanceof ConcurrentOperationException)
throw (ConcurrentOperationException)jobResult;
else if (jobResult instanceof Throwable)
throw new RuntimeException("Unexpected exception", (Throwable)jobResult);
}
return vol;
}
@ -1383,12 +1420,12 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
throw new RuntimeException("Execution excetion", e);
}
Throwable jobException = retrieveExecutionException(outcome.getJob());
if (jobException != null) {
if (jobException instanceof ConcurrentOperationException)
throw (ConcurrentOperationException)jobException;
else
throw new RuntimeException("Unexpected exception", jobException);
Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
if (jobResult != null) {
if (jobResult instanceof ConcurrentOperationException)
throw (ConcurrentOperationException)jobResult;
else if (jobResult instanceof Throwable)
throw new RuntimeException("Unexpected exception", (Throwable)jobResult);
}
return vol;
}
@ -1517,6 +1554,48 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
throw new InvalidParameterValueException("Migration of volume from local storage pool is not supported");
}
if (vm != null) {
// serialize VM operation
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
return orchestrateMigrateVolume(vol.getId(), destPool.getId(), liveMigrateVolume);
} else {
Outcome<Volume> outcome = migrateVolumeThroughJobQueue(vm.getId(), vol.getId(), destPool.getId(), liveMigrateVolume);
try {
outcome.get();
} catch (InterruptedException e) {
throw new RuntimeException("Operation is interrupted", e);
} catch (java.util.concurrent.ExecutionException e) {
throw new RuntimeException("Execution excetion", e);
}
Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
if (jobResult != null) {
if (jobResult instanceof ConcurrentOperationException)
throw (ConcurrentOperationException)jobResult;
else if (jobResult instanceof Throwable)
throw new RuntimeException("Unexpected exception", (Throwable)jobResult);
}
// retrieve the migrated new volume from job result
if (jobResult != null && jobResult instanceof Long) {
return _entityMgr.findById(VolumeVO.class, ((Long)jobResult).longValue());
}
return null;
}
}
return orchestrateMigrateVolume(vol.getId(), destPool.getId(), liveMigrateVolume);
}
private Volume orchestrateMigrateVolume(long volumeId, long destPoolId, boolean liveMigrateVolume) {
VolumeVO vol = _volsDao.findById(volumeId);
assert (vol != null);
StoragePool destPool = (StoragePool)dataStoreMgr.getDataStore(destPoolId, DataStoreRole.Primary);
assert (destPool != null);
Volume newVol = null;
if (liveMigrateVolume) {
newVol = liveMigrateVolume(vol, destPool);
@ -1940,20 +2019,6 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
_storagePoolAllocators = storagePoolAllocators;
}
public Throwable retrieveExecutionException(AsyncJob job) {
assert (job != null);
assert (job.getDispatcher().equals(VmWorkConstants.VM_WORK_JOB_DISPATCHER));
AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, job.getId());
if (jobVo != null && jobVo.getResult() != null) {
Object obj = JobSerializerHelper.fromSerializedString(job.getResult());
if (obj != null && obj instanceof Throwable)
return (Throwable)obj;
}
return null;
}
public class VmJobSyncOutcome extends OutcomeImpl<Volume> {
private long _volumeId;
@ -2065,6 +2130,98 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), volumeId);
}
public Outcome<Volume> resizeVolumeThroughJobQueue(final Long vmId, final long volumeId,
final long currentSize, final long newSize, final Long newServiceOfferingId, final boolean shrinkOk) {
final CallContext context = CallContext.current();
final User callingUser = context.getCallingUser();
final Account callingAccount = context.getCallingAccount();
final VMInstanceVO vm = _vmInstanceDao.findById(vmId);
Transaction.execute(new TransactionCallbackNoReturn() {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
VmWorkJobVO workJob = null;
_vmInstanceDao.lockRow(vm.getId(), true);
workJob = new VmWorkJobVO(context.getContextId());
workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
workJob.setCmd(VmWorkResizeVolume.class.getName());
workJob.setAccountId(callingAccount.getId());
workJob.setUserId(callingUser.getId());
workJob.setStep(VmWorkJobVO.Step.Starting);
workJob.setVmType(vm.getType());
workJob.setVmInstanceId(vm.getId());
// save work context info (there are some duplications)
VmWorkResizeVolume workInfo = new VmWorkResizeVolume(callingUser.getId(), callingAccount.getId(), vm.getId(),
VolumeApiServiceImpl.VM_WORK_JOB_HANDLER, volumeId, currentSize, newSize, newServiceOfferingId, shrinkOk);
workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
// Transaction syntax sugar has a cost here
context.putContextParameter("workJob", workJob);
context.putContextParameter("jobId", new Long(workJob.getId()));
}
});
final long jobId = (Long)context.getContextParameter("jobId");
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
volumeId);
}
public Outcome<Volume> migrateVolumeThroughJobQueue(final Long vmId, final long volumeId,
final long destPoolId, final boolean liveMigrate) {
final CallContext context = CallContext.current();
final User callingUser = context.getCallingUser();
final Account callingAccount = context.getCallingAccount();
final VMInstanceVO vm = _vmInstanceDao.findById(vmId);
Transaction.execute(new TransactionCallbackNoReturn() {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
VmWorkJobVO workJob = null;
_vmInstanceDao.lockRow(vm.getId(), true);
workJob = new VmWorkJobVO(context.getContextId());
workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
workJob.setCmd(VmWorkMigrateVolume.class.getName());
workJob.setAccountId(callingAccount.getId());
workJob.setUserId(callingUser.getId());
workJob.setStep(VmWorkJobVO.Step.Starting);
workJob.setVmType(vm.getType());
workJob.setVmInstanceId(vm.getId());
// save work context info (there are some duplications)
VmWorkMigrateVolume workInfo = new VmWorkMigrateVolume(callingUser.getId(), callingAccount.getId(), vm.getId(),
VolumeApiServiceImpl.VM_WORK_JOB_HANDLER, volumeId, destPoolId, liveMigrate);
workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
// Transaction syntax sugar has a cost here
context.putContextParameter("workJob", workJob);
context.putContextParameter("jobId", new Long(workJob.getId()));
}
});
final long jobId = (Long)context.getContextParameter("jobId");
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
volumeId);
}
@Override
public Pair<JobInfo.Status, String> handleVmWorkJob(AsyncJob job, VmWork work) throws Exception {
VMInstanceVO vm = _entityMgr.findById(VMInstanceVO.class, work.getVmId());
@ -2100,6 +2257,36 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
s_logger.debug("Done executing Detach-Volume within VM work job context. vmId: " + detachWork.getVmId() + ", volId: " + detachWork.getVolumeId());
return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
} else if (work instanceof VmWorkResizeVolume) {
VmWorkResizeVolume resizeWork = (VmWorkResizeVolume)work;
if (s_logger.isDebugEnabled())
s_logger.debug("Execute Resize-Volume within VM work job context. vmId: " + resizeWork.getVmId()
+ ", volId: " + resizeWork.getVolumeId() + ", size " + resizeWork.getCurrentSize() + " -> " + resizeWork.getNewSize());
orchestrateResizeVolume(resizeWork.getVolumeId(), resizeWork.getCurrentSize(), resizeWork.getNewSize(),
resizeWork.getNewServiceOfferingId(), resizeWork.isShrinkOk());
if (s_logger.isDebugEnabled())
s_logger.debug("Done executing Resize-Volume within VM work job context. vmId: " + resizeWork.getVmId()
+ ", volId: " + resizeWork.getVolumeId() + ", size " + resizeWork.getCurrentSize() + " -> " + resizeWork.getNewSize());
return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
} else if (work instanceof VmWorkMigrateVolume) {
VmWorkMigrateVolume migrateWork = (VmWorkMigrateVolume)work;
if (s_logger.isDebugEnabled())
s_logger.debug("Execute Migrate-Volume within VM work job context. vmId: " + migrateWork.getVmId()
+ ", volId: " + migrateWork.getVolumeId() + ", destPoolId: " + migrateWork.getDestPoolId() + ", live: " + migrateWork.isLiveMigrate());
Volume newVol = orchestrateMigrateVolume(migrateWork.getVolumeId(), migrateWork.getDestPoolId(), migrateWork.isLiveMigrate());
if (s_logger.isDebugEnabled())
s_logger.debug("Done executing Migrate-Volume within VM work job context. vmId: " + migrateWork.getVmId()
+ ", volId: " + migrateWork.getVolumeId() + ", destPoolId: " + migrateWork.getDestPoolId() + ", live: " + migrateWork.isLiveMigrate());
return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, JobSerializerHelper.toObjectSerializedString(new Long(newVol.getId())));
} else {
RuntimeException e = new RuntimeException("Unsupported VM work command: " + job.getCmd());
String exceptionJson = JobSerializerHelper.toSerializedString(e);

View File

@ -35,7 +35,17 @@ import org.apache.cloudstack.context.CallContext;
import org.apache.cloudstack.engine.subsystem.api.storage.StorageStrategyFactory;
import org.apache.cloudstack.engine.subsystem.api.storage.VMSnapshotOptions;
import org.apache.cloudstack.engine.subsystem.api.storage.VMSnapshotStrategy;
import org.apache.cloudstack.framework.config.ConfigKey;
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
import org.apache.cloudstack.framework.jobs.AsyncJob;
import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
import org.apache.cloudstack.framework.jobs.Outcome;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
import org.apache.cloudstack.framework.jobs.impl.JobSerializerHelper;
import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl;
import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO;
import org.apache.cloudstack.jobs.JobInfo;
import com.cloud.event.ActionEvent;
import com.cloud.event.EventTypes;
@ -55,15 +65,22 @@ import com.cloud.storage.dao.SnapshotDao;
import com.cloud.storage.dao.VolumeDao;
import com.cloud.user.Account;
import com.cloud.user.AccountManager;
import com.cloud.user.User;
import com.cloud.user.dao.AccountDao;
import com.cloud.uservm.UserVm;
import com.cloud.utils.DateUtil;
import com.cloud.utils.NumbersUtil;
import com.cloud.utils.Pair;
import com.cloud.utils.Predicate;
import com.cloud.utils.Ternary;
import com.cloud.utils.component.ManagerBase;
import com.cloud.utils.db.EntityManager;
import com.cloud.utils.db.Filter;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.Transaction;
import com.cloud.utils.db.TransactionCallbackNoReturn;
import com.cloud.utils.db.TransactionStatus;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.vm.UserVmVO;
import com.cloud.vm.VMInstanceVO;
@ -71,40 +88,53 @@ import com.cloud.vm.VirtualMachine;
import com.cloud.vm.VirtualMachine.State;
import com.cloud.vm.VirtualMachineManager;
import com.cloud.vm.VirtualMachineProfile;
import com.cloud.vm.VmWork;
import com.cloud.vm.VmWorkConstants;
import com.cloud.vm.VmWorkJobHandler;
import com.cloud.vm.VmWorkSerializer;
import com.cloud.vm.dao.UserVmDao;
import com.cloud.vm.dao.VMInstanceDao;
import com.cloud.vm.snapshot.dao.VMSnapshotDao;
@Component
@Local(value = {VMSnapshotManager.class, VMSnapshotService.class})
public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotManager, VMSnapshotService {
@Local(value = { VMSnapshotManager.class, VMSnapshotService.class })
public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotManager, VMSnapshotService, VmWorkJobHandler {
private static final Logger s_logger = Logger.getLogger(VMSnapshotManagerImpl.class);
public static final String VM_WORK_JOB_HANDLER = VMSnapshotManagerImpl.class.getSimpleName();
String _name;
@Inject
VMSnapshotDao _vmSnapshotDao;
@Inject
VolumeDao _volumeDao;
@Inject
AccountDao _accountDao;
@Inject
UserVmDao _userVMDao;
@Inject
AccountManager _accountMgr;
@Inject
GuestOSDao _guestOSDao;
@Inject
SnapshotDao _snapshotDao;
@Inject
VirtualMachineManager _itMgr;
@Inject
ConfigurationDao _configDao;
@Inject
HypervisorCapabilitiesDao _hypervisorCapabilitiesDao;
VMInstanceDao _vmInstanceDao;
@Inject VMSnapshotDao _vmSnapshotDao;
@Inject VolumeDao _volumeDao;
@Inject AccountDao _accountDao;
@Inject UserVmDao _userVMDao;
@Inject AccountManager _accountMgr;
@Inject GuestOSDao _guestOSDao;
@Inject SnapshotDao _snapshotDao;
@Inject VirtualMachineManager _itMgr;
@Inject ConfigurationDao _configDao;
@Inject HypervisorCapabilitiesDao _hypervisorCapabilitiesDao;
@Inject
StorageStrategyFactory storageStrategyFactory;
@Inject
EntityManager _entityMgr;
@Inject
AsyncJobManager _jobMgr;
int _vmSnapshotMax;
int _wait;
// TODO
static final ConfigKey<Boolean> VmJobEnabled = new ConfigKey<Boolean>("Advanced",
Boolean.class, "vm.job.enabled", "false",
"True to enable new VM sync model. false to use the old way", false);
static final ConfigKey<Long> VmJobCheckInterval = new ConfigKey<Long>("Advanced",
Long.class, "vm.job.check.interval", "3000",
"Interval in milliseconds to check if the job is complete", false);
@Override
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
_name = name;
@ -249,7 +279,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
throw new InvalidParameterValueException("Creating vm snapshot failed due to VM:" + vmId + " is not in the running or Stopped state");
}
if (snapshotMemory && userVmVo.getState() == VirtualMachine.State.Stopped) {
if(snapshotMemory && userVmVo.getState() == VirtualMachine.State.Stopped){
throw new InvalidParameterValueException("Can not snapshot memory when VM is in stopped state");
}
@ -327,6 +357,46 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
if (vmSnapshot == null) {
throw new CloudRuntimeException("VM snapshot id: " + vmSnapshotId + " can not be found");
}
// serialize VM operation
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
return orchestrateCreateVMSnapshot(vmId, vmSnapshotId, quiescevm);
} else {
Outcome<VMSnapshot> outcome = createVMSnapshotThroughJobQueue(vmId, vmSnapshotId, quiescevm);
VMSnapshot result = null;
try {
result = outcome.get();
} catch (InterruptedException e) {
throw new RuntimeException("Operation is interrupted", e);
} catch (java.util.concurrent.ExecutionException e) {
throw new RuntimeException("Execution excetion", e);
}
Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
if (jobResult != null) {
if (jobResult instanceof ConcurrentOperationException)
throw (ConcurrentOperationException)jobResult;
else if (jobResult instanceof Throwable)
throw new RuntimeException("Unexpected exception", (Throwable)jobResult);
}
return result;
}
}
private VMSnapshot orchestrateCreateVMSnapshot(Long vmId, Long vmSnapshotId, Boolean quiescevm) {
UserVmVO userVm = _userVMDao.findById(vmId);
if (userVm == null) {
throw new InvalidParameterValueException("Create vm to snapshot failed due to vm: " + vmId + " is not found");
}
VMSnapshotVO vmSnapshot = _vmSnapshotDao.findById(vmSnapshotId);
if (vmSnapshot == null) {
throw new CloudRuntimeException("VM snapshot id: " + vmSnapshotId + " can not be found");
}
VMSnapshotOptions options = new VMSnapshotOptions(quiescevm);
vmSnapshot.setOptions(options);
try {
@ -376,6 +446,62 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
throw new InvalidParameterValueException("There is other active vm snapshot tasks on the instance, please try again later");
}
// serialize VM operation
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
return orchestrateDeleteVMSnapshot(vmSnapshotId);
} else {
Outcome<VMSnapshot> outcome = deleteVMSnapshotThroughJobQueue(vmSnapshot.getVmId(), vmSnapshotId);
VMSnapshot result = null;
try {
result = outcome.get();
} catch (InterruptedException e) {
throw new RuntimeException("Operation is interrupted", e);
} catch (java.util.concurrent.ExecutionException e) {
throw new RuntimeException("Execution excetion", e);
}
Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
if (jobResult != null) {
if (jobResult instanceof ConcurrentOperationException)
throw (ConcurrentOperationException)jobResult;
else if (jobResult instanceof Throwable)
throw new RuntimeException("Unexpected exception", (Throwable)jobResult);
}
if (jobResult instanceof Boolean)
return ((Boolean)jobResult).booleanValue();
return false;
}
}
public boolean orchestrateDeleteVMSnapshot(Long vmSnapshotId) {
Account caller = getCaller();
VMSnapshotVO vmSnapshot = _vmSnapshotDao.findById(vmSnapshotId);
if (vmSnapshot == null) {
throw new InvalidParameterValueException("unable to find the vm snapshot with id " + vmSnapshotId);
}
_accountMgr.checkAccess(caller, null, true, vmSnapshot);
// check VM snapshot states, only allow to delete vm snapshots in created and error state
if (VMSnapshot.State.Ready != vmSnapshot.getState() && VMSnapshot.State.Expunging != vmSnapshot.getState() && VMSnapshot.State.Error != vmSnapshot.getState()) {
throw new InvalidParameterValueException("Can't delete the vm snapshotshot " + vmSnapshotId + " due to it is not in Created or Error, or Expunging State");
}
// check if there are other active VM snapshot tasks
if (hasActiveVMSnapshotTasks(vmSnapshot.getVmId())) {
List<VMSnapshotVO> expungingSnapshots = _vmSnapshotDao.listByInstanceId(vmSnapshot.getVmId(), VMSnapshot.State.Expunging);
if (expungingSnapshots.size() > 0 && expungingSnapshots.get(0).getId() == vmSnapshot.getId())
s_logger.debug("Target VM snapshot already in expunging state, go on deleting it: " + vmSnapshot.getDisplayName());
else
throw new InvalidParameterValueException("There is other active vm snapshot tasks on the instance, please try again later");
}
if (vmSnapshot.getState() == VMSnapshot.State.Allocated) {
return _vmSnapshotDao.remove(vmSnapshot.getId());
} else {
@ -413,6 +539,77 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
Account caller = getCaller();
_accountMgr.checkAccess(caller, null, true, vmSnapshotVo);
// VM should be in running or stopped states
if (userVm.getState() != VirtualMachine.State.Running
&& userVm.getState() != VirtualMachine.State.Stopped) {
throw new InvalidParameterValueException(
"VM Snapshot reverting failed due to vm is not in the state of Running or Stopped.");
}
// if snapshot is not created, error out
if (vmSnapshotVo.getState() != VMSnapshot.State.Ready) {
throw new InvalidParameterValueException(
"VM Snapshot reverting failed due to vm snapshot is not in the state of Created.");
}
// serialize VM operation
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
return orchestrateRevertToVMSnapshot(vmSnapshotId);
} else {
Outcome<VMSnapshot> outcome = revertToVMSnapshotThroughJobQueue(vmSnapshotVo.getVmId(), vmSnapshotId);
VMSnapshot result = null;
try {
result = outcome.get();
} catch (InterruptedException e) {
throw new RuntimeException("Operation is interrupted", e);
} catch (java.util.concurrent.ExecutionException e) {
throw new RuntimeException("Execution excetion", e);
}
Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
if (jobResult != null) {
if (jobResult instanceof ConcurrentOperationException)
throw (ConcurrentOperationException)jobResult;
else if (jobResult instanceof InsufficientCapacityException)
throw (InsufficientCapacityException)jobResult;
else if (jobResult instanceof ResourceUnavailableException)
throw (ResourceUnavailableException)jobResult;
else if (jobResult instanceof Throwable)
throw new RuntimeException("Unexpected exception", (Throwable)jobResult);
}
return userVm;
}
}
public UserVm orchestrateRevertToVMSnapshot(Long vmSnapshotId) throws InsufficientCapacityException, ResourceUnavailableException, ConcurrentOperationException {
// check if VM snapshot exists in DB
VMSnapshotVO vmSnapshotVo = _vmSnapshotDao.findById(vmSnapshotId);
if (vmSnapshotVo == null) {
throw new InvalidParameterValueException(
"unable to find the vm snapshot with id " + vmSnapshotId);
}
Long vmId = vmSnapshotVo.getVmId();
UserVmVO userVm = _userVMDao.findById(vmId);
// check if VM exists
if (userVm == null) {
throw new InvalidParameterValueException("Revert vm to snapshot: "
+ vmSnapshotId + " failed due to vm: " + vmId
+ " is not found");
}
// check if there are other active VM snapshot tasks
if (hasActiveVMSnapshotTasks(vmId)) {
throw new InvalidParameterValueException("There is other active vm snapshot tasks on the instance, please try again later");
}
Account caller = getCaller();
_accountMgr.checkAccess(caller, null, true, vmSnapshotVo);
// VM should be in running or stopped states
if (userVm.getState() != VirtualMachine.State.Running && userVm.getState() != VirtualMachine.State.Stopped) {
throw new InvalidParameterValueException("VM Snapshot reverting failed due to vm is not in the state of Running or Stopped.");
@ -481,6 +678,38 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
@Override
public boolean deleteAllVMSnapshots(long vmId, VMSnapshot.Type type) {
// serialize VM operation
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
return orchestrateDeleteAllVMSnapshots(vmId, type);
} else {
Outcome<VirtualMachine> outcome = deleteAllVMSnapshotsThroughJobQueue(vmId, type);
try {
outcome.get();
} catch (InterruptedException e) {
throw new RuntimeException("Operation is interrupted", e);
} catch (java.util.concurrent.ExecutionException e) {
throw new RuntimeException("Execution excetion", e);
}
Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
if (jobResult != null) {
if (jobResult instanceof ConcurrentOperationException)
throw (ConcurrentOperationException)jobResult;
else if (jobResult instanceof Throwable)
throw new RuntimeException("Unexpected exception", (Throwable)jobResult);
}
if (jobResult instanceof Boolean)
return (Boolean)jobResult;
return false;
}
}
private boolean orchestrateDeleteAllVMSnapshots(long vmId, VMSnapshot.Type type) {
boolean result = true;
List<VMSnapshotVO> listVmSnapshots = _vmSnapshotDao.findByVm(vmId);
if (listVmSnapshots == null || listVmSnapshots.isEmpty()) {
@ -501,14 +730,13 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
@Override
public boolean syncVMSnapshot(VMInstanceVO vm, Long hostId) {
try {
try{
UserVmVO userVm = _userVMDao.findById(vm.getId());
if (userVm == null)
return false;
List<VMSnapshotVO> vmSnapshotsInExpungingStates =
_vmSnapshotDao.listByInstanceId(vm.getId(), VMSnapshot.State.Expunging, VMSnapshot.State.Reverting, VMSnapshot.State.Creating);
List<VMSnapshotVO> vmSnapshotsInExpungingStates = _vmSnapshotDao.listByInstanceId(vm.getId(), VMSnapshot.State.Expunging, VMSnapshot.State.Reverting, VMSnapshot.State.Creating);
for (VMSnapshotVO vmSnapshotVO : vmSnapshotsInExpungingStates) {
VMSnapshotStrategy strategy = findVMSnapshotStrategy(vmSnapshotVO);
if (vmSnapshotVO.getState() == VMSnapshot.State.Expunging) {
@ -529,4 +757,299 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
return false;
}
public class VmJobSyncOutcome extends OutcomeImpl<VMSnapshot> {
private long _vmSnapshotId;
public VmJobSyncOutcome(final AsyncJob job, final long vmSnapshotId) {
super(VMSnapshot.class, job, VmJobCheckInterval.value(), new Predicate() {
@Override
public boolean checkCondition() {
AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, job.getId());
assert (jobVo != null);
if (jobVo == null || jobVo.getStatus() != JobInfo.Status.IN_PROGRESS)
return true;
return false;
}
}, AsyncJob.Topics.JOB_STATE);
_vmSnapshotId = vmSnapshotId;
}
@Override
protected VMSnapshot retrieve() {
return _vmSnapshotDao.findById(_vmSnapshotId);
}
}
public class VmJobSyncVirtualMachineOutcome extends OutcomeImpl<VirtualMachine> {
long vmId;
public VmJobSyncVirtualMachineOutcome(final AsyncJob job, final long vmId) {
super(VirtualMachine.class, job, VmJobCheckInterval.value(), new Predicate() {
@Override
public boolean checkCondition() {
AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, job.getId());
assert (jobVo != null);
if (jobVo == null || jobVo.getStatus() != JobInfo.Status.IN_PROGRESS)
return true;
return false;
}
}, AsyncJob.Topics.JOB_STATE);
}
@Override
protected VirtualMachine retrieve() {
return _vmInstanceDao.findById(vmId);
}
}
public Outcome<VMSnapshot> createVMSnapshotThroughJobQueue(final Long vmId, final Long vmSnapshotId, final boolean quiesceVm) {
final CallContext context = CallContext.current();
final User callingUser = context.getCallingUser();
final Account callingAccount = context.getCallingAccount();
final VMInstanceVO vm = _vmInstanceDao.findById(vmId);
Transaction.execute(new TransactionCallbackNoReturn() {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
VmWorkJobVO workJob = null;
_vmInstanceDao.lockRow(vm.getId(), true);
workJob = new VmWorkJobVO(context.getContextId());
workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
workJob.setCmd(VmWorkCreateVMSnapshot.class.getName());
workJob.setAccountId(callingAccount.getId());
workJob.setUserId(callingUser.getId());
workJob.setStep(VmWorkJobVO.Step.Starting);
workJob.setVmType(vm.getType());
workJob.setVmInstanceId(vm.getId());
// save work context info (there are some duplications)
VmWorkCreateVMSnapshot workInfo = new VmWorkCreateVMSnapshot(callingUser.getId(), callingAccount.getId(), vm.getId(),
VMSnapshotManagerImpl.VM_WORK_JOB_HANDLER, vmSnapshotId, quiesceVm);
workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
// Transaction syntax sugar has a cost here
context.putContextParameter("workJob", workJob);
context.putContextParameter("jobId", new Long(workJob.getId()));
}
});
final long jobId = (Long)context.getContextParameter("jobId");
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
vmSnapshotId);
}
public Outcome<VMSnapshot> deleteVMSnapshotThroughJobQueue(final Long vmId, final Long vmSnapshotId) {
final CallContext context = CallContext.current();
final User callingUser = context.getCallingUser();
final Account callingAccount = context.getCallingAccount();
final VMInstanceVO vm = _vmInstanceDao.findById(vmId);
Transaction.execute(new TransactionCallbackNoReturn() {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
VmWorkJobVO workJob = null;
_vmInstanceDao.lockRow(vm.getId(), true);
workJob = new VmWorkJobVO(context.getContextId());
workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
workJob.setCmd(VmWorkDeleteVMSnapshot.class.getName());
workJob.setAccountId(callingAccount.getId());
workJob.setUserId(callingUser.getId());
workJob.setStep(VmWorkJobVO.Step.Starting);
workJob.setVmType(vm.getType());
workJob.setVmInstanceId(vm.getId());
// save work context info (there are some duplications)
VmWorkDeleteVMSnapshot workInfo = new VmWorkDeleteVMSnapshot(callingUser.getId(), callingAccount.getId(), vm.getId(),
VMSnapshotManagerImpl.VM_WORK_JOB_HANDLER, vmSnapshotId);
workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
// Transaction syntax sugar has a cost here
context.putContextParameter("workJob", workJob);
context.putContextParameter("jobId", new Long(workJob.getId()));
}
});
final long jobId = (Long)context.getContextParameter("jobId");
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
vmSnapshotId);
}
public Outcome<VMSnapshot> revertToVMSnapshotThroughJobQueue(final Long vmId, final Long vmSnapshotId) {
final CallContext context = CallContext.current();
final User callingUser = context.getCallingUser();
final Account callingAccount = context.getCallingAccount();
final VMInstanceVO vm = _vmInstanceDao.findById(vmId);
Transaction.execute(new TransactionCallbackNoReturn() {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
VmWorkJobVO workJob = null;
_vmInstanceDao.lockRow(vm.getId(), true);
workJob = new VmWorkJobVO(context.getContextId());
workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
workJob.setCmd(VmWorkRevertToVMSnapshot.class.getName());
workJob.setAccountId(callingAccount.getId());
workJob.setUserId(callingUser.getId());
workJob.setStep(VmWorkJobVO.Step.Starting);
workJob.setVmType(vm.getType());
workJob.setVmInstanceId(vm.getId());
// save work context info (there are some duplications)
VmWorkRevertToVMSnapshot workInfo = new VmWorkRevertToVMSnapshot(callingUser.getId(), callingAccount.getId(), vm.getId(),
VMSnapshotManagerImpl.VM_WORK_JOB_HANDLER, vmSnapshotId);
workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
// Transaction syntax sugar has a cost here
context.putContextParameter("workJob", workJob);
context.putContextParameter("jobId", new Long(workJob.getId()));
}
});
final long jobId = (Long)context.getContextParameter("jobId");
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
vmSnapshotId);
}
public Outcome<VirtualMachine> deleteAllVMSnapshotsThroughJobQueue(final Long vmId, final VMSnapshot.Type type) {
final CallContext context = CallContext.current();
final User callingUser = context.getCallingUser();
final Account callingAccount = context.getCallingAccount();
final VMInstanceVO vm = _vmInstanceDao.findById(vmId);
Transaction.execute(new TransactionCallbackNoReturn() {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
VmWorkJobVO workJob = null;
_vmInstanceDao.lockRow(vm.getId(), true);
workJob = new VmWorkJobVO(context.getContextId());
workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
workJob.setCmd(VmWorkDeleteAllVMSnapshots.class.getName());
workJob.setAccountId(callingAccount.getId());
workJob.setUserId(callingUser.getId());
workJob.setStep(VmWorkJobVO.Step.Starting);
workJob.setVmType(vm.getType());
workJob.setVmInstanceId(vm.getId());
// save work context info (there are some duplications)
VmWorkDeleteAllVMSnapshots workInfo = new VmWorkDeleteAllVMSnapshots(callingUser.getId(), callingAccount.getId(), vm.getId(),
VMSnapshotManagerImpl.VM_WORK_JOB_HANDLER, type);
workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
// Transaction syntax sugar has a cost here
context.putContextParameter("workJob", workJob);
context.putContextParameter("jobId", new Long(workJob.getId()));
}
});
final long jobId = (Long)context.getContextParameter("jobId");
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
return new VmJobSyncVirtualMachineOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
vmId);
}
@Override
public Pair<JobInfo.Status, String> handleVmWorkJob(AsyncJob job, VmWork work) throws Exception {
if (work instanceof VmWorkCreateVMSnapshot) {
VmWorkCreateVMSnapshot createWork = (VmWorkCreateVMSnapshot)work;
if (s_logger.isDebugEnabled())
s_logger.debug("Execute Create-VM-Snapshot within VM work job context. vmId: " + createWork.getVmId()
+ ", VM snapshotId: " + createWork.getVmSnapshotId() + "quiesce: " + createWork.isQuiesceVm());
VMSnapshot vmSnapshot = orchestrateCreateVMSnapshot(createWork.getVmId(), createWork.getVmSnapshotId(), createWork.isQuiesceVm());
if (s_logger.isDebugEnabled())
s_logger.debug("Execute Create-VM-Snapshot within VM work job context. vmId: " + createWork.getVmId()
+ ", VM snapshotId: " + createWork.getVmSnapshotId() + "quiesce: " + createWork.isQuiesceVm());
return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, JobSerializerHelper.toObjectSerializedString(new Long(vmSnapshot.getId())));
} else if (work instanceof VmWorkDeleteVMSnapshot) {
VmWorkDeleteVMSnapshot deleteWork = (VmWorkDeleteVMSnapshot)work;
if (s_logger.isDebugEnabled())
s_logger.debug("Execute Delete-VM-Snapshot within VM work job context. vmId: " + deleteWork.getVmId()
+ ", VM snapshotId: " + deleteWork.getVmSnapshotId());
boolean result = orchestrateDeleteVMSnapshot(deleteWork.getVmSnapshotId());
if (s_logger.isDebugEnabled())
s_logger.debug("Done executing Delete-VM-Snapshot within VM work job context. vmId: " + deleteWork.getVmId()
+ ", VM snapshotId: " + deleteWork.getVmSnapshotId());
return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, JobSerializerHelper.toObjectSerializedString(new Boolean(result)));
} else if (work instanceof VmWorkRevertToVMSnapshot) {
VmWorkRevertToVMSnapshot revertWork = (VmWorkRevertToVMSnapshot)work;
if (s_logger.isDebugEnabled())
s_logger.debug("Execute Revert-VM-Snapshot within VM work job context. vmId: " + revertWork.getVmId()
+ ", VM snapshotId: " + revertWork.getVmSnapshotId());
orchestrateRevertToVMSnapshot(revertWork.getVmSnapshotId());
if (s_logger.isDebugEnabled())
s_logger.debug("Done executing Revert-VM-Snapshot within VM work job context. vmId: " + revertWork.getVmId()
+ ", VM snapshotId: " + revertWork.getVmSnapshotId());
return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
} else if (work instanceof VmWorkDeleteAllVMSnapshots) {
VmWorkDeleteAllVMSnapshots deleteAllWork = (VmWorkDeleteAllVMSnapshots)work;
if (s_logger.isDebugEnabled())
s_logger.debug("Execute Delete-All-VM-Snapshot within VM work job context. vmId: " + deleteAllWork.getVmId());
boolean result = orchestrateDeleteAllVMSnapshots(deleteAllWork.getVmId(), deleteAllWork.getSnapshotType());
if (s_logger.isDebugEnabled())
s_logger.debug("Execute Delete-All-VM-Snapshot within VM work job context. vmId: " + deleteAllWork.getVmId());
return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, JobSerializerHelper.toObjectSerializedString(new Boolean(result)));
} else {
RuntimeException e = new RuntimeException("Unsupported VM work command: " + job.getCmd());
String exceptionJson = JobSerializerHelper.toSerializedString(e);
s_logger.error("Serialize exception object into json: " + exceptionJson);
return new Pair<JobInfo.Status, String>(JobInfo.Status.FAILED, exceptionJson);
}
}
}

View File

@ -0,0 +1,41 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.vm.snapshot;
import com.cloud.vm.VmWork;
public class VmWorkCreateVMSnapshot extends VmWork {
private static final long serialVersionUID = 124386202146049838L;
private Long vmSnapshotId;
private boolean quiesceVm;
public VmWorkCreateVMSnapshot(long userId, long accountId, long vmId, String handlerName, Long vmSnapshotId, boolean quiesceVm) {
super(userId, accountId, vmId, handlerName);
this.vmSnapshotId = vmSnapshotId;
this.quiesceVm = quiesceVm;
}
public Long getVmSnapshotId() {
return vmSnapshotId;
}
public boolean isQuiesceVm() {
return quiesceVm;
}
}

View File

@ -0,0 +1,35 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.vm.snapshot;
import com.cloud.vm.VmWork;
public class VmWorkDeleteAllVMSnapshots extends VmWork {
private static final long serialVersionUID = -6010083039865471888L;
private VMSnapshot.Type type;
public VmWorkDeleteAllVMSnapshots(long userId, long accountId, long vmId, String handlerName, VMSnapshot.Type type) {
super(userId, accountId, vmId, handlerName);
this.type = type;
}
public VMSnapshot.Type getSnapshotType() {
return type;
}
}

View File

@ -0,0 +1,35 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.vm.snapshot;
import com.cloud.vm.VmWork;
public class VmWorkDeleteVMSnapshot extends VmWork {
private static final long serialVersionUID = 7168101866614517508L;
private Long vmSnapshotId;
public VmWorkDeleteVMSnapshot(long userId, long accountId, long vmId, String handlerName, Long vmSnapshotId) {
super(userId, accountId, vmId, handlerName);
this.vmSnapshotId = vmSnapshotId;
}
public Long getVmSnapshotId() {
return vmSnapshotId;
}
}

View File

@ -0,0 +1,35 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.vm.snapshot;
import com.cloud.vm.VmWork;
public class VmWorkRevertToVMSnapshot extends VmWork {
private static final long serialVersionUID = -3406543280278986843L;
private Long vmSnapshotId;
public VmWorkRevertToVMSnapshot(long userId, long accountId, long vmId, String handlerName, Long vmSnapshotId) {
super(userId, accountId, vmId, handlerName);
this.vmSnapshotId = vmSnapshotId;
}
public Long getVmSnapshotId() {
return vmSnapshotId;
}
}