mirror of https://github.com/apache/cloudstack.git
CLOUDSTACK-669: convert volume attach/detach flows to make them be serialized with other VM operations
This commit is contained in:
parent
d0abf3fcc2
commit
ed2125ec34
|
|
@ -0,0 +1,23 @@
|
|||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
package com.cloud.vm;
|
||||
|
||||
public interface VmWorkConstants {
|
||||
public static final String VM_WORK_QUEUE = "VmWorkJobQueue";
|
||||
public static final String VM_WORK_JOB_DISPATCHER = "VmWorkJobDispatcher";
|
||||
public static final String VM_WORK_JOB_WAKEUP_DISPATCHER = "VmWorkJobWakeupDispatcher";
|
||||
}
|
||||
|
|
@ -75,17 +75,18 @@
|
|||
|
||||
<bean id= "vmWorkJobDispatcher" class="com.cloud.vm.VmWorkJobDispatcher">
|
||||
<property name="name">
|
||||
<util:constant static-field="com.cloud.vm.VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER"/>
|
||||
<util:constant static-field="com.cloud.vm.VmWorkConstants.VM_WORK_JOB_DISPATCHER"/>
|
||||
</property>
|
||||
<property name="handlers">
|
||||
<map>
|
||||
<entry key="VirtualMachineManagerImpl" value-ref="clusteredVirtualMachineManagerImpl" />
|
||||
<entry key="VolumeApiServiceImpl" value-ref="volumeApiServiceImpl" />
|
||||
</map>
|
||||
</property>
|
||||
</bean>
|
||||
<bean id= "vmWorkJobWakeupDispatcher" class="com.cloud.vm.VmWorkJobWakeupDispatcher">
|
||||
<property name="name">
|
||||
<util:constant static-field="com.cloud.vm.VmWorkJobDispatcher.VM_WORK_JOB_WAKEUP_DISPATCHER"/>
|
||||
<util:constant static-field="com.cloud.vm.VmWorkConstants.VM_WORK_JOB_WAKEUP_DISPATCHER"/>
|
||||
</property>
|
||||
</bean>
|
||||
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load Diff
|
|
@ -36,10 +36,6 @@ import com.cloud.vm.dao.VMInstanceDao;
|
|||
public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatcher {
|
||||
private static final Logger s_logger = Logger.getLogger(VmWorkJobDispatcher.class);
|
||||
|
||||
public static final String VM_WORK_QUEUE = "VmWorkJobQueue";
|
||||
public static final String VM_WORK_JOB_DISPATCHER = "VmWorkJobDispatcher";
|
||||
public static final String VM_WORK_JOB_WAKEUP_DISPATCHER = "VmWorkJobWakeupDispatcher";
|
||||
|
||||
@Inject private VirtualMachineManagerImpl _vmMgr;
|
||||
@Inject private AsyncJobManager _asyncJobMgr;
|
||||
@Inject private VMInstanceDao _instanceDao;
|
||||
|
|
@ -103,79 +99,6 @@ public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatch
|
|||
Pair<JobInfo.Status, String> result = handler.handleVmWorkJob(job, work);
|
||||
_asyncJobMgr.completeAsyncJob(job.getId(), result.first(), 0, result.second());
|
||||
|
||||
/*
|
||||
VMInstanceVO vm = _instanceDao.findById(work.getVmId());
|
||||
if (vm == null) {
|
||||
s_logger.info("Unable to find vm " + work.getVmId());
|
||||
}
|
||||
assert(vm != null);
|
||||
if(work instanceof VmWorkStart) {
|
||||
VmWorkStart workStart = (VmWorkStart)work;
|
||||
_vmMgr.orchestrateStart(vm.getUuid(), workStart.getParams(), workStart.getPlan());
|
||||
_asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, null);
|
||||
} else if(work instanceof VmWorkStop) {
|
||||
VmWorkStop workStop = (VmWorkStop)work;
|
||||
_vmMgr.orchestrateStop(vm.getUuid(), workStop.isCleanup());
|
||||
_asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, null);
|
||||
} else if(work instanceof VmWorkMigrate) {
|
||||
VmWorkMigrate workMigrate = (VmWorkMigrate)work;
|
||||
_vmMgr.orchestrateMigrate(vm.getUuid(), workMigrate.getSrcHostId(), workMigrate.getDeployDestination());
|
||||
_asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, null);
|
||||
} else if(work instanceof VmWorkMigrateWithStorage) {
|
||||
VmWorkMigrateWithStorage workMigrateWithStorage = (VmWorkMigrateWithStorage)work;
|
||||
_vmMgr.orchestrateMigrateWithStorage(vm.getUuid(),
|
||||
workMigrateWithStorage.getSrcHostId(),
|
||||
workMigrateWithStorage.getDestHostId(),
|
||||
workMigrateWithStorage.getVolumeToPool());
|
||||
_asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, null);
|
||||
} else if(work instanceof VmWorkMigrateForScale) {
|
||||
VmWorkMigrateForScale workMigrateForScale = (VmWorkMigrateForScale)work;
|
||||
_vmMgr.orchestrateMigrateForScale(vm.getUuid(),
|
||||
workMigrateForScale.getSrcHostId(),
|
||||
workMigrateForScale.getDeployDestination(),
|
||||
workMigrateForScale.getNewServiceOfferringId());
|
||||
_asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, null);
|
||||
} else if(work instanceof VmWorkReboot) {
|
||||
VmWorkReboot workReboot = (VmWorkReboot)work;
|
||||
_vmMgr.orchestrateReboot(vm.getUuid(), workReboot.getParams());
|
||||
_asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, null);
|
||||
} else if(work instanceof VmWorkAddVmToNetwork) {
|
||||
VmWorkAddVmToNetwork workAddVmToNetwork = (VmWorkAddVmToNetwork)work;
|
||||
NicProfile nic = _vmMgr.orchestrateAddVmToNetwork(vm, workAddVmToNetwork.getNetwork(),
|
||||
workAddVmToNetwork.getRequestedNicProfile());
|
||||
_asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0,
|
||||
JobSerializerHelper.toObjectSerializedString(nic));
|
||||
} else if(work instanceof VmWorkRemoveNicFromVm) {
|
||||
VmWorkRemoveNicFromVm workRemoveNicFromVm = (VmWorkRemoveNicFromVm)work;
|
||||
boolean result = _vmMgr.orchestrateRemoveNicFromVm(vm, workRemoveNicFromVm.getNic());
|
||||
_asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0,
|
||||
JobSerializerHelper.toObjectSerializedString(new Boolean(result)));
|
||||
} else if(work instanceof VmWorkRemoveVmFromNetwork) {
|
||||
VmWorkRemoveVmFromNetwork workRemoveVmFromNetwork = (VmWorkRemoveVmFromNetwork)work;
|
||||
boolean result = _vmMgr.orchestrateRemoveVmFromNetwork(vm,
|
||||
workRemoveVmFromNetwork.getNetwork(), workRemoveVmFromNetwork.getBroadcastUri());
|
||||
_asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0,
|
||||
JobSerializerHelper.toObjectSerializedString(new Boolean(result)));
|
||||
} else if(work instanceof VmWorkReconfigure) {
|
||||
VmWorkReconfigure workReconfigure = (VmWorkReconfigure)work;
|
||||
_vmMgr.reConfigureVm(vm.getUuid(), workReconfigure.getNewServiceOffering(),
|
||||
workReconfigure.isSameHost());
|
||||
_asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, null);
|
||||
} else if(work instanceof VmWorkStorageMigration) {
|
||||
VmWorkStorageMigration workStorageMigration = (VmWorkStorageMigration)work;
|
||||
_vmMgr.orchestrateStorageMigration(vm.getUuid(), workStorageMigration.getDestStoragePool());
|
||||
_asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, null);
|
||||
} else {
|
||||
assert(false);
|
||||
s_logger.error("Unhandled VM work command: " + job.getCmd());
|
||||
|
||||
RuntimeException e = new RuntimeException("Unsupported VM work command: " + job.getCmd());
|
||||
String exceptionJson = JobSerializerHelper.toSerializedString(e);
|
||||
s_logger.error("Serialize exception object into json: " + exceptionJson);
|
||||
_asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, exceptionJson);
|
||||
}
|
||||
*/
|
||||
|
||||
} catch(Throwable e) {
|
||||
s_logger.error("Unable to complete " + job, e);
|
||||
|
||||
|
|
|
|||
|
|
@ -109,13 +109,11 @@ public class OutcomeImpl<T> implements Outcome<T> {
|
|||
@Override
|
||||
public void execute(Task<T> task) {
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(Task<T> task, long wait, TimeUnit unit) {
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
}
|
||||
|
||||
public Predicate getPredicate() {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,40 @@
|
|||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
package com.cloud.storage;
|
||||
|
||||
import com.cloud.vm.VmWork;
|
||||
|
||||
public class VmWorkAttachVolume extends VmWork {
|
||||
private static final long serialVersionUID = 553291814854451740L;
|
||||
|
||||
private Long volumeId;
|
||||
private Long deviceId;
|
||||
|
||||
public VmWorkAttachVolume(long userId, long accountId, long vmId, String handlerName, Long volumeId, Long deviceId) {
|
||||
super(userId, accountId, vmId, handlerName);
|
||||
this.volumeId = volumeId;
|
||||
this.deviceId = deviceId;
|
||||
}
|
||||
|
||||
public Long getVolumeId() {
|
||||
return volumeId;
|
||||
}
|
||||
|
||||
public Long getDeviceId() {
|
||||
return deviceId;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,34 @@
|
|||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
package com.cloud.storage;
|
||||
|
||||
import com.cloud.vm.VmWork;
|
||||
|
||||
public class VmWorkDetachVolume extends VmWork {
|
||||
private static final long serialVersionUID = -8722243207385263101L;
|
||||
|
||||
private Long volumeId;
|
||||
|
||||
public VmWorkDetachVolume(long userId, long accountId, long vmId, String handlerName, Long volumeId) {
|
||||
super(userId, accountId, vmId, handlerName);
|
||||
this.volumeId = volumeId;
|
||||
}
|
||||
|
||||
public Long getVolumeId() {
|
||||
return volumeId;
|
||||
}
|
||||
}
|
||||
|
|
@ -18,9 +18,9 @@ package com.cloud.storage;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
|
|
@ -53,10 +53,17 @@ import org.apache.cloudstack.engine.subsystem.api.storage.VolumeInfo;
|
|||
import org.apache.cloudstack.engine.subsystem.api.storage.VolumeService;
|
||||
import org.apache.cloudstack.engine.subsystem.api.storage.VolumeService.VolumeApiResult;
|
||||
import org.apache.cloudstack.framework.async.AsyncCallFuture;
|
||||
import org.apache.cloudstack.framework.config.ConfigKey;
|
||||
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
|
||||
import org.apache.cloudstack.framework.jobs.AsyncJob;
|
||||
import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext;
|
||||
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
|
||||
import org.apache.cloudstack.framework.jobs.Outcome;
|
||||
import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
|
||||
import org.apache.cloudstack.framework.jobs.impl.JobSerializerHelper;
|
||||
import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl;
|
||||
import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO;
|
||||
import org.apache.cloudstack.jobs.JobInfo;
|
||||
import org.apache.cloudstack.storage.command.AttachAnswer;
|
||||
import org.apache.cloudstack.storage.command.AttachCommand;
|
||||
import org.apache.cloudstack.storage.command.DettachCommand;
|
||||
|
|
@ -130,18 +137,22 @@ import com.cloud.template.TemplateManager;
|
|||
import com.cloud.user.Account;
|
||||
import com.cloud.user.AccountManager;
|
||||
import com.cloud.user.ResourceLimitService;
|
||||
import com.cloud.user.User;
|
||||
import com.cloud.user.VmDiskStatisticsVO;
|
||||
import com.cloud.user.dao.AccountDao;
|
||||
import com.cloud.user.dao.UserDao;
|
||||
import com.cloud.user.dao.VmDiskStatisticsDao;
|
||||
import com.cloud.utils.EnumUtils;
|
||||
import com.cloud.utils.NumbersUtil;
|
||||
import com.cloud.utils.Pair;
|
||||
import com.cloud.utils.Predicate;
|
||||
import com.cloud.utils.UriUtils;
|
||||
import com.cloud.utils.component.ManagerBase;
|
||||
import com.cloud.utils.db.DB;
|
||||
import com.cloud.utils.db.EntityManager;
|
||||
import com.cloud.utils.db.Transaction;
|
||||
import com.cloud.utils.db.TransactionCallback;
|
||||
import com.cloud.utils.db.TransactionCallbackNoReturn;
|
||||
import com.cloud.utils.db.TransactionStatus;
|
||||
import com.cloud.utils.exception.CloudRuntimeException;
|
||||
import com.cloud.utils.fsm.NoTransitionException;
|
||||
|
|
@ -152,6 +163,10 @@ import com.cloud.vm.VMInstanceVO;
|
|||
import com.cloud.vm.VirtualMachine;
|
||||
import com.cloud.vm.VirtualMachine.State;
|
||||
import com.cloud.vm.VirtualMachineManager;
|
||||
import com.cloud.vm.VmWork;
|
||||
import com.cloud.vm.VmWorkConstants;
|
||||
import com.cloud.vm.VmWorkJobHandler;
|
||||
import com.cloud.vm.VmWorkSerializer;
|
||||
import com.cloud.vm.dao.ConsoleProxyDao;
|
||||
import com.cloud.vm.dao.DomainRouterDao;
|
||||
import com.cloud.vm.dao.SecondaryStorageVmDao;
|
||||
|
|
@ -160,8 +175,11 @@ import com.cloud.vm.dao.VMInstanceDao;
|
|||
import com.cloud.vm.snapshot.VMSnapshotVO;
|
||||
import com.cloud.vm.snapshot.dao.VMSnapshotDao;
|
||||
|
||||
public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiService {
|
||||
public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiService, VmWorkJobHandler {
|
||||
private final static Logger s_logger = Logger.getLogger(VolumeApiServiceImpl.class);
|
||||
|
||||
public static final String VM_WORK_JOB_HANDLER = VolumeApiServiceImpl.class.getSimpleName();
|
||||
|
||||
@Inject
|
||||
VolumeOrchestrationService _volumeMgr;
|
||||
|
||||
|
|
@ -307,6 +325,18 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
|
|||
protected HypervisorCapabilitiesDao _hypervisorCapabilitiesDao;
|
||||
@Inject
|
||||
StorageManager storageMgr;
|
||||
|
||||
@Inject
|
||||
protected AsyncJobManager _jobMgr;
|
||||
|
||||
// TODO
|
||||
static final ConfigKey<Boolean> VmJobEnabled = new ConfigKey<Boolean>("Advanced",
|
||||
Boolean.class, "vm.job.enabled", "false",
|
||||
"True to enable new VM sync model. false to use the old way", false);
|
||||
static final ConfigKey<Long> VmJobCheckInterval = new ConfigKey<Long>("Advanced",
|
||||
Long.class, "vm.job.check.interval", "3000",
|
||||
"Interval in milliseconds to check if the job is complete", false);
|
||||
|
||||
private int _customDiskOfferingMinSize = 1;
|
||||
private final int _customDiskOfferingMaxSize = 1024;
|
||||
private long _maxVolumeSizeInGb;
|
||||
|
|
@ -379,8 +409,8 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
|
|||
userSpecifiedName = getRandomVolumeName();
|
||||
}
|
||||
if ((!url.toLowerCase().endsWith("vhd")) && (!url.toLowerCase().endsWith("vhd.zip")) && (!url.toLowerCase().endsWith("vhd.bz2")) &&
|
||||
(!url.toLowerCase().endsWith("vhdx")) && (!url.toLowerCase().endsWith("vhdx.zip")) &&
|
||||
(!url.toLowerCase().endsWith("vhdx.gz")) && (!url.toLowerCase().endsWith("vhdx.bz2")) &&
|
||||
(!url.toLowerCase().endsWith("vhdx")) && (!url.toLowerCase().endsWith("vhdx.zip")) &&
|
||||
(!url.toLowerCase().endsWith("vhdx.gz")) && (!url.toLowerCase().endsWith("vhdx.bz2")) &&
|
||||
(!url.toLowerCase().endsWith("vhd.gz")) && (!url.toLowerCase().endsWith("qcow2")) && (!url.toLowerCase().endsWith("qcow2.zip")) &&
|
||||
(!url.toLowerCase().endsWith("qcow2.bz2")) && (!url.toLowerCase().endsWith("qcow2.gz")) && (!url.toLowerCase().endsWith("ova")) &&
|
||||
(!url.toLowerCase().endsWith("ova.zip")) && (!url.toLowerCase().endsWith("ova.bz2")) && (!url.toLowerCase().endsWith("ova.gz")) &&
|
||||
|
|
@ -1026,12 +1056,37 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
|
|||
|
||||
@Override
|
||||
public Volume attachVolumeToVM(AttachVolumeCmd command) {
|
||||
Long vmId = command.getVirtualMachineId();
|
||||
Long volumeId = command.getId();
|
||||
Long deviceId = command.getDeviceId();
|
||||
return attachVolumeToVM(vmId, volumeId, deviceId);
|
||||
|
||||
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
|
||||
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
|
||||
// avoid re-entrance
|
||||
return orchestrateAttachVolumeToVM(command.getVirtualMachineId(), command.getId(), command.getDeviceId());
|
||||
} else {
|
||||
Outcome<Volume> outcome = attachVolumeToVmThroughJobQueue(command.getVirtualMachineId(), command.getId(), command.getDeviceId());
|
||||
|
||||
Volume vol = null;
|
||||
try {
|
||||
vol = outcome.get();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException("Operation is interrupted", e);
|
||||
} catch (java.util.concurrent.ExecutionException e) {
|
||||
throw new RuntimeException("Execution excetion", e);
|
||||
}
|
||||
|
||||
Throwable jobException = retrieveExecutionException(outcome.getJob());
|
||||
if (jobException != null) {
|
||||
if (jobException instanceof ConcurrentOperationException)
|
||||
throw (ConcurrentOperationException)jobException;
|
||||
else
|
||||
throw new RuntimeException("Unexpected exception", jobException);
|
||||
}
|
||||
return vol;
|
||||
}
|
||||
}
|
||||
|
||||
private Volume orchestrateAttachVolumeToVM(Long vmId, Long volumeId, Long deviceId) {
|
||||
return attachVolumeToVM(vmId, volumeId, deviceId);
|
||||
}
|
||||
|
||||
@ActionEvent(eventType = EventTypes.EVENT_VOLUME_ATTACH, eventDescription = "attaching volume", async = true)
|
||||
public Volume attachVolumeToVM(Long vmId, Long volumeId, Long deviceId) {
|
||||
|
|
@ -1214,7 +1269,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
|
|||
@ActionEvent(eventType = EventTypes.EVENT_VOLUME_UPDATE, eventDescription = "updating volume", async = true)
|
||||
public Volume updateVolume(long volumeId, String path, String state, Long storageId, Boolean displayVolume) {
|
||||
VolumeVO volume = _volumeDao.findById(volumeId);
|
||||
|
||||
|
||||
if (path != null) {
|
||||
volume.setPath(path);
|
||||
}
|
||||
|
|
@ -1222,7 +1277,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
|
|||
if (displayVolume != null) {
|
||||
volume.setDisplayVolume(displayVolume);
|
||||
}
|
||||
|
||||
|
||||
if (state != null) {
|
||||
try {
|
||||
Volume.State volumeState = Volume.State.valueOf(state);
|
||||
|
|
@ -1232,7 +1287,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
|
|||
throw new InvalidParameterValueException("Invalid volume state specified");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (storageId != null) {
|
||||
StoragePool pool = _storagePoolDao.findById(storageId);
|
||||
if (pool.getDataCenterId() != volume.getDataCenterId()) {
|
||||
|
|
@ -1240,7 +1295,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
|
|||
}
|
||||
volume.setPoolId(pool.getId());
|
||||
}
|
||||
|
||||
|
||||
_volumeDao.update(volumeId, volume);
|
||||
|
||||
return volume;
|
||||
|
|
@ -1315,6 +1370,38 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
|
|||
_asyncMgr.updateAsyncJobStatus(job.getId(), BaseCmd.PROGRESS_INSTANCE_CREATED, volumeId.toString());
|
||||
}
|
||||
|
||||
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
|
||||
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
|
||||
// avoid re-entrance
|
||||
return orchestrateDetachVolumeFromVM(vmId, volumeId);
|
||||
} else {
|
||||
Outcome<Volume> outcome = detachVolumeFromVmThroughJobQueue(vmId, volumeId);
|
||||
|
||||
Volume vol = null;
|
||||
try {
|
||||
vol = outcome.get();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException("Operation is interrupted", e);
|
||||
} catch (java.util.concurrent.ExecutionException e) {
|
||||
throw new RuntimeException("Execution excetion", e);
|
||||
}
|
||||
|
||||
Throwable jobException = retrieveExecutionException(outcome.getJob());
|
||||
if (jobException != null) {
|
||||
if (jobException instanceof ConcurrentOperationException)
|
||||
throw (ConcurrentOperationException)jobException;
|
||||
else
|
||||
throw new RuntimeException("Unexpected exception", jobException);
|
||||
}
|
||||
return vol;
|
||||
}
|
||||
}
|
||||
|
||||
private Volume orchestrateDetachVolumeFromVM(long vmId, long volumeId) {
|
||||
|
||||
Volume volume = _volumeDao.findById(volumeId);
|
||||
VMInstanceVO vm = _vmInstanceDao.findById(vmId);
|
||||
|
||||
String errorMsg = "Failed to detach volume: " + volume.getName() + " from VM: " + vm.getHostName();
|
||||
boolean sendCommand = (vm.getState() == State.Running);
|
||||
|
||||
|
|
@ -1861,4 +1948,176 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
|
|||
_storagePoolAllocators = storagePoolAllocators;
|
||||
}
|
||||
|
||||
public Throwable retrieveExecutionException(AsyncJob job) {
|
||||
assert (job != null);
|
||||
assert (job.getDispatcher().equals(VmWorkConstants.VM_WORK_JOB_DISPATCHER));
|
||||
|
||||
AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, job.getId());
|
||||
if (jobVo != null && jobVo.getResult() != null) {
|
||||
Object obj = JobSerializerHelper.fromSerializedString(job.getResult());
|
||||
|
||||
if (obj != null && obj instanceof Throwable)
|
||||
return (Throwable)obj;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public class VmJobSyncOutcome extends OutcomeImpl<Volume> {
|
||||
private long _volumeId;
|
||||
|
||||
public VmJobSyncOutcome(final AsyncJob job, final long volumeId) {
|
||||
super(Volume.class, job, VmJobCheckInterval.value(), new Predicate() {
|
||||
@Override
|
||||
public boolean checkCondition() {
|
||||
AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, job.getId());
|
||||
assert (jobVo != null);
|
||||
if (jobVo == null || jobVo.getStatus() != JobInfo.Status.IN_PROGRESS)
|
||||
return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
}, AsyncJob.Topics.JOB_STATE);
|
||||
_volumeId = volumeId;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Volume retrieve() {
|
||||
return _volumeDao.findById(_volumeId);
|
||||
}
|
||||
}
|
||||
|
||||
public Outcome<Volume> attachVolumeToVmThroughJobQueue(final Long vmId, final Long volumeId, final Long deviceId) {
|
||||
|
||||
final CallContext context = CallContext.current();
|
||||
final User callingUser = context.getCallingUser();
|
||||
final Account callingAccount = context.getCallingAccount();
|
||||
|
||||
final VMInstanceVO vm = _vmInstanceDao.findById(vmId);
|
||||
|
||||
Transaction.execute(new TransactionCallbackNoReturn() {
|
||||
@Override
|
||||
public void doInTransactionWithoutResult(TransactionStatus status) {
|
||||
VmWorkJobVO workJob = null;
|
||||
|
||||
_vmInstanceDao.lockRow(vm.getId(), true);
|
||||
workJob = new VmWorkJobVO(context.getContextId());
|
||||
|
||||
workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
|
||||
workJob.setCmd(VmWorkAttachVolume.class.getName());
|
||||
|
||||
workJob.setAccountId(callingAccount.getId());
|
||||
workJob.setUserId(callingUser.getId());
|
||||
workJob.setStep(VmWorkJobVO.Step.Starting);
|
||||
workJob.setVmType(vm.getType());
|
||||
workJob.setVmInstanceId(vm.getId());
|
||||
|
||||
// save work context info (there are some duplications)
|
||||
VmWorkAttachVolume workInfo = new VmWorkAttachVolume(callingUser.getId(), callingAccount.getId(), vm.getId(),
|
||||
VolumeApiServiceImpl.VM_WORK_JOB_HANDLER, volumeId, deviceId);
|
||||
workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
|
||||
|
||||
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
|
||||
|
||||
// Transaction syntax sugar has a cost here
|
||||
context.putContextParameter("workJob", workJob);
|
||||
context.putContextParameter("jobId", new Long(workJob.getId()));
|
||||
}
|
||||
});
|
||||
|
||||
final long jobId = (Long)context.getContextParameter("jobId");
|
||||
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
|
||||
|
||||
return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
|
||||
volumeId);
|
||||
}
|
||||
|
||||
public Outcome<Volume> detachVolumeFromVmThroughJobQueue(final Long vmId, final Long volumeId) {
|
||||
|
||||
final CallContext context = CallContext.current();
|
||||
final User callingUser = context.getCallingUser();
|
||||
final Account callingAccount = context.getCallingAccount();
|
||||
|
||||
final VMInstanceVO vm = _vmInstanceDao.findById(vmId);
|
||||
|
||||
Transaction.execute(new TransactionCallbackNoReturn() {
|
||||
@Override
|
||||
public void doInTransactionWithoutResult(TransactionStatus status) {
|
||||
VmWorkJobVO workJob = null;
|
||||
|
||||
_vmInstanceDao.lockRow(vm.getId(), true);
|
||||
workJob = new VmWorkJobVO(context.getContextId());
|
||||
|
||||
workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
|
||||
workJob.setCmd(VmWorkDetachVolume.class.getName());
|
||||
|
||||
workJob.setAccountId(callingAccount.getId());
|
||||
workJob.setUserId(callingUser.getId());
|
||||
workJob.setStep(VmWorkJobVO.Step.Starting);
|
||||
workJob.setVmType(vm.getType());
|
||||
workJob.setVmInstanceId(vm.getId());
|
||||
|
||||
// save work context info (there are some duplications)
|
||||
VmWorkDetachVolume workInfo = new VmWorkDetachVolume(callingUser.getId(), callingAccount.getId(), vm.getId(),
|
||||
VolumeApiServiceImpl.VM_WORK_JOB_HANDLER, volumeId);
|
||||
workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
|
||||
|
||||
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
|
||||
|
||||
// Transaction syntax sugar has a cost here
|
||||
context.putContextParameter("workJob", workJob);
|
||||
context.putContextParameter("jobId", new Long(workJob.getId()));
|
||||
}
|
||||
});
|
||||
|
||||
final long jobId = (Long)context.getContextParameter("jobId");
|
||||
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
|
||||
|
||||
return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
|
||||
volumeId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Pair<JobInfo.Status, String> handleVmWorkJob(AsyncJob job, VmWork work) throws Exception {
|
||||
VMInstanceVO vm = _entityMgr.findById(VMInstanceVO.class, work.getVmId());
|
||||
if (vm == null) {
|
||||
s_logger.info("Unable to find vm " + work.getVmId());
|
||||
}
|
||||
assert (vm != null);
|
||||
|
||||
if (work instanceof VmWorkAttachVolume) {
|
||||
|
||||
VmWorkAttachVolume attachWork = (VmWorkAttachVolume)work;
|
||||
|
||||
if (s_logger.isDebugEnabled())
|
||||
s_logger.debug("Execute Attach-Volume within VM work job context. vmId: " + attachWork.getVmId()
|
||||
+ ", volId: " + attachWork.getVolumeId() + ", deviceId: " + attachWork.getDeviceId());
|
||||
|
||||
orchestrateAttachVolumeToVM(attachWork.getVmId(), attachWork.getVolumeId(), attachWork.getDeviceId());
|
||||
|
||||
if (s_logger.isDebugEnabled())
|
||||
s_logger.debug("Done executing Attach-Volume within VM work job context. vmId: " + attachWork.getVmId()
|
||||
+ ", volId: " + attachWork.getVolumeId() + ", deviceId: " + attachWork.getDeviceId());
|
||||
|
||||
return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
|
||||
} else if (work instanceof VmWorkDetachVolume) {
|
||||
VmWorkDetachVolume detachWork = (VmWorkDetachVolume)work;
|
||||
|
||||
if (s_logger.isDebugEnabled())
|
||||
s_logger.debug("Execute Detach-Volume within VM work job context. vmId: " + detachWork.getVmId()
|
||||
+ ", volId: " + detachWork.getVolumeId());
|
||||
|
||||
orchestrateDetachVolumeFromVM(detachWork.getVmId(), detachWork.getVolumeId());
|
||||
|
||||
if (s_logger.isDebugEnabled())
|
||||
s_logger.debug("Done executing Detach-Volume within VM work job context. vmId: " + detachWork.getVmId()
|
||||
+ ", volId: " + detachWork.getVolumeId());
|
||||
|
||||
return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
|
||||
} else {
|
||||
RuntimeException e = new RuntimeException("Unsupported VM work command: " + job.getCmd());
|
||||
String exceptionJson = JobSerializerHelper.toSerializedString(e);
|
||||
s_logger.error("Serialize exception object into json: " + exceptionJson);
|
||||
return new Pair<JobInfo.Status, String>(JobInfo.Status.FAILED, exceptionJson);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue