From 26c1ba296cfba98de7a292406924caa664f37985 Mon Sep 17 00:00:00 2001 From: Kelven Yang Date: Wed, 11 Dec 2013 16:19:18 -0800 Subject: [PATCH] CLOUDSTACK-669: put user vm work under new vmsync model --- ...ring-engine-orchestration-core-context.xml | 5 + .../cloud/vm/VirtualMachineManagerImpl.java | 91 ++++++----- .../cloud/vm/VmWorkJobWakeupDispatcher.java | 141 ++++++++++++++++++ .../jobs/impl/AsyncJobManagerImpl.java | 2 +- 4 files changed, 198 insertions(+), 41 deletions(-) create mode 100644 engine/orchestration/src/com/cloud/vm/VmWorkJobWakeupDispatcher.java diff --git a/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml b/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml index 0c76f008c32..fc3bae3129f 100644 --- a/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml +++ b/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml @@ -78,5 +78,10 @@ + + + + + diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java index 1cac898f927..a200aea2024 100755 --- a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java +++ b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java @@ -40,6 +40,8 @@ import javax.ejb.Local; import javax.inject.Inject; import javax.naming.ConfigurationException; +import org.apache.log4j.Logger; + import org.apache.cloudstack.affinity.dao.AffinityGroupVMMapDao; import org.apache.cloudstack.context.CallContext; import org.apache.cloudstack.engine.orchestration.service.NetworkOrchestrationService; @@ -67,7 +69,6 @@ import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao; import org.apache.cloudstack.storage.datastore.db.StoragePoolVO; import org.apache.cloudstack.storage.to.VolumeObjectTO; import org.apache.cloudstack.utils.identity.ManagementServerNode; -import org.apache.log4j.Logger; import com.cloud.agent.AgentManager; import com.cloud.agent.Listener; @@ -698,7 +699,6 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Override public void advanceStart(String vmUuid, Map params) throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException { - advanceStart(vmUuid, params, null); } @@ -1515,7 +1515,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac return true; } - + @Override public void storageMigration(String vmUuid, StoragePool destPool) { AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); @@ -2970,7 +2970,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac this.name = name; this.state = state; this.vm = vm; - this.hostUuid = host; + hostUuid = host; this.platform = platform; } @@ -4141,8 +4141,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac // TODO build a common pattern to reduce code duplication in following methods // no time for this at current iteration // - public Outcome startVmThroughJobQueue(final String vmUuid, - final Map params, + public Outcome startVmThroughJobQueue(final String vmUuid, + final Map params, final DeploymentPlan planToDeploy) { final CallContext context = CallContext.current(); @@ -4152,11 +4152,12 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); Transaction.execute(new TransactionCallbackNoReturn () { - public void doInTransactionWithoutResult(TransactionStatus status) { + @Override + public void doInTransactionWithoutResult(TransactionStatus status) { VmWorkJobVO workJob = null; _vmDao.lockRow(vm.getId(), true); - List pendingWorkJobs = _workJobDao.listPendingWorkJobs(VirtualMachine.Type.Instance, + List pendingWorkJobs = _workJobDao.listPendingWorkJobs(VirtualMachine.Type.Instance, vm.getId(), VmWorkStart.class.getName()); if (pendingWorkJobs.size() > 0) { @@ -4192,7 +4193,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final long jobId = (Long)context.getContextParameter("jobId"); AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); - return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), + return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), VirtualMachine.PowerState.PowerOn, vm.getId(), null); } @@ -4204,11 +4205,12 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); Transaction.execute(new TransactionCallbackNoReturn () { - public void doInTransactionWithoutResult(TransactionStatus status) { + @Override + public void doInTransactionWithoutResult(TransactionStatus status) { _vmDao.lockRow(vm.getId(), true); List pendingWorkJobs = _workJobDao.listPendingWorkJobs( - VirtualMachine.Type.Instance, vm.getId(), + VirtualMachine.Type.Instance, vm.getId(), VmWorkStop.class.getName()); VmWorkJobVO workJob = null; @@ -4242,7 +4244,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final long jobId = (Long)context.getContextParameter("jobId"); AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); - return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), + return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), VirtualMachine.PowerState.PowerOff, vm.getId(), null); } @@ -4256,11 +4258,12 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); Transaction.execute(new TransactionCallbackNoReturn () { - public void doInTransactionWithoutResult(TransactionStatus status) { + @Override + public void doInTransactionWithoutResult(TransactionStatus status) { _vmDao.lockRow(vm.getId(), true); List pendingWorkJobs = _workJobDao.listPendingWorkJobs( - VirtualMachine.Type.Instance, vm.getId(), + VirtualMachine.Type.Instance, vm.getId(), VmWorkReboot.class.getName()); VmWorkJobVO workJob = null; @@ -4294,7 +4297,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final long jobId = (Long)context.getContextParameter("jobId"); AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); - return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), + return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId()); } @@ -4306,12 +4309,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); Transaction.execute(new TransactionCallbackNoReturn () { - public void doInTransactionWithoutResult(TransactionStatus status) { + @Override + public void doInTransactionWithoutResult(TransactionStatus status) { _vmDao.lockRow(vm.getId(), true); List pendingWorkJobs = _workJobDao.listPendingWorkJobs( - VirtualMachine.Type.Instance, vm.getId(), + VirtualMachine.Type.Instance, vm.getId(), VmWorkMigrate.class.getName()); VmWorkJobVO workJob = null; @@ -4344,7 +4348,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final long jobId = (Long)context.getContextParameter("jobId"); AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); - return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), + return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), VirtualMachine.PowerState.PowerOn, vm.getId(), vm.getPowerHostId()); } @@ -4359,12 +4363,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); Transaction.execute(new TransactionCallbackNoReturn () { - public void doInTransactionWithoutResult(TransactionStatus status) { + @Override + public void doInTransactionWithoutResult(TransactionStatus status) { _vmDao.lockRow(vm.getId(), true); List pendingWorkJobs = _workJobDao.listPendingWorkJobs( - VirtualMachine.Type.Instance, vm.getId(), + VirtualMachine.Type.Instance, vm.getId(), VmWorkMigrateWithStorage.class.getName()); VmWorkJobVO workJob = null; @@ -4384,7 +4389,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setVmInstanceId(vm.getId()); // save work context info (there are some duplications) - VmWorkMigrateWithStorage workInfo = new VmWorkMigrateWithStorage(user.getId(), account.getId(), vm.getId(), + VmWorkMigrateWithStorage workInfo = new VmWorkMigrateWithStorage(user.getId(), account.getId(), vm.getId(), srcHostId, destHostId, volumeToPool); workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); @@ -4398,7 +4403,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final long jobId = (Long)context.getContextParameter("jobId"); AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); - return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), + return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), VirtualMachine.PowerState.PowerOn, vm.getId(), destHostId); } @@ -4412,12 +4417,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); Transaction.execute(new TransactionCallbackNoReturn () { - public void doInTransactionWithoutResult(TransactionStatus status) { + @Override + public void doInTransactionWithoutResult(TransactionStatus status) { _vmDao.lockRow(vm.getId(), true); List pendingWorkJobs = _workJobDao.listPendingWorkJobs( - VirtualMachine.Type.Instance, vm.getId(), + VirtualMachine.Type.Instance, vm.getId(), VmWorkMigrateForScale.class.getName()); VmWorkJobVO workJob = null; @@ -4437,7 +4443,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setVmInstanceId(vm.getId()); // save work context info (there are some duplications) - VmWorkMigrateForScale workInfo = new VmWorkMigrateForScale(user.getId(), account.getId(), vm.getId(), + VmWorkMigrateForScale workInfo = new VmWorkMigrateForScale(user.getId(), account.getId(), vm.getId(), srcHostId, dest, newSvcOfferingId); workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); @@ -4464,12 +4470,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); Transaction.execute(new TransactionCallbackNoReturn () { - public void doInTransactionWithoutResult(TransactionStatus status) { + @Override + public void doInTransactionWithoutResult(TransactionStatus status) { _vmDao.lockRow(vm.getId(), true); List pendingWorkJobs = _workJobDao.listPendingWorkJobs( - VirtualMachine.Type.Instance, vm.getId(), + VirtualMachine.Type.Instance, vm.getId(), VmWorkStorageMigration.class.getName()); VmWorkJobVO workJob = null; @@ -4489,7 +4496,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setVmInstanceId(vm.getId()); // save work context info (there are some duplications) - VmWorkStorageMigration workInfo = new VmWorkStorageMigration(user.getId(), account.getId(), vm.getId(), + VmWorkStorageMigration workInfo = new VmWorkStorageMigration(user.getId(), account.getId(), vm.getId(), destPool); workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); @@ -4514,12 +4521,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final Account account = context.getCallingAccount(); Transaction.execute(new TransactionCallbackNoReturn () { - public void doInTransactionWithoutResult(TransactionStatus status) { + @Override + public void doInTransactionWithoutResult(TransactionStatus status) { _vmDao.lockRow(vm.getId(), true); List pendingWorkJobs = _workJobDao.listPendingWorkJobs( - VirtualMachine.Type.Instance, vm.getId(), + VirtualMachine.Type.Instance, vm.getId(), VmWorkAddVmToNetwork.class.getName()); VmWorkJobVO workJob = null; @@ -4539,7 +4547,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setVmInstanceId(vm.getId()); // save work context info (there are some duplications) - VmWorkAddVmToNetwork workInfo = new VmWorkAddVmToNetwork(user.getId(), account.getId(), vm.getId(), + VmWorkAddVmToNetwork workInfo = new VmWorkAddVmToNetwork(user.getId(), account.getId(), vm.getId(), network, requested); workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); @@ -4564,12 +4572,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final Account account = context.getCallingAccount(); Transaction.execute(new TransactionCallbackNoReturn () { - public void doInTransactionWithoutResult(TransactionStatus status) { + @Override + public void doInTransactionWithoutResult(TransactionStatus status) { _vmDao.lockRow(vm.getId(), true); List pendingWorkJobs = _workJobDao.listPendingWorkJobs( - VirtualMachine.Type.Instance, vm.getId(), + VirtualMachine.Type.Instance, vm.getId(), VmWorkRemoveNicFromVm.class.getName()); VmWorkJobVO workJob = null; @@ -4589,7 +4598,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setVmInstanceId(vm.getId()); // save work context info (there are some duplications) - VmWorkRemoveNicFromVm workInfo = new VmWorkRemoveNicFromVm(user.getId(), account.getId(), vm.getId(), + VmWorkRemoveNicFromVm workInfo = new VmWorkRemoveNicFromVm(user.getId(), account.getId(), vm.getId(), nic); workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); @@ -4614,12 +4623,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final Account account = context.getCallingAccount(); Transaction.execute(new TransactionCallbackNoReturn () { - public void doInTransactionWithoutResult(TransactionStatus status) { + @Override + public void doInTransactionWithoutResult(TransactionStatus status) { _vmDao.lockRow(vm.getId(), true); List pendingWorkJobs = _workJobDao.listPendingWorkJobs( - VirtualMachine.Type.Instance, vm.getId(), + VirtualMachine.Type.Instance, vm.getId(), VmWorkRemoveVmFromNetwork.class.getName()); VmWorkJobVO workJob = null; @@ -4639,7 +4649,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setVmInstanceId(vm.getId()); // save work context info (there are some duplications) - VmWorkRemoveVmFromNetwork workInfo = new VmWorkRemoveVmFromNetwork(user.getId(), account.getId(), vm.getId(), + VmWorkRemoveVmFromNetwork workInfo = new VmWorkRemoveVmFromNetwork(user.getId(), account.getId(), vm.getId(), network, broadcastUri); workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); @@ -4666,12 +4676,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); Transaction.execute(new TransactionCallbackNoReturn () { - public void doInTransactionWithoutResult(TransactionStatus status) { + @Override + public void doInTransactionWithoutResult(TransactionStatus status) { _vmDao.lockRow(vm.getId(), true); List pendingWorkJobs = _workJobDao.listPendingWorkJobs( - VirtualMachine.Type.Instance, vm.getId(), + VirtualMachine.Type.Instance, vm.getId(), VmWorkReconfigure.class.getName()); VmWorkJobVO workJob = null; @@ -4691,7 +4702,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setVmInstanceId(vm.getId()); // save work context info (there are some duplications) - VmWorkReconfigure workInfo = new VmWorkReconfigure(user.getId(), account.getId(), vm.getId(), + VmWorkReconfigure workInfo = new VmWorkReconfigure(user.getId(), account.getId(), vm.getId(), oldServiceOffering, reconfiguringOnExistingHost); workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); diff --git a/engine/orchestration/src/com/cloud/vm/VmWorkJobWakeupDispatcher.java b/engine/orchestration/src/com/cloud/vm/VmWorkJobWakeupDispatcher.java new file mode 100644 index 00000000000..5704f978720 --- /dev/null +++ b/engine/orchestration/src/com/cloud/vm/VmWorkJobWakeupDispatcher.java @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.cloud.vm; + +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.inject.Inject; + +import org.apache.log4j.Logger; + +import org.apache.cloudstack.context.CallContext; +import org.apache.cloudstack.framework.jobs.AsyncJob; +import org.apache.cloudstack.framework.jobs.AsyncJobDispatcher; +import org.apache.cloudstack.framework.jobs.AsyncJobManager; +import org.apache.cloudstack.framework.jobs.dao.AsyncJobJoinMapDao; +import org.apache.cloudstack.framework.jobs.dao.VmWorkJobDao; +import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO; +import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO; + +import com.cloud.user.AccountVO; +import com.cloud.user.dao.AccountDao; +import com.cloud.utils.component.AdapterBase; +import com.cloud.vm.dao.VMInstanceDao; + +public class VmWorkJobWakeupDispatcher extends AdapterBase implements AsyncJobDispatcher { + private static final Logger s_logger = Logger.getLogger(VmWorkJobWakeupDispatcher.class); + + @Inject + private VmWorkJobDao _workjobDao; + @Inject + private AsyncJobJoinMapDao _joinMapDao; + @Inject + private AccountDao _accountDao; + @Inject + private VMInstanceDao _instanceDao; + @Inject + private VirtualMachineManager _vmMgr; + @Inject + private AsyncJobManager _asyncJobMgr; + + private final Map _handlerMap = new HashMap(); + + @Override + public void runJob(AsyncJob job) { + try { + List joinRecords = _joinMapDao.listJoinRecords(job.getId()); + if (joinRecords.size() != 1) { + s_logger.warn("AsyncJob-" + job.getId() + + " received wakeup call with un-supported joining job number: " + joinRecords.size()); + + // if we fail wakeup-execution for any reason, avoid release sync-source if there is any + job.setSyncSource(null); + return; + } + + AsyncJobJoinMapVO joinRecord = joinRecords.get(0); + VmWorkJobVO joinedJob = _workjobDao.findById(joinRecord.getJoinJobId()); + + Class workClz = null; + try { + workClz = Class.forName(job.getCmd()); + } catch (ClassNotFoundException e) { + s_logger.error("VM work class " + job.getCmd() + " is not found", e); + return; + } + + // get original work context information from joined job + VmWork work = VmWorkSerializer.deserialize(workClz, joinedJob.getCmdInfo()); + assert (work != null); + + AccountVO account = _accountDao.findById(work.getAccountId()); + assert (account != null); + + VMInstanceVO vm = _instanceDao.findById(work.getVmId()); + assert (vm != null); + + CallContext.register(work.getUserId(), work.getAccountId(), job.getRelated()); + try { + Method handler = getHandler(joinRecord.getWakeupHandler()); + if (handler != null) { + handler.invoke(_vmMgr); + } else { + assert (false); + s_logger.error("Unable to find wakeup handler " + joinRecord.getWakeupHandler() + + " when waking up job-" + job.getId()); + } + } finally { + CallContext.unregister(); + } + } catch (Throwable e) { + s_logger.warn("Unexpected exception in waking up job-" + job.getId()); + + // if we fail wakeup-execution for any reason, avoid release sync-source if there is any + job.setSyncSource(null); + } + } + + private Method getHandler(String wakeupHandler) { + + synchronized (_handlerMap) { + Class clz = _vmMgr.getClass(); + Method method = _handlerMap.get(wakeupHandler); + if (method != null) + return method; + + try { + method = clz.getMethod(wakeupHandler); + method.setAccessible(true); + } catch (SecurityException e) { + assert (false); + s_logger.error("Unexpected exception", e); + return null; + } catch (NoSuchMethodException e) { + assert (false); + s_logger.error("Unexpected exception", e); + return null; + } + + _handlerMap.put(wakeupHandler, method); + return method; + } + } +} \ No newline at end of file diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java index 39f48188fc0..d98d8329cc2 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java @@ -593,7 +593,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, msgDetector.open(_messageBus, topics); try { long startTick = System.currentTimeMillis(); - while (System.currentTimeMillis() - startTick < timeoutInMiliseconds) { + while (timeoutInMiliseconds < 0 || System.currentTimeMillis() - startTick < timeoutInMiliseconds) { msgDetector.waitAny(checkIntervalInMilliSeconds); job = _jobDao.findById(job.getId()); if (job.getStatus().done()) {