diff --git a/api/src/com/cloud/vm/VirtualMachine.java b/api/src/com/cloud/vm/VirtualMachine.java index 9a8d88330c7..34008989d2b 100755 --- a/api/src/com/cloud/vm/VirtualMachine.java +++ b/api/src/com/cloud/vm/VirtualMachine.java @@ -213,7 +213,12 @@ public interface VirtualMachine extends RunningOn, ControlledEntity, Identity, I * UserBareMetal is only used for selecting VirtualMachineGuru, there is no * VM with this type. UserBareMetal should treat exactly as User. */ - UserBareMetal(false); + UserBareMetal(false), + + /* + * General VM type for queuing VM orchestration work + */ + Instance(false); boolean _isUsedBySystem; diff --git a/api/src/org/apache/cloudstack/api/InternalIdentity.java b/api/src/org/apache/cloudstack/api/InternalIdentity.java index 1dfeb8c9091..4149dd1c846 100644 --- a/api/src/org/apache/cloudstack/api/InternalIdentity.java +++ b/api/src/org/apache/cloudstack/api/InternalIdentity.java @@ -16,11 +16,13 @@ // under the License. package org.apache.cloudstack.api; +import java.io.Serializable; + // This interface is a contract that getId() will give the internal // ID of an entity which extends this interface // Any class having an internal ID in db table/schema should extend this // For example, all ControlledEntity, OwnedBy would have an internal ID -public interface InternalIdentity { +public interface InternalIdentity extends Serializable { long getId(); } diff --git a/api/src/org/apache/cloudstack/context/CallContext.java b/api/src/org/apache/cloudstack/context/CallContext.java index 5439aee7062..3cdccc526fe 100644 --- a/api/src/org/apache/cloudstack/context/CallContext.java +++ b/api/src/org/apache/cloudstack/context/CallContext.java @@ -197,6 +197,18 @@ public class CallContext { } return register(user, account); } + + public static CallContext register(long callingUserId, long callingAccountId, String contextId) throws CloudAuthenticationException { + Account account = s_entityMgr.findById(Account.class, callingAccountId); + if (account == null) { + throw new CloudAuthenticationException("The account is no longer current.").add(Account.class, Long.toString(callingAccountId)); + } + User user = s_entityMgr.findById(User.class, callingUserId); + if (user == null) { + throw new CloudAuthenticationException("The user is no longer current.").add(User.class, Long.toString(callingUserId)); + } + return register(user, account, contextId); + } public static void unregisterAll() { while ( unregister() != null ) { diff --git a/engine/api/src/com/cloud/vm/VirtualMachineManager.java b/engine/api/src/com/cloud/vm/VirtualMachineManager.java index 35804af13d8..9d19cf5f36b 100644 --- a/engine/api/src/com/cloud/vm/VirtualMachineManager.java +++ b/engine/api/src/com/cloud/vm/VirtualMachineManager.java @@ -108,8 +108,13 @@ public interface VirtualMachineManager extends Manager { void advanceStart(String vmUuid, Map params, DeploymentPlan planToDeploy) throws InsufficientCapacityException, ResourceUnavailableException, ConcurrentOperationException, OperationTimedoutException; + void orchestrateStart(String vmUuid, Map params, DeploymentPlan planToDeploy) throws InsufficientCapacityException, + ResourceUnavailableException, ConcurrentOperationException, OperationTimedoutException; + void advanceStop(String vmUuid, boolean cleanupEvenIfUnableToStop) throws ResourceUnavailableException, OperationTimedoutException, ConcurrentOperationException; + void orchestrateStop(String vmUuid, boolean cleanupEvenIfUnableToStop) throws ResourceUnavailableException, OperationTimedoutException, ConcurrentOperationException; + void advanceExpunge(String vmUuid) throws ResourceUnavailableException, OperationTimedoutException, ConcurrentOperationException; void destroy(String vmUuid) throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException; @@ -117,11 +122,17 @@ public interface VirtualMachineManager extends Manager { void migrateAway(String vmUuid, long hostId) throws InsufficientServerCapacityException; void migrate(String vmUuid, long srcHostId, DeployDestination dest) throws ResourceUnavailableException, ConcurrentOperationException; + + void orchestrateMigrate(String vmUuid, long srcHostId, DeployDestination dest) throws ResourceUnavailableException, ConcurrentOperationException; void migrateWithStorage(String vmUuid, long srcId, long destId, Map volumeToPool) throws ResourceUnavailableException, ConcurrentOperationException; - + + void orchestrateMigrateWithStorage(String vmUuid, long srcId, long destId, Map volumeToPool) throws ResourceUnavailableException, ConcurrentOperationException; + void reboot(String vmUuid, Map params) throws InsufficientCapacityException, ResourceUnavailableException; + void orchestrateReboot(String vmUuid, Map params) throws InsufficientCapacityException, ResourceUnavailableException; + void advanceReboot(String vmUuid, Map params) throws InsufficientCapacityException, ResourceUnavailableException, ConcurrentOperationException, OperationTimedoutException; @@ -137,6 +148,8 @@ public interface VirtualMachineManager extends Manager { VirtualMachine findById(long vmId); void storageMigration(String vmUuid, StoragePool storagePoolId); + + void orchestrateStorageMigration(String vmUuid, StoragePool storagePoolId); /** * @param vmInstance @@ -161,7 +174,11 @@ public interface VirtualMachineManager extends Manager { * @throws InsufficientCapacityException */ NicProfile addVmToNetwork(VirtualMachine vm, Network network, NicProfile requested) throws ConcurrentOperationException, - ResourceUnavailableException, InsufficientCapacityException; + ResourceUnavailableException, InsufficientCapacityException; + + NicProfile orchestrateAddVmToNetwork(VirtualMachine vm, Network network, NicProfile requested) throws ConcurrentOperationException, + ResourceUnavailableException, InsufficientCapacityException; + /** * @param vm @@ -172,6 +189,8 @@ public interface VirtualMachineManager extends Manager { */ boolean removeNicFromVm(VirtualMachine vm, Nic nic) throws ConcurrentOperationException, ResourceUnavailableException; + boolean orchestrateRemoveNicFromVm(VirtualMachine vm, Nic nic) throws ConcurrentOperationException, ResourceUnavailableException; + /** * @param vm * @param network @@ -181,6 +200,8 @@ public interface VirtualMachineManager extends Manager { * @throws ConcurrentOperationException */ boolean removeVmFromNetwork(VirtualMachine vm, Network network, URI broadcastUri) throws ConcurrentOperationException, ResourceUnavailableException; + + boolean orchestrateRemoveVmFromNetwork(VirtualMachine vm, Network network, URI broadcastUri) throws ConcurrentOperationException, ResourceUnavailableException; /** * @param nic @@ -196,12 +217,15 @@ public interface VirtualMachineManager extends Manager { */ VirtualMachineTO toVmTO(VirtualMachineProfile profile); - VirtualMachine reConfigureVm(String vmUuid, ServiceOffering newServiceOffering, boolean sameHost) throws ResourceUnavailableException, ConcurrentOperationException; + + VirtualMachine orchestrateReConfigureVm(String vmUuid, ServiceOffering newServiceOffering, boolean sameHost) throws ResourceUnavailableException, ConcurrentOperationException; void findHostAndMigrate(String vmUuid, Long newSvcOfferingId, DeploymentPlanner.ExcludeList excludeHostList) throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException; void migrateForScale(String vmUuid, long srcHostId, DeployDestination dest, Long newSvcOfferingId) throws ResourceUnavailableException, ConcurrentOperationException; + + void orchestrateMigrateForScale(String vmUuid, long srcHostId, DeployDestination dest, Long newSvcOfferingId) throws ResourceUnavailableException, ConcurrentOperationException; } diff --git a/engine/components-api/src/com/cloud/alert/AlertManager.java b/engine/components-api/src/com/cloud/alert/AlertManager.java index 1ae6b1b7216..eb5ac0c2a13 100755 --- a/engine/components-api/src/com/cloud/alert/AlertManager.java +++ b/engine/components-api/src/com/cloud/alert/AlertManager.java @@ -50,6 +50,8 @@ public interface AlertManager extends Manager { public static final short ALERT_TYPE_DIRECT_ATTACHED_PUBLIC_IP = 24; public static final short ALERT_TYPE_LOCAL_STORAGE = 25; public static final short ALERT_TYPE_RESOURCE_LIMIT_EXCEEDED = 26; // Generated when the resource limit exceeds the limit. Currently used for recurring snapshots only + + public static final short ALERT_TYPE_SYNC = 27; static final ConfigKey StorageCapacityThreshold = new ConfigKey(Double.class, "cluster.storage.capacity.notificationthreshold", "Alert", "0.75", "Percentage (as a value between 0 and 1) of storage utilization above which alerts will be sent about low storage available.", true, ConfigKey.Scope.Cluster, null); diff --git a/engine/components-api/src/com/cloud/vm/VmWork.java b/engine/components-api/src/com/cloud/vm/VmWork.java new file mode 100644 index 00000000000..3f9e71dd8fc --- /dev/null +++ b/engine/components-api/src/com/cloud/vm/VmWork.java @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.vm; + +import java.io.Serializable; + +public class VmWork implements Serializable { + private static final long serialVersionUID = -6946320465729853589L; + + long userId; + long accountId; + long vmId; + + public VmWork(long userId, long accountId, long vmId) { + this.userId = userId; + this.accountId = accountId; + this.vmId = vmId; + } + + public long getUserId() { + return userId; + } + + public long getAccountId() { + return accountId; + } + + public long getVmId() { + return vmId; + } +} diff --git a/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml b/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml index b5c4254abaa..880002cd538 100644 --- a/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml +++ b/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml @@ -67,5 +67,6 @@ + diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java index 75a95c5ed5b..189c2ba11a3 100755 --- a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java +++ b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java @@ -18,6 +18,10 @@ package com.cloud.vm; import java.net.URI; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -26,6 +30,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TimeZone; import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -47,6 +52,18 @@ import org.apache.cloudstack.framework.config.ConfigDepot; import org.apache.cloudstack.framework.config.ConfigKey; import org.apache.cloudstack.framework.config.Configurable; import org.apache.cloudstack.framework.config.dao.ConfigurationDao; +import org.apache.cloudstack.framework.jobs.AsyncJob; +import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext; +import org.apache.cloudstack.framework.jobs.AsyncJobManager; +import org.apache.cloudstack.framework.jobs.Outcome; +import org.apache.cloudstack.framework.jobs.dao.VmWorkJobDao; +import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO; +import org.apache.cloudstack.framework.jobs.impl.JobSerializerHelper; +import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl; +import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO; +import org.apache.cloudstack.framework.messagebus.MessageBus; +import org.apache.cloudstack.framework.messagebus.MessageHandler; +import org.apache.cloudstack.jobs.JobInfo; import org.apache.cloudstack.managed.context.ManagedContextRunnable; import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao; import org.apache.cloudstack.storage.datastore.db.StoragePoolVO; @@ -153,8 +170,10 @@ import com.cloud.storage.dao.VolumeDao; import com.cloud.template.VirtualMachineTemplate; import com.cloud.user.Account; import com.cloud.user.User; +import com.cloud.utils.DateUtil; import com.cloud.utils.Journal; import com.cloud.utils.Pair; +import com.cloud.utils.Predicate; import com.cloud.utils.StringUtils; import com.cloud.utils.Ternary; import com.cloud.utils.component.ManagerBase; @@ -163,8 +182,10 @@ import com.cloud.utils.db.DB; import com.cloud.utils.db.EntityManager; import com.cloud.utils.db.GlobalLock; import com.cloud.utils.db.Transaction; +import com.cloud.utils.db.TransactionCallbackNoReturn; import com.cloud.utils.db.TransactionCallbackWithException; import com.cloud.utils.db.TransactionCallbackWithExceptionNoReturn; +import com.cloud.utils.db.TransactionLegacy; import com.cloud.utils.db.TransactionStatus; import com.cloud.utils.exception.CloudRuntimeException; import com.cloud.utils.exception.ExecutionException; @@ -172,6 +193,7 @@ import com.cloud.utils.fsm.NoTransitionException; import com.cloud.utils.fsm.StateMachine2; import com.cloud.vm.ItWorkVO.Step; import com.cloud.vm.VirtualMachine.Event; +import com.cloud.vm.VirtualMachine.PowerState; import com.cloud.vm.VirtualMachine.State; import com.cloud.vm.dao.NicDao; import com.cloud.vm.dao.UserVmDao; @@ -186,6 +208,8 @@ import com.cloud.vm.snapshot.dao.VMSnapshotDao; public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMachineManager, Listener, Configurable { private static final Logger s_logger = Logger.getLogger(VirtualMachineManagerImpl.class); + private static final String VM_SYNC_ALERT_SUBJECT = "VM state sync alert"; + @Inject DataStoreManager dataStoreMgr; @Inject @@ -279,6 +303,11 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Inject DeploymentPlanningManager _dpMgr; + @Inject protected MessageBus _messageBus; + @Inject protected VirtualMachinePowerStateSync _syncMgr; + @Inject protected VmWorkJobDao _workJobDao; + @Inject protected AsyncJobManager _jobMgr; + Map _vmGurus = new HashMap(); protected StateMachine2 _stateMachine; @@ -298,6 +327,19 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac "On destroy, force-stop takes this value ", true); static final ConfigKey ClusterDeltaSyncInterval = new ConfigKey("Advanced", Integer.class, "sync.interval", "60", "Cluster Delta sync interval in seconds", false); + + static final ConfigKey VmJobEnabled = new ConfigKey("Advanced", + Boolean.class, "vm.job.enabled", "false", + "True to enable new VM sync model. false to use the old way", false); + static final ConfigKey VmJobCheckInterval = new ConfigKey("Advanced", + Long.class, "vm.job.check.interval", "3000", + "Interval in milliseconds to check if the job is complete", false); + static final ConfigKey VmJobTimeout = new ConfigKey("Advanced", + Long.class, "vm.job.timeout", "600000", + "Time in milliseconds to wait before attempting to cancel a job", false); + static final ConfigKey VmJobStateReportInterval = new ConfigKey("Advanced", + Integer.class, "vm.job.report.interval", "60", + "Interval to send application level pings to make sure the connection is still working", false); @@ -651,15 +693,46 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } @Override - public void advanceStart(String vmUuid, Map params) throws InsufficientCapacityException, ConcurrentOperationException, - ResourceUnavailableException { + public void advanceStart(String vmUuid, Map params) + throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException { + advanceStart(vmUuid, params, null); } @Override public void advanceStart(String vmUuid, Map params, DeploymentPlan planToDeploy) throws InsufficientCapacityException, - ConcurrentOperationException, ResourceUnavailableException { - CallContext cctxt = CallContext.current(); + ConcurrentOperationException, ResourceUnavailableException { + + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); + if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) { + // avoid re-entrance + orchestrateStart(vmUuid, params, planToDeploy); + } else { + Outcome outcome = startVmThroughJobQueue(vmUuid, params, planToDeploy); + + try { + VirtualMachine vm = outcome.get(); + } catch (InterruptedException e) { + throw new RuntimeException("Operation is interrupted", e); + } catch (java.util.concurrent.ExecutionException e) { + throw new RuntimeException("Execution excetion", e); + } + + Throwable jobException = retriveExecutionException(outcome.getJob()); + if(jobException != null) { + if(jobException instanceof ConcurrentOperationException) + throw (ConcurrentOperationException)jobException; + else if(jobException instanceof ResourceUnavailableException) + throw (ResourceUnavailableException)jobException; + } + } + } + + @Override + public void orchestrateStart(String vmUuid, Map params, DeploymentPlan planToDeploy) throws InsufficientCapacityException, + ConcurrentOperationException, ResourceUnavailableException { + + CallContext cctxt = CallContext.current(); Account account = cctxt.getCallingAccount(); User caller = cctxt.getCallingUser(); @@ -1145,7 +1218,38 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } @Override - public void advanceStop(String vmUuid, boolean cleanUpEvenIfUnableToStop) throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException { + public void advanceStop(String vmUuid, boolean cleanUpEvenIfUnableToStop) + throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException { + + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); + if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) { + // avoid re-entrance + orchestrateStop(vmUuid, cleanUpEvenIfUnableToStop); + } else { + Outcome outcome = stopVmThroughJobQueue(vmUuid, cleanUpEvenIfUnableToStop); + + try { + VirtualMachine vm = outcome.get(); + } catch (InterruptedException e) { + throw new RuntimeException("Operation is interrupted", e); + } catch (java.util.concurrent.ExecutionException e) { + throw new RuntimeException("Execution excetion", e); + } + + Throwable jobException = retriveExecutionException(outcome.getJob()); + if(jobException != null) { + if(jobException instanceof AgentUnavailableException) + throw (AgentUnavailableException)jobException; + else if(jobException instanceof ConcurrentOperationException) + throw (ConcurrentOperationException)jobException; + else if(jobException instanceof OperationTimedoutException) + throw (OperationTimedoutException)jobException; + } + } + } + + @Override + public void orchestrateStop(String vmUuid, boolean cleanUpEvenIfUnableToStop) throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException { VMInstanceVO vm = _vmDao.findByUuid(vmUuid); advanceStop(vm, cleanUpEvenIfUnableToStop); @@ -1415,9 +1519,33 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac return true; } + + public void storageMigration(String vmUuid, StoragePool destPool) { + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); + if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) { + // avoid re-entrance + orchestrateStorageMigration(vmUuid, destPool); + } else { + Outcome outcome = migrateVmStorageThroughJobQueue(vmUuid, destPool); + + try { + VirtualMachine vm = outcome.get(); + } catch (InterruptedException e) { + throw new RuntimeException("Operation is interrupted", e); + } catch (java.util.concurrent.ExecutionException e) { + throw new RuntimeException("Execution excetion", e); + } + + Throwable jobException = retriveExecutionException(outcome.getJob()); + if(jobException != null) { + if(jobException instanceof RuntimeException) + throw (RuntimeException)jobException; + } + } + } @Override - public void storageMigration(String vmUuid, StoragePool destPool) { + public void orchestrateStorageMigration(String vmUuid, StoragePool destPool) { VMInstanceVO vm = _vmDao.findByUuid(vmUuid); try { @@ -1473,7 +1601,38 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } @Override - public void migrate(String vmUuid, long srcHostId, DeployDestination dest) throws ResourceUnavailableException, ConcurrentOperationException { + public void migrate(String vmUuid, long srcHostId, DeployDestination dest) + throws ResourceUnavailableException, ConcurrentOperationException { + + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); + if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) { + // avoid re-entrance + orchestrateMigrate(vmUuid, srcHostId, dest); + } else { + Outcome outcome = migrateVmThroughJobQueue(vmUuid, srcHostId, dest); + + try { + VirtualMachine vm = outcome.get(); + } catch (InterruptedException e) { + throw new RuntimeException("Operation is interrupted", e); + } catch (java.util.concurrent.ExecutionException e) { + throw new RuntimeException("Execution excetion", e); + } + + Throwable jobException = retriveExecutionException(outcome.getJob()); + if(jobException != null) { + if(jobException instanceof ResourceUnavailableException) + throw (ResourceUnavailableException)jobException; + else if(jobException instanceof ConcurrentOperationException) + throw (ConcurrentOperationException)jobException; + else if(jobException instanceof RuntimeException) + throw (RuntimeException)jobException; + } + } + } + + @Override + public void orchestrateMigrate(String vmUuid, long srcHostId, DeployDestination dest) throws ResourceUnavailableException, ConcurrentOperationException { VMInstanceVO vm = _vmDao.findByUuid(vmUuid); if (vm == null) { if (s_logger.isDebugEnabled()) { @@ -1713,9 +1872,38 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } } + public void migrateWithStorage(String vmUuid, long srcHostId, long destHostId, Map volumeToPool) + throws ResourceUnavailableException, ConcurrentOperationException { + + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); + if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) { + // avoid re-entrance + orchestrateMigrateWithStorage(vmUuid, srcHostId, destHostId, volumeToPool); + } else { + Outcome outcome = migrateVmWithStorageThroughJobQueue(vmUuid, srcHostId, destHostId, volumeToPool); + + try { + VirtualMachine vm = outcome.get(); + } catch (InterruptedException e) { + throw new RuntimeException("Operation is interrupted", e); + } catch (java.util.concurrent.ExecutionException e) { + throw new RuntimeException("Execution excetion", e); + } + + Throwable jobException = retriveExecutionException(outcome.getJob()); + if(jobException != null) { + if(jobException instanceof ResourceUnavailableException) + throw (ResourceUnavailableException)jobException; + else if(jobException instanceof ConcurrentOperationException) + throw (ConcurrentOperationException)jobException; + } + } + } + @Override - public void migrateWithStorage(String vmUuid, long srcHostId, long destHostId, Map volumeToPool) throws ResourceUnavailableException, - ConcurrentOperationException { + public void orchestrateMigrateWithStorage(String vmUuid, long srcHostId, long destHostId, Map volumeToPool) throws ResourceUnavailableException, + ConcurrentOperationException { + VMInstanceVO vm = _vmDao.findByUuid(vmUuid); HostVO srcHost = _hostDao.findById(srcHostId); @@ -1954,9 +2142,40 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac throw new CloudRuntimeException("Unable to reboot a VM due to concurrent operation", e); } } - + @Override - public void advanceReboot(String vmUuid, Map params) throws InsufficientCapacityException, ConcurrentOperationException, + public void advanceReboot(String vmUuid, Map params) + throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException { + + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); + if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) { + // avoid re-entrance + orchestrateReboot(vmUuid, params); + } else { + Outcome outcome = rebootVmThroughJobQueue(vmUuid, params); + + try { + VirtualMachine vm = outcome.get(); + } catch (InterruptedException e) { + throw new RuntimeException("Operation is interrupted", e); + } catch (java.util.concurrent.ExecutionException e) { + throw new RuntimeException("Execution excetion", e); + } + + Throwable jobException = retriveExecutionException(outcome.getJob()); + if(jobException != null) { + if(jobException instanceof ResourceUnavailableException) + throw (ResourceUnavailableException)jobException; + else if(jobException instanceof ConcurrentOperationException) + throw (ConcurrentOperationException)jobException; + else if(jobException instanceof InsufficientCapacityException) + throw (InsufficientCapacityException)jobException; + } + } + } + + @Override + public void orchestrateReboot(String vmUuid, Map params) throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException { VMInstanceVO vm = _vmDao.findByUuid(vmUuid); @@ -2081,8 +2300,6 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac return commands; } - - public void deltaSync(Map> newStates) { Map states = convertToInfos(newStates); @@ -2615,6 +2832,16 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } } } + + if(VmJobEnabled.value()) { + if(ping.getHostVmStateReport() != null && ping.getHostVmStateReport().size() > 0) { + _syncMgr.processHostVmStatePingReport(agentId, ping.getHostVmStateReport()); + } + } + + // take the chance to scan VMs that are stuck in transitional states + // and are missing from the report + scanStalledVMInTransitionStateOnUpHost(agentId); processed = true; } } @@ -2636,7 +2863,14 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac if (!(cmd instanceof StartupRoutingCommand)) { return; } + + if(s_logger.isDebugEnabled()) + s_logger.debug("Received startup command from hypervisor host. host id: " + agent.getId()); + if(VmJobEnabled.value()) { + _syncMgr.resetHostSyncState(agent.getId()); + } + if (forRebalance) { s_logger.debug("Not processing listener " + this + " as connect happens on rebalance process"); return; @@ -2842,9 +3076,50 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac vmForUpdate.setServiceOfferingId(newSvcOff.getId()); return _vmDao.update(vmId, vmForUpdate); } - + @Override - public NicProfile addVmToNetwork(VirtualMachine vm, Network network, NicProfile requested) throws ConcurrentOperationException, ResourceUnavailableException, + public NicProfile addVmToNetwork(VirtualMachine vm, Network network, NicProfile requested) + throws ConcurrentOperationException, ResourceUnavailableException, InsufficientCapacityException { + + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); + if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) { + // avoid re-entrance + return orchestrateAddVmToNetwork(vm, network,requested); + } else { + Outcome outcome = addVmToNetworkThroughJobQueue(vm, network, requested); + + try { + outcome.get(); + } catch (InterruptedException e) { + throw new RuntimeException("Operation is interrupted", e); + } catch (java.util.concurrent.ExecutionException e) { + throw new RuntimeException("Execution excetion", e); + } + + AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, outcome.getJob().getId()); + if(jobVo.getResultCode() == JobInfo.Status.SUCCEEDED.ordinal()) { + + NicProfile nic = (NicProfile)JobSerializerHelper.fromObjectSerializedString(jobVo.getResult()); + return nic; + } else { + Throwable jobException = retriveExecutionException(outcome.getJob()); + if(jobException != null) { + if(jobException instanceof ResourceUnavailableException) + throw (ResourceUnavailableException)jobException; + else if(jobException instanceof ConcurrentOperationException) + throw (ConcurrentOperationException)jobException; + else if(jobException instanceof InsufficientCapacityException) + throw (InsufficientCapacityException)jobException; + else if(jobException instanceof RuntimeException) + throw (RuntimeException)jobException; + } + throw new RuntimeException("Job failed with unhandled exception"); + } + } + } + + @Override + public NicProfile orchestrateAddVmToNetwork(VirtualMachine vm, Network network, NicProfile requested) throws ConcurrentOperationException, ResourceUnavailableException, InsufficientCapacityException { CallContext cctx = CallContext.current(); @@ -2909,9 +3184,48 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac NicTO nicTO = hvGuru.toNicTO(nic); return nicTO; } + + public boolean removeNicFromVm(VirtualMachine vm, Nic nic) + throws ConcurrentOperationException, ResourceUnavailableException { + + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); + if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) { + // avoid re-entrance + return orchestrateRemoveNicFromVm(vm, nic); + } else { + Outcome outcome = removeNicFromVmThroughJobQueue(vm, nic); + + try { + outcome.get(); + } catch (InterruptedException e) { + throw new RuntimeException("Operation is interrupted", e); + } catch (java.util.concurrent.ExecutionException e) { + throw new RuntimeException("Execution excetion", e); + } + + AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, outcome.getJob().getId()); + + if(jobVo.getResultCode() == JobInfo.Status.SUCCEEDED.ordinal()) { + Boolean result = (Boolean)JobSerializerHelper.fromObjectSerializedString(jobVo.getResult()); + return result; + } else { + Throwable jobException = retriveExecutionException(outcome.getJob()); + if(jobException != null) { + if(jobException instanceof ResourceUnavailableException) + throw (ResourceUnavailableException)jobException; + else if(jobException instanceof ConcurrentOperationException) + throw (ConcurrentOperationException)jobException; + else if(jobException instanceof RuntimeException) + throw (RuntimeException)jobException; + } + + throw new RuntimeException("Job failed with un-handled exception"); + } + } + } @Override - public boolean removeNicFromVm(VirtualMachine vm, Nic nic) throws ConcurrentOperationException, ResourceUnavailableException { + public boolean orchestrateRemoveNicFromVm(VirtualMachine vm, Nic nic) throws ConcurrentOperationException, ResourceUnavailableException { CallContext cctx = CallContext.current(); VMInstanceVO vmVO = _vmDao.findById(vm.getId()); NetworkVO network = _networkDao.findById(nic.getNetworkId()); @@ -2971,6 +3285,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Override @DB public boolean removeVmFromNetwork(VirtualMachine vm, Network network, URI broadcastUri) throws ConcurrentOperationException, ResourceUnavailableException { + // TODO will serialize on the VM object later to resolve operation conflicts + return orchestrateRemoveVmFromNetwork(vm, network, broadcastUri); + } + + @Override + @DB + public boolean orchestrateRemoveVmFromNetwork(VirtualMachine vm, Network network, URI broadcastUri) throws ConcurrentOperationException, ResourceUnavailableException { CallContext cctx = CallContext.current(); VMInstanceVO vmVO = _vmDao.findById(vm.getId()); ReservationContext context = new ReservationContextImpl(null, null, cctx.getCallingUser(), cctx.getCallingAccount()); @@ -3106,10 +3427,40 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac throw e; } } + + @Override + public void migrateForScale(String vmUuid, long srcHostId, DeployDestination dest, Long oldSvcOfferingId) + throws ResourceUnavailableException, ConcurrentOperationException { + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); + if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) { + // avoid re-entrance + orchestrateMigrateForScale(vmUuid, srcHostId, dest, oldSvcOfferingId); + } else { + Outcome outcome = migrateVmForScaleThroughJobQueue(vmUuid, srcHostId, dest, oldSvcOfferingId); + + try { + VirtualMachine vm = outcome.get(); + } catch (InterruptedException e) { + throw new RuntimeException("Operation is interrupted", e); + } catch (java.util.concurrent.ExecutionException e) { + throw new RuntimeException("Execution excetion", e); + } + + Throwable jobException = retriveExecutionException(outcome.getJob()); + if(jobException != null) { + if(jobException instanceof ResourceUnavailableException) + throw (ResourceUnavailableException)jobException; + else if(jobException instanceof ConcurrentOperationException) + throw (ConcurrentOperationException)jobException; + } + } + } @Override - public void migrateForScale(String vmUuid, long srcHostId, DeployDestination dest, Long oldSvcOfferingId) throws ResourceUnavailableException, ConcurrentOperationException { - VMInstanceVO vm = _vmDao.findByUuid(vmUuid); + public void orchestrateMigrateForScale(String vmUuid, long srcHostId, DeployDestination dest, Long oldSvcOfferingId) + throws ResourceUnavailableException, ConcurrentOperationException { + + VMInstanceVO vm = _vmDao.findByUuid(vmUuid); s_logger.info("Migrating " + vm + " to " + dest); vm.getServiceOfferingId(); @@ -3293,7 +3644,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } public boolean unplugNic(Network network, NicTO nic, VirtualMachineTO vm, ReservationContext context, DeployDestination dest) throws ConcurrentOperationException, - ResourceUnavailableException { + ResourceUnavailableException { boolean result = true; VMInstanceVO router = _vmDao.findById(vm.getId()); @@ -3324,9 +3675,46 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac return result; } + + public VMInstanceVO reConfigureVm(String vmUuid, ServiceOffering oldServiceOffering, + boolean reconfiguringOnExistingHost) + throws ResourceUnavailableException, ConcurrentOperationException { + AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); + if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) { + // avoid re-entrance + return orchestrateReConfigureVm(vmUuid, oldServiceOffering, reconfiguringOnExistingHost); + } else { + Outcome outcome = reconfigureVmThroughJobQueue(vmUuid, oldServiceOffering, reconfiguringOnExistingHost); + + VirtualMachine vm = null; + try { + vm = outcome.get(); + } catch (InterruptedException e) { + throw new RuntimeException("Operation is interrupted", e); + } catch (java.util.concurrent.ExecutionException e) { + throw new RuntimeException("Execution excetion", e); + } + + AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, outcome.getJob().getId()); + if(jobVo.getResultCode() == JobInfo.Status.SUCCEEDED.ordinal()) { + return _entityMgr.findById(VMInstanceVO.class, vm.getId()); + } else { + Throwable jobException = retriveExecutionException(outcome.getJob()); + if(jobException != null) { + if(jobException instanceof ResourceUnavailableException) + throw (ResourceUnavailableException)jobException; + else if(jobException instanceof ConcurrentOperationException) + throw (ConcurrentOperationException)jobException; + } + + throw new RuntimeException("Failed with un-handled exception"); + } + } + } + @Override - public VMInstanceVO reConfigureVm(String vmUuid, ServiceOffering oldServiceOffering, boolean reconfiguringOnExistingHost) throws ResourceUnavailableException, + public VMInstanceVO orchestrateReConfigureVm(String vmUuid, ServiceOffering oldServiceOffering, boolean reconfiguringOnExistingHost) throws ResourceUnavailableException, ConcurrentOperationException { VMInstanceVO vm = _vmDao.findByUuid(vmUuid); @@ -3388,7 +3776,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Override public ConfigKey[] getConfigKeys() { return new ConfigKey[] {ClusterDeltaSyncInterval, StartRetry, VmDestroyForcestop, VmOpCancelInterval, VmOpCleanupInterval, VmOpCleanupWait, VmOpLockStateRetry, - VmOpWaitInterval, ExecuteInSequence}; + VmOpWaitInterval, ExecuteInSequence, VmJobCheckInterval, VmJobTimeout, VmJobStateReportInterval}; } public List getStoragePoolAllocators() { @@ -3400,4 +3788,920 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac _storagePoolAllocators = storagePoolAllocators; } + + // + // PowerState report handling for out-of-band changes and handling of left-over transitional VM states + // + + @MessageHandler(topic = Topics.VM_POWER_STATE) + private void HandlePownerStateReport(Object target, String subject, String senderAddress, Object args) { + assert(args != null); + Long vmId = (Long)args; + + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( + VirtualMachine.Type.Instance, vmId); + if(pendingWorkJobs.size() == 0) { + // there is no pending operation job + VMInstanceVO vm = _vmDao.findById(vmId); + if(vm != null) { + switch(vm.getPowerState()) { + case PowerOn : + handlePowerOnReportWithNoPendingJobsOnVM(vm); + break; + + case PowerOff : + handlePowerOffReportWithNoPendingJobsOnVM(vm); + break; + + // PowerUnknown shouldn't be reported, it is a derived + // VM power state from host state (host un-reachable + case PowerUnknown : + default : + assert(false); + break; + } + } else { + s_logger.warn("VM " + vmId + " no longer exists when processing VM state report"); + } + } else { + // TODO, do job wake-up signalling, since currently async job wake-up is not in use + // we will skip it for nows + } + } + + private void handlePowerOnReportWithNoPendingJobsOnVM(VMInstanceVO vm) { + // + // 1) handle left-over transitional VM states + // 2) handle out of band VM live migration + // 3) handle out of sync stationary states, marking VM from Stopped to Running with + // alert messages + // + switch(vm.getState()) { + case Starting : + try { + stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId()); + } catch(NoTransitionException e) { + s_logger.warn("Unexpected VM state transition exception, race-condition?", e); + } + + // we need to alert admin or user about this risky state transition + _alertMgr.sendAlert(AlertManager.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(), + VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (Starting -> Running) from out-of-context transition. VM network environment may need to be reset"); + break; + + case Running : + try { + if(vm.getHostId() != null && vm.getHostId().longValue() != vm.getPowerHostId().longValue()) + s_logger.info("Detected out of band VM migration from host " + vm.getHostId() + " to host " + vm.getPowerHostId()); + stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId()); + } catch(NoTransitionException e) { + s_logger.warn("Unexpected VM state transition exception, race-condition?", e); + } + break; + + case Stopping : + case Stopped : + try { + stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId()); + } catch(NoTransitionException e) { + s_logger.warn("Unexpected VM state transition exception, race-condition?", e); + } + _alertMgr.sendAlert(AlertManager.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(), + VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (" + vm.getState() + " -> Running) from out-of-context transition. VM network environment may need to be reset"); + break; + + case Destroyed : + case Expunging : + s_logger.info("Receive power on report when VM is in destroyed or expunging state. vm: " + + vm.getId() + ", state: " + vm.getState()); + break; + + case Migrating : + try { + stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId()); + } catch(NoTransitionException e) { + s_logger.warn("Unexpected VM state transition exception, race-condition?", e); + } + break; + + case Error : + default : + s_logger.info("Receive power on report when VM is in error or unexpected state. vm: " + + vm.getId() + ", state: " + vm.getState()); + break; + } + } + + private void handlePowerOffReportWithNoPendingJobsOnVM(VMInstanceVO vm) { + + // 1) handle left-over transitional VM states + // 2) handle out of sync stationary states, schedule force-stop to release resources + // + switch(vm.getState()) { + case Starting : + case Stopping : + case Stopped : + case Migrating : + try { + stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOffReport, vm.getPowerHostId()); + } catch(NoTransitionException e) { + s_logger.warn("Unexpected VM state transition exception, race-condition?", e); + } + _alertMgr.sendAlert(AlertManager.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(), + VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (" + vm.getState() + " -> Stopped) from out-of-context transition."); + // TODO: we need to forcely release all resource allocation + break; + + case Running : + case Destroyed : + case Expunging : + break; + + case Error : + default : + break; + } + } + + private void scanStalledVMInTransitionStateOnUpHost(long hostId) { + // + // Check VM that is stuck in Starting, Stopping, Migrating states, we won't check + // VMs in expunging state (this need to be handled specially) + // + // checking condition + // 1) no pending VmWork job + // 2) on hostId host and host is UP + // + // When host is UP, soon or later we will get a report from the host about the VM, + // however, if VM is missing from the host report (it may happen in out of band changes + // or from designed behave of XS/KVM), the VM may not get a chance to run the state-sync logic + // + // Therefor, we will scan thoses VMs on UP host based on last update timestamp, if the host is UP + // and a VM stalls for status update, we will consider them to be powered off + // (which is relatively safe to do so) + + long stallThresholdInMs = VmJobStateReportInterval.value() + (VmJobStateReportInterval.value() >> 1); + Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - stallThresholdInMs); + List mostlikelyStoppedVMs = listStalledVMInTransitionStateOnUpHost(hostId, cutTime); + for(Long vmId : mostlikelyStoppedVMs) { + VMInstanceVO vm = _vmDao.findById(vmId); + assert(vm != null); + handlePowerOffReportWithNoPendingJobsOnVM(vm); + } + + List vmsWithRecentReport = listVMInTransitionStateWithRecentReportOnUpHost(hostId, cutTime); + for(Long vmId : vmsWithRecentReport) { + VMInstanceVO vm = _vmDao.findById(vmId); + assert(vm != null); + if(vm.getPowerState() == PowerState.PowerOn) + handlePowerOnReportWithNoPendingJobsOnVM(vm); + else + handlePowerOffReportWithNoPendingJobsOnVM(vm); + } + } + + private void scanStalledVMInTransitionStateOnDisconnectedHosts() { + Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - VmOpWaitInterval.value()*1000); + List stuckAndUncontrollableVMs = listStalledVMInTransitionStateOnDisconnectedHosts(cutTime); + for(Long vmId : stuckAndUncontrollableVMs) { + VMInstanceVO vm = _vmDao.findById(vmId); + + // We now only alert administrator about this situation + _alertMgr.sendAlert(AlertManager.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(), + VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") is stuck in " + vm.getState() + " state and its host is unreachable for too long"); + } + } + + + // VMs that in transitional state without recent power state report + private List listStalledVMInTransitionStateOnUpHost(long hostId, Date cutTime) { + String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status = 'UP' " + + "AND h.id = ? AND i.power_state_update_time < ? AND i.host_id = h.id " + + "AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " + + "AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)"; + + List l = new ArrayList(); + TransactionLegacy txn = null; + try { + txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB); + + PreparedStatement pstmt = null; + try { + pstmt = txn.prepareAutoCloseStatement(sql); + + pstmt.setLong(1, hostId); + pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime)); + pstmt.setInt(3, JobInfo.Status.IN_PROGRESS.ordinal()); + ResultSet rs = pstmt.executeQuery(); + while(rs.next()) { + l.add(rs.getLong(1)); + } + } catch (SQLException e) { + } catch (Throwable e) { + } + + } finally { + if(txn != null) + txn.close(); + } + return l; + } + + // VMs that in transitional state and recently have power state update + private List listVMInTransitionStateWithRecentReportOnUpHost(long hostId, Date cutTime) { + String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status = 'UP' " + + "AND h.id = ? AND i.power_state_update_time > ? AND i.host_id = h.id " + + "AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " + + "AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)"; + + List l = new ArrayList(); + TransactionLegacy txn = null; + try { + txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB); + PreparedStatement pstmt = null; + try { + pstmt = txn.prepareAutoCloseStatement(sql); + + pstmt.setLong(1, hostId); + pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime)); + pstmt.setInt(3, JobInfo.Status.IN_PROGRESS.ordinal()); + ResultSet rs = pstmt.executeQuery(); + while(rs.next()) { + l.add(rs.getLong(1)); + } + } catch (SQLException e) { + } catch (Throwable e) { + } + return l; + } finally { + if(txn != null) + txn.close(); + } + } + + private List listStalledVMInTransitionStateOnDisconnectedHosts(Date cutTime) { + String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status != 'UP' " + + "AND i.power_state_update_time < ? AND i.host_id = h.id " + + "AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " + + "AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)"; + + List l = new ArrayList(); + TransactionLegacy txn = null; + try { + txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB); + PreparedStatement pstmt = null; + try { + pstmt = txn.prepareAutoCloseStatement(sql); + + pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime)); + pstmt.setInt(2, JobInfo.Status.IN_PROGRESS.ordinal()); + ResultSet rs = pstmt.executeQuery(); + while(rs.next()) { + l.add(rs.getLong(1)); + } + } catch (SQLException e) { + } catch (Throwable e) { + } + return l; + } finally { + if(txn != null) + txn.close(); + } + } + + // + // VM operation based on new sync model + // + + public class VmStateSyncOutcome extends OutcomeImpl { + private long _vmId; + + public VmStateSyncOutcome(final AsyncJob job, final PowerState desiredPowerState, final long vmId, final Long srcHostIdForMigration) { + super(VirtualMachine.class, job, VmJobCheckInterval.value(), new Predicate() { + @Override + public boolean checkCondition() { + VMInstanceVO instance = _vmDao.findById(vmId); + if (instance.getPowerState() == desiredPowerState && (srcHostIdForMigration != null && instance.getPowerHostId() != srcHostIdForMigration)) + return true; + return false; + } + }, Topics.VM_POWER_STATE, AsyncJob.Topics.JOB_STATE); + _vmId = vmId; + } + + @Override + protected VirtualMachine retrieve() { + return _vmDao.findById(_vmId); + } + } + + public class VmJobSyncOutcome extends OutcomeImpl { + private long _vmId; + + public VmJobSyncOutcome(final AsyncJob job, final long vmId) { + super(VirtualMachine.class, job, VmJobCheckInterval.value(), new Predicate() { + @Override + public boolean checkCondition() { + AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, job.getId()); + assert(jobVo != null); + if(jobVo == null || jobVo.getStatus() != JobInfo.Status.IN_PROGRESS) + return true; + + return false; + } + }, AsyncJob.Topics.JOB_STATE); + _vmId = vmId; + } + + @Override + protected VirtualMachine retrieve() { + return _vmDao.findById(_vmId); + } + } + + public Throwable retriveExecutionException(AsyncJob job) { + assert(job != null); + assert(job.getDispatcher().equals(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)); + + AsyncJobVO jobVo = this._entityMgr.findById(AsyncJobVO.class, job.getId()); + if(jobVo != null && jobVo.getResult() != null) { + Object obj = JobSerializerHelper.fromSerializedString(job.getResult()); + + if(obj != null && obj instanceof Throwable) + return (Throwable)obj; + } + return null; + } + + public Outcome startVmThroughJobQueue(final String vmUuid, + final Map params, + final DeploymentPlan planToDeploy) { + + final CallContext context = CallContext.current(); + final User callingUser = context.getCallingUser(); + final Account callingAccount = context.getCallingAccount(); + + final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); + + Transaction.execute(new TransactionCallbackNoReturn () { + public void doInTransactionWithoutResult(TransactionStatus status) { + VmWorkJobVO workJob = null; + + _vmDao.lockRow(vm.getId(), true); + List pendingWorkJobs = _workJobDao.listPendingWorkJobs(VirtualMachine.Type.Instance, + vm.getId(), VmWorkStart.class.getName()); + + if (pendingWorkJobs.size() > 0) { + assert (pendingWorkJobs.size() == 1); + workJob = pendingWorkJobs.get(0); + } else { + workJob = new VmWorkJobVO(context.getContextId()); + + workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkStart.class.getName()); + + workJob.setAccountId(callingAccount.getId()); + workJob.setUserId(callingUser.getId()); + workJob.setStep(VmWorkJobVO.Step.Starting); + workJob.setVmType(vm.getType()); + workJob.setVmInstanceId(vm.getId()); + + // save work context info (there are some duplications) + VmWorkStart workInfo = new VmWorkStart(callingUser.getId(), callingAccount.getId(), vm.getId()); + workInfo.setPlan(planToDeploy); + workInfo.setParams(params); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + + _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId()); + } + + // Transaction syntax sugar has a cost here + context.putContextParameter("workJob", workJob); + context.putContextParameter("jobId", new Long(vm.getId())); + } + }); + + final long jobId = (Long)context.getContextParameter("jobId"); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + + return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), + VirtualMachine.PowerState.PowerOn, vm.getId(), null); + } + + public Outcome stopVmThroughJobQueue(final String vmUuid, final boolean cleanup) { + final CallContext context = CallContext.current(); + final Account account = context.getCallingAccount(); + final User user = context.getCallingUser(); + + final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); + + Transaction.execute(new TransactionCallbackNoReturn () { + public void doInTransactionWithoutResult(TransactionStatus status) { + _vmDao.lockRow(vm.getId(), true); + + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( + VirtualMachine.Type.Instance, vm.getId(), + VmWorkStop.class.getName()); + + VmWorkJobVO workJob = null; + if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { + assert (pendingWorkJobs.size() == 1); + workJob = pendingWorkJobs.get(0); + } else { + workJob = new VmWorkJobVO(context.getContextId()); + + workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkStop.class.getName()); + + workJob.setAccountId(account.getId()); + workJob.setUserId(user.getId()); + workJob.setStep(VmWorkJobVO.Step.Prepare); + workJob.setVmType(vm.getType()); + workJob.setVmInstanceId(vm.getId()); + + // save work context info (there are some duplications) + VmWorkStop workInfo = new VmWorkStop(user.getId(), account.getId(), vm.getId(), cleanup); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + + _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId()); + } + + context.putContextParameter("workJob", workJob); + context.putContextParameter("jobId", new Long(vm.getId())); + } + }); + + final long jobId = (Long)context.getContextParameter("jobId"); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + + return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), + VirtualMachine.PowerState.PowerOff, vm.getId(), null); + } + + public Outcome rebootVmThroughJobQueue(final String vmUuid, + final Map params) { + + final CallContext context = CallContext.current(); + final Account account = context.getCallingAccount(); + final User user = context.getCallingUser(); + + final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); + + Transaction.execute(new TransactionCallbackNoReturn () { + public void doInTransactionWithoutResult(TransactionStatus status) { + _vmDao.lockRow(vm.getId(), true); + + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( + VirtualMachine.Type.Instance, vm.getId(), + VmWorkReboot.class.getName()); + + VmWorkJobVO workJob = null; + if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { + assert (pendingWorkJobs.size() == 1); + workJob = pendingWorkJobs.get(0); + } else { + workJob = new VmWorkJobVO(context.getContextId()); + + workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkReboot.class.getName()); + + workJob.setAccountId(account.getId()); + workJob.setUserId(user.getId()); + workJob.setStep(VmWorkJobVO.Step.Prepare); + workJob.setVmType(vm.getType()); + workJob.setVmInstanceId(vm.getId()); + + // save work context info (there are some duplications) + VmWorkReboot workInfo = new VmWorkReboot(user.getId(), account.getId(), vm.getId(), params); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + + _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId()); + } + + context.putContextParameter("workJob", workJob); + context.putContextParameter("jobId", new Long(vm.getId())); + } + }); + + final long jobId = (Long)context.getContextParameter("jobId"); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + + return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), + vm.getId()); + } + + public Outcome migrateVmThroughJobQueue(final String vmUuid, final long srcHostId, final DeployDestination dest) { + final CallContext context = CallContext.current(); + final User user = context.getCallingUser(); + final Account account = context.getCallingAccount(); + + final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); + + Transaction.execute(new TransactionCallbackNoReturn () { + public void doInTransactionWithoutResult(TransactionStatus status) { + + _vmDao.lockRow(vm.getId(), true); + + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( + VirtualMachine.Type.Instance, vm.getId(), + VmWorkMigrate.class.getName()); + + VmWorkJobVO workJob = null; + if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { + assert (pendingWorkJobs.size() == 1); + workJob = pendingWorkJobs.get(0); + } else { + + workJob = new VmWorkJobVO(context.getContextId()); + + workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkMigrate.class.getName()); + + workJob.setAccountId(account.getId()); + workJob.setUserId(user.getId()); + workJob.setVmType(vm.getType()); + workJob.setVmInstanceId(vm.getId()); + + // save work context info (there are some duplications) + VmWorkMigrate workInfo = new VmWorkMigrate(user.getId(), account.getId(), vm.getId(), srcHostId, dest); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + + _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId()); + } + context.putContextParameter("workJob", workJob); + context.putContextParameter("jobId", new Long(vm.getId())); + } + }); + + final long jobId = (Long)context.getContextParameter("jobId"); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + + return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), + VirtualMachine.PowerState.PowerOn, vm.getId(), vm.getPowerHostId()); + } + + public Outcome migrateVmWithStorageThroughJobQueue( + final String vmUuid, final long srcHostId, final long destHostId, + final Map volumeToPool) { + + final CallContext context = CallContext.current(); + final User user = context.getCallingUser(); + final Account account = context.getCallingAccount(); + + final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); + + Transaction.execute(new TransactionCallbackNoReturn () { + public void doInTransactionWithoutResult(TransactionStatus status) { + + _vmDao.lockRow(vm.getId(), true); + + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( + VirtualMachine.Type.Instance, vm.getId(), + VmWorkMigrateWithStorage.class.getName()); + + VmWorkJobVO workJob = null; + if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { + assert (pendingWorkJobs.size() == 1); + workJob = pendingWorkJobs.get(0); + } else { + + workJob = new VmWorkJobVO(context.getContextId()); + + workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkMigrate.class.getName()); + + workJob.setAccountId(account.getId()); + workJob.setUserId(user.getId()); + workJob.setVmType(vm.getType()); + workJob.setVmInstanceId(vm.getId()); + + // save work context info (there are some duplications) + VmWorkMigrateWithStorage workInfo = new VmWorkMigrateWithStorage(user.getId(), account.getId(), vm.getId(), + srcHostId, destHostId, volumeToPool); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + + _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId()); + } + context.putContextParameter("workJob", workJob); + context.putContextParameter("jobId", new Long(vm.getId())); + } + }); + + final long jobId = (Long)context.getContextParameter("jobId"); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + + return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), + VirtualMachine.PowerState.PowerOn, vm.getId(), destHostId); + } + + // + // TODO build a common pattern to reduce code duplication in following methods + // no time for this at current iteration + // + public Outcome migrateVmForScaleThroughJobQueue( + final String vmUuid, final long srcHostId, final DeployDestination dest, final Long newSvcOfferingId) { + + final CallContext context = CallContext.current(); + final User user = context.getCallingUser(); + final Account account = context.getCallingAccount(); + + final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); + + Transaction.execute(new TransactionCallbackNoReturn () { + public void doInTransactionWithoutResult(TransactionStatus status) { + + _vmDao.lockRow(vm.getId(), true); + + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( + VirtualMachine.Type.Instance, vm.getId(), + VmWorkMigrateForScale.class.getName()); + + VmWorkJobVO workJob = null; + if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { + assert (pendingWorkJobs.size() == 1); + workJob = pendingWorkJobs.get(0); + } else { + + workJob = new VmWorkJobVO(context.getContextId()); + + workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkMigrate.class.getName()); + + workJob.setAccountId(account.getId()); + workJob.setUserId(user.getId()); + workJob.setVmType(vm.getType()); + workJob.setVmInstanceId(vm.getId()); + + // save work context info (there are some duplications) + VmWorkMigrateForScale workInfo = new VmWorkMigrateForScale(user.getId(), account.getId(), vm.getId(), + srcHostId, dest, newSvcOfferingId); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + + _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId()); + } + context.putContextParameter("workJob", workJob); + context.putContextParameter("jobId", new Long(vm.getId())); + } + }); + + final long jobId = (Long)context.getContextParameter("jobId"); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + + return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId()); + } + + public Outcome migrateVmStorageThroughJobQueue( + final String vmUuid, final StoragePool destPool) { + + final CallContext context = CallContext.current(); + final User user = context.getCallingUser(); + final Account account = context.getCallingAccount(); + + final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); + + Transaction.execute(new TransactionCallbackNoReturn () { + public void doInTransactionWithoutResult(TransactionStatus status) { + + _vmDao.lockRow(vm.getId(), true); + + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( + VirtualMachine.Type.Instance, vm.getId(), + VmWorkStorageMigration.class.getName()); + + VmWorkJobVO workJob = null; + if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { + assert (pendingWorkJobs.size() == 1); + workJob = pendingWorkJobs.get(0); + } else { + + workJob = new VmWorkJobVO(context.getContextId()); + + workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkStorageMigration.class.getName()); + + workJob.setAccountId(account.getId()); + workJob.setUserId(user.getId()); + workJob.setVmType(vm.getType()); + workJob.setVmInstanceId(vm.getId()); + + // save work context info (there are some duplications) + VmWorkStorageMigration workInfo = new VmWorkStorageMigration(user.getId(), account.getId(), vm.getId(), + destPool); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + + _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId()); + } + context.putContextParameter("workJob", workJob); + context.putContextParameter("jobId", new Long(vm.getId())); + } + }); + + final long jobId = (Long)context.getContextParameter("jobId"); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + + return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId()); + } + + public Outcome addVmToNetworkThroughJobQueue( + final VirtualMachine vm, final Network network, final NicProfile requested) { + + final CallContext context = CallContext.current(); + final User user = context.getCallingUser(); + final Account account = context.getCallingAccount(); + + Transaction.execute(new TransactionCallbackNoReturn () { + public void doInTransactionWithoutResult(TransactionStatus status) { + + _vmDao.lockRow(vm.getId(), true); + + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( + VirtualMachine.Type.Instance, vm.getId(), + VmWorkAddVmToNetwork.class.getName()); + + VmWorkJobVO workJob = null; + if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { + assert (pendingWorkJobs.size() == 1); + workJob = pendingWorkJobs.get(0); + } else { + + workJob = new VmWorkJobVO(context.getContextId()); + + workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkAddVmToNetwork.class.getName()); + + workJob.setAccountId(account.getId()); + workJob.setUserId(user.getId()); + workJob.setVmType(vm.getType()); + workJob.setVmInstanceId(vm.getId()); + + // save work context info (there are some duplications) + VmWorkAddVmToNetwork workInfo = new VmWorkAddVmToNetwork(user.getId(), account.getId(), vm.getId(), + network, requested); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + + _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId()); + } + context.putContextParameter("workJob", workJob); + context.putContextParameter("jobId", new Long(vm.getId())); + } + }); + + final long jobId = (Long)context.getContextParameter("jobId"); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + + return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId()); + } + + public Outcome removeNicFromVmThroughJobQueue( + final VirtualMachine vm, final Nic nic) { + + final CallContext context = CallContext.current(); + final User user = context.getCallingUser(); + final Account account = context.getCallingAccount(); + + Transaction.execute(new TransactionCallbackNoReturn () { + public void doInTransactionWithoutResult(TransactionStatus status) { + + _vmDao.lockRow(vm.getId(), true); + + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( + VirtualMachine.Type.Instance, vm.getId(), + VmWorkRemoveNicFromVm.class.getName()); + + VmWorkJobVO workJob = null; + if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { + assert (pendingWorkJobs.size() == 1); + workJob = pendingWorkJobs.get(0); + } else { + + workJob = new VmWorkJobVO(context.getContextId()); + + workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkRemoveNicFromVm.class.getName()); + + workJob.setAccountId(account.getId()); + workJob.setUserId(user.getId()); + workJob.setVmType(vm.getType()); + workJob.setVmInstanceId(vm.getId()); + + // save work context info (there are some duplications) + VmWorkRemoveNicFromVm workInfo = new VmWorkRemoveNicFromVm(user.getId(), account.getId(), vm.getId(), + nic); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + + _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId()); + } + context.putContextParameter("workJob", workJob); + context.putContextParameter("jobId", new Long(vm.getId())); + } + }); + + final long jobId = (Long)context.getContextParameter("jobId"); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + + return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId()); + } + + public Outcome removeVmFromNetworkThroughJobQueue( + final VirtualMachine vm, final Network network, final URI broadcastUri) { + + final CallContext context = CallContext.current(); + final User user = context.getCallingUser(); + final Account account = context.getCallingAccount(); + + Transaction.execute(new TransactionCallbackNoReturn () { + public void doInTransactionWithoutResult(TransactionStatus status) { + + _vmDao.lockRow(vm.getId(), true); + + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( + VirtualMachine.Type.Instance, vm.getId(), + VmWorkRemoveVmFromNetwork.class.getName()); + + VmWorkJobVO workJob = null; + if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { + assert (pendingWorkJobs.size() == 1); + workJob = pendingWorkJobs.get(0); + } else { + + workJob = new VmWorkJobVO(context.getContextId()); + + workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkRemoveVmFromNetwork.class.getName()); + + workJob.setAccountId(account.getId()); + workJob.setUserId(user.getId()); + workJob.setVmType(vm.getType()); + workJob.setVmInstanceId(vm.getId()); + + // save work context info (there are some duplications) + VmWorkRemoveVmFromNetwork workInfo = new VmWorkRemoveVmFromNetwork(user.getId(), account.getId(), vm.getId(), + network, broadcastUri); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + + _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId()); + } + context.putContextParameter("workJob", workJob); + context.putContextParameter("jobId", new Long(vm.getId())); + } + }); + + final long jobId = (Long)context.getContextParameter("jobId"); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + + return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId()); + } + + public Outcome reconfigureVmThroughJobQueue( + final String vmUuid, final ServiceOffering oldServiceOffering, final boolean reconfiguringOnExistingHost) { + + final CallContext context = CallContext.current(); + final User user = context.getCallingUser(); + final Account account = context.getCallingAccount(); + + final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); + + Transaction.execute(new TransactionCallbackNoReturn () { + public void doInTransactionWithoutResult(TransactionStatus status) { + + _vmDao.lockRow(vm.getId(), true); + + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( + VirtualMachine.Type.Instance, vm.getId(), + VmWorkReconfigure.class.getName()); + + VmWorkJobVO workJob = null; + if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { + assert (pendingWorkJobs.size() == 1); + workJob = pendingWorkJobs.get(0); + } else { + + workJob = new VmWorkJobVO(context.getContextId()); + + workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkReconfigure.class.getName()); + + workJob.setAccountId(account.getId()); + workJob.setUserId(user.getId()); + workJob.setVmType(vm.getType()); + workJob.setVmInstanceId(vm.getId()); + + // save work context info (there are some duplications) + VmWorkReconfigure workInfo = new VmWorkReconfigure(user.getId(), account.getId(), vm.getId(), + oldServiceOffering, reconfiguringOnExistingHost); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + + _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId()); + } + context.putContextParameter("workJob", workJob); + context.putContextParameter("jobId", new Long(vm.getId())); + } + }); + + final long jobId = (Long)context.getContextParameter("jobId"); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + + return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId()); + } + } diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java b/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java index 7a23ddd81e9..dacc8d09067 100644 --- a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java +++ b/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java @@ -19,7 +19,6 @@ package com.cloud.vm; import java.util.Map; import com.cloud.agent.api.HostVmStateReportEntry; -import com.cloud.vm.VirtualMachine.PowerState; public interface VirtualMachinePowerStateSync { @@ -28,5 +27,5 @@ public interface VirtualMachinePowerStateSync { void processHostVmStateReport(long hostId, Map report); // to adapt legacy ping report - void processHostVmStatePingReport(long hostId, Map report); + void processHostVmStatePingReport(long hostId, Map report); } diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java index 9c47727ed51..9aa95013b08 100644 --- a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java +++ b/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java @@ -27,7 +27,6 @@ import org.apache.cloudstack.framework.messagebus.MessageBus; import org.apache.cloudstack.framework.messagebus.PublishScope; import com.cloud.agent.api.HostVmStateReportEntry; -import com.cloud.vm.VirtualMachine.PowerState; import com.cloud.vm.dao.VMInstanceDao; public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStateSync { @@ -56,11 +55,11 @@ public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStat } @Override - public void processHostVmStatePingReport(long hostId, Map report) { + public void processHostVmStatePingReport(long hostId, Map report) { if(s_logger.isDebugEnabled()) s_logger.debug("Process host VM state report from ping process. host: " + hostId); - Map translatedInfo = convertHostPingInfos(report); + Map translatedInfo = convertToInfos(report); processReport(hostId, translatedInfo); } @@ -80,25 +79,6 @@ public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStat } } } - - private Map convertHostPingInfos(Map states) { - final HashMap map = new HashMap(); - if (states == null) { - return map; - } - - for (Map.Entry entry : states.entrySet()) { - VMInstanceVO vm = findVM(entry.getKey()); - if(vm != null) { - map.put(vm.getId(), entry.getValue()); - break; - } else { - s_logger.info("Unable to find matched VM in CloudStack DB. name: " + entry.getKey()); - } - } - - return map; - } private Map convertToInfos(Map states) { final HashMap map = new HashMap(); diff --git a/engine/orchestration/src/com/cloud/vm/VmWorkAddVmToNetwork.java b/engine/orchestration/src/com/cloud/vm/VmWorkAddVmToNetwork.java new file mode 100644 index 00000000000..3590c0d21f3 --- /dev/null +++ b/engine/orchestration/src/com/cloud/vm/VmWorkAddVmToNetwork.java @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.vm; + +import com.cloud.network.Network; + +public class VmWorkAddVmToNetwork extends VmWork { + private static final long serialVersionUID = 8861516006586736813L; + + Network network; + NicProfile requstedNicProfile; + + public VmWorkAddVmToNetwork(long userId, long accountId, long vmId, + Network network, NicProfile requested) { + super(userId, accountId, vmId); + + this.network = network; + this.requstedNicProfile = requested; + } + + public Network getNetwork() { + return this.network; + } + + public NicProfile getRequestedNicProfile() { + return this.requstedNicProfile; + } +} diff --git a/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java b/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java new file mode 100644 index 00000000000..7c36d8c28d5 --- /dev/null +++ b/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java @@ -0,0 +1,152 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.vm; + +import javax.inject.Inject; + +import org.apache.log4j.Logger; +import org.apache.cloudstack.context.CallContext; +import org.apache.cloudstack.framework.jobs.AsyncJob; +import org.apache.cloudstack.framework.jobs.AsyncJobDispatcher; +import org.apache.cloudstack.framework.jobs.AsyncJobManager; +import org.apache.cloudstack.framework.jobs.impl.JobSerializerHelper; +import org.apache.cloudstack.jobs.JobInfo; + +import com.cloud.utils.component.AdapterBase; +import com.cloud.vm.dao.VMInstanceDao; + +public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatcher { + private static final Logger s_logger = Logger.getLogger(VmWorkJobDispatcher.class); + + public static final String VM_WORK_QUEUE = "VmWorkJobQueue"; + public static final String VM_WORK_JOB_DISPATCHER = "VmWorkJobDispatcher"; + public static final String VM_WORK_JOB_WAKEUP_DISPATCHER = "VmWorkJobWakeupDispatcher"; + + @Inject private VirtualMachineManagerImpl _vmMgr; + @Inject private AsyncJobManager _asyncJobMgr; + @Inject private VMInstanceDao _instanceDao; + + @Override + public void runJob(AsyncJob job) { + VmWork work = null; + try { + String cmd = job.getCmd(); + assert(cmd != null); + + if(s_logger.isDebugEnabled()) + s_logger.debug("Run VM work job: " + cmd); + + Class workClz = null; + try { + workClz = Class.forName(job.getCmd()); + } catch(ClassNotFoundException e) { + s_logger.error("VM work class " + cmd + " is not found", e); + _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, e.getMessage()); + return; + } + + work = VmWorkSerializer.deserialize(workClz, job.getCmdInfo()); + assert(work != null); + if(work == null) { + s_logger.error("Unable to deserialize VM work " + job.getCmd() + ", job info: " + job.getCmdInfo()); + _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, "Unable to deserialize VM work"); + return; + } + + CallContext.register(work.getUserId(), work.getAccountId(), job.getRelated()); + + VMInstanceVO vm = _instanceDao.findById(work.getVmId()); + if (vm == null) { + s_logger.info("Unable to find vm " + work.getVmId()); + } + assert(vm != null); + if(work instanceof VmWorkStart) { + VmWorkStart workStart = (VmWorkStart)work; + _vmMgr.orchestrateStart(vm.getUuid(), workStart.getParams(), workStart.getPlan()); + _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, null); + } else if(work instanceof VmWorkStop) { + VmWorkStop workStop = (VmWorkStop)work; + _vmMgr.orchestrateStop(vm.getUuid(), workStop.isCleanup()); + _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, null); + } else if(work instanceof VmWorkMigrate) { + VmWorkMigrate workMigrate = (VmWorkMigrate)work; + _vmMgr.orchestrateMigrate(vm.getUuid(), workMigrate.getSrcHostId(), workMigrate.getDeployDestination()); + _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, null); + } else if(work instanceof VmWorkMigrateWithStorage) { + VmWorkMigrateWithStorage workMigrateWithStorage = (VmWorkMigrateWithStorage)work; + _vmMgr.orchestrateMigrateWithStorage(vm.getUuid(), + workMigrateWithStorage.getSrcHostId(), + workMigrateWithStorage.getDestHostId(), + workMigrateWithStorage.getVolumeToPool()); + _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, null); + } else if(work instanceof VmWorkMigrateForScale) { + VmWorkMigrateForScale workMigrateForScale = (VmWorkMigrateForScale)work; + _vmMgr.orchestrateMigrateForScale(vm.getUuid(), + workMigrateForScale.getSrcHostId(), + workMigrateForScale.getDeployDestination(), + workMigrateForScale.getNewServiceOfferringId()); + _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, null); + } else if(work instanceof VmWorkReboot) { + VmWorkReboot workReboot = (VmWorkReboot)work; + _vmMgr.orchestrateReboot(vm.getUuid(), workReboot.getParams()); + _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, null); + } else if(work instanceof VmWorkAddVmToNetwork) { + VmWorkAddVmToNetwork workAddVmToNetwork = (VmWorkAddVmToNetwork)work; + NicProfile nic = _vmMgr.orchestrateAddVmToNetwork(vm, workAddVmToNetwork.getNetwork(), + workAddVmToNetwork.getRequestedNicProfile()); + _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, + JobSerializerHelper.toObjectSerializedString(nic)); + } else if(work instanceof VmWorkRemoveNicFromVm) { + VmWorkRemoveNicFromVm workRemoveNicFromVm = (VmWorkRemoveNicFromVm)work; + boolean result = _vmMgr.orchestrateRemoveNicFromVm(vm, workRemoveNicFromVm.getNic()); + _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, + JobSerializerHelper.toObjectSerializedString(new Boolean(result))); + } else if(work instanceof VmWorkRemoveVmFromNetwork) { + VmWorkRemoveVmFromNetwork workRemoveVmFromNetwork = (VmWorkRemoveVmFromNetwork)work; + boolean result = _vmMgr.orchestrateRemoveVmFromNetwork(vm, + workRemoveVmFromNetwork.getNetwork(), workRemoveVmFromNetwork.getBroadcastUri()); + _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, + JobSerializerHelper.toObjectSerializedString(new Boolean(result))); + } else if(work instanceof VmWorkReconfigure) { + VmWorkReconfigure workReconfigure = (VmWorkReconfigure)work; + _vmMgr.reConfigureVm(vm.getUuid(), workReconfigure.getNewServiceOffering(), + workReconfigure.isSameHost()); + _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, null); + } else if(work instanceof VmWorkStorageMigration) { + VmWorkStorageMigration workStorageMigration = (VmWorkStorageMigration)work; + _vmMgr.orchestrateStorageMigration(vm.getUuid(), workStorageMigration.getDestStoragePool()); + _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, null); + } else { + assert(false); + s_logger.error("Unhandled VM work command: " + job.getCmd()); + + RuntimeException e = new RuntimeException("Unsupported VM work command: " + job.getCmd()); + String exceptionJson = JobSerializerHelper.toSerializedString(e); + s_logger.error("Serialize exception object into json: " + exceptionJson); + _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, exceptionJson); + } + } catch(Throwable e) { + s_logger.error("Unable to complete " + job, e); + + String exceptionJson = JobSerializerHelper.toSerializedString(e); + s_logger.info("Serialize exception object into json: " + exceptionJson); + _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, exceptionJson); + } finally { + CallContext.unregister(); + } + } +} diff --git a/engine/orchestration/src/com/cloud/vm/VmWorkMigrate.java b/engine/orchestration/src/com/cloud/vm/VmWorkMigrate.java new file mode 100644 index 00000000000..c313876b433 --- /dev/null +++ b/engine/orchestration/src/com/cloud/vm/VmWorkMigrate.java @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.vm; + +import java.util.HashMap; +import java.util.Map; + +import com.cloud.dc.DataCenter; +import com.cloud.dc.Pod; +import com.cloud.deploy.DeployDestination; +import com.cloud.host.Host; +import com.cloud.org.Cluster; +import com.cloud.storage.StoragePool; +import com.cloud.storage.Volume; +import com.cloud.utils.db.EntityManager; + +public class VmWorkMigrate extends VmWork { + private static final long serialVersionUID = 1689203333114836522L; + + Long zoneId; + Long podId; + Long clusterId; + Long hostId; + private Map storage; + long srcHostId; + + public VmWorkMigrate(long userId, long accountId, long vmId, long srcHostId, DeployDestination dst) { + super(userId, accountId, vmId); + this.srcHostId = srcHostId; + zoneId = dst.getDataCenter() != null ? dst.getDataCenter().getId() : null; + podId = dst.getPod() != null ? dst.getPod().getId() : null; + clusterId = dst.getCluster() != null ? dst.getCluster().getId() : null; + hostId = dst.getHost() != null ? dst.getHost().getId() : null; + if (dst.getStorageForDisks() != null) { + storage = new HashMap(dst.getStorageForDisks().size()); + for (Map.Entry entry : dst.getStorageForDisks().entrySet()) { + storage.put(entry.getKey().getUuid(), entry.getValue().getUuid()); + } + } else { + storage = null; + } + } + + public DeployDestination getDeployDestination() { + DataCenter zone = zoneId != null ? s_entityMgr.findById(DataCenter.class, zoneId) : null; + Pod pod = podId != null ? s_entityMgr.findById(Pod.class, podId) : null; + Cluster cluster = clusterId != null ? s_entityMgr.findById(Cluster.class, clusterId) : null; + Host host = hostId != null ? s_entityMgr.findById(Host.class, hostId) : null; + + Map vols = null; + + if (storage != null) { + vols = new HashMap(storage.size()); + for (Map.Entry entry : storage.entrySet()) { + vols.put(s_entityMgr.findByUuid(Volume.class, entry.getKey()), s_entityMgr.findByUuid(StoragePool.class, entry.getValue())); + } + } + + DeployDestination dest = new DeployDestination(zone, pod, cluster, host, vols); + return dest; + } + + public long getSrcHostId() { + return srcHostId; + } + + static private EntityManager s_entityMgr; + + static public void init(EntityManager entityMgr) { + s_entityMgr = entityMgr; + } +} diff --git a/engine/orchestration/src/com/cloud/vm/VmWorkMigrateForScale.java b/engine/orchestration/src/com/cloud/vm/VmWorkMigrateForScale.java new file mode 100644 index 00000000000..8e71aa84050 --- /dev/null +++ b/engine/orchestration/src/com/cloud/vm/VmWorkMigrateForScale.java @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.vm; + +import com.cloud.deploy.DeployDestination; + +public class VmWorkMigrateForScale extends VmWork { + private static final long serialVersionUID = 6854870395568389613L; + + long srcHostId; + DeployDestination deployDestination; + Long newSvcOfferingId; + + public VmWorkMigrateForScale(long userId, long accountId, long vmId, long srcHostId, + DeployDestination dest, Long newSvcOfferingId) { + + super(userId, accountId, vmId); + this.srcHostId = srcHostId; + this.deployDestination = dest; + this.newSvcOfferingId = newSvcOfferingId; + } + + public long getSrcHostId() { + return srcHostId; + } + + public DeployDestination getDeployDestination() { + return this.deployDestination; + } + + public Long getNewServiceOfferringId() { + return this.newSvcOfferingId; + } +} diff --git a/engine/orchestration/src/com/cloud/vm/VmWorkMigrateWithStorage.java b/engine/orchestration/src/com/cloud/vm/VmWorkMigrateWithStorage.java new file mode 100644 index 00000000000..ae91231a35f --- /dev/null +++ b/engine/orchestration/src/com/cloud/vm/VmWorkMigrateWithStorage.java @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.vm; + +import java.util.Map; + +import com.cloud.storage.StoragePool; +import com.cloud.storage.Volume; + +public class VmWorkMigrateWithStorage extends VmWork { + private static final long serialVersionUID = -5626053872453569165L; + + long srcHostId; + long destHostId; + Map volumeToPool; + + public VmWorkMigrateWithStorage(long userId, long accountId, long vmId, long srcHostId, + long destHostId, Map volumeToPool) { + + super(userId, accountId, vmId); + + this.srcHostId = srcHostId; + this.destHostId = destHostId; + this.volumeToPool = volumeToPool; + } + + public long getSrcHostId() { + return this.srcHostId; + } + + public long getDestHostId() { + return this.destHostId; + } + + public Map getVolumeToPool() { + return this.volumeToPool; + } +} diff --git a/engine/orchestration/src/com/cloud/vm/VmWorkReboot.java b/engine/orchestration/src/com/cloud/vm/VmWorkReboot.java new file mode 100644 index 00000000000..8f50702358a --- /dev/null +++ b/engine/orchestration/src/com/cloud/vm/VmWorkReboot.java @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.vm; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import org.apache.cloudstack.framework.jobs.impl.JobSerializerHelper; + +public class VmWorkReboot extends VmWork { + private static final long serialVersionUID = 195907627459759933L; + + // use serialization friendly map + private Map rawParams; + + public VmWorkReboot(long userId, long accountId, long vmId, Map params) { + super(userId, accountId, vmId); + + setParams(params); + } + + public Map getRawParams() { + return rawParams; + } + + public void setRawParams(Map params) { + rawParams = params; + } + + public Map getParams() { + Map map = new HashMap(); + + if(rawParams != null) { + for(Map.Entry entry : rawParams.entrySet()) { + VirtualMachineProfile.Param key = new VirtualMachineProfile.Param(entry.getKey()); + Object val = JobSerializerHelper.fromObjectSerializedString(entry.getValue()); + map.put(key, val); + } + } + + return map; + } + + public void setParams(Map params) { + if(params != null) { + rawParams = new HashMap(); + for(Map.Entry entry : params.entrySet()) { + rawParams.put(entry.getKey().getName(), JobSerializerHelper.toObjectSerializedString( + entry.getValue() instanceof Serializable ? (Serializable)entry.getValue() : entry.getValue().toString())); + } + } + } +} diff --git a/engine/orchestration/src/com/cloud/vm/VmWorkReconfigure.java b/engine/orchestration/src/com/cloud/vm/VmWorkReconfigure.java new file mode 100644 index 00000000000..48a9df35faa --- /dev/null +++ b/engine/orchestration/src/com/cloud/vm/VmWorkReconfigure.java @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.vm; + +import com.cloud.offering.ServiceOffering; + +public class VmWorkReconfigure extends VmWork { + private static final long serialVersionUID = -4517030323758086615L; + + ServiceOffering newServiceOffering; + boolean sameHost; + + public VmWorkReconfigure(long userId, long accountId, long vmId, + ServiceOffering newServiceOffering, boolean sameHost) { + + super(userId, accountId, vmId); + + this.newServiceOffering = newServiceOffering; + this.sameHost = sameHost; + } + + public ServiceOffering getNewServiceOffering() { + return this.newServiceOffering; + } + + public boolean isSameHost() { + return this.sameHost; + } +} diff --git a/engine/orchestration/src/com/cloud/vm/VmWorkRemoveNicFromVm.java b/engine/orchestration/src/com/cloud/vm/VmWorkRemoveNicFromVm.java new file mode 100644 index 00000000000..b756ac35761 --- /dev/null +++ b/engine/orchestration/src/com/cloud/vm/VmWorkRemoveNicFromVm.java @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.vm; + +public class VmWorkRemoveNicFromVm extends VmWork { + private static final long serialVersionUID = -4265657031064437923L; + + Nic nic; + + public VmWorkRemoveNicFromVm(long userId, long accountId, long vmId, Nic nic) { + super(userId, accountId, vmId); + + this.nic = nic; + } + + public Nic getNic() { + return this.nic; + } +} diff --git a/engine/orchestration/src/com/cloud/vm/VmWorkRemoveVmFromNetwork.java b/engine/orchestration/src/com/cloud/vm/VmWorkRemoveVmFromNetwork.java new file mode 100644 index 00000000000..d4e0ae4ef7b --- /dev/null +++ b/engine/orchestration/src/com/cloud/vm/VmWorkRemoveVmFromNetwork.java @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.vm; + +import java.net.URI; + +import com.cloud.network.Network; + +public class VmWorkRemoveVmFromNetwork extends VmWork { + private static final long serialVersionUID = -5070392905642149925L; + + Network network; + URI broadcastUri; + + public VmWorkRemoveVmFromNetwork(long userId, long accountId, long vmId, Network network, URI broadcastUri) { + super(userId, accountId, vmId); + + this.network = network; + this.broadcastUri = broadcastUri; + } + + public Network getNetwork() { + return this.network; + } + + public URI getBroadcastUri() { + return this.broadcastUri; + } +} diff --git a/engine/orchestration/src/com/cloud/vm/VmWorkSerializer.java b/engine/orchestration/src/com/cloud/vm/VmWorkSerializer.java new file mode 100644 index 00000000000..9a1aaaced67 --- /dev/null +++ b/engine/orchestration/src/com/cloud/vm/VmWorkSerializer.java @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.vm; + +import java.lang.reflect.Type; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.cloudstack.framework.jobs.impl.JobSerializerHelper; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; + +public class VmWorkSerializer { + static class StringMapTypeAdapter implements JsonDeserializer { + + @Override + public Map deserialize(JsonElement src, Type srcType, JsonDeserializationContext context) throws JsonParseException { + + Map obj = new HashMap(); + JsonObject json = src.getAsJsonObject(); + + for (Entry entry : json.entrySet()) { + obj.put(entry.getKey(), entry.getValue().getAsString()); + } + + return obj; + } + } + + protected static Gson s_gson; + static { + GsonBuilder gBuilder = new GsonBuilder(); + gBuilder.setVersion(1.3); + gBuilder.registerTypeAdapter(Map.class, new StringMapTypeAdapter()); + s_gson = gBuilder.create(); + } + + public static String serialize(VmWork work) { + // TODO: there are way many generics, too tedious to get serialization work under GSON + // use java binary serialization instead + // + return JobSerializerHelper.toObjectSerializedString(work); + // return s_gson.toJson(work); + } + + public static T deserialize(Class clazz, String workInJsonText) { + // TODO: there are way many generics, too tedious to get serialization work under GSON + // use java binary serialization instead + // + return (T)JobSerializerHelper.fromObjectSerializedString(workInJsonText); + // return (T)s_gson.fromJson(workInJsonText, clazz); + } +} + diff --git a/engine/orchestration/src/com/cloud/vm/VmWorkStart.java b/engine/orchestration/src/com/cloud/vm/VmWorkStart.java new file mode 100644 index 00000000000..7a7447f74a0 --- /dev/null +++ b/engine/orchestration/src/com/cloud/vm/VmWorkStart.java @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.cloud.vm; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import org.apache.log4j.Logger; + +import org.apache.cloudstack.context.CallContext; +import org.apache.cloudstack.framework.jobs.impl.JobSerializerHelper; + +import com.cloud.deploy.DataCenterDeployment; +import com.cloud.deploy.DeploymentPlan; +import com.cloud.deploy.DeploymentPlanner.ExcludeList; +import com.cloud.utils.Journal; + +public class VmWorkStart extends VmWork { + private static final long serialVersionUID = 9038937399817468894L; + + private static final Logger s_logger = Logger.getLogger(VmWorkStart.class); + + long dcId; + Long podId; + Long clusterId; + Long hostId; + Long poolId; + ExcludeList avoids; + Long physicalNetworkId; + + String reservationId; + String journalName; + + // use serialization friendly map + private Map rawParams; + + public VmWorkStart(long userId, long accountId, long vmId) { + super(userId, accountId, vmId); + } + + public DeploymentPlan getPlan() { + + if(podId != null || clusterId != null || hostId != null || poolId != null || physicalNetworkId != null) { + // this is ugly, to work with legacy code, we need to re-construct the DeploymentPlan hard-codely + // this has to be refactored together with migrating legacy code into the new way + ReservationContext context = null; + if(reservationId != null) { + Journal journal = new Journal.LogJournal("VmWorkStart", s_logger); + context = new ReservationContextImpl(reservationId, journal, + CallContext.current().getCallingUser(), + CallContext.current().getCallingAccount()); + } + + DeploymentPlan plan = new DataCenterDeployment( + dcId, podId, clusterId, hostId, poolId, physicalNetworkId, + context); + return plan; + } + + return null; + } + + public void setPlan(DeploymentPlan plan) { + if(plan != null) { + dcId = plan.getDataCenterId(); + podId = plan.getPodId(); + clusterId = plan.getClusterId(); + hostId = plan.getHostId(); + poolId = plan.getPoolId(); + physicalNetworkId = plan.getPhysicalNetworkId(); + avoids = plan.getAvoids(); + + if(plan.getReservationContext() != null) + reservationId = plan.getReservationContext().getReservationId(); + } + } + + public Map getRawParams() { + return rawParams; + } + + public void setRawParams(Map params) { + rawParams = params; + } + + public Map getParams() { + Map map = new HashMap(); + + if(rawParams != null) { + for(Map.Entry entry : rawParams.entrySet()) { + VirtualMachineProfile.Param key = new VirtualMachineProfile.Param(entry.getKey()); + Object val = JobSerializerHelper.fromObjectSerializedString(entry.getValue()); + map.put(key, val); + } + } + + return map; + } + + public void setParams(Map params) { + if(params != null) { + rawParams = new HashMap(); + for(Map.Entry entry : params.entrySet()) { + rawParams.put(entry.getKey().getName(), JobSerializerHelper.toObjectSerializedString( + entry.getValue() instanceof Serializable ? (Serializable)entry.getValue() : entry.getValue().toString())); + } + } + } +} diff --git a/engine/orchestration/src/com/cloud/vm/VmWorkStop.java b/engine/orchestration/src/com/cloud/vm/VmWorkStop.java new file mode 100644 index 00000000000..0a27057810b --- /dev/null +++ b/engine/orchestration/src/com/cloud/vm/VmWorkStop.java @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.vm; + +public class VmWorkStop extends VmWork { + private static final long serialVersionUID = 202908740486785251L; + + private final boolean cleanup; + + public VmWorkStop(long userId, long accountId, long vmId, boolean cleanup) { + super(userId, accountId, vmId); + this.cleanup = cleanup; + } + + public boolean isCleanup() { + return cleanup; + } +} diff --git a/engine/orchestration/src/com/cloud/vm/VmWorkStorageMigration.java b/engine/orchestration/src/com/cloud/vm/VmWorkStorageMigration.java new file mode 100644 index 00000000000..ba8330c9d9d --- /dev/null +++ b/engine/orchestration/src/com/cloud/vm/VmWorkStorageMigration.java @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.vm; + +import com.cloud.storage.StoragePool; + +public class VmWorkStorageMigration extends VmWork { + private static final long serialVersionUID = -8677979691741157474L; + + StoragePool destPool; + + public VmWorkStorageMigration(long userId, long accountId, long vmId, StoragePool destPool) { + super(userId, accountId, vmId); + + this.destPool = destPool; + } + + public StoragePool getDestStoragePool() { + return this.destPool; + } +} diff --git a/framework/cluster/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java b/framework/cluster/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java index 246bfe6bcd4..d187199a621 100644 --- a/framework/cluster/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java +++ b/framework/cluster/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java @@ -100,6 +100,7 @@ public class ManagementServerHostDaoImpl extends GenericDaoBase - - + diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java index 595800d2524..0263d3d630c 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java @@ -63,6 +63,14 @@ public class AsyncJobExecutionContext { public void setJob(AsyncJob job) { _job = job; } + + public boolean isJobDispatchedBy(String jobDispatcherName) { + assert(jobDispatcherName != null); + if(_job != null && _job.getDispatcher() != null && _job.getDispatcher().equals(jobDispatcherName)) + return true; + + return false; + } public void completeAsyncJob(JobInfo.Status jobStatus, int resultCode, String resultObject) { assert(_job != null); @@ -159,7 +167,7 @@ public class AsyncJobExecutionContext { setCurrentExecutionContext(null); return context; } - + // This is intended to be package level access for AsyncJobManagerImpl only. public static void setCurrentExecutionContext(AsyncJobExecutionContext currentContext) { s_currentExectionContext.set(currentContext);