CLOUDSTACK-669: Convert volume snapshot orchestration flow to make it be serialized with other VM operations

This commit is contained in:
Kelven Yang 2013-12-27 16:40:20 -08:00
parent bf9a554522
commit 8c93bd6080
5 changed files with 228 additions and 29 deletions

View File

@ -24,13 +24,13 @@ public enum DataStoreRole {
Primary("primary"), Image("image"), ImageCache("imagecache"), Backup("backup");
public boolean isImageStore() {
return (this.role.equalsIgnoreCase("image") || this.role.equalsIgnoreCase("imagecache")) ? true : false;
return (role.equalsIgnoreCase("image") || role.equalsIgnoreCase("imagecache")) ? true : false;
}
private final String role;
DataStoreRole(String type) {
this.role = type;
role = type;
}
public static DataStoreRole getRole(String role) {

View File

@ -4097,10 +4097,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
}
}
public class VmJobSyncOutcome extends OutcomeImpl<VirtualMachine> {
public class VmJobVirtualMachineOutcome extends OutcomeImpl<VirtualMachine> {
private long _vmId;
public VmJobSyncOutcome(final AsyncJob job, final long vmId) {
public VmJobVirtualMachineOutcome(final AsyncJob job, final long vmId) {
super(VirtualMachine.class, job, VmJobCheckInterval.value(), new Predicate() {
@Override
public boolean checkCondition() {
@ -4281,8 +4281,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final long jobId = (Long)context.getContextParameter("jobId");
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
vm.getId());
return new VmJobVirtualMachineOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
vm.getId());
}
public Outcome<VirtualMachine> migrateVmThroughJobQueue(final String vmUuid, final long srcHostId, final DeployDestination dest) {
@ -4439,7 +4439,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final long jobId = (Long)context.getContextParameter("jobId");
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId());
return new VmJobVirtualMachineOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId());
}
public Outcome<VirtualMachine> migrateVmStorageThroughJobQueue(
@ -4491,7 +4491,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final long jobId = (Long)context.getContextParameter("jobId");
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId());
return new VmJobVirtualMachineOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId());
}
public Outcome<VirtualMachine> addVmToNetworkThroughJobQueue(
@ -4541,7 +4541,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final long jobId = (Long)context.getContextParameter("jobId");
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId());
return new VmJobVirtualMachineOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId());
}
public Outcome<VirtualMachine> removeNicFromVmThroughJobQueue(
@ -4591,7 +4591,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final long jobId = (Long)context.getContextParameter("jobId");
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId());
return new VmJobVirtualMachineOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId());
}
public Outcome<VirtualMachine> removeVmFromNetworkThroughJobQueue(
@ -4641,7 +4641,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final long jobId = (Long)context.getContextParameter("jobId");
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId());
return new VmJobVirtualMachineOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId());
}
public Outcome<VirtualMachine> reconfigureVmThroughJobQueue(
@ -4690,9 +4690,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
}
});
final long jobId = (Long)context.getContextParameter("jobId");
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId());
return new VmJobVirtualMachineOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId());
}
@Override

View File

@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.storage;
import com.cloud.vm.VmWork;
public class VmWorkTakeVolumeSnapshot extends VmWork {
private static final long serialVersionUID = 341816293003023823L;
private Long volumeId;
private Long policyId;
private Long snapshotId;
private boolean quiesceVm;
public VmWorkTakeVolumeSnapshot(long userId, long accountId, long vmId, String handlerName,
Long volumeId, Long policyId, Long snapshotId, boolean quiesceVm) {
super(userId, accountId, vmId, handlerName);
this.volumeId = volumeId;
this.policyId = policyId;
this.snapshotId = snapshotId;
this.quiesceVm = quiesceVm;
}
public Long getVolumeId() {
return volumeId;
}
public Long getPolicyId() {
return policyId;
}
public Long getSnapshotId() {
return snapshotId;
}
public boolean isQuiesceVm() {
return quiesceVm;
}
}

View File

@ -1631,6 +1631,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
@Override
public Snapshot takeSnapshot(Long volumeId, Long policyId, Long snapshotId, Account account, boolean quiescevm) throws ResourceAllocationException {
VolumeInfo volume = volFactory.getVolume(volumeId);
if (volume == null) {
throw new InvalidParameterValueException("Creating snapshot failed due to volume:" + volumeId + " doesn't exist");
@ -1640,6 +1641,63 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
throw new InvalidParameterValueException("VolumeId: " + volumeId + " is not in " + Volume.State.Ready + " state but " + volume.getState() + ". Cannot take snapshot.");
}
VMInstanceVO vm = null;
if (volume.getInstanceId() != null)
vm = _vmInstanceDao.findById(volume.getInstanceId());
if (vm != null) {
// serialize VM operation
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
return orchestrateTakeVolumeSnapshot(volumeId, policyId, snapshotId, account, quiescevm);
} else {
Outcome<Snapshot> outcome = takeVolumeSnapshotThroughJobQueue(vm.getId(), volumeId, policyId, snapshotId, account.getId(), quiescevm);
try {
outcome.get();
} catch (InterruptedException e) {
throw new RuntimeException("Operation is interrupted", e);
} catch (java.util.concurrent.ExecutionException e) {
throw new RuntimeException("Execution excetion", e);
}
Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
if (jobResult != null) {
if (jobResult instanceof ConcurrentOperationException)
throw (ConcurrentOperationException)jobResult;
else if (jobResult instanceof ResourceAllocationException)
throw (ResourceAllocationException)jobResult;
else if (jobResult instanceof Throwable)
throw new RuntimeException("Unexpected exception", (Throwable)jobResult);
}
return _snapshotDao.findById(snapshotId);
}
} else {
CreateSnapshotPayload payload = new CreateSnapshotPayload();
payload.setSnapshotId(snapshotId);
payload.setSnapshotPolicyId(policyId);
payload.setAccount(account);
payload.setQuiescevm(quiescevm);
volume.addPayload(payload);
return volService.takeSnapshot(volume);
}
}
private Snapshot orchestrateTakeVolumeSnapshot(Long volumeId, Long policyId, Long snapshotId, Account account, boolean quiescevm)
throws ResourceAllocationException {
VolumeInfo volume = volFactory.getVolume(volumeId);
if (volume == null) {
throw new InvalidParameterValueException("Creating snapshot failed due to volume:" + volumeId + " doesn't exist");
}
if (volume.getState() != Volume.State.Ready) {
throw new InvalidParameterValueException("VolumeId: " + volumeId + " is not in " + Volume.State.Ready + " state but " + volume.getState() + ". Cannot take snapshot.");
}
CreateSnapshotPayload payload = new CreateSnapshotPayload();
payload.setSnapshotId(snapshotId);
payload.setSnapshotPolicyId(policyId);
@ -2019,10 +2077,10 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
_storagePoolAllocators = storagePoolAllocators;
}
public class VmJobSyncOutcome extends OutcomeImpl<Volume> {
public class VmJobVolumeOutcome extends OutcomeImpl<Volume> {
private long _volumeId;
public VmJobSyncOutcome(final AsyncJob job, final long volumeId) {
public VmJobVolumeOutcome(final AsyncJob job, final long volumeId) {
super(Volume.class, job, VmJobCheckInterval.value(), new Predicate() {
@Override
public boolean checkCondition() {
@ -2043,6 +2101,30 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
}
}
public class VmJobSnapshotOutcome extends OutcomeImpl<Snapshot> {
private long _snapshotId;
public VmJobSnapshotOutcome(final AsyncJob job, final long snapshotId) {
super(Snapshot.class, job, VmJobCheckInterval.value(), new Predicate() {
@Override
public boolean checkCondition() {
AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, job.getId());
assert (jobVo != null);
if (jobVo == null || jobVo.getStatus() != JobInfo.Status.IN_PROGRESS)
return true;
return false;
}
}, AsyncJob.Topics.JOB_STATE);
_snapshotId = snapshotId;
}
@Override
protected Snapshot retrieve() {
return _snapshotDao.findById(_snapshotId);
}
}
public Outcome<Volume> attachVolumeToVmThroughJobQueue(final Long vmId, final Long volumeId, final Long deviceId) {
final CallContext context = CallContext.current();
@ -2084,7 +2166,8 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
final long jobId = (Long)context.getContextParameter("jobId");
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), volumeId);
return new VmJobVolumeOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
volumeId);
}
public Outcome<Volume> detachVolumeFromVmThroughJobQueue(final Long vmId, final Long volumeId) {
@ -2127,7 +2210,8 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
final long jobId = (Long)context.getContextParameter("jobId");
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), volumeId);
return new VmJobVolumeOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
volumeId);
}
public Outcome<Volume> resizeVolumeThroughJobQueue(final Long vmId, final long volumeId,
@ -2172,7 +2256,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
final long jobId = (Long)context.getContextParameter("jobId");
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
return new VmJobVolumeOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
volumeId);
}
@ -2218,10 +2302,57 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
final long jobId = (Long)context.getContextParameter("jobId");
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
return new VmJobVolumeOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
volumeId);
}
public Outcome<Snapshot> takeVolumeSnapshotThroughJobQueue(final Long vmId, final Long volumeId,
final Long policyId, final Long snapshotId, final Long accountId, final boolean quiesceVm) {
final CallContext context = CallContext.current();
final User callingUser = context.getCallingUser();
final Account callingAccount = context.getCallingAccount();
final VMInstanceVO vm = _vmInstanceDao.findById(vmId);
Transaction.execute(new TransactionCallbackNoReturn() {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
VmWorkJobVO workJob = null;
_vmInstanceDao.lockRow(vm.getId(), true);
workJob = new VmWorkJobVO(context.getContextId());
workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
workJob.setCmd(VmWorkTakeVolumeSnapshot.class.getName());
workJob.setAccountId(callingAccount.getId());
workJob.setUserId(callingUser.getId());
workJob.setStep(VmWorkJobVO.Step.Starting);
workJob.setVmType(vm.getType());
workJob.setVmInstanceId(vm.getId());
// save work context info (there are some duplications)
VmWorkTakeVolumeSnapshot workInfo = new VmWorkTakeVolumeSnapshot(
callingUser.getId(), accountId != null ? accountId : callingAccount.getId(), vm.getId(),
VolumeApiServiceImpl.VM_WORK_JOB_HANDLER, volumeId, policyId, snapshotId, quiesceVm);
workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
// Transaction syntax sugar has a cost here
context.putContextParameter("workJob", workJob);
context.putContextParameter("jobId", new Long(workJob.getId()));
}
});
final long jobId = (Long)context.getContextParameter("jobId");
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
return new VmJobSnapshotOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
snapshotId);
}
@Override
public Pair<JobInfo.Status, String> handleVmWorkJob(AsyncJob job, VmWork work) throws Exception {
VMInstanceVO vm = _entityMgr.findById(VMInstanceVO.class, work.getVmId());
@ -2287,6 +2418,22 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
+ ", volId: " + migrateWork.getVolumeId() + ", destPoolId: " + migrateWork.getDestPoolId() + ", live: " + migrateWork.isLiveMigrate());
return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, JobSerializerHelper.toObjectSerializedString(new Long(newVol.getId())));
} else if (work instanceof VmWorkTakeVolumeSnapshot) {
VmWorkTakeVolumeSnapshot snapshotWork = (VmWorkTakeVolumeSnapshot)work;
if (s_logger.isDebugEnabled())
s_logger.debug("Execute Take-Volume-Snapshot within VM work job context. vmId: " + snapshotWork.getVmId()
+ ", volId: " + snapshotWork.getVolumeId() + ", policyId: " + snapshotWork.getPolicyId() + ", quiesceVm: " + snapshotWork.isQuiesceVm());
Account account = _accountDao.findById(snapshotWork.getAccountId());
orchestrateTakeVolumeSnapshot(snapshotWork.getVolumeId(), snapshotWork.getPolicyId(), snapshotWork.getSnapshotId(),
account, snapshotWork.isQuiesceVm());
if (s_logger.isDebugEnabled())
s_logger.debug("Done executing Take-Volume-Snapshot within VM work job context. vmId: " + snapshotWork.getVmId()
+ ", volId: " + snapshotWork.getVolumeId() + ", policyId: " + snapshotWork.getPolicyId() + ", quiesceVm: " + snapshotWork.isQuiesceVm());
return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, JobSerializerHelper.toObjectSerializedString(snapshotWork.getSnapshotId()));
} else {
RuntimeException e = new RuntimeException("Unsupported VM work command: " + job.getCmd());
String exceptionJson = JobSerializerHelper.toSerializedString(e);

View File

@ -129,7 +129,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
// TODO
static final ConfigKey<Boolean> VmJobEnabled = new ConfigKey<Boolean>("Advanced",
Boolean.class, "vm.job.enabled", "false",
Boolean.class, "vm.job.enabled", "true",
"True to enable new VM sync model. false to use the old way", false);
static final ConfigKey<Long> VmJobCheckInterval = new ConfigKey<Long>("Advanced",
Long.class, "vm.job.check.interval", "3000",
@ -757,10 +757,10 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
return false;
}
public class VmJobSyncOutcome extends OutcomeImpl<VMSnapshot> {
public class VmJobVMSnapshotOutcome extends OutcomeImpl<VMSnapshot> {
private long _vmSnapshotId;
public VmJobSyncOutcome(final AsyncJob job, final long vmSnapshotId) {
public VmJobVMSnapshotOutcome(final AsyncJob job, final long vmSnapshotId) {
super(VMSnapshot.class, job, VmJobCheckInterval.value(), new Predicate() {
@Override
public boolean checkCondition() {
@ -781,10 +781,10 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
}
}
public class VmJobSyncVirtualMachineOutcome extends OutcomeImpl<VirtualMachine> {
public class VmJobVirtualMachineOutcome extends OutcomeImpl<VirtualMachine> {
long vmId;
public VmJobSyncVirtualMachineOutcome(final AsyncJob job, final long vmId) {
public VmJobVirtualMachineOutcome(final AsyncJob job, final long vmId) {
super(VirtualMachine.class, job, VmJobCheckInterval.value(), new Predicate() {
@Override
public boolean checkCondition() {
@ -845,7 +845,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
final long jobId = (Long)context.getContextParameter("jobId");
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
return new VmJobVMSnapshotOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
vmSnapshotId);
}
@ -890,7 +890,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
final long jobId = (Long)context.getContextParameter("jobId");
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
return new VmJobVMSnapshotOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
vmSnapshotId);
}
@ -935,7 +935,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
final long jobId = (Long)context.getContextParameter("jobId");
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
return new VmJobVMSnapshotOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
vmSnapshotId);
}
@ -980,7 +980,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
final long jobId = (Long)context.getContextParameter("jobId");
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
return new VmJobSyncVirtualMachineOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
return new VmJobVirtualMachineOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
vmId);
}