Start and stop VM now works for CPVM and SSVM with new VMSync

This commit is contained in:
Kelven Yang 2013-05-07 17:03:04 -07:00
parent 006b2f8ed8
commit 3c780a500d
10 changed files with 264 additions and 304 deletions

View File

@ -794,6 +794,7 @@
<bean id="asyncJobDaoImpl" class="com.cloud.async.dao.AsyncJobDaoImpl" />
<bean id="asyncJobJoinDaoImpl" class="com.cloud.api.query.dao.AsyncJobJoinDaoImpl" />
<bean id="asyncJobJournalDaoImpl" class="com.cloud.async.dao.AsyncJobJournalDaoImpl" />
<bean id="asyncJobJoinMapDaoImpl" class="com.cloud.async.dao.AsyncJobJoinMapDaoImpl" />
<bean id="asyncJobManagerImpl" class="com.cloud.async.AsyncJobManagerImpl">
<property name="defaultDispatcher" value="ApiAsyncJobDispatcher" />
</bean>

View File

@ -18,10 +18,17 @@ package com.cloud.async;
import javax.inject.Inject;
import com.cloud.async.dao.AsyncJobJoinMapDao;
import com.cloud.exception.ConcurrentOperationException;
import com.cloud.exception.InsufficientCapacityException;
import com.cloud.exception.ResourceUnavailableException;
import com.cloud.serializer.SerializerHelper;
public class AsyncJobExecutionContext {
private AsyncJob _job;
@Inject private AsyncJobManager _jobMgr;
@Inject private AsyncJobManager _jobMgr;
@Inject private AsyncJobJoinMapDao _joinMapDao;
private static ThreadLocal<AsyncJobExecutionContext> s_currentExectionContext = new ThreadLocal<AsyncJobExecutionContext>();
@ -65,6 +72,48 @@ public class AsyncJobExecutionContext {
_jobMgr.logJobJournal(_job.getId(), journalType, journalText, journalObjJson);
}
public void joinJob(long joinJobId) {
assert(_job != null);
_jobMgr.joinJob(_job.getId(), joinJobId);
}
//
// check failure exception before we disjoin the worker job
// TODO : it is ugly and this will become unnecessary after we switch to full-async mode
//
public void disjoinJob(long joinedJobId) throws InsufficientCapacityException,
ConcurrentOperationException, ResourceUnavailableException {
assert(_job != null);
AsyncJobJoinMapVO record = _joinMapDao.getJoinRecord(_job.getId(), joinedJobId);
if(record.getJoinStatus() == AsyncJobResult.STATUS_FAILED && record.getJoinResult() != null) {
Object exception = SerializerHelper.fromObjectSerializedString(record.getJoinResult());
if(exception != null && exception instanceof Exception) {
if(exception instanceof InsufficientCapacityException)
throw (InsufficientCapacityException)exception;
else if(exception instanceof ConcurrentOperationException)
throw (ConcurrentOperationException)exception;
else if(exception instanceof ResourceUnavailableException)
throw (ResourceUnavailableException)exception;
else
throw new RuntimeException((Exception)exception);
}
}
_jobMgr.disjoinJob(_job.getId(), joinedJobId);
}
public void completeJoin(int joinStatus, String joinResult) {
assert(_job != null);
_jobMgr.completeJoin(_job.getId(), joinStatus, joinResult);
}
public void completeJobAndJoin(int joinStatus, String joinResult) {
assert(_job != null);
_jobMgr.completeJoin(_job.getId(), joinStatus, joinResult);
_jobMgr.completeAsyncJob(_job.getId(), joinStatus, 0, null);
}
public static AsyncJobExecutionContext getCurrentExecutionContext() {
return s_currentExectionContext.get();
}

View File

@ -25,25 +25,29 @@ import com.cloud.utils.component.Manager;
public interface AsyncJobManager extends Manager {
public AsyncJobVO getAsyncJob(long jobId);
AsyncJobVO getAsyncJob(long jobId);
public List<? extends AsyncJob> findInstancePendingAsyncJobs(String instanceType, Long accountId);
List<? extends AsyncJob> findInstancePendingAsyncJobs(String instanceType, Long accountId);
public long submitAsyncJob(AsyncJob job);
public long submitAsyncJob(AsyncJob job, boolean scheduleJobExecutionInContext);
public long submitAsyncJob(AsyncJob job, String syncObjType, long syncObjId);
public AsyncJobResult queryAsyncJobResult(long jobId);
long submitAsyncJob(AsyncJob job);
long submitAsyncJob(AsyncJob job, boolean scheduleJobExecutionInContext);
long submitAsyncJob(AsyncJob job, String syncObjType, long syncObjId);
AsyncJobResult queryAsyncJobResult(long jobId);
public void completeAsyncJob(long jobId, int jobStatus, int resultCode, Object resultObject);
public void updateAsyncJobStatus(long jobId, int processStatus, Object resultObject);
public void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId);
public void logJobJournal(long jobId, AsyncJob.JournalType journalType, String
void completeAsyncJob(long jobId, int jobStatus, int resultCode, Object resultObject);
void updateAsyncJobStatus(long jobId, int processStatus, Object resultObject);
void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId);
void logJobJournal(long jobId, AsyncJob.JournalType journalType, String
journalText, String journalObjJson);
public void releaseSyncSource();
public void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId, long queueSizeLimit);
public boolean waitAndCheck(String[] wakupSubjects, long checkIntervalInMilliSeconds,
void joinJob(long jobId, long joinJobId);
void disjoinJob(long jobId, long joinedJobId);
void completeJoin(long joinJobId, int joinStatus, String joinResult);
void releaseSyncSource();
void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId, long queueSizeLimit);
boolean waitAndCheck(String[] wakupSubjects, long checkIntervalInMilliSeconds,
long timeoutInMiliseconds, Predicate predicate);
/**
@ -51,5 +55,5 @@ public interface AsyncJobManager extends Manager {
* @param cmd the command that specifies the job id
* @return an async-call result object
*/
public AsyncJob queryAsyncJobResult(QueryAsyncJobResultCmd cmd);
AsyncJob queryAsyncJobResult(QueryAsyncJobResultCmd cmd);
}

View File

@ -38,11 +38,15 @@ import org.apache.cloudstack.api.command.user.job.QueryAsyncJobResultCmd;
import org.apache.cloudstack.api.response.ExceptionResponse;
import org.apache.cloudstack.framework.messagebus.MessageBus;
import org.apache.cloudstack.framework.messagebus.MessageDetector;
import org.apache.cloudstack.framework.messagebus.PublishScope;
import org.apache.cloudstack.messagebus.TopicConstants;
import org.apache.log4j.Logger;
import org.apache.log4j.NDC;
import com.cloud.api.ApiSerializerHelper;
import com.cloud.api.query.dao.AsyncJobJoinDao;
import com.cloud.async.dao.AsyncJobDao;
import com.cloud.async.dao.AsyncJobJoinMapDao;
import com.cloud.async.dao.AsyncJobJournalDao;
import com.cloud.cluster.ClusterManager;
import com.cloud.cluster.ClusterManagerListener;
@ -86,6 +90,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
@Inject private AsyncJobDao _jobDao;
@Inject private AsyncJobJournalDao _journalDao;
@Inject private ConfigurationDao _configDao;
@Inject private AsyncJobJoinMapDao _joinMapDao;
@Inject private List<AsyncJobDispatcher> _jobDispatchers;
@Inject private MessageBus _messageBus;
@ -216,6 +221,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
job.setLastUpdated(DateUtil.currentGMTTime());
_jobDao.update(jobId, job);
txn.commit();
_messageBus.publish(null, TopicConstants.JOB_STATE, PublishScope.GLOBAL, (Long)jobId);
} catch(Exception e) {
s_logger.error("Unexpected exception while completing async job-" + jobId, e);
txn.rollback();
@ -291,6 +298,21 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
_journalDao.persist(journal);
}
@DB
public void joinJob(long jobId, long joinJobId) {
_joinMapDao.joinJob(jobId, joinJobId, this.getMsid());
}
@DB
public void disjoinJob(long jobId, long joinedJobId) {
_joinMapDao.disjoinJob(jobId, joinedJobId);
}
@DB
public void completeJoin(long joinJobId, int joinStatus, String joinResult) {
_joinMapDao.completeJoin(joinJobId, joinStatus, joinResult, this.getMsid());
}
@Override
public void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId, long queueSizeLimit) {
if(s_logger.isDebugEnabled()) {

View File

@ -159,6 +159,7 @@ import com.cloud.vm.VirtualMachineManager;
import com.cloud.vm.VirtualMachineName;
import com.cloud.vm.VirtualMachineProfile;
import com.cloud.vm.VmWorkStart;
import com.cloud.vm.VmWorkStop;
import com.cloud.vm.dao.ConsoleProxyDao;
import com.cloud.vm.dao.UserVmDetailsDao;
import com.cloud.vm.dao.VMInstanceDao;
@ -2052,15 +2053,32 @@ public class ConsoleProxyManagerImpl extends ManagerBase implements ConsoleProxy
try {
_itMgr.processVmStartWork(vm, ((VmWorkStart)work).getParams(),
user, account, ((VmWorkStart)work).getPlan());
AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobResult.STATUS_SUCCEEDED, null);
} catch(Exception e) {
s_logger.error("Exception in process VM-start work", e);
String result = SerializerHelper.toObjectSerializedString(e);
AsyncJobExecutionContext.getCurrentExecutionContext().completeAsyncJob(AsyncJobResult.STATUS_FAILED, 0, result);
AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobResult.STATUS_FAILED, result);
}
}
@Override
public void vmWorkStop(VmWork work) {
// TODO
assert(work instanceof VmWorkStop);
ConsoleProxyVO vm = findById(work.getVmId());
UserVO user = _entityMgr.findById(UserVO.class, work.getUserId());
AccountVO account = _entityMgr.findById(AccountVO.class, work.getAccountId());
try {
_itMgr.processVmStopWork(vm, ((VmWorkStop)work).isForceStop(), user, account);
AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobResult.STATUS_SUCCEEDED, null);
} catch(Exception e) {
s_logger.error("Exception in process VM-stop work", e);
String result = SerializerHelper.toObjectSerializedString(e);
AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobResult.STATUS_FAILED, result);
}
}
}

View File

@ -144,6 +144,7 @@ import com.cloud.vm.VirtualMachineManager;
import com.cloud.vm.VirtualMachineName;
import com.cloud.vm.VirtualMachineProfile;
import com.cloud.vm.VmWorkStart;
import com.cloud.vm.VmWorkStop;
import com.cloud.vm.dao.SecondaryStorageVmDao;
import com.cloud.vm.dao.UserVmDetailsDao;
import com.cloud.vm.dao.VMInstanceDao;
@ -1495,16 +1496,33 @@ public class SecondaryStorageManagerImpl extends ManagerBase implements Secondar
try {
_itMgr.processVmStartWork(vm, ((VmWorkStart)work).getParams(),
user, account, ((VmWorkStart)work).getPlan());
user, account, ((VmWorkStart)work).getPlan());
AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobResult.STATUS_SUCCEEDED, null);
} catch(Exception e) {
s_logger.error("Exception in process VM-start work", e);
String result = SerializerHelper.toObjectSerializedString(e);
AsyncJobExecutionContext.getCurrentExecutionContext().completeAsyncJob(AsyncJobResult.STATUS_FAILED, 0, result);
AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobResult.STATUS_FAILED, result);
}
}
@Override
public void vmWorkStop(VmWork work) {
// TODO
assert(work instanceof VmWorkStop);
SecondaryStorageVmVO vm = findById(work.getVmId());
UserVO user = _entityMgr.findById(UserVO.class, work.getUserId());
AccountVO account = _entityMgr.findById(AccountVO.class, work.getAccountId());
try {
_itMgr.processVmStopWork(vm, ((VmWorkStop)work).isForceStop(), user, account);
AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobResult.STATUS_SUCCEEDED, null);
} catch(Exception e) {
s_logger.error("Exception in process VM-stop work", e);
String result = SerializerHelper.toObjectSerializedString(e);
AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobResult.STATUS_FAILED, result);
}
}
}

View File

@ -208,4 +208,7 @@ public interface VirtualMachineManager extends Manager {
//
<T extends VMInstanceVO> T processVmStartWork(T vm, Map<VirtualMachineProfile.Param, Object> params, User caller, Account account, DeploymentPlan planToDeploy)
throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException;
<T extends VMInstanceVO> boolean processVmStopWork(T vm, boolean forced, User user, Account account)
throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException;
}

View File

@ -694,7 +694,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final long jobId = workJob.getId();
// TODO : will refactor to fully-asynchronized way in the future
//
// TODO : this will be replaced with fully-asynchronized way later so that we don't need
// to wait here. The reason we do it synchronized here is that callers of advanceStart is expecting
// synchronized semantics
//
//
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
_jobMgr.waitAndCheck(
new String[] { TopicConstants.VM_POWER_STATE, TopicConstants.JOB_STATE },
3000L, 600000L, new Predicate() {
@ -712,6 +718,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
return false;
}
});
AsyncJobExecutionContext.getCurrentExecutionContext().disjoinJob(jobId);
return vm;
}
@ -719,9 +726,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
@Override
public <T extends VMInstanceVO> T processVmStartWork(T vm, Map<VirtualMachineProfile.Param, Object> params, User caller, Account account, DeploymentPlan planToDeploy)
throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException {
long vmId = vm.getId();
VirtualMachineGuru<T> vmGuru = getVmGuru(vm);
vm = vmGuru.findById(vm.getId());
Ternary<T, ReservationContext, VmWorkJobVO> start = changeToStartState(vmGuru, vm, caller, account);
@ -984,283 +989,6 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
return startedVm;
}
/*
@Override
public <T extends VMInstanceVO> T advanceStart(T vm, Map<VirtualMachineProfile.Param, Object> params, User caller, Account account, DeploymentPlan planToDeploy)
throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException {
long vmId = vm.getId();
VirtualMachineGuru<T> vmGuru = getVmGuru(vm);
vm = vmGuru.findById(vm.getId());
Ternary<T, ReservationContext, ItWorkVO> start = changeToStartState(vmGuru, vm, caller, account);
if (start == null) {
return vmGuru.findById(vmId);
}
vm = start.first();
ReservationContext ctx = start.second();
ItWorkVO work = start.third();
T startedVm = null;
ServiceOfferingVO offering = _offeringDao.findById(vm.getServiceOfferingId());
VMTemplateVO template = _templateDao.findById(vm.getTemplateId());
if (s_logger.isDebugEnabled()) {
s_logger.debug("Trying to deploy VM, vm has dcId: " + vm.getDataCenterId() + " and podId: " + vm.getPodIdToDeployIn());
}
DataCenterDeployment plan = new DataCenterDeployment(vm.getDataCenterId(), vm.getPodIdToDeployIn(), null, null, null, null, ctx);
if(planToDeploy != null && planToDeploy.getDataCenterId() != 0){
if (s_logger.isDebugEnabled()) {
s_logger.debug("advanceStart: DeploymentPlan is provided, using dcId:" + planToDeploy.getDataCenterId() + ", podId: " + planToDeploy.getPodId() + ", clusterId: "
+ planToDeploy.getClusterId() + ", hostId: " + planToDeploy.getHostId() + ", poolId: " + planToDeploy.getPoolId());
}
plan = new DataCenterDeployment(planToDeploy.getDataCenterId(), planToDeploy.getPodId(), planToDeploy.getClusterId(), planToDeploy.getHostId(), planToDeploy.getPoolId(), planToDeploy.getPhysicalNetworkId(), ctx);
}
HypervisorGuru hvGuru = _hvGuruMgr.getGuru(vm.getHypervisorType());
boolean canRetry = true;
try {
Journal journal = start.second().getJournal();
ExcludeList avoids = null;
if (planToDeploy != null) {
avoids = planToDeploy.getAvoids();
}
if (avoids == null) {
avoids = new ExcludeList();
}
if (s_logger.isDebugEnabled()) {
s_logger.debug("Deploy avoids pods: " + avoids.getPodsToAvoid() + ", clusters: " + avoids.getClustersToAvoid() + ", hosts: " + avoids.getHostsToAvoid());
}
boolean planChangedByVolume = false;
boolean reuseVolume = true;
DataCenterDeployment originalPlan = plan;
int retry = _retry;
while (retry-- != 0) { // It's != so that it can match -1.
if(reuseVolume){
// edit plan if this vm's ROOT volume is in READY state already
List<VolumeVO> vols = _volsDao.findReadyRootVolumesByInstance(vm.getId());
for (VolumeVO vol : vols) {
// make sure if the templateId is unchanged. If it is changed,
// let planner
// reassign pool for the volume even if it ready.
Long volTemplateId = vol.getTemplateId();
if (volTemplateId != null && volTemplateId.longValue() != template.getId()) {
if (s_logger.isDebugEnabled()) {
s_logger.debug(vol + " of " + vm + " is READY, but template ids don't match, let the planner reassign a new pool");
}
continue;
}
StoragePool pool = (StoragePool)dataStoreMgr.getPrimaryDataStore(vol.getPoolId());
if (!pool.isInMaintenance()) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Root volume is ready, need to place VM in volume's cluster");
}
long rootVolDcId = pool.getDataCenterId();
Long rootVolPodId = pool.getPodId();
Long rootVolClusterId = pool.getClusterId();
if (planToDeploy != null && planToDeploy.getDataCenterId() != 0) {
Long clusterIdSpecified = planToDeploy.getClusterId();
if (clusterIdSpecified != null && rootVolClusterId != null) {
if (rootVolClusterId.longValue() != clusterIdSpecified.longValue()) {
// cannot satisfy the plan passed in to the
// planner
if (s_logger.isDebugEnabled()) {
s_logger.debug("Cannot satisfy the deployment plan passed in since the ready Root volume is in different cluster. volume's cluster: " + rootVolClusterId
+ ", cluster specified: " + clusterIdSpecified);
}
throw new ResourceUnavailableException("Root volume is ready in different cluster, Deployment plan provided cannot be satisfied, unable to create a deployment for "
+ vm, Cluster.class, clusterIdSpecified);
}
}
plan = new DataCenterDeployment(planToDeploy.getDataCenterId(), planToDeploy.getPodId(), planToDeploy.getClusterId(), planToDeploy.getHostId(), vol.getPoolId(), null, ctx);
}else{
plan = new DataCenterDeployment(rootVolDcId, rootVolPodId, rootVolClusterId, null, vol.getPoolId(), null, ctx);
if (s_logger.isDebugEnabled()) {
s_logger.debug(vol + " is READY, changing deployment plan to use this pool's dcId: " + rootVolDcId + " , podId: " + rootVolPodId + " , and clusterId: " + rootVolClusterId);
}
planChangedByVolume = true;
}
}
}
}
VirtualMachineProfileImpl<T> vmProfile = new VirtualMachineProfileImpl<T>(vm, template, offering, account, params);
DeployDestination dest = null;
for (DeploymentPlanner planner : _planners) {
if (planner.canHandle(vmProfile, plan, avoids)) {
dest = planner.plan(vmProfile, plan, avoids);
} else {
continue;
}
if (dest != null) {
avoids.addHost(dest.getHost().getId());
journal.record("Deployment found ", vmProfile, dest);
break;
}
}
if (dest == null) {
if (planChangedByVolume) {
plan = originalPlan;
planChangedByVolume = false;
//do not enter volume reuse for next retry, since we want to look for resorces outside the volume's cluster
reuseVolume = false;
continue;
}
throw new InsufficientServerCapacityException("Unable to create a deployment for " + vmProfile, DataCenter.class, plan.getDataCenterId());
}
long destHostId = dest.getHost().getId();
vm.setPodId(dest.getPod().getId());
Long cluster_id = dest.getCluster().getId();
ClusterDetailsVO cluster_detail_cpu = _clusterDetailsDao.findDetail(cluster_id,"cpuOvercommitRatio");
ClusterDetailsVO cluster_detail_ram = _clusterDetailsDao.findDetail(cluster_id,"memoryOvercommitRatio");
vmProfile.setcpuOvercommitRatio(Float.parseFloat(cluster_detail_cpu.getValue()));
vmProfile.setramOvercommitRatio(Float.parseFloat(cluster_detail_ram.getValue()));
try {
if (!changeState(vm, Event.OperationRetry, destHostId, work, Step.Prepare)) {
throw new ConcurrentOperationException("Unable to update the state of the Virtual Machine");
}
} catch (NoTransitionException e1) {
throw new ConcurrentOperationException(e1.getMessage());
}
try {
if (s_logger.isDebugEnabled()) {
s_logger.debug("VM is being created in podId: " + vm.getPodIdToDeployIn());
}
_networkMgr.prepare(vmProfile, dest, ctx);
if (vm.getHypervisorType() != HypervisorType.BareMetal) {
this.volumeMgr.prepare(vmProfile, dest);
}
//since StorageMgr succeeded in volume creation, reuse Volume for further tries until current cluster has capacity
if(!reuseVolume){
reuseVolume = true;
}
Commands cmds = null;
vmGuru.finalizeVirtualMachineProfile(vmProfile, dest, ctx);
VirtualMachineTO vmTO = hvGuru.implement(vmProfile);
cmds = new Commands(OnError.Stop);
cmds.addCommand(new StartCommand(vmTO, dest.getHost()));
vmGuru.finalizeDeployment(cmds, vmProfile, dest, ctx);
work = _workDao.findById(work.getId());
if (work == null || work.getStep() != Step.Prepare) {
throw new ConcurrentOperationException("Work steps have been changed: " + work);
}
_workDao.updateStep(work, Step.Starting);
_agentMgr.send(destHostId, cmds);
_workDao.updateStep(work, Step.Started);
StartAnswer startAnswer = cmds.getAnswer(StartAnswer.class);
if (startAnswer != null && startAnswer.getResult()) {
String host_guid = startAnswer.getHost_guid();
if( host_guid != null ) {
HostVO finalHost = _resourceMgr.findHostByGuid(host_guid);
if (finalHost == null ) {
throw new CloudRuntimeException("Host Guid " + host_guid + " doesn't exist in DB, something wrong here");
}
destHostId = finalHost.getId();
}
if (vmGuru.finalizeStart(vmProfile, destHostId, cmds, ctx)) {
if (!changeState(vm, Event.OperationSucceeded, destHostId, work, Step.Done)) {
throw new ConcurrentOperationException("Unable to transition to a new state.");
}
startedVm = vm;
if (s_logger.isDebugEnabled()) {
s_logger.debug("Start completed for VM " + vm);
}
return startedVm;
} else {
if (s_logger.isDebugEnabled()) {
s_logger.info("The guru did not like the answers so stopping " + vm);
}
StopCommand cmd = new StopCommand(vm);
StopAnswer answer = (StopAnswer) _agentMgr.easySend(destHostId, cmd);
if (answer == null || !answer.getResult()) {
s_logger.warn("Unable to stop " + vm + " due to " + (answer != null ? answer.getDetails() : "no answers"));
_haMgr.scheduleStop(vm, destHostId, WorkType.ForceStop);
throw new ExecutionException("Unable to stop " + vm + " so we are unable to retry the start operation");
}
throw new ExecutionException("Unable to start " + vm + " due to error in finalizeStart, not retrying");
}
}
s_logger.info("Unable to start VM on " + dest.getHost() + " due to " + (startAnswer == null ? " no start answer" : startAnswer.getDetails()));
} catch (OperationTimedoutException e) {
s_logger.debug("Unable to send the start command to host " + dest.getHost());
if (e.isActive()) {
_haMgr.scheduleStop(vm, destHostId, WorkType.CheckStop);
}
canRetry = false;
throw new AgentUnavailableException("Unable to start " + vm.getHostName(), destHostId, e);
} catch (ResourceUnavailableException e) {
s_logger.info("Unable to contact resource.", e);
if (!avoids.add(e)) {
if (e.getScope() == Volume.class || e.getScope() == Nic.class) {
throw e;
} else {
s_logger.warn("unexpected ResourceUnavailableException : " + e.getScope().getName(), e);
throw e;
}
}
} catch (InsufficientCapacityException e) {
s_logger.info("Insufficient capacity ", e);
if (!avoids.add(e)) {
if (e.getScope() == Volume.class || e.getScope() == Nic.class) {
throw e;
} else {
s_logger.warn("unexpected InsufficientCapacityException : " + e.getScope().getName(), e);
}
}
} catch (Exception e) {
s_logger.error("Failed to start instance " + vm, e);
throw new AgentUnavailableException("Unable to start instance due to " + e.getMessage(), destHostId, e);
} finally {
if (startedVm == null && canRetry) {
Step prevStep = work.getStep();
_workDao.updateStep(work, Step.Release);
if (prevStep == Step.Started || prevStep == Step.Starting) {
cleanup(vmGuru, vmProfile, work, Event.OperationFailed, false, caller, account);
} else {
//if step is not starting/started, send cleanup command with force=true
cleanup(vmGuru, vmProfile, work, Event.OperationFailed, true, caller, account);
}
}
}
}
} finally {
if (startedVm == null) {
if (canRetry) {
try {
changeState(vm, Event.OperationFailed, null, work, Step.Done);
} catch (NoTransitionException e) {
throw new ConcurrentOperationException(e.getMessage());
}
}
}
}
return startedVm;
}
*/
@Override
public <T extends VMInstanceVO> boolean stop(T vm, User user, Account account) throws ResourceUnavailableException {
try {
@ -1360,11 +1088,87 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
}
@Override
public <T extends VMInstanceVO> boolean advanceStop(T vm, boolean forced, User user, Account account) throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException {
// ???
public <T extends VMInstanceVO> boolean advanceStop(final T vm, boolean forced, User user, Account account) throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException {
VmWorkJobVO workJob = null;
Transaction txn = Transaction.currentTxn();
try {
txn.start();
_vmDao.lockRow(vm.getId(), true);
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
VirtualMachine.Type.Instance, vm.getId(), VmWorkConstants.VM_WORK_STOP);
if(pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
assert(pendingWorkJobs.size() == 1);
workJob = pendingWorkJobs.get(0);
} else {
workJob = new VmWorkJobVO();
workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
workJob.setCmd(VmWorkConstants.VM_WORK_STOP);
workJob.setAccountId(account.getId());
workJob.setUserId(user.getId());
workJob.setStep(VmWorkJobVO.Step.Prepare);
workJob.setVmType(vm.getType());
workJob.setVmInstanceId(vm.getId());
// save work context info (there are some duplications)
VmWorkStop workInfo = new VmWorkStop();
workInfo.setAccountId(account.getId());
workInfo.setUserId(user.getId());
workInfo.setVmId(vm.getId());
workInfo.setForceStop(forced);
workJob.setCmdInfo(ApiSerializerHelper.toSerializedString(workInfo));
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
}
txn.commit();
} catch(Throwable e) {
s_logger.error("Unexpected exception", e);
txn.rollback();
throw new ConcurrentOperationException("Unhandled exception, converted to ConcurrentOperationException");
}
final long jobId = workJob.getId();
//
// TODO : this will be replaced with fully-asynchronized way later so that we don't need
// to wait here. The reason we do it synchronized here is that callers of advanceStart is expecting
// synchronized semantics
//
//
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
_jobMgr.waitAndCheck(
new String[] { TopicConstants.VM_POWER_STATE, TopicConstants.JOB_STATE },
3000L, 600000L, new Predicate() {
@Override
public boolean checkCondition() {
VMInstanceVO instance = _vmDao.findById(vm.getId());
if(instance.getPowerState() == VirtualMachine.PowerState.PowerOff)
return true;
VmWorkJobVO workJob = _workJobDao.findById(jobId);
if(workJob.getStatus() != AsyncJobResult.STATUS_IN_PROGRESS)
return true;
return false;
}
});
try {
AsyncJobExecutionContext.getCurrentExecutionContext().disjoinJob(jobId);
} catch(Exception e) {
s_logger.error("Unexpected exception", e);
return false;
}
return true;
}
@Override
public <T extends VMInstanceVO> boolean processVmStopWork(T vm, boolean forced, User user, Account account) throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException {
VmWorkJobVO work = _workJobDao.findById(AsyncJobExecutionContext.getCurrentExecutionContext().getJob().getId());

View File

@ -0,0 +1,34 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.vm;
public class VmWorkStop extends VmWork {
private boolean forceStop;
public VmWorkStop() {
forceStop = false;
}
public void setForceStop(boolean value) {
forceStop = value;
}
public boolean isForceStop() {
return forceStop;
}
}

View File

@ -396,4 +396,11 @@ public class VmWorkMockVirtualMachineManagerImpl implements VirtualMachineManage
return vm;
}
@Override
public <T extends VMInstanceVO> boolean processVmStopWork(T vm, boolean forced, User user, Account account)
throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException {
return true;
}
}