CLOUDSTACK-669: put user vm work under new vmsync model

This commit is contained in:
Kelven Yang 2013-12-11 16:19:18 -08:00
parent 9ed2c2e224
commit 26c1ba296c
4 changed files with 198 additions and 41 deletions

View File

@ -78,5 +78,10 @@
<util:constant static-field="com.cloud.vm.VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER"/>
</property>
</bean>
<bean id= "vmWorkJobWakeupDispatcher" class="com.cloud.vm.VmWorkJobWakeupDispatcher">
<property name="name">
<util:constant static-field="com.cloud.vm.VmWorkJobDispatcher.VM_WORK_JOB_WAKEUP_DISPATCHER"/>
</property>
</bean>
</beans>

View File

@ -40,6 +40,8 @@ import javax.ejb.Local;
import javax.inject.Inject;
import javax.naming.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.cloudstack.affinity.dao.AffinityGroupVMMapDao;
import org.apache.cloudstack.context.CallContext;
import org.apache.cloudstack.engine.orchestration.service.NetworkOrchestrationService;
@ -67,7 +69,6 @@ import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
import org.apache.cloudstack.storage.to.VolumeObjectTO;
import org.apache.cloudstack.utils.identity.ManagementServerNode;
import org.apache.log4j.Logger;
import com.cloud.agent.AgentManager;
import com.cloud.agent.Listener;
@ -698,7 +699,6 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
@Override
public void advanceStart(String vmUuid, Map<VirtualMachineProfile.Param, Object> params)
throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException {
advanceStart(vmUuid, params, null);
}
@ -1515,7 +1515,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
return true;
}
@Override
public void storageMigration(String vmUuid, StoragePool destPool) {
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
@ -2970,7 +2970,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
this.name = name;
this.state = state;
this.vm = vm;
this.hostUuid = host;
hostUuid = host;
this.platform = platform;
}
@ -4141,8 +4141,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
// TODO build a common pattern to reduce code duplication in following methods
// no time for this at current iteration
//
public Outcome<VirtualMachine> startVmThroughJobQueue(final String vmUuid,
final Map<VirtualMachineProfile.Param, Object> params,
public Outcome<VirtualMachine> startVmThroughJobQueue(final String vmUuid,
final Map<VirtualMachineProfile.Param, Object> params,
final DeploymentPlan planToDeploy) {
final CallContext context = CallContext.current();
@ -4152,11 +4152,12 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
Transaction.execute(new TransactionCallbackNoReturn () {
public void doInTransactionWithoutResult(TransactionStatus status) {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
VmWorkJobVO workJob = null;
_vmDao.lockRow(vm.getId(), true);
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(VirtualMachine.Type.Instance,
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(VirtualMachine.Type.Instance,
vm.getId(), VmWorkStart.class.getName());
if (pendingWorkJobs.size() > 0) {
@ -4192,7 +4193,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final long jobId = (Long)context.getContextParameter("jobId");
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
VirtualMachine.PowerState.PowerOn, vm.getId(), null);
}
@ -4204,11 +4205,12 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
Transaction.execute(new TransactionCallbackNoReturn () {
public void doInTransactionWithoutResult(TransactionStatus status) {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
_vmDao.lockRow(vm.getId(), true);
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
VirtualMachine.Type.Instance, vm.getId(),
VirtualMachine.Type.Instance, vm.getId(),
VmWorkStop.class.getName());
VmWorkJobVO workJob = null;
@ -4242,7 +4244,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final long jobId = (Long)context.getContextParameter("jobId");
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
VirtualMachine.PowerState.PowerOff, vm.getId(), null);
}
@ -4256,11 +4258,12 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
Transaction.execute(new TransactionCallbackNoReturn () {
public void doInTransactionWithoutResult(TransactionStatus status) {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
_vmDao.lockRow(vm.getId(), true);
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
VirtualMachine.Type.Instance, vm.getId(),
VirtualMachine.Type.Instance, vm.getId(),
VmWorkReboot.class.getName());
VmWorkJobVO workJob = null;
@ -4294,7 +4297,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final long jobId = (Long)context.getContextParameter("jobId");
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
vm.getId());
}
@ -4306,12 +4309,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
Transaction.execute(new TransactionCallbackNoReturn () {
public void doInTransactionWithoutResult(TransactionStatus status) {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
_vmDao.lockRow(vm.getId(), true);
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
VirtualMachine.Type.Instance, vm.getId(),
VirtualMachine.Type.Instance, vm.getId(),
VmWorkMigrate.class.getName());
VmWorkJobVO workJob = null;
@ -4344,7 +4348,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final long jobId = (Long)context.getContextParameter("jobId");
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
VirtualMachine.PowerState.PowerOn, vm.getId(), vm.getPowerHostId());
}
@ -4359,12 +4363,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
Transaction.execute(new TransactionCallbackNoReturn () {
public void doInTransactionWithoutResult(TransactionStatus status) {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
_vmDao.lockRow(vm.getId(), true);
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
VirtualMachine.Type.Instance, vm.getId(),
VirtualMachine.Type.Instance, vm.getId(),
VmWorkMigrateWithStorage.class.getName());
VmWorkJobVO workJob = null;
@ -4384,7 +4389,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
workJob.setVmInstanceId(vm.getId());
// save work context info (there are some duplications)
VmWorkMigrateWithStorage workInfo = new VmWorkMigrateWithStorage(user.getId(), account.getId(), vm.getId(),
VmWorkMigrateWithStorage workInfo = new VmWorkMigrateWithStorage(user.getId(), account.getId(), vm.getId(),
srcHostId, destHostId, volumeToPool);
workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
@ -4398,7 +4403,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final long jobId = (Long)context.getContextParameter("jobId");
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
VirtualMachine.PowerState.PowerOn, vm.getId(), destHostId);
}
@ -4412,12 +4417,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
Transaction.execute(new TransactionCallbackNoReturn () {
public void doInTransactionWithoutResult(TransactionStatus status) {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
_vmDao.lockRow(vm.getId(), true);
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
VirtualMachine.Type.Instance, vm.getId(),
VirtualMachine.Type.Instance, vm.getId(),
VmWorkMigrateForScale.class.getName());
VmWorkJobVO workJob = null;
@ -4437,7 +4443,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
workJob.setVmInstanceId(vm.getId());
// save work context info (there are some duplications)
VmWorkMigrateForScale workInfo = new VmWorkMigrateForScale(user.getId(), account.getId(), vm.getId(),
VmWorkMigrateForScale workInfo = new VmWorkMigrateForScale(user.getId(), account.getId(), vm.getId(),
srcHostId, dest, newSvcOfferingId);
workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
@ -4464,12 +4470,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
Transaction.execute(new TransactionCallbackNoReturn () {
public void doInTransactionWithoutResult(TransactionStatus status) {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
_vmDao.lockRow(vm.getId(), true);
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
VirtualMachine.Type.Instance, vm.getId(),
VirtualMachine.Type.Instance, vm.getId(),
VmWorkStorageMigration.class.getName());
VmWorkJobVO workJob = null;
@ -4489,7 +4496,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
workJob.setVmInstanceId(vm.getId());
// save work context info (there are some duplications)
VmWorkStorageMigration workInfo = new VmWorkStorageMigration(user.getId(), account.getId(), vm.getId(),
VmWorkStorageMigration workInfo = new VmWorkStorageMigration(user.getId(), account.getId(), vm.getId(),
destPool);
workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
@ -4514,12 +4521,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final Account account = context.getCallingAccount();
Transaction.execute(new TransactionCallbackNoReturn () {
public void doInTransactionWithoutResult(TransactionStatus status) {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
_vmDao.lockRow(vm.getId(), true);
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
VirtualMachine.Type.Instance, vm.getId(),
VirtualMachine.Type.Instance, vm.getId(),
VmWorkAddVmToNetwork.class.getName());
VmWorkJobVO workJob = null;
@ -4539,7 +4547,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
workJob.setVmInstanceId(vm.getId());
// save work context info (there are some duplications)
VmWorkAddVmToNetwork workInfo = new VmWorkAddVmToNetwork(user.getId(), account.getId(), vm.getId(),
VmWorkAddVmToNetwork workInfo = new VmWorkAddVmToNetwork(user.getId(), account.getId(), vm.getId(),
network, requested);
workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
@ -4564,12 +4572,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final Account account = context.getCallingAccount();
Transaction.execute(new TransactionCallbackNoReturn () {
public void doInTransactionWithoutResult(TransactionStatus status) {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
_vmDao.lockRow(vm.getId(), true);
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
VirtualMachine.Type.Instance, vm.getId(),
VirtualMachine.Type.Instance, vm.getId(),
VmWorkRemoveNicFromVm.class.getName());
VmWorkJobVO workJob = null;
@ -4589,7 +4598,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
workJob.setVmInstanceId(vm.getId());
// save work context info (there are some duplications)
VmWorkRemoveNicFromVm workInfo = new VmWorkRemoveNicFromVm(user.getId(), account.getId(), vm.getId(),
VmWorkRemoveNicFromVm workInfo = new VmWorkRemoveNicFromVm(user.getId(), account.getId(), vm.getId(),
nic);
workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
@ -4614,12 +4623,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final Account account = context.getCallingAccount();
Transaction.execute(new TransactionCallbackNoReturn () {
public void doInTransactionWithoutResult(TransactionStatus status) {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
_vmDao.lockRow(vm.getId(), true);
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
VirtualMachine.Type.Instance, vm.getId(),
VirtualMachine.Type.Instance, vm.getId(),
VmWorkRemoveVmFromNetwork.class.getName());
VmWorkJobVO workJob = null;
@ -4639,7 +4649,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
workJob.setVmInstanceId(vm.getId());
// save work context info (there are some duplications)
VmWorkRemoveVmFromNetwork workInfo = new VmWorkRemoveVmFromNetwork(user.getId(), account.getId(), vm.getId(),
VmWorkRemoveVmFromNetwork workInfo = new VmWorkRemoveVmFromNetwork(user.getId(), account.getId(), vm.getId(),
network, broadcastUri);
workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
@ -4666,12 +4676,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
Transaction.execute(new TransactionCallbackNoReturn () {
public void doInTransactionWithoutResult(TransactionStatus status) {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
_vmDao.lockRow(vm.getId(), true);
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
VirtualMachine.Type.Instance, vm.getId(),
VirtualMachine.Type.Instance, vm.getId(),
VmWorkReconfigure.class.getName());
VmWorkJobVO workJob = null;
@ -4691,7 +4702,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
workJob.setVmInstanceId(vm.getId());
// save work context info (there are some duplications)
VmWorkReconfigure workInfo = new VmWorkReconfigure(user.getId(), account.getId(), vm.getId(),
VmWorkReconfigure workInfo = new VmWorkReconfigure(user.getId(), account.getId(), vm.getId(),
oldServiceOffering, reconfiguringOnExistingHost);
workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));

View File

@ -0,0 +1,141 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.vm;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.inject.Inject;
import org.apache.log4j.Logger;
import org.apache.cloudstack.context.CallContext;
import org.apache.cloudstack.framework.jobs.AsyncJob;
import org.apache.cloudstack.framework.jobs.AsyncJobDispatcher;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
import org.apache.cloudstack.framework.jobs.dao.AsyncJobJoinMapDao;
import org.apache.cloudstack.framework.jobs.dao.VmWorkJobDao;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO;
import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO;
import com.cloud.user.AccountVO;
import com.cloud.user.dao.AccountDao;
import com.cloud.utils.component.AdapterBase;
import com.cloud.vm.dao.VMInstanceDao;
public class VmWorkJobWakeupDispatcher extends AdapterBase implements AsyncJobDispatcher {
private static final Logger s_logger = Logger.getLogger(VmWorkJobWakeupDispatcher.class);
@Inject
private VmWorkJobDao _workjobDao;
@Inject
private AsyncJobJoinMapDao _joinMapDao;
@Inject
private AccountDao _accountDao;
@Inject
private VMInstanceDao _instanceDao;
@Inject
private VirtualMachineManager _vmMgr;
@Inject
private AsyncJobManager _asyncJobMgr;
private final Map<String, Method> _handlerMap = new HashMap<String, Method>();
@Override
public void runJob(AsyncJob job) {
try {
List<AsyncJobJoinMapVO> joinRecords = _joinMapDao.listJoinRecords(job.getId());
if (joinRecords.size() != 1) {
s_logger.warn("AsyncJob-" + job.getId()
+ " received wakeup call with un-supported joining job number: " + joinRecords.size());
// if we fail wakeup-execution for any reason, avoid release sync-source if there is any
job.setSyncSource(null);
return;
}
AsyncJobJoinMapVO joinRecord = joinRecords.get(0);
VmWorkJobVO joinedJob = _workjobDao.findById(joinRecord.getJoinJobId());
Class<?> workClz = null;
try {
workClz = Class.forName(job.getCmd());
} catch (ClassNotFoundException e) {
s_logger.error("VM work class " + job.getCmd() + " is not found", e);
return;
}
// get original work context information from joined job
VmWork work = VmWorkSerializer.deserialize(workClz, joinedJob.getCmdInfo());
assert (work != null);
AccountVO account = _accountDao.findById(work.getAccountId());
assert (account != null);
VMInstanceVO vm = _instanceDao.findById(work.getVmId());
assert (vm != null);
CallContext.register(work.getUserId(), work.getAccountId(), job.getRelated());
try {
Method handler = getHandler(joinRecord.getWakeupHandler());
if (handler != null) {
handler.invoke(_vmMgr);
} else {
assert (false);
s_logger.error("Unable to find wakeup handler " + joinRecord.getWakeupHandler() +
" when waking up job-" + job.getId());
}
} finally {
CallContext.unregister();
}
} catch (Throwable e) {
s_logger.warn("Unexpected exception in waking up job-" + job.getId());
// if we fail wakeup-execution for any reason, avoid release sync-source if there is any
job.setSyncSource(null);
}
}
private Method getHandler(String wakeupHandler) {
synchronized (_handlerMap) {
Class<?> clz = _vmMgr.getClass();
Method method = _handlerMap.get(wakeupHandler);
if (method != null)
return method;
try {
method = clz.getMethod(wakeupHandler);
method.setAccessible(true);
} catch (SecurityException e) {
assert (false);
s_logger.error("Unexpected exception", e);
return null;
} catch (NoSuchMethodException e) {
assert (false);
s_logger.error("Unexpected exception", e);
return null;
}
_handlerMap.put(wakeupHandler, method);
return method;
}
}
}

View File

@ -593,7 +593,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
msgDetector.open(_messageBus, topics);
try {
long startTick = System.currentTimeMillis();
while (System.currentTimeMillis() - startTick < timeoutInMiliseconds) {
while (timeoutInMiliseconds < 0 || System.currentTimeMillis() - startTick < timeoutInMiliseconds) {
msgDetector.waitAny(checkIntervalInMilliSeconds);
job = _jobDao.findById(job.getId());
if (job.getStatus().done()) {