From 3c780a500d6c4f9e3bf64fbbb024b3eecd801185 Mon Sep 17 00:00:00 2001 From: Kelven Yang Date: Tue, 7 May 2013 17:03:04 -0700 Subject: [PATCH] Start and stop VM now works for CPVM and SSVM with new VMSync --- client/tomcatconf/applicationContext.xml.in | 1 + .../cloud/async/AsyncJobExecutionContext.java | 51 ++- .../src/com/cloud/async/AsyncJobManager.java | 34 +- .../com/cloud/async/AsyncJobManagerImpl.java | 22 ++ .../consoleproxy/ConsoleProxyManagerImpl.java | 22 +- .../SecondaryStorageManagerImpl.java | 24 +- .../com/cloud/vm/VirtualMachineManager.java | 3 + .../cloud/vm/VirtualMachineManagerImpl.java | 370 ++++-------------- server/src/com/cloud/vm/VmWorkStop.java | 34 ++ .../VmWorkMockVirtualMachineManagerImpl.java | 7 + 10 files changed, 264 insertions(+), 304 deletions(-) create mode 100644 server/src/com/cloud/vm/VmWorkStop.java diff --git a/client/tomcatconf/applicationContext.xml.in b/client/tomcatconf/applicationContext.xml.in index 2c2031f8d09..fa8ca6f717a 100644 --- a/client/tomcatconf/applicationContext.xml.in +++ b/client/tomcatconf/applicationContext.xml.in @@ -794,6 +794,7 @@ + diff --git a/server/src/com/cloud/async/AsyncJobExecutionContext.java b/server/src/com/cloud/async/AsyncJobExecutionContext.java index c02797512cc..9697bed1232 100644 --- a/server/src/com/cloud/async/AsyncJobExecutionContext.java +++ b/server/src/com/cloud/async/AsyncJobExecutionContext.java @@ -18,10 +18,17 @@ package com.cloud.async; import javax.inject.Inject; +import com.cloud.async.dao.AsyncJobJoinMapDao; +import com.cloud.exception.ConcurrentOperationException; +import com.cloud.exception.InsufficientCapacityException; +import com.cloud.exception.ResourceUnavailableException; +import com.cloud.serializer.SerializerHelper; + public class AsyncJobExecutionContext { private AsyncJob _job; - @Inject private AsyncJobManager _jobMgr; + @Inject private AsyncJobManager _jobMgr; + @Inject private AsyncJobJoinMapDao _joinMapDao; private static ThreadLocal s_currentExectionContext = new ThreadLocal(); @@ -65,6 +72,48 @@ public class AsyncJobExecutionContext { _jobMgr.logJobJournal(_job.getId(), journalType, journalText, journalObjJson); } + public void joinJob(long joinJobId) { + assert(_job != null); + _jobMgr.joinJob(_job.getId(), joinJobId); + } + + // + // check failure exception before we disjoin the worker job + // TODO : it is ugly and this will become unnecessary after we switch to full-async mode + // + public void disjoinJob(long joinedJobId) throws InsufficientCapacityException, + ConcurrentOperationException, ResourceUnavailableException { + assert(_job != null); + + AsyncJobJoinMapVO record = _joinMapDao.getJoinRecord(_job.getId(), joinedJobId); + if(record.getJoinStatus() == AsyncJobResult.STATUS_FAILED && record.getJoinResult() != null) { + Object exception = SerializerHelper.fromObjectSerializedString(record.getJoinResult()); + if(exception != null && exception instanceof Exception) { + if(exception instanceof InsufficientCapacityException) + throw (InsufficientCapacityException)exception; + else if(exception instanceof ConcurrentOperationException) + throw (ConcurrentOperationException)exception; + else if(exception instanceof ResourceUnavailableException) + throw (ResourceUnavailableException)exception; + else + throw new RuntimeException((Exception)exception); + } + } + + _jobMgr.disjoinJob(_job.getId(), joinedJobId); + } + + public void completeJoin(int joinStatus, String joinResult) { + assert(_job != null); + _jobMgr.completeJoin(_job.getId(), joinStatus, joinResult); + } + + public void completeJobAndJoin(int joinStatus, String joinResult) { + assert(_job != null); + _jobMgr.completeJoin(_job.getId(), joinStatus, joinResult); + _jobMgr.completeAsyncJob(_job.getId(), joinStatus, 0, null); + } + public static AsyncJobExecutionContext getCurrentExecutionContext() { return s_currentExectionContext.get(); } diff --git a/server/src/com/cloud/async/AsyncJobManager.java b/server/src/com/cloud/async/AsyncJobManager.java index 8cafa5fb060..8ca544a1f75 100644 --- a/server/src/com/cloud/async/AsyncJobManager.java +++ b/server/src/com/cloud/async/AsyncJobManager.java @@ -25,25 +25,29 @@ import com.cloud.utils.component.Manager; public interface AsyncJobManager extends Manager { - public AsyncJobVO getAsyncJob(long jobId); + AsyncJobVO getAsyncJob(long jobId); - public List findInstancePendingAsyncJobs(String instanceType, Long accountId); + List findInstancePendingAsyncJobs(String instanceType, Long accountId); - public long submitAsyncJob(AsyncJob job); - public long submitAsyncJob(AsyncJob job, boolean scheduleJobExecutionInContext); - public long submitAsyncJob(AsyncJob job, String syncObjType, long syncObjId); - public AsyncJobResult queryAsyncJobResult(long jobId); + long submitAsyncJob(AsyncJob job); + long submitAsyncJob(AsyncJob job, boolean scheduleJobExecutionInContext); + long submitAsyncJob(AsyncJob job, String syncObjType, long syncObjId); + AsyncJobResult queryAsyncJobResult(long jobId); - public void completeAsyncJob(long jobId, int jobStatus, int resultCode, Object resultObject); - public void updateAsyncJobStatus(long jobId, int processStatus, Object resultObject); - public void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId); - public void logJobJournal(long jobId, AsyncJob.JournalType journalType, String + void completeAsyncJob(long jobId, int jobStatus, int resultCode, Object resultObject); + void updateAsyncJobStatus(long jobId, int processStatus, Object resultObject); + void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId); + void logJobJournal(long jobId, AsyncJob.JournalType journalType, String journalText, String journalObjJson); - - public void releaseSyncSource(); - public void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId, long queueSizeLimit); - public boolean waitAndCheck(String[] wakupSubjects, long checkIntervalInMilliSeconds, + void joinJob(long jobId, long joinJobId); + void disjoinJob(long jobId, long joinedJobId); + void completeJoin(long joinJobId, int joinStatus, String joinResult); + + void releaseSyncSource(); + void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId, long queueSizeLimit); + + boolean waitAndCheck(String[] wakupSubjects, long checkIntervalInMilliSeconds, long timeoutInMiliseconds, Predicate predicate); /** @@ -51,5 +55,5 @@ public interface AsyncJobManager extends Manager { * @param cmd the command that specifies the job id * @return an async-call result object */ - public AsyncJob queryAsyncJobResult(QueryAsyncJobResultCmd cmd); + AsyncJob queryAsyncJobResult(QueryAsyncJobResultCmd cmd); } diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java index 468782ef2f9..57cca06d0e4 100644 --- a/server/src/com/cloud/async/AsyncJobManagerImpl.java +++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java @@ -38,11 +38,15 @@ import org.apache.cloudstack.api.command.user.job.QueryAsyncJobResultCmd; import org.apache.cloudstack.api.response.ExceptionResponse; import org.apache.cloudstack.framework.messagebus.MessageBus; import org.apache.cloudstack.framework.messagebus.MessageDetector; +import org.apache.cloudstack.framework.messagebus.PublishScope; +import org.apache.cloudstack.messagebus.TopicConstants; import org.apache.log4j.Logger; import org.apache.log4j.NDC; import com.cloud.api.ApiSerializerHelper; +import com.cloud.api.query.dao.AsyncJobJoinDao; import com.cloud.async.dao.AsyncJobDao; +import com.cloud.async.dao.AsyncJobJoinMapDao; import com.cloud.async.dao.AsyncJobJournalDao; import com.cloud.cluster.ClusterManager; import com.cloud.cluster.ClusterManagerListener; @@ -86,6 +90,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, @Inject private AsyncJobDao _jobDao; @Inject private AsyncJobJournalDao _journalDao; @Inject private ConfigurationDao _configDao; + @Inject private AsyncJobJoinMapDao _joinMapDao; @Inject private List _jobDispatchers; @Inject private MessageBus _messageBus; @@ -216,6 +221,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, job.setLastUpdated(DateUtil.currentGMTTime()); _jobDao.update(jobId, job); txn.commit(); + + _messageBus.publish(null, TopicConstants.JOB_STATE, PublishScope.GLOBAL, (Long)jobId); } catch(Exception e) { s_logger.error("Unexpected exception while completing async job-" + jobId, e); txn.rollback(); @@ -291,6 +298,21 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, _journalDao.persist(journal); } + @DB + public void joinJob(long jobId, long joinJobId) { + _joinMapDao.joinJob(jobId, joinJobId, this.getMsid()); + } + + @DB + public void disjoinJob(long jobId, long joinedJobId) { + _joinMapDao.disjoinJob(jobId, joinedJobId); + } + + @DB + public void completeJoin(long joinJobId, int joinStatus, String joinResult) { + _joinMapDao.completeJoin(joinJobId, joinStatus, joinResult, this.getMsid()); + } + @Override public void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId, long queueSizeLimit) { if(s_logger.isDebugEnabled()) { diff --git a/server/src/com/cloud/consoleproxy/ConsoleProxyManagerImpl.java b/server/src/com/cloud/consoleproxy/ConsoleProxyManagerImpl.java index a9b62e24a40..3015d40be59 100755 --- a/server/src/com/cloud/consoleproxy/ConsoleProxyManagerImpl.java +++ b/server/src/com/cloud/consoleproxy/ConsoleProxyManagerImpl.java @@ -159,6 +159,7 @@ import com.cloud.vm.VirtualMachineManager; import com.cloud.vm.VirtualMachineName; import com.cloud.vm.VirtualMachineProfile; import com.cloud.vm.VmWorkStart; +import com.cloud.vm.VmWorkStop; import com.cloud.vm.dao.ConsoleProxyDao; import com.cloud.vm.dao.UserVmDetailsDao; import com.cloud.vm.dao.VMInstanceDao; @@ -2052,15 +2053,32 @@ public class ConsoleProxyManagerImpl extends ManagerBase implements ConsoleProxy try { _itMgr.processVmStartWork(vm, ((VmWorkStart)work).getParams(), user, account, ((VmWorkStart)work).getPlan()); + + AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobResult.STATUS_SUCCEEDED, null); } catch(Exception e) { s_logger.error("Exception in process VM-start work", e); String result = SerializerHelper.toObjectSerializedString(e); - AsyncJobExecutionContext.getCurrentExecutionContext().completeAsyncJob(AsyncJobResult.STATUS_FAILED, 0, result); + AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobResult.STATUS_FAILED, result); } } @Override public void vmWorkStop(VmWork work) { - // TODO + assert(work instanceof VmWorkStop); + + ConsoleProxyVO vm = findById(work.getVmId()); + + UserVO user = _entityMgr.findById(UserVO.class, work.getUserId()); + AccountVO account = _entityMgr.findById(AccountVO.class, work.getAccountId()); + + try { + _itMgr.processVmStopWork(vm, ((VmWorkStop)work).isForceStop(), user, account); + + AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobResult.STATUS_SUCCEEDED, null); + } catch(Exception e) { + s_logger.error("Exception in process VM-stop work", e); + String result = SerializerHelper.toObjectSerializedString(e); + AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobResult.STATUS_FAILED, result); + } } } diff --git a/server/src/com/cloud/storage/secondary/SecondaryStorageManagerImpl.java b/server/src/com/cloud/storage/secondary/SecondaryStorageManagerImpl.java index 49d29fc3519..4471781e797 100755 --- a/server/src/com/cloud/storage/secondary/SecondaryStorageManagerImpl.java +++ b/server/src/com/cloud/storage/secondary/SecondaryStorageManagerImpl.java @@ -144,6 +144,7 @@ import com.cloud.vm.VirtualMachineManager; import com.cloud.vm.VirtualMachineName; import com.cloud.vm.VirtualMachineProfile; import com.cloud.vm.VmWorkStart; +import com.cloud.vm.VmWorkStop; import com.cloud.vm.dao.SecondaryStorageVmDao; import com.cloud.vm.dao.UserVmDetailsDao; import com.cloud.vm.dao.VMInstanceDao; @@ -1495,16 +1496,33 @@ public class SecondaryStorageManagerImpl extends ManagerBase implements Secondar try { _itMgr.processVmStartWork(vm, ((VmWorkStart)work).getParams(), - user, account, ((VmWorkStart)work).getPlan()); + user, account, ((VmWorkStart)work).getPlan()); + + AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobResult.STATUS_SUCCEEDED, null); } catch(Exception e) { s_logger.error("Exception in process VM-start work", e); String result = SerializerHelper.toObjectSerializedString(e); - AsyncJobExecutionContext.getCurrentExecutionContext().completeAsyncJob(AsyncJobResult.STATUS_FAILED, 0, result); + AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobResult.STATUS_FAILED, result); } } @Override public void vmWorkStop(VmWork work) { - // TODO + assert(work instanceof VmWorkStop); + + SecondaryStorageVmVO vm = findById(work.getVmId()); + + UserVO user = _entityMgr.findById(UserVO.class, work.getUserId()); + AccountVO account = _entityMgr.findById(AccountVO.class, work.getAccountId()); + + try { + _itMgr.processVmStopWork(vm, ((VmWorkStop)work).isForceStop(), user, account); + + AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobResult.STATUS_SUCCEEDED, null); + } catch(Exception e) { + s_logger.error("Exception in process VM-stop work", e); + String result = SerializerHelper.toObjectSerializedString(e); + AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobResult.STATUS_FAILED, result); + } } } diff --git a/server/src/com/cloud/vm/VirtualMachineManager.java b/server/src/com/cloud/vm/VirtualMachineManager.java index 1032abc58b0..ba8f6cd5409 100644 --- a/server/src/com/cloud/vm/VirtualMachineManager.java +++ b/server/src/com/cloud/vm/VirtualMachineManager.java @@ -208,4 +208,7 @@ public interface VirtualMachineManager extends Manager { // T processVmStartWork(T vm, Map params, User caller, Account account, DeploymentPlan planToDeploy) throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException; + + boolean processVmStopWork(T vm, boolean forced, User user, Account account) + throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException; } diff --git a/server/src/com/cloud/vm/VirtualMachineManagerImpl.java b/server/src/com/cloud/vm/VirtualMachineManagerImpl.java index 555fc90bbd0..058c6c28373 100755 --- a/server/src/com/cloud/vm/VirtualMachineManagerImpl.java +++ b/server/src/com/cloud/vm/VirtualMachineManagerImpl.java @@ -694,7 +694,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final long jobId = workJob.getId(); - // TODO : will refactor to fully-asynchronized way in the future + // + // TODO : this will be replaced with fully-asynchronized way later so that we don't need + // to wait here. The reason we do it synchronized here is that callers of advanceStart is expecting + // synchronized semantics + // + // + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); _jobMgr.waitAndCheck( new String[] { TopicConstants.VM_POWER_STATE, TopicConstants.JOB_STATE }, 3000L, 600000L, new Predicate() { @@ -712,6 +718,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac return false; } }); + AsyncJobExecutionContext.getCurrentExecutionContext().disjoinJob(jobId); return vm; } @@ -719,9 +726,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Override public T processVmStartWork(T vm, Map params, User caller, Account account, DeploymentPlan planToDeploy) throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException { - long vmId = vm.getId(); VirtualMachineGuru vmGuru = getVmGuru(vm); - vm = vmGuru.findById(vm.getId()); Ternary start = changeToStartState(vmGuru, vm, caller, account); @@ -984,283 +989,6 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac return startedVm; } - /* - @Override - public T advanceStart(T vm, Map params, User caller, Account account, DeploymentPlan planToDeploy) - throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException { - long vmId = vm.getId(); - VirtualMachineGuru vmGuru = getVmGuru(vm); - - vm = vmGuru.findById(vm.getId()); - Ternary start = changeToStartState(vmGuru, vm, caller, account); - if (start == null) { - return vmGuru.findById(vmId); - } - - vm = start.first(); - ReservationContext ctx = start.second(); - ItWorkVO work = start.third(); - - T startedVm = null; - ServiceOfferingVO offering = _offeringDao.findById(vm.getServiceOfferingId()); - VMTemplateVO template = _templateDao.findById(vm.getTemplateId()); - - if (s_logger.isDebugEnabled()) { - s_logger.debug("Trying to deploy VM, vm has dcId: " + vm.getDataCenterId() + " and podId: " + vm.getPodIdToDeployIn()); - } - DataCenterDeployment plan = new DataCenterDeployment(vm.getDataCenterId(), vm.getPodIdToDeployIn(), null, null, null, null, ctx); - if(planToDeploy != null && planToDeploy.getDataCenterId() != 0){ - if (s_logger.isDebugEnabled()) { - s_logger.debug("advanceStart: DeploymentPlan is provided, using dcId:" + planToDeploy.getDataCenterId() + ", podId: " + planToDeploy.getPodId() + ", clusterId: " - + planToDeploy.getClusterId() + ", hostId: " + planToDeploy.getHostId() + ", poolId: " + planToDeploy.getPoolId()); - } - plan = new DataCenterDeployment(planToDeploy.getDataCenterId(), planToDeploy.getPodId(), planToDeploy.getClusterId(), planToDeploy.getHostId(), planToDeploy.getPoolId(), planToDeploy.getPhysicalNetworkId(), ctx); - } - - HypervisorGuru hvGuru = _hvGuruMgr.getGuru(vm.getHypervisorType()); - - boolean canRetry = true; - try { - Journal journal = start.second().getJournal(); - - ExcludeList avoids = null; - if (planToDeploy != null) { - avoids = planToDeploy.getAvoids(); - } - if (avoids == null) { - avoids = new ExcludeList(); - } - if (s_logger.isDebugEnabled()) { - s_logger.debug("Deploy avoids pods: " + avoids.getPodsToAvoid() + ", clusters: " + avoids.getClustersToAvoid() + ", hosts: " + avoids.getHostsToAvoid()); - } - - - boolean planChangedByVolume = false; - boolean reuseVolume = true; - DataCenterDeployment originalPlan = plan; - - int retry = _retry; - while (retry-- != 0) { // It's != so that it can match -1. - - if(reuseVolume){ - // edit plan if this vm's ROOT volume is in READY state already - List vols = _volsDao.findReadyRootVolumesByInstance(vm.getId()); - for (VolumeVO vol : vols) { - // make sure if the templateId is unchanged. If it is changed, - // let planner - // reassign pool for the volume even if it ready. - Long volTemplateId = vol.getTemplateId(); - if (volTemplateId != null && volTemplateId.longValue() != template.getId()) { - if (s_logger.isDebugEnabled()) { - s_logger.debug(vol + " of " + vm + " is READY, but template ids don't match, let the planner reassign a new pool"); - } - continue; - } - - StoragePool pool = (StoragePool)dataStoreMgr.getPrimaryDataStore(vol.getPoolId()); - if (!pool.isInMaintenance()) { - if (s_logger.isDebugEnabled()) { - s_logger.debug("Root volume is ready, need to place VM in volume's cluster"); - } - long rootVolDcId = pool.getDataCenterId(); - Long rootVolPodId = pool.getPodId(); - Long rootVolClusterId = pool.getClusterId(); - if (planToDeploy != null && planToDeploy.getDataCenterId() != 0) { - Long clusterIdSpecified = planToDeploy.getClusterId(); - if (clusterIdSpecified != null && rootVolClusterId != null) { - if (rootVolClusterId.longValue() != clusterIdSpecified.longValue()) { - // cannot satisfy the plan passed in to the - // planner - if (s_logger.isDebugEnabled()) { - s_logger.debug("Cannot satisfy the deployment plan passed in since the ready Root volume is in different cluster. volume's cluster: " + rootVolClusterId - + ", cluster specified: " + clusterIdSpecified); - } - throw new ResourceUnavailableException("Root volume is ready in different cluster, Deployment plan provided cannot be satisfied, unable to create a deployment for " - + vm, Cluster.class, clusterIdSpecified); - } - } - plan = new DataCenterDeployment(planToDeploy.getDataCenterId(), planToDeploy.getPodId(), planToDeploy.getClusterId(), planToDeploy.getHostId(), vol.getPoolId(), null, ctx); - }else{ - plan = new DataCenterDeployment(rootVolDcId, rootVolPodId, rootVolClusterId, null, vol.getPoolId(), null, ctx); - if (s_logger.isDebugEnabled()) { - s_logger.debug(vol + " is READY, changing deployment plan to use this pool's dcId: " + rootVolDcId + " , podId: " + rootVolPodId + " , and clusterId: " + rootVolClusterId); - } - planChangedByVolume = true; - } - } - } - } - - VirtualMachineProfileImpl vmProfile = new VirtualMachineProfileImpl(vm, template, offering, account, params); - DeployDestination dest = null; - for (DeploymentPlanner planner : _planners) { - if (planner.canHandle(vmProfile, plan, avoids)) { - dest = planner.plan(vmProfile, plan, avoids); - } else { - continue; - } - if (dest != null) { - avoids.addHost(dest.getHost().getId()); - journal.record("Deployment found ", vmProfile, dest); - break; - } - } - - if (dest == null) { - if (planChangedByVolume) { - plan = originalPlan; - planChangedByVolume = false; - //do not enter volume reuse for next retry, since we want to look for resorces outside the volume's cluster - reuseVolume = false; - continue; - } - throw new InsufficientServerCapacityException("Unable to create a deployment for " + vmProfile, DataCenter.class, plan.getDataCenterId()); - } - - long destHostId = dest.getHost().getId(); - vm.setPodId(dest.getPod().getId()); - Long cluster_id = dest.getCluster().getId(); - ClusterDetailsVO cluster_detail_cpu = _clusterDetailsDao.findDetail(cluster_id,"cpuOvercommitRatio"); - ClusterDetailsVO cluster_detail_ram = _clusterDetailsDao.findDetail(cluster_id,"memoryOvercommitRatio"); - vmProfile.setcpuOvercommitRatio(Float.parseFloat(cluster_detail_cpu.getValue())); - vmProfile.setramOvercommitRatio(Float.parseFloat(cluster_detail_ram.getValue())); - - try { - if (!changeState(vm, Event.OperationRetry, destHostId, work, Step.Prepare)) { - throw new ConcurrentOperationException("Unable to update the state of the Virtual Machine"); - } - } catch (NoTransitionException e1) { - throw new ConcurrentOperationException(e1.getMessage()); - } - - try { - if (s_logger.isDebugEnabled()) { - s_logger.debug("VM is being created in podId: " + vm.getPodIdToDeployIn()); - } - _networkMgr.prepare(vmProfile, dest, ctx); - if (vm.getHypervisorType() != HypervisorType.BareMetal) { - this.volumeMgr.prepare(vmProfile, dest); - } - //since StorageMgr succeeded in volume creation, reuse Volume for further tries until current cluster has capacity - if(!reuseVolume){ - reuseVolume = true; - } - - Commands cmds = null; - vmGuru.finalizeVirtualMachineProfile(vmProfile, dest, ctx); - - VirtualMachineTO vmTO = hvGuru.implement(vmProfile); - - cmds = new Commands(OnError.Stop); - cmds.addCommand(new StartCommand(vmTO, dest.getHost())); - - vmGuru.finalizeDeployment(cmds, vmProfile, dest, ctx); - - - work = _workDao.findById(work.getId()); - if (work == null || work.getStep() != Step.Prepare) { - throw new ConcurrentOperationException("Work steps have been changed: " + work); - } - _workDao.updateStep(work, Step.Starting); - - _agentMgr.send(destHostId, cmds); - - _workDao.updateStep(work, Step.Started); - - - StartAnswer startAnswer = cmds.getAnswer(StartAnswer.class); - if (startAnswer != null && startAnswer.getResult()) { - String host_guid = startAnswer.getHost_guid(); - if( host_guid != null ) { - HostVO finalHost = _resourceMgr.findHostByGuid(host_guid); - if (finalHost == null ) { - throw new CloudRuntimeException("Host Guid " + host_guid + " doesn't exist in DB, something wrong here"); - } - destHostId = finalHost.getId(); - } - if (vmGuru.finalizeStart(vmProfile, destHostId, cmds, ctx)) { - if (!changeState(vm, Event.OperationSucceeded, destHostId, work, Step.Done)) { - throw new ConcurrentOperationException("Unable to transition to a new state."); - } - startedVm = vm; - if (s_logger.isDebugEnabled()) { - s_logger.debug("Start completed for VM " + vm); - } - return startedVm; - } else { - if (s_logger.isDebugEnabled()) { - s_logger.info("The guru did not like the answers so stopping " + vm); - } - - StopCommand cmd = new StopCommand(vm); - StopAnswer answer = (StopAnswer) _agentMgr.easySend(destHostId, cmd); - if (answer == null || !answer.getResult()) { - s_logger.warn("Unable to stop " + vm + " due to " + (answer != null ? answer.getDetails() : "no answers")); - _haMgr.scheduleStop(vm, destHostId, WorkType.ForceStop); - throw new ExecutionException("Unable to stop " + vm + " so we are unable to retry the start operation"); - } - throw new ExecutionException("Unable to start " + vm + " due to error in finalizeStart, not retrying"); - } - } - s_logger.info("Unable to start VM on " + dest.getHost() + " due to " + (startAnswer == null ? " no start answer" : startAnswer.getDetails())); - - } catch (OperationTimedoutException e) { - s_logger.debug("Unable to send the start command to host " + dest.getHost()); - if (e.isActive()) { - _haMgr.scheduleStop(vm, destHostId, WorkType.CheckStop); - } - canRetry = false; - throw new AgentUnavailableException("Unable to start " + vm.getHostName(), destHostId, e); - } catch (ResourceUnavailableException e) { - s_logger.info("Unable to contact resource.", e); - if (!avoids.add(e)) { - if (e.getScope() == Volume.class || e.getScope() == Nic.class) { - throw e; - } else { - s_logger.warn("unexpected ResourceUnavailableException : " + e.getScope().getName(), e); - throw e; - } - } - } catch (InsufficientCapacityException e) { - s_logger.info("Insufficient capacity ", e); - if (!avoids.add(e)) { - if (e.getScope() == Volume.class || e.getScope() == Nic.class) { - throw e; - } else { - s_logger.warn("unexpected InsufficientCapacityException : " + e.getScope().getName(), e); - } - } - } catch (Exception e) { - s_logger.error("Failed to start instance " + vm, e); - throw new AgentUnavailableException("Unable to start instance due to " + e.getMessage(), destHostId, e); - } finally { - if (startedVm == null && canRetry) { - Step prevStep = work.getStep(); - _workDao.updateStep(work, Step.Release); - if (prevStep == Step.Started || prevStep == Step.Starting) { - cleanup(vmGuru, vmProfile, work, Event.OperationFailed, false, caller, account); - } else { - //if step is not starting/started, send cleanup command with force=true - cleanup(vmGuru, vmProfile, work, Event.OperationFailed, true, caller, account); - } - } - } - } - } finally { - if (startedVm == null) { - if (canRetry) { - try { - changeState(vm, Event.OperationFailed, null, work, Step.Done); - } catch (NoTransitionException e) { - throw new ConcurrentOperationException(e.getMessage()); - } - } - } - } - - return startedVm; - } -*/ @Override public boolean stop(T vm, User user, Account account) throws ResourceUnavailableException { try { @@ -1360,11 +1088,87 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } @Override - public boolean advanceStop(T vm, boolean forced, User user, Account account) throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException { - // ??? + public boolean advanceStop(final T vm, boolean forced, User user, Account account) throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException { + VmWorkJobVO workJob = null; + Transaction txn = Transaction.currentTxn(); + try { + txn.start(); + + _vmDao.lockRow(vm.getId(), true); + + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( + VirtualMachine.Type.Instance, vm.getId(), VmWorkConstants.VM_WORK_STOP); + + if(pendingWorkJobs != null && pendingWorkJobs.size() > 0) { + assert(pendingWorkJobs.size() == 1); + workJob = pendingWorkJobs.get(0); + } else { + workJob = new VmWorkJobVO(); + + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkConstants.VM_WORK_STOP); + + workJob.setAccountId(account.getId()); + workJob.setUserId(user.getId()); + workJob.setStep(VmWorkJobVO.Step.Prepare); + workJob.setVmType(vm.getType()); + workJob.setVmInstanceId(vm.getId()); + + // save work context info (there are some duplications) + VmWorkStop workInfo = new VmWorkStop(); + workInfo.setAccountId(account.getId()); + workInfo.setUserId(user.getId()); + workInfo.setVmId(vm.getId()); + workInfo.setForceStop(forced); + workJob.setCmdInfo(ApiSerializerHelper.toSerializedString(workInfo)); + + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); + } + + txn.commit(); + } catch(Throwable e) { + s_logger.error("Unexpected exception", e); + txn.rollback(); + throw new ConcurrentOperationException("Unhandled exception, converted to ConcurrentOperationException"); + } + + final long jobId = workJob.getId(); + + // + // TODO : this will be replaced with fully-asynchronized way later so that we don't need + // to wait here. The reason we do it synchronized here is that callers of advanceStart is expecting + // synchronized semantics + // + // + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + _jobMgr.waitAndCheck( + new String[] { TopicConstants.VM_POWER_STATE, TopicConstants.JOB_STATE }, + 3000L, 600000L, new Predicate() { + + @Override + public boolean checkCondition() { + VMInstanceVO instance = _vmDao.findById(vm.getId()); + if(instance.getPowerState() == VirtualMachine.PowerState.PowerOff) + return true; + + VmWorkJobVO workJob = _workJobDao.findById(jobId); + if(workJob.getStatus() != AsyncJobResult.STATUS_IN_PROGRESS) + return true; + + return false; + } + }); + + try { + AsyncJobExecutionContext.getCurrentExecutionContext().disjoinJob(jobId); + } catch(Exception e) { + s_logger.error("Unexpected exception", e); + return false; + } return true; } - + + @Override public boolean processVmStopWork(T vm, boolean forced, User user, Account account) throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException { VmWorkJobVO work = _workJobDao.findById(AsyncJobExecutionContext.getCurrentExecutionContext().getJob().getId()); diff --git a/server/src/com/cloud/vm/VmWorkStop.java b/server/src/com/cloud/vm/VmWorkStop.java new file mode 100644 index 00000000000..d6d226fb81f --- /dev/null +++ b/server/src/com/cloud/vm/VmWorkStop.java @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.vm; + +public class VmWorkStop extends VmWork { + + private boolean forceStop; + + public VmWorkStop() { + forceStop = false; + } + + public void setForceStop(boolean value) { + forceStop = value; + } + + public boolean isForceStop() { + return forceStop; + } +} diff --git a/server/test/com/cloud/vm/VmWorkMockVirtualMachineManagerImpl.java b/server/test/com/cloud/vm/VmWorkMockVirtualMachineManagerImpl.java index 5cef9c27de1..d77ebd148ee 100644 --- a/server/test/com/cloud/vm/VmWorkMockVirtualMachineManagerImpl.java +++ b/server/test/com/cloud/vm/VmWorkMockVirtualMachineManagerImpl.java @@ -396,4 +396,11 @@ public class VmWorkMockVirtualMachineManagerImpl implements VirtualMachineManage return vm; } + + @Override + public boolean processVmStopWork(T vm, boolean forced, User user, Account account) + throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException { + return true; + } + }