diff --git a/client/tomcatconf/applicationContext.xml.in b/client/tomcatconf/applicationContext.xml.in
index 2c2031f8d09..fa8ca6f717a 100644
--- a/client/tomcatconf/applicationContext.xml.in
+++ b/client/tomcatconf/applicationContext.xml.in
@@ -794,6 +794,7 @@
+
diff --git a/server/src/com/cloud/async/AsyncJobExecutionContext.java b/server/src/com/cloud/async/AsyncJobExecutionContext.java
index c02797512cc..9697bed1232 100644
--- a/server/src/com/cloud/async/AsyncJobExecutionContext.java
+++ b/server/src/com/cloud/async/AsyncJobExecutionContext.java
@@ -18,10 +18,17 @@ package com.cloud.async;
import javax.inject.Inject;
+import com.cloud.async.dao.AsyncJobJoinMapDao;
+import com.cloud.exception.ConcurrentOperationException;
+import com.cloud.exception.InsufficientCapacityException;
+import com.cloud.exception.ResourceUnavailableException;
+import com.cloud.serializer.SerializerHelper;
+
public class AsyncJobExecutionContext {
private AsyncJob _job;
- @Inject private AsyncJobManager _jobMgr;
+ @Inject private AsyncJobManager _jobMgr;
+ @Inject private AsyncJobJoinMapDao _joinMapDao;
private static ThreadLocal s_currentExectionContext = new ThreadLocal();
@@ -65,6 +72,48 @@ public class AsyncJobExecutionContext {
_jobMgr.logJobJournal(_job.getId(), journalType, journalText, journalObjJson);
}
+ public void joinJob(long joinJobId) {
+ assert(_job != null);
+ _jobMgr.joinJob(_job.getId(), joinJobId);
+ }
+
+ //
+ // check failure exception before we disjoin the worker job
+ // TODO : it is ugly and this will become unnecessary after we switch to full-async mode
+ //
+ public void disjoinJob(long joinedJobId) throws InsufficientCapacityException,
+ ConcurrentOperationException, ResourceUnavailableException {
+ assert(_job != null);
+
+ AsyncJobJoinMapVO record = _joinMapDao.getJoinRecord(_job.getId(), joinedJobId);
+ if(record.getJoinStatus() == AsyncJobResult.STATUS_FAILED && record.getJoinResult() != null) {
+ Object exception = SerializerHelper.fromObjectSerializedString(record.getJoinResult());
+ if(exception != null && exception instanceof Exception) {
+ if(exception instanceof InsufficientCapacityException)
+ throw (InsufficientCapacityException)exception;
+ else if(exception instanceof ConcurrentOperationException)
+ throw (ConcurrentOperationException)exception;
+ else if(exception instanceof ResourceUnavailableException)
+ throw (ResourceUnavailableException)exception;
+ else
+ throw new RuntimeException((Exception)exception);
+ }
+ }
+
+ _jobMgr.disjoinJob(_job.getId(), joinedJobId);
+ }
+
+ public void completeJoin(int joinStatus, String joinResult) {
+ assert(_job != null);
+ _jobMgr.completeJoin(_job.getId(), joinStatus, joinResult);
+ }
+
+ public void completeJobAndJoin(int joinStatus, String joinResult) {
+ assert(_job != null);
+ _jobMgr.completeJoin(_job.getId(), joinStatus, joinResult);
+ _jobMgr.completeAsyncJob(_job.getId(), joinStatus, 0, null);
+ }
+
public static AsyncJobExecutionContext getCurrentExecutionContext() {
return s_currentExectionContext.get();
}
diff --git a/server/src/com/cloud/async/AsyncJobManager.java b/server/src/com/cloud/async/AsyncJobManager.java
index 8cafa5fb060..8ca544a1f75 100644
--- a/server/src/com/cloud/async/AsyncJobManager.java
+++ b/server/src/com/cloud/async/AsyncJobManager.java
@@ -25,25 +25,29 @@ import com.cloud.utils.component.Manager;
public interface AsyncJobManager extends Manager {
- public AsyncJobVO getAsyncJob(long jobId);
+ AsyncJobVO getAsyncJob(long jobId);
- public List extends AsyncJob> findInstancePendingAsyncJobs(String instanceType, Long accountId);
+ List extends AsyncJob> findInstancePendingAsyncJobs(String instanceType, Long accountId);
- public long submitAsyncJob(AsyncJob job);
- public long submitAsyncJob(AsyncJob job, boolean scheduleJobExecutionInContext);
- public long submitAsyncJob(AsyncJob job, String syncObjType, long syncObjId);
- public AsyncJobResult queryAsyncJobResult(long jobId);
+ long submitAsyncJob(AsyncJob job);
+ long submitAsyncJob(AsyncJob job, boolean scheduleJobExecutionInContext);
+ long submitAsyncJob(AsyncJob job, String syncObjType, long syncObjId);
+ AsyncJobResult queryAsyncJobResult(long jobId);
- public void completeAsyncJob(long jobId, int jobStatus, int resultCode, Object resultObject);
- public void updateAsyncJobStatus(long jobId, int processStatus, Object resultObject);
- public void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId);
- public void logJobJournal(long jobId, AsyncJob.JournalType journalType, String
+ void completeAsyncJob(long jobId, int jobStatus, int resultCode, Object resultObject);
+ void updateAsyncJobStatus(long jobId, int processStatus, Object resultObject);
+ void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId);
+ void logJobJournal(long jobId, AsyncJob.JournalType journalType, String
journalText, String journalObjJson);
-
- public void releaseSyncSource();
- public void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId, long queueSizeLimit);
- public boolean waitAndCheck(String[] wakupSubjects, long checkIntervalInMilliSeconds,
+ void joinJob(long jobId, long joinJobId);
+ void disjoinJob(long jobId, long joinedJobId);
+ void completeJoin(long joinJobId, int joinStatus, String joinResult);
+
+ void releaseSyncSource();
+ void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId, long queueSizeLimit);
+
+ boolean waitAndCheck(String[] wakupSubjects, long checkIntervalInMilliSeconds,
long timeoutInMiliseconds, Predicate predicate);
/**
@@ -51,5 +55,5 @@ public interface AsyncJobManager extends Manager {
* @param cmd the command that specifies the job id
* @return an async-call result object
*/
- public AsyncJob queryAsyncJobResult(QueryAsyncJobResultCmd cmd);
+ AsyncJob queryAsyncJobResult(QueryAsyncJobResultCmd cmd);
}
diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java
index 468782ef2f9..57cca06d0e4 100644
--- a/server/src/com/cloud/async/AsyncJobManagerImpl.java
+++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java
@@ -38,11 +38,15 @@ import org.apache.cloudstack.api.command.user.job.QueryAsyncJobResultCmd;
import org.apache.cloudstack.api.response.ExceptionResponse;
import org.apache.cloudstack.framework.messagebus.MessageBus;
import org.apache.cloudstack.framework.messagebus.MessageDetector;
+import org.apache.cloudstack.framework.messagebus.PublishScope;
+import org.apache.cloudstack.messagebus.TopicConstants;
import org.apache.log4j.Logger;
import org.apache.log4j.NDC;
import com.cloud.api.ApiSerializerHelper;
+import com.cloud.api.query.dao.AsyncJobJoinDao;
import com.cloud.async.dao.AsyncJobDao;
+import com.cloud.async.dao.AsyncJobJoinMapDao;
import com.cloud.async.dao.AsyncJobJournalDao;
import com.cloud.cluster.ClusterManager;
import com.cloud.cluster.ClusterManagerListener;
@@ -86,6 +90,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
@Inject private AsyncJobDao _jobDao;
@Inject private AsyncJobJournalDao _journalDao;
@Inject private ConfigurationDao _configDao;
+ @Inject private AsyncJobJoinMapDao _joinMapDao;
@Inject private List _jobDispatchers;
@Inject private MessageBus _messageBus;
@@ -216,6 +221,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
job.setLastUpdated(DateUtil.currentGMTTime());
_jobDao.update(jobId, job);
txn.commit();
+
+ _messageBus.publish(null, TopicConstants.JOB_STATE, PublishScope.GLOBAL, (Long)jobId);
} catch(Exception e) {
s_logger.error("Unexpected exception while completing async job-" + jobId, e);
txn.rollback();
@@ -291,6 +298,21 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
_journalDao.persist(journal);
}
+ @DB
+ public void joinJob(long jobId, long joinJobId) {
+ _joinMapDao.joinJob(jobId, joinJobId, this.getMsid());
+ }
+
+ @DB
+ public void disjoinJob(long jobId, long joinedJobId) {
+ _joinMapDao.disjoinJob(jobId, joinedJobId);
+ }
+
+ @DB
+ public void completeJoin(long joinJobId, int joinStatus, String joinResult) {
+ _joinMapDao.completeJoin(joinJobId, joinStatus, joinResult, this.getMsid());
+ }
+
@Override
public void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId, long queueSizeLimit) {
if(s_logger.isDebugEnabled()) {
diff --git a/server/src/com/cloud/consoleproxy/ConsoleProxyManagerImpl.java b/server/src/com/cloud/consoleproxy/ConsoleProxyManagerImpl.java
index a9b62e24a40..3015d40be59 100755
--- a/server/src/com/cloud/consoleproxy/ConsoleProxyManagerImpl.java
+++ b/server/src/com/cloud/consoleproxy/ConsoleProxyManagerImpl.java
@@ -159,6 +159,7 @@ import com.cloud.vm.VirtualMachineManager;
import com.cloud.vm.VirtualMachineName;
import com.cloud.vm.VirtualMachineProfile;
import com.cloud.vm.VmWorkStart;
+import com.cloud.vm.VmWorkStop;
import com.cloud.vm.dao.ConsoleProxyDao;
import com.cloud.vm.dao.UserVmDetailsDao;
import com.cloud.vm.dao.VMInstanceDao;
@@ -2052,15 +2053,32 @@ public class ConsoleProxyManagerImpl extends ManagerBase implements ConsoleProxy
try {
_itMgr.processVmStartWork(vm, ((VmWorkStart)work).getParams(),
user, account, ((VmWorkStart)work).getPlan());
+
+ AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobResult.STATUS_SUCCEEDED, null);
} catch(Exception e) {
s_logger.error("Exception in process VM-start work", e);
String result = SerializerHelper.toObjectSerializedString(e);
- AsyncJobExecutionContext.getCurrentExecutionContext().completeAsyncJob(AsyncJobResult.STATUS_FAILED, 0, result);
+ AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobResult.STATUS_FAILED, result);
}
}
@Override
public void vmWorkStop(VmWork work) {
- // TODO
+ assert(work instanceof VmWorkStop);
+
+ ConsoleProxyVO vm = findById(work.getVmId());
+
+ UserVO user = _entityMgr.findById(UserVO.class, work.getUserId());
+ AccountVO account = _entityMgr.findById(AccountVO.class, work.getAccountId());
+
+ try {
+ _itMgr.processVmStopWork(vm, ((VmWorkStop)work).isForceStop(), user, account);
+
+ AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobResult.STATUS_SUCCEEDED, null);
+ } catch(Exception e) {
+ s_logger.error("Exception in process VM-stop work", e);
+ String result = SerializerHelper.toObjectSerializedString(e);
+ AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobResult.STATUS_FAILED, result);
+ }
}
}
diff --git a/server/src/com/cloud/storage/secondary/SecondaryStorageManagerImpl.java b/server/src/com/cloud/storage/secondary/SecondaryStorageManagerImpl.java
index 49d29fc3519..4471781e797 100755
--- a/server/src/com/cloud/storage/secondary/SecondaryStorageManagerImpl.java
+++ b/server/src/com/cloud/storage/secondary/SecondaryStorageManagerImpl.java
@@ -144,6 +144,7 @@ import com.cloud.vm.VirtualMachineManager;
import com.cloud.vm.VirtualMachineName;
import com.cloud.vm.VirtualMachineProfile;
import com.cloud.vm.VmWorkStart;
+import com.cloud.vm.VmWorkStop;
import com.cloud.vm.dao.SecondaryStorageVmDao;
import com.cloud.vm.dao.UserVmDetailsDao;
import com.cloud.vm.dao.VMInstanceDao;
@@ -1495,16 +1496,33 @@ public class SecondaryStorageManagerImpl extends ManagerBase implements Secondar
try {
_itMgr.processVmStartWork(vm, ((VmWorkStart)work).getParams(),
- user, account, ((VmWorkStart)work).getPlan());
+ user, account, ((VmWorkStart)work).getPlan());
+
+ AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobResult.STATUS_SUCCEEDED, null);
} catch(Exception e) {
s_logger.error("Exception in process VM-start work", e);
String result = SerializerHelper.toObjectSerializedString(e);
- AsyncJobExecutionContext.getCurrentExecutionContext().completeAsyncJob(AsyncJobResult.STATUS_FAILED, 0, result);
+ AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobResult.STATUS_FAILED, result);
}
}
@Override
public void vmWorkStop(VmWork work) {
- // TODO
+ assert(work instanceof VmWorkStop);
+
+ SecondaryStorageVmVO vm = findById(work.getVmId());
+
+ UserVO user = _entityMgr.findById(UserVO.class, work.getUserId());
+ AccountVO account = _entityMgr.findById(AccountVO.class, work.getAccountId());
+
+ try {
+ _itMgr.processVmStopWork(vm, ((VmWorkStop)work).isForceStop(), user, account);
+
+ AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobResult.STATUS_SUCCEEDED, null);
+ } catch(Exception e) {
+ s_logger.error("Exception in process VM-stop work", e);
+ String result = SerializerHelper.toObjectSerializedString(e);
+ AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobResult.STATUS_FAILED, result);
+ }
}
}
diff --git a/server/src/com/cloud/vm/VirtualMachineManager.java b/server/src/com/cloud/vm/VirtualMachineManager.java
index 1032abc58b0..ba8f6cd5409 100644
--- a/server/src/com/cloud/vm/VirtualMachineManager.java
+++ b/server/src/com/cloud/vm/VirtualMachineManager.java
@@ -208,4 +208,7 @@ public interface VirtualMachineManager extends Manager {
//
T processVmStartWork(T vm, Map params, User caller, Account account, DeploymentPlan planToDeploy)
throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException;
+
+ boolean processVmStopWork(T vm, boolean forced, User user, Account account)
+ throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException;
}
diff --git a/server/src/com/cloud/vm/VirtualMachineManagerImpl.java b/server/src/com/cloud/vm/VirtualMachineManagerImpl.java
index 555fc90bbd0..058c6c28373 100755
--- a/server/src/com/cloud/vm/VirtualMachineManagerImpl.java
+++ b/server/src/com/cloud/vm/VirtualMachineManagerImpl.java
@@ -694,7 +694,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final long jobId = workJob.getId();
- // TODO : will refactor to fully-asynchronized way in the future
+ //
+ // TODO : this will be replaced with fully-asynchronized way later so that we don't need
+ // to wait here. The reason we do it synchronized here is that callers of advanceStart is expecting
+ // synchronized semantics
+ //
+ //
+ AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
_jobMgr.waitAndCheck(
new String[] { TopicConstants.VM_POWER_STATE, TopicConstants.JOB_STATE },
3000L, 600000L, new Predicate() {
@@ -712,6 +718,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
return false;
}
});
+ AsyncJobExecutionContext.getCurrentExecutionContext().disjoinJob(jobId);
return vm;
}
@@ -719,9 +726,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
@Override
public T processVmStartWork(T vm, Map params, User caller, Account account, DeploymentPlan planToDeploy)
throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException {
- long vmId = vm.getId();
VirtualMachineGuru vmGuru = getVmGuru(vm);
-
vm = vmGuru.findById(vm.getId());
Ternary start = changeToStartState(vmGuru, vm, caller, account);
@@ -984,283 +989,6 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
return startedVm;
}
- /*
- @Override
- public T advanceStart(T vm, Map params, User caller, Account account, DeploymentPlan planToDeploy)
- throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException {
- long vmId = vm.getId();
- VirtualMachineGuru vmGuru = getVmGuru(vm);
-
- vm = vmGuru.findById(vm.getId());
- Ternary start = changeToStartState(vmGuru, vm, caller, account);
- if (start == null) {
- return vmGuru.findById(vmId);
- }
-
- vm = start.first();
- ReservationContext ctx = start.second();
- ItWorkVO work = start.third();
-
- T startedVm = null;
- ServiceOfferingVO offering = _offeringDao.findById(vm.getServiceOfferingId());
- VMTemplateVO template = _templateDao.findById(vm.getTemplateId());
-
- if (s_logger.isDebugEnabled()) {
- s_logger.debug("Trying to deploy VM, vm has dcId: " + vm.getDataCenterId() + " and podId: " + vm.getPodIdToDeployIn());
- }
- DataCenterDeployment plan = new DataCenterDeployment(vm.getDataCenterId(), vm.getPodIdToDeployIn(), null, null, null, null, ctx);
- if(planToDeploy != null && planToDeploy.getDataCenterId() != 0){
- if (s_logger.isDebugEnabled()) {
- s_logger.debug("advanceStart: DeploymentPlan is provided, using dcId:" + planToDeploy.getDataCenterId() + ", podId: " + planToDeploy.getPodId() + ", clusterId: "
- + planToDeploy.getClusterId() + ", hostId: " + planToDeploy.getHostId() + ", poolId: " + planToDeploy.getPoolId());
- }
- plan = new DataCenterDeployment(planToDeploy.getDataCenterId(), planToDeploy.getPodId(), planToDeploy.getClusterId(), planToDeploy.getHostId(), planToDeploy.getPoolId(), planToDeploy.getPhysicalNetworkId(), ctx);
- }
-
- HypervisorGuru hvGuru = _hvGuruMgr.getGuru(vm.getHypervisorType());
-
- boolean canRetry = true;
- try {
- Journal journal = start.second().getJournal();
-
- ExcludeList avoids = null;
- if (planToDeploy != null) {
- avoids = planToDeploy.getAvoids();
- }
- if (avoids == null) {
- avoids = new ExcludeList();
- }
- if (s_logger.isDebugEnabled()) {
- s_logger.debug("Deploy avoids pods: " + avoids.getPodsToAvoid() + ", clusters: " + avoids.getClustersToAvoid() + ", hosts: " + avoids.getHostsToAvoid());
- }
-
-
- boolean planChangedByVolume = false;
- boolean reuseVolume = true;
- DataCenterDeployment originalPlan = plan;
-
- int retry = _retry;
- while (retry-- != 0) { // It's != so that it can match -1.
-
- if(reuseVolume){
- // edit plan if this vm's ROOT volume is in READY state already
- List vols = _volsDao.findReadyRootVolumesByInstance(vm.getId());
- for (VolumeVO vol : vols) {
- // make sure if the templateId is unchanged. If it is changed,
- // let planner
- // reassign pool for the volume even if it ready.
- Long volTemplateId = vol.getTemplateId();
- if (volTemplateId != null && volTemplateId.longValue() != template.getId()) {
- if (s_logger.isDebugEnabled()) {
- s_logger.debug(vol + " of " + vm + " is READY, but template ids don't match, let the planner reassign a new pool");
- }
- continue;
- }
-
- StoragePool pool = (StoragePool)dataStoreMgr.getPrimaryDataStore(vol.getPoolId());
- if (!pool.isInMaintenance()) {
- if (s_logger.isDebugEnabled()) {
- s_logger.debug("Root volume is ready, need to place VM in volume's cluster");
- }
- long rootVolDcId = pool.getDataCenterId();
- Long rootVolPodId = pool.getPodId();
- Long rootVolClusterId = pool.getClusterId();
- if (planToDeploy != null && planToDeploy.getDataCenterId() != 0) {
- Long clusterIdSpecified = planToDeploy.getClusterId();
- if (clusterIdSpecified != null && rootVolClusterId != null) {
- if (rootVolClusterId.longValue() != clusterIdSpecified.longValue()) {
- // cannot satisfy the plan passed in to the
- // planner
- if (s_logger.isDebugEnabled()) {
- s_logger.debug("Cannot satisfy the deployment plan passed in since the ready Root volume is in different cluster. volume's cluster: " + rootVolClusterId
- + ", cluster specified: " + clusterIdSpecified);
- }
- throw new ResourceUnavailableException("Root volume is ready in different cluster, Deployment plan provided cannot be satisfied, unable to create a deployment for "
- + vm, Cluster.class, clusterIdSpecified);
- }
- }
- plan = new DataCenterDeployment(planToDeploy.getDataCenterId(), planToDeploy.getPodId(), planToDeploy.getClusterId(), planToDeploy.getHostId(), vol.getPoolId(), null, ctx);
- }else{
- plan = new DataCenterDeployment(rootVolDcId, rootVolPodId, rootVolClusterId, null, vol.getPoolId(), null, ctx);
- if (s_logger.isDebugEnabled()) {
- s_logger.debug(vol + " is READY, changing deployment plan to use this pool's dcId: " + rootVolDcId + " , podId: " + rootVolPodId + " , and clusterId: " + rootVolClusterId);
- }
- planChangedByVolume = true;
- }
- }
- }
- }
-
- VirtualMachineProfileImpl vmProfile = new VirtualMachineProfileImpl(vm, template, offering, account, params);
- DeployDestination dest = null;
- for (DeploymentPlanner planner : _planners) {
- if (planner.canHandle(vmProfile, plan, avoids)) {
- dest = planner.plan(vmProfile, plan, avoids);
- } else {
- continue;
- }
- if (dest != null) {
- avoids.addHost(dest.getHost().getId());
- journal.record("Deployment found ", vmProfile, dest);
- break;
- }
- }
-
- if (dest == null) {
- if (planChangedByVolume) {
- plan = originalPlan;
- planChangedByVolume = false;
- //do not enter volume reuse for next retry, since we want to look for resorces outside the volume's cluster
- reuseVolume = false;
- continue;
- }
- throw new InsufficientServerCapacityException("Unable to create a deployment for " + vmProfile, DataCenter.class, plan.getDataCenterId());
- }
-
- long destHostId = dest.getHost().getId();
- vm.setPodId(dest.getPod().getId());
- Long cluster_id = dest.getCluster().getId();
- ClusterDetailsVO cluster_detail_cpu = _clusterDetailsDao.findDetail(cluster_id,"cpuOvercommitRatio");
- ClusterDetailsVO cluster_detail_ram = _clusterDetailsDao.findDetail(cluster_id,"memoryOvercommitRatio");
- vmProfile.setcpuOvercommitRatio(Float.parseFloat(cluster_detail_cpu.getValue()));
- vmProfile.setramOvercommitRatio(Float.parseFloat(cluster_detail_ram.getValue()));
-
- try {
- if (!changeState(vm, Event.OperationRetry, destHostId, work, Step.Prepare)) {
- throw new ConcurrentOperationException("Unable to update the state of the Virtual Machine");
- }
- } catch (NoTransitionException e1) {
- throw new ConcurrentOperationException(e1.getMessage());
- }
-
- try {
- if (s_logger.isDebugEnabled()) {
- s_logger.debug("VM is being created in podId: " + vm.getPodIdToDeployIn());
- }
- _networkMgr.prepare(vmProfile, dest, ctx);
- if (vm.getHypervisorType() != HypervisorType.BareMetal) {
- this.volumeMgr.prepare(vmProfile, dest);
- }
- //since StorageMgr succeeded in volume creation, reuse Volume for further tries until current cluster has capacity
- if(!reuseVolume){
- reuseVolume = true;
- }
-
- Commands cmds = null;
- vmGuru.finalizeVirtualMachineProfile(vmProfile, dest, ctx);
-
- VirtualMachineTO vmTO = hvGuru.implement(vmProfile);
-
- cmds = new Commands(OnError.Stop);
- cmds.addCommand(new StartCommand(vmTO, dest.getHost()));
-
- vmGuru.finalizeDeployment(cmds, vmProfile, dest, ctx);
-
-
- work = _workDao.findById(work.getId());
- if (work == null || work.getStep() != Step.Prepare) {
- throw new ConcurrentOperationException("Work steps have been changed: " + work);
- }
- _workDao.updateStep(work, Step.Starting);
-
- _agentMgr.send(destHostId, cmds);
-
- _workDao.updateStep(work, Step.Started);
-
-
- StartAnswer startAnswer = cmds.getAnswer(StartAnswer.class);
- if (startAnswer != null && startAnswer.getResult()) {
- String host_guid = startAnswer.getHost_guid();
- if( host_guid != null ) {
- HostVO finalHost = _resourceMgr.findHostByGuid(host_guid);
- if (finalHost == null ) {
- throw new CloudRuntimeException("Host Guid " + host_guid + " doesn't exist in DB, something wrong here");
- }
- destHostId = finalHost.getId();
- }
- if (vmGuru.finalizeStart(vmProfile, destHostId, cmds, ctx)) {
- if (!changeState(vm, Event.OperationSucceeded, destHostId, work, Step.Done)) {
- throw new ConcurrentOperationException("Unable to transition to a new state.");
- }
- startedVm = vm;
- if (s_logger.isDebugEnabled()) {
- s_logger.debug("Start completed for VM " + vm);
- }
- return startedVm;
- } else {
- if (s_logger.isDebugEnabled()) {
- s_logger.info("The guru did not like the answers so stopping " + vm);
- }
-
- StopCommand cmd = new StopCommand(vm);
- StopAnswer answer = (StopAnswer) _agentMgr.easySend(destHostId, cmd);
- if (answer == null || !answer.getResult()) {
- s_logger.warn("Unable to stop " + vm + " due to " + (answer != null ? answer.getDetails() : "no answers"));
- _haMgr.scheduleStop(vm, destHostId, WorkType.ForceStop);
- throw new ExecutionException("Unable to stop " + vm + " so we are unable to retry the start operation");
- }
- throw new ExecutionException("Unable to start " + vm + " due to error in finalizeStart, not retrying");
- }
- }
- s_logger.info("Unable to start VM on " + dest.getHost() + " due to " + (startAnswer == null ? " no start answer" : startAnswer.getDetails()));
-
- } catch (OperationTimedoutException e) {
- s_logger.debug("Unable to send the start command to host " + dest.getHost());
- if (e.isActive()) {
- _haMgr.scheduleStop(vm, destHostId, WorkType.CheckStop);
- }
- canRetry = false;
- throw new AgentUnavailableException("Unable to start " + vm.getHostName(), destHostId, e);
- } catch (ResourceUnavailableException e) {
- s_logger.info("Unable to contact resource.", e);
- if (!avoids.add(e)) {
- if (e.getScope() == Volume.class || e.getScope() == Nic.class) {
- throw e;
- } else {
- s_logger.warn("unexpected ResourceUnavailableException : " + e.getScope().getName(), e);
- throw e;
- }
- }
- } catch (InsufficientCapacityException e) {
- s_logger.info("Insufficient capacity ", e);
- if (!avoids.add(e)) {
- if (e.getScope() == Volume.class || e.getScope() == Nic.class) {
- throw e;
- } else {
- s_logger.warn("unexpected InsufficientCapacityException : " + e.getScope().getName(), e);
- }
- }
- } catch (Exception e) {
- s_logger.error("Failed to start instance " + vm, e);
- throw new AgentUnavailableException("Unable to start instance due to " + e.getMessage(), destHostId, e);
- } finally {
- if (startedVm == null && canRetry) {
- Step prevStep = work.getStep();
- _workDao.updateStep(work, Step.Release);
- if (prevStep == Step.Started || prevStep == Step.Starting) {
- cleanup(vmGuru, vmProfile, work, Event.OperationFailed, false, caller, account);
- } else {
- //if step is not starting/started, send cleanup command with force=true
- cleanup(vmGuru, vmProfile, work, Event.OperationFailed, true, caller, account);
- }
- }
- }
- }
- } finally {
- if (startedVm == null) {
- if (canRetry) {
- try {
- changeState(vm, Event.OperationFailed, null, work, Step.Done);
- } catch (NoTransitionException e) {
- throw new ConcurrentOperationException(e.getMessage());
- }
- }
- }
- }
-
- return startedVm;
- }
-*/
@Override
public boolean stop(T vm, User user, Account account) throws ResourceUnavailableException {
try {
@@ -1360,11 +1088,87 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
}
@Override
- public boolean advanceStop(T vm, boolean forced, User user, Account account) throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException {
- // ???
+ public boolean advanceStop(final T vm, boolean forced, User user, Account account) throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException {
+ VmWorkJobVO workJob = null;
+ Transaction txn = Transaction.currentTxn();
+ try {
+ txn.start();
+
+ _vmDao.lockRow(vm.getId(), true);
+
+ List pendingWorkJobs = _workJobDao.listPendingWorkJobs(
+ VirtualMachine.Type.Instance, vm.getId(), VmWorkConstants.VM_WORK_STOP);
+
+ if(pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
+ assert(pendingWorkJobs.size() == 1);
+ workJob = pendingWorkJobs.get(0);
+ } else {
+ workJob = new VmWorkJobVO();
+
+ workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
+ workJob.setCmd(VmWorkConstants.VM_WORK_STOP);
+
+ workJob.setAccountId(account.getId());
+ workJob.setUserId(user.getId());
+ workJob.setStep(VmWorkJobVO.Step.Prepare);
+ workJob.setVmType(vm.getType());
+ workJob.setVmInstanceId(vm.getId());
+
+ // save work context info (there are some duplications)
+ VmWorkStop workInfo = new VmWorkStop();
+ workInfo.setAccountId(account.getId());
+ workInfo.setUserId(user.getId());
+ workInfo.setVmId(vm.getId());
+ workInfo.setForceStop(forced);
+ workJob.setCmdInfo(ApiSerializerHelper.toSerializedString(workInfo));
+
+ _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
+ }
+
+ txn.commit();
+ } catch(Throwable e) {
+ s_logger.error("Unexpected exception", e);
+ txn.rollback();
+ throw new ConcurrentOperationException("Unhandled exception, converted to ConcurrentOperationException");
+ }
+
+ final long jobId = workJob.getId();
+
+ //
+ // TODO : this will be replaced with fully-asynchronized way later so that we don't need
+ // to wait here. The reason we do it synchronized here is that callers of advanceStart is expecting
+ // synchronized semantics
+ //
+ //
+ AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+ _jobMgr.waitAndCheck(
+ new String[] { TopicConstants.VM_POWER_STATE, TopicConstants.JOB_STATE },
+ 3000L, 600000L, new Predicate() {
+
+ @Override
+ public boolean checkCondition() {
+ VMInstanceVO instance = _vmDao.findById(vm.getId());
+ if(instance.getPowerState() == VirtualMachine.PowerState.PowerOff)
+ return true;
+
+ VmWorkJobVO workJob = _workJobDao.findById(jobId);
+ if(workJob.getStatus() != AsyncJobResult.STATUS_IN_PROGRESS)
+ return true;
+
+ return false;
+ }
+ });
+
+ try {
+ AsyncJobExecutionContext.getCurrentExecutionContext().disjoinJob(jobId);
+ } catch(Exception e) {
+ s_logger.error("Unexpected exception", e);
+ return false;
+ }
return true;
}
-
+
+ @Override
public boolean processVmStopWork(T vm, boolean forced, User user, Account account) throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException {
VmWorkJobVO work = _workJobDao.findById(AsyncJobExecutionContext.getCurrentExecutionContext().getJob().getId());
diff --git a/server/src/com/cloud/vm/VmWorkStop.java b/server/src/com/cloud/vm/VmWorkStop.java
new file mode 100644
index 00000000000..d6d226fb81f
--- /dev/null
+++ b/server/src/com/cloud/vm/VmWorkStop.java
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.vm;
+
+public class VmWorkStop extends VmWork {
+
+ private boolean forceStop;
+
+ public VmWorkStop() {
+ forceStop = false;
+ }
+
+ public void setForceStop(boolean value) {
+ forceStop = value;
+ }
+
+ public boolean isForceStop() {
+ return forceStop;
+ }
+}
diff --git a/server/test/com/cloud/vm/VmWorkMockVirtualMachineManagerImpl.java b/server/test/com/cloud/vm/VmWorkMockVirtualMachineManagerImpl.java
index 5cef9c27de1..d77ebd148ee 100644
--- a/server/test/com/cloud/vm/VmWorkMockVirtualMachineManagerImpl.java
+++ b/server/test/com/cloud/vm/VmWorkMockVirtualMachineManagerImpl.java
@@ -396,4 +396,11 @@ public class VmWorkMockVirtualMachineManagerImpl implements VirtualMachineManage
return vm;
}
+
+ @Override
+ public boolean processVmStopWork(T vm, boolean forced, User user, Account account)
+ throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException {
+ return true;
+ }
+
}