From 41fdbfc65e6a3ac3f47f5237c7aa4f286fc396f3 Mon Sep 17 00:00:00 2001 From: edison Date: Mon, 29 Nov 2010 19:10:14 -0800 Subject: [PATCH] new allocator --- api/src/com/cloud/capacity/Capacity.java | 2 + core/src/com/cloud/capacity/CapacityVO.java | 12 +- core/src/com/cloud/vm/VMInstanceVO.java | 3 +- .../com/cloud/capacity/dao/CapacityDao.java | 3 +- .../cloud/capacity/dao/CapacityDaoImpl.java | 22 ++- .../src/com/cloud/deploy/FirstFitPlanner.java | 142 +++++++++++++++ .../cloud/ha/HighAvailabilityManagerImpl.java | 19 +- server/src/com/cloud/vm/MauriceMoss.java | 72 +++++++- server/src/com/cloud/vm/VMStateListener.java | 121 ++++++++++++ server/src/com/cloud/vm/VmManager.java | 3 + .../src/com/cloud/vm/dao/VMInstanceDao.java | 3 +- .../com/cloud/vm/dao/VMInstanceDaoImpl.java | 34 ++++ utils/src/com/cloud/utils/fsm/StateDao.java | 5 + .../com/cloud/utils/fsm/StateListener.java | 5 + .../com/cloud/utils/fsm/StateMachine2.java | 172 ++++++++++++++++++ .../src/com/cloud/utils/fsm/StateObject.java | 8 + 16 files changed, 605 insertions(+), 21 deletions(-) create mode 100644 server/src/com/cloud/deploy/FirstFitPlanner.java create mode 100644 server/src/com/cloud/vm/VMStateListener.java create mode 100644 utils/src/com/cloud/utils/fsm/StateDao.java create mode 100644 utils/src/com/cloud/utils/fsm/StateListener.java create mode 100644 utils/src/com/cloud/utils/fsm/StateMachine2.java create mode 100644 utils/src/com/cloud/utils/fsm/StateObject.java diff --git a/api/src/com/cloud/capacity/Capacity.java b/api/src/com/cloud/capacity/Capacity.java index a9df7342ab1..e821a71a5fe 100644 --- a/api/src/com/cloud/capacity/Capacity.java +++ b/api/src/com/cloud/capacity/Capacity.java @@ -39,5 +39,7 @@ public interface Capacity { public long getTotalCapacity(); public short getCapacityType(); + long getReservedCapacity(); + } diff --git a/core/src/com/cloud/capacity/CapacityVO.java b/core/src/com/cloud/capacity/CapacityVO.java index eabf7a9bb6a..34d8ef06b52 100644 --- a/core/src/com/cloud/capacity/CapacityVO.java +++ b/core/src/com/cloud/capacity/CapacityVO.java @@ -44,7 +44,10 @@ public class CapacityVO implements Capacity { @Column(name="used_capacity") private long usedCapacity; - + + @Column(name="reserved_capacity") + private long reservedCapacity; + @Column(name="total_capacity") private long totalCapacity; @@ -96,6 +99,13 @@ public class CapacityVO implements Capacity { } public void setUsedCapacity(long usedCapacity) { this.usedCapacity = usedCapacity; + } + @Override + public long getReservedCapacity() { + return reservedCapacity; + } + public void setReservedCapacity(long reservedCapacity) { + this.usedCapacity = reservedCapacity; } @Override public long getTotalCapacity() { diff --git a/core/src/com/cloud/vm/VMInstanceVO.java b/core/src/com/cloud/vm/VMInstanceVO.java index 8bfa0906c36..a610ff3076e 100644 --- a/core/src/com/cloud/vm/VMInstanceVO.java +++ b/core/src/com/cloud/vm/VMInstanceVO.java @@ -38,12 +38,13 @@ import javax.persistence.TemporalType; import com.cloud.utils.db.GenericDao; import com.cloud.utils.db.StateMachine; import com.cloud.utils.fsm.FiniteStateObject; +import com.cloud.utils.fsm.StateObject; @Entity @Table(name="vm_instance") @Inheritance(strategy=InheritanceType.JOINED) @DiscriminatorColumn(name="type", discriminatorType=DiscriminatorType.STRING, length=32) -public class VMInstanceVO implements VirtualMachine, FiniteStateObject { +public class VMInstanceVO implements VirtualMachine, FiniteStateObject, StateObject { @Id @TableGenerator(name="vm_instance_sq", table="sequence", pkColumnName="name", valueColumnName="value", pkColumnValue="vm_instance_seq", allocationSize=1) @Column(name="id", updatable=false, nullable = false) diff --git a/server/src/com/cloud/capacity/dao/CapacityDao.java b/server/src/com/cloud/capacity/dao/CapacityDao.java index 726544f4384..6c1ae593577 100644 --- a/server/src/com/cloud/capacity/dao/CapacityDao.java +++ b/server/src/com/cloud/capacity/dao/CapacityDao.java @@ -23,5 +23,6 @@ import com.cloud.utils.db.GenericDao; public interface CapacityDao extends GenericDao { void clearNonStorageCapacities(); - void clearStorageCapacities(); + void clearStorageCapacities(); + CapacityVO findByHostIdType(Long hostId, short capacityType); } diff --git a/server/src/com/cloud/capacity/dao/CapacityDaoImpl.java b/server/src/com/cloud/capacity/dao/CapacityDaoImpl.java index e6e8d4dba25..99a5b0e1dce 100644 --- a/server/src/com/cloud/capacity/dao/CapacityDaoImpl.java +++ b/server/src/com/cloud/capacity/dao/CapacityDaoImpl.java @@ -26,6 +26,8 @@ import org.apache.log4j.Logger; import com.cloud.capacity.CapacityVO; import com.cloud.utils.db.GenericDaoBase; +import com.cloud.utils.db.SearchBuilder; +import com.cloud.utils.db.SearchCriteria; import com.cloud.utils.db.Transaction; @Local(value = { CapacityDao.class }) @@ -35,8 +37,16 @@ public class CapacityDaoImpl extends GenericDaoBase implements private static final String ADD_ALLOCATED_SQL = "UPDATE `cloud`.`op_host_capacity` SET used_capacity = used_capacity + ? WHERE host_id = ? AND capacity_type = ?"; private static final String SUBTRACT_ALLOCATED_SQL = "UPDATE `cloud`.`op_host_capacity` SET used_capacity = used_capacity - ? WHERE host_id = ? AND capacity_type = ?"; private static final String CLEAR_STORAGE_CAPACITIES = "DELETE FROM `cloud`.`op_host_capacity` WHERE capacity_type=2 OR capacity_type=3 OR capacity_type=6"; //clear storage and secondary_storage capacities - private static final String CLEAR_NON_STORAGE_CAPACITIES = "DELETE FROM `cloud`.`op_host_capacity` WHERE capacity_type<>2 AND capacity_type<>3 AND capacity_type<>6"; //clear non-storage and non-secondary_storage capacities - + private static final String CLEAR_NON_STORAGE_CAPACITIES = "DELETE FROM `cloud`.`op_host_capacity` WHERE capacity_type<>2 AND capacity_type<>3 AND capacity_type<>6"; //clear non-storage and non-secondary_storage capacities + private SearchBuilder _hostIdTypeSearch; + + public void CapacityDaoImple() { + _hostIdTypeSearch = createSearchBuilder(); + _hostIdTypeSearch.and("hostId", _hostIdTypeSearch.entity().getHostOrPoolId(), SearchCriteria.Op.EQ); + _hostIdTypeSearch.and("type", _hostIdTypeSearch.entity().getCapacityType(), SearchCriteria.Op.EQ); + _hostIdTypeSearch.done(); + } + public void updateAllocated(Long hostId, long allocatedAmount, short capacityType, boolean add) { Transaction txn = Transaction.currentTxn(); PreparedStatement pstmt = null; @@ -91,5 +101,13 @@ public class CapacityDaoImpl extends GenericDaoBase implements txn.rollback(); s_logger.warn("Exception clearing storage capacities", e); } + } + + @Override + public CapacityVO findByHostIdType(Long hostId, short capacityType) { + SearchCriteria sc = _hostIdTypeSearch.create(); + sc.setParameters("hostId", hostId); + sc.setParameters("type", capacityType); + return findOneBy(sc); } } diff --git a/server/src/com/cloud/deploy/FirstFitPlanner.java b/server/src/com/cloud/deploy/FirstFitPlanner.java new file mode 100644 index 00000000000..e5bd1621290 --- /dev/null +++ b/server/src/com/cloud/deploy/FirstFitPlanner.java @@ -0,0 +1,142 @@ +package com.cloud.deploy; + +import java.util.Collections; +import java.util.List; + +import com.cloud.capacity.CapacityVO; +import com.cloud.capacity.dao.CapacityDao; +import com.cloud.dc.ClusterVO; +import com.cloud.dc.DataCenter; +import com.cloud.dc.HostPodVO; +import com.cloud.dc.Pod; +import com.cloud.dc.dao.ClusterDao; +import com.cloud.dc.dao.DataCenterDao; +import com.cloud.dc.dao.HostPodDao; +import com.cloud.exception.InsufficientServerCapacityException; +import com.cloud.host.Host; +import com.cloud.host.HostVO; +import com.cloud.host.Status; +import com.cloud.host.dao.HostDao; +import com.cloud.offering.ServiceOffering; +import com.cloud.org.Cluster; +import com.cloud.utils.component.Inject; +import com.cloud.utils.db.Transaction; +import com.cloud.vm.VirtualMachine; +import com.cloud.vm.VirtualMachineProfile; + +public class FirstFitPlanner extends PlannerBase implements DeploymentPlanner { + @Inject private HostDao _hostDao; + @Inject private CapacityDao _capacityDao; + @Inject private DataCenterDao _dcDao; + @Inject private HostPodDao _podDao; + @Inject private ClusterDao _clusterDao; + + @Override + public DeployDestination plan(VirtualMachineProfile vmProfile, + DeploymentPlan plan, ExcludeList avoid) + throws InsufficientServerCapacityException { + VirtualMachine vm = vmProfile.getVirtualMachine(); + ServiceOffering offering = vmProfile.getServiceOffering(); + DataCenter dc = _dcDao.findById(vm.getDataCenterId()); + int cpu_requested = offering.getCpu() * offering.getSpeed(); + int ram_requested = offering.getRamSize(); + + if (vm.getLastHostId() != null) { + HostVO host = _hostDao.findById(vm.getLastHostId()); + + if (host.getStatus() == Status.Up) { + boolean canDepployToLastHost = deployToHost(vm.getLastHostId(), cpu_requested, ram_requested, true); + if (canDepployToLastHost) { + Pod pod = _podDao.findById(vm.getPodId()); + Cluster cluster = _clusterDao.findById(host.getClusterId()); + return new DeployDestination(dc, pod, cluster, host); + } + } + } + + /*Go through all the pods/clusters under zone*/ + List pods = _podDao.listByDataCenterId(plan.getDataCenterId()); + Collections.shuffle(pods); + + for (HostPodVO hostPod : pods) { + List clusters = _clusterDao.listByPodId(hostPod.getId()); + Collections.shuffle(clusters); + + for (ClusterVO clusterVO : clusters) { + List hosts = _hostDao.listByCluster(clusterVO.getId()); + Collections.shuffle(hosts); + + for (HostVO hostVO : hosts) { + if (hostVO.getStatus() != Status.Up) { + continue; + } + + boolean canDeployToHost = deployToHost(hostVO.getId(), cpu_requested, ram_requested, false); + if (canDeployToHost) { + Pod pod = _podDao.findById(hostPod.getId()); + Cluster cluster = _clusterDao.findById(clusterVO.getId()); + Host host = _hostDao.findById(hostVO.getId()); + return new DeployDestination(dc, pod, cluster, host); + } + } + } + } + + return null; + } + + private boolean deployToHost(Long hostId, Integer cpu, long ram, boolean fromLastHost) { + + CapacityVO capacityCpu = _capacityDao.findByHostIdType(hostId, CapacityVO.CAPACITY_TYPE_CPU); + CapacityVO capacityMem = _capacityDao.findByHostIdType(hostId, CapacityVO.CAPACITY_TYPE_MEMORY); + + Transaction txn = Transaction.currentTxn(); + txn.start(); + + try { + capacityCpu = _capacityDao.lockRow(capacityCpu.getId(), true); + capacityMem = _capacityDao.lockRow(capacityMem.getId(), true); + + long usedCpu = capacityCpu.getUsedCapacity(); + long usedMem = capacityMem.getUsedCapacity(); + long reservedCpu = capacityCpu.getReservedCapacity(); + long reservedMem = capacityMem.getReservedCapacity(); + long totalCpu = capacityCpu.getTotalCapacity(); + long totalMem = capacityMem.getTotalCapacity(); + + boolean success = false; + if (fromLastHost) { + /*alloc from reserved*/ + if (reservedCpu >= cpu && reservedMem >= ram) { + capacityCpu.setReservedCapacity(reservedCpu - cpu); + capacityMem.setReservedCapacity(reservedMem - ram); + success = true; + } + } else { + /*alloc from free resource*/ + if ((reservedCpu + usedCpu + cpu <= totalCpu) && (reservedMem + usedMem + ram <= totalMem)) { + capacityCpu.setUsedCapacity(usedCpu + cpu); + capacityMem.setUsedCapacity(totalMem + ram); + success = true; + } + } + + if (success) { + _capacityDao.update(capacityCpu.getId(), capacityCpu); + _capacityDao.update(capacityMem.getId(), capacityMem); + } + + return success; + } finally { + txn.commit(); + } + + } + @Override + public boolean check(VirtualMachineProfile vm, DeploymentPlan plan, + DeployDestination dest, ExcludeList exclude) { + // TODO Auto-generated method stub + return false; + } + +} diff --git a/server/src/com/cloud/ha/HighAvailabilityManagerImpl.java b/server/src/com/cloud/ha/HighAvailabilityManagerImpl.java index 0dbe82c07e6..2860ca1b517 100644 --- a/server/src/com/cloud/ha/HighAvailabilityManagerImpl.java +++ b/server/src/com/cloud/ha/HighAvailabilityManagerImpl.java @@ -71,6 +71,7 @@ import com.cloud.vm.VMInstanceVO; import com.cloud.vm.VirtualMachine; import com.cloud.vm.VirtualMachine.Event; import com.cloud.vm.VirtualMachineManager; +import com.cloud.vm.VmManager; import com.cloud.vm.dao.VMInstanceDao; /** @@ -130,6 +131,7 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager { @Inject StorageManager _storageMgr; @Inject GuestOSDao _guestOSDao; @Inject GuestOSCategoryDao _guestOSCategoryDao; + @Inject VmManager _itMgr; String _instance; ScheduledExecutorService _executor; @@ -358,7 +360,8 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager { if (vm.getHostId() == null || vm.getHostId() != work.getHostId()) { s_logger.info("VM " + vm.toString() + " is now no longer on host " + work.getHostId()); if (vm.getState() == State.Starting && vm.getUpdated() == work.getUpdateTime()) { - _instanceDao.updateIf(vm, Event.AgentReportStopped, null); + vm.setHostId(null); + _itMgr.stateTransitTo(vm, Event.AgentReportStopped); } return null; } @@ -518,7 +521,7 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager { s_logger.debug("Both states are " + agentState.toString() + " for " + serverName); } assert (agentState == State.Stopped || agentState == State.Running) : "If the states we send up is changed, this must be changed."; - _instanceDao.updateIf(vm, agentState == State.Stopped ? VirtualMachine.Event.AgentReportStopped : VirtualMachine.Event.AgentReportRunning, vm.getHostId()); + _itMgr.stateTransitTo(vm, agentState == State.Stopped ? VirtualMachine.Event.AgentReportStopped : VirtualMachine.Event.AgentReportRunning); if (agentState == State.Stopped) { s_logger.debug("State matches but the agent said stopped so let's send a cleanup anyways."); return info.mgr.cleanup(vm, agentName); @@ -549,7 +552,8 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager { s_logger.debug("Ignoring VM in starting mode: " + vm.getHostName()); } else { s_logger.debug("Sending cleanup to a stopped vm: " + agentName); - _instanceDao.updateIf(vm, VirtualMachine.Event.AgentReportStopped, null); + vm.setHostId(null); + _itMgr.stateTransitTo(vm, VirtualMachine.Event.AgentReportStopped); command = info.mgr.cleanup(vm, agentName); } } else if (agentState == State.Running) { @@ -573,7 +577,7 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager { vm = info.mgr.get(vm.getId()); command = info.mgr.cleanup(vm, agentName); } else { - _instanceDao.updateIf(vm, VirtualMachine.Event.AgentReportRunning, vm.getHostId()); + _itMgr.stateTransitTo(vm, VirtualMachine.Event.AgentReportRunning); } } /*else if (agentState == State.Unknown) { if (serverState == State.Running) { @@ -745,7 +749,7 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager { if (work.getStep() == Step.Migrating) { vm = mgr.get(vmId); // let's see if anything has changed. boolean migrated = false; - if (vm == null || vm.getRemoved() != null || vm.getHostId() == null || !_instanceDao.updateIf(vm, Event.MigrationRequested, vm.getHostId())) { + if (vm == null || vm.getRemoved() != null || vm.getHostId() == null || !_itMgr.stateTransitTo(vm, Event.MigrationRequested)) { s_logger.info("Migration cancelled because state has changed: " + vm.toString()); } else { try { @@ -775,7 +779,7 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager { HostPodVO podVO = _podDao.findById(vm.getPodId()); _alertMgr.sendAlert(alertType, fromHost.getDataCenterId(), fromHost.getPodId(), "Unable to migrate vm " + vm.getHostName() + " from host " + fromHost.getName() + " in zone " + dcVO.getName() + " and pod " + podVO.getName(), "Migrate Command failed. Please check logs."); - _instanceDao.updateIf(vm, Event.OperationFailed, vm.getHostId()); + _itMgr.stateTransitTo(vm, Event.OperationFailed); _agentMgr.maintenanceFailed(vm.getHostId()); Command cleanup = mgr.cleanup(vm, null); @@ -805,7 +809,8 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager { } catch (final OperationTimedoutException e) { s_logger.warn("Operation timed outfor " + vm.toString()); } - _instanceDao.updateIf(vm, Event.OperationFailed, toHost.getId()); + vm.setHostId(toHost.getId()); + _itMgr.stateTransitTo(vm, Event.OperationFailed); return (System.currentTimeMillis() >> 10) + _migrateRetryInterval; } diff --git a/server/src/com/cloud/vm/MauriceMoss.java b/server/src/com/cloud/vm/MauriceMoss.java index 716db3cf07b..31cdd2f351c 100644 --- a/server/src/com/cloud/vm/MauriceMoss.java +++ b/server/src/com/cloud/vm/MauriceMoss.java @@ -36,6 +36,7 @@ import com.cloud.agent.api.StopAnswer; import com.cloud.agent.api.StopCommand; import com.cloud.agent.api.to.VirtualMachineTO; import com.cloud.agent.manager.Commands; +import com.cloud.capacity.dao.CapacityDao; import com.cloud.cluster.ClusterManager; import com.cloud.cluster.ClusterManagerListener; import com.cloud.cluster.ManagementServerHostVO; @@ -82,6 +83,7 @@ import com.cloud.utils.component.Inject; import com.cloud.utils.db.DB; import com.cloud.utils.db.Transaction; import com.cloud.utils.exception.CloudRuntimeException; +import com.cloud.utils.fsm.StateMachine2; import com.cloud.vm.ItWorkVO.Type; import com.cloud.vm.VirtualMachine.Event; import com.cloud.vm.dao.VMInstanceDao; @@ -102,12 +104,14 @@ public class MauriceMoss implements VmManager, ClusterManagerListener { @Inject private DomainDao _domainDao; @Inject private ClusterManager _clusterMgr; @Inject private ItWorkDao _workDao; + @Inject private CapacityDao _capacityDao; @Inject(adapter=DeploymentPlanner.class) private Adapters _planners; Map> _vmGurus = new HashMap>(); Map _hvGurus = new HashMap(); + private StateMachine2 _stateMachine; private int _retry; private long _nodeId; @@ -170,7 +174,7 @@ public class MauriceMoss implements VmManager, ClusterManagerListener { _storageMgr.allocateRawVolume(VolumeType.DATADISK, "DATA-" + vm.getId(), offering.first(), offering.second(), vm, owner); } - _vmDao.updateIf(vm, Event.OperationSucceeded, null); + stateTransitTo(vm, Event.OperationSucceeded); txn.commit(); if (s_logger.isDebugEnabled()) { s_logger.debug("Allocation completed for VM: " + vm); @@ -234,7 +238,7 @@ public class MauriceMoss implements VmManager, ClusterManagerListener { if (s_logger.isDebugEnabled()) { s_logger.debug("Destroying vm " + vm); } - if (!_vmDao.updateIf(vm, VirtualMachine.Event.DestroyRequested, vm.getHostId())) { + if (!stateTransitTo(vm, VirtualMachine.Event.DestroyRequested)) { s_logger.debug("Unable to destroy the vm because it is not in the correct state: " + vm.toString()); return false; } @@ -273,6 +277,8 @@ public class MauriceMoss implements VmManager, ClusterManagerListener { _nodeId = _clusterMgr.getId(); _clusterMgr.registerListener(this); + setStateMachine(); + return true; } @@ -320,13 +326,17 @@ public class MauriceMoss implements VmManager, ClusterManagerListener { vm.setReservationId(work.getId()); - if (!_vmDao.updateIf(vm, Event.StartRequested, null)) { + if (!stateTransitTo(vm, Event.StartRequested)) { throw new ConcurrentOperationException("Unable to start vm " + vm + " due to concurrent operations"); } ExcludeList avoids = new ExcludeList(); int retry = _retry; while (retry-- != 0) { // It's != so that it can match -1. + if (retry < (_retry -1)) { + stateTransitTo(vm, Event.OperationRetry); + } + VirtualMachineProfileImpl vmProfile = new VirtualMachineProfileImpl(vm, template, offering, null, params); DeployDestination dest = null; for (DeploymentPlanner planner : _planners) { @@ -344,11 +354,12 @@ public class MauriceMoss implements VmManager, ClusterManagerListener { vm.setDataCenterId(dest.getDataCenter().getId()); vm.setPodId(dest.getPod().getId()); - _vmDao.updateIf(vm, Event.OperationRetry, dest.getHost().getId()); + vm.setHostId(dest.getHost().getId()); try { _storageMgr.prepare(vmProfile, dest); } catch (ConcurrentOperationException e) { + stateTransitTo(vm, Event.OperationFailed); throw e; } catch (StorageUnavailableException e) { s_logger.warn("Unable to contact storage.", e); @@ -367,7 +378,7 @@ public class MauriceMoss implements VmManager, ClusterManagerListener { try { Answer[] answers = _agentMgr.send(dest.getHost().getId(), cmds); if (answers[0].getResult() && vmGuru.finalizeStart(cmds, vmProfile, dest, context)) { - if (!_vmDao.updateIf(vm, Event.OperationSucceeded, dest.getHost().getId())) { + if (!stateTransitTo(vm, Event.OperationSucceeded)) { throw new CloudRuntimeException("Unable to transition to a new state."); } return vm; @@ -382,6 +393,8 @@ public class MauriceMoss implements VmManager, ClusterManagerListener { } } + stateTransitTo(vm, Event.OperationFailed); + if (s_logger.isDebugEnabled()) { s_logger.debug("Creation complete for VM " + vm); } @@ -404,7 +417,7 @@ public class MauriceMoss implements VmManager, ClusterManagerListener { return true; } - if (!_vmDao.updateIf(vm, Event.StopRequested, vm.getHostId())) { + if (!stateTransitTo(vm, Event.StopRequested)) { throw new ConcurrentOperationException("VM is being operated on by someone else."); } @@ -433,7 +446,7 @@ public class MauriceMoss implements VmManager, ClusterManagerListener { } } finally { if (!stopped) { - _vmDao.updateIf(vm, Event.OperationFailed, vm.getHostId()); + stateTransitTo(vm, Event.OperationFailed); } } @@ -468,7 +481,8 @@ public class MauriceMoss implements VmManager, ClusterManagerListener { } vm.setReservationId(null); - _vmDao.updateIf(vm, Event.OperationSucceeded, null); + vm.setHostId(null); + stateTransitTo(vm, Event.OperationSucceeded); if (cleanup) { ItWorkVO work = new ItWorkVO(reservationId, _nodeId, Type.Cleanup); @@ -487,4 +501,46 @@ public class MauriceMoss implements VmManager, ClusterManagerListener { public void onManagementNodeLeft(List nodeList, long selfNodeId) { } + private void setStateMachine() { + _stateMachine = new StateMachine2(_vmDao); + + _stateMachine.addTransition(null, VirtualMachine.Event.CreateRequested, State.Creating); + _stateMachine.addTransition(State.Creating, VirtualMachine.Event.OperationSucceeded, State.Stopped); + _stateMachine.addTransition(State.Creating, VirtualMachine.Event.OperationFailed, State.Destroyed); + _stateMachine.addTransition(State.Stopped, VirtualMachine.Event.StartRequested, State.Starting); + _stateMachine.addTransition(State.Stopped, VirtualMachine.Event.DestroyRequested, State.Destroyed); + _stateMachine.addTransition(State.Stopped, VirtualMachine.Event.StopRequested, State.Stopped); + _stateMachine.addTransition(State.Stopped, VirtualMachine.Event.AgentReportStopped, State.Stopped); + _stateMachine.addTransition(State.Starting, VirtualMachine.Event.OperationRetry, State.Starting); + _stateMachine.addTransition(State.Starting, VirtualMachine.Event.OperationSucceeded, State.Running); + _stateMachine.addTransition(State.Starting, VirtualMachine.Event.OperationFailed, State.Stopped); + _stateMachine.addTransition(State.Starting, VirtualMachine.Event.AgentReportRunning, State.Running); + _stateMachine.addTransition(State.Starting, VirtualMachine.Event.AgentReportStopped, State.Stopped); + _stateMachine.addTransition(State.Destroyed, VirtualMachine.Event.RecoveryRequested, State.Stopped); + _stateMachine.addTransition(State.Destroyed, VirtualMachine.Event.ExpungeOperation, State.Expunging); + _stateMachine.addTransition(State.Creating, VirtualMachine.Event.MigrationRequested, State.Destroyed); + _stateMachine.addTransition(State.Running, VirtualMachine.Event.MigrationRequested, State.Migrating); + _stateMachine.addTransition(State.Running, VirtualMachine.Event.AgentReportRunning, State.Running); + _stateMachine.addTransition(State.Running, VirtualMachine.Event.AgentReportStopped, State.Stopped); + _stateMachine.addTransition(State.Running, VirtualMachine.Event.StopRequested, State.Stopping); + _stateMachine.addTransition(State.Migrating, VirtualMachine.Event.MigrationRequested, State.Migrating); + _stateMachine.addTransition(State.Migrating, VirtualMachine.Event.OperationSucceeded, State.Running); + _stateMachine.addTransition(State.Migrating, VirtualMachine.Event.OperationFailed, State.Running); + _stateMachine.addTransition(State.Migrating, VirtualMachine.Event.AgentReportRunning, State.Running); + _stateMachine.addTransition(State.Migrating, VirtualMachine.Event.AgentReportStopped, State.Stopped); + _stateMachine.addTransition(State.Stopping, VirtualMachine.Event.OperationSucceeded, State.Stopped); + _stateMachine.addTransition(State.Stopping, VirtualMachine.Event.OperationFailed, State.Running); + _stateMachine.addTransition(State.Stopping, VirtualMachine.Event.AgentReportRunning, State.Running); + _stateMachine.addTransition(State.Stopping, VirtualMachine.Event.AgentReportStopped, State.Stopped); + _stateMachine.addTransition(State.Stopping, VirtualMachine.Event.StopRequested, State.Stopping); + _stateMachine.addTransition(State.Expunging, VirtualMachine.Event.OperationFailed, State.Expunging); + _stateMachine.addTransition(State.Expunging, VirtualMachine.Event.ExpungeOperation, State.Expunging); + + _stateMachine.registerListener(new VMStateListener(_capacityDao, _offeringDao, _vmDao)); + } + + @Override + public boolean stateTransitTo(VMInstanceVO vm, VirtualMachine.Event e) { + return _stateMachine.transitTO(vm, e); + } } diff --git a/server/src/com/cloud/vm/VMStateListener.java b/server/src/com/cloud/vm/VMStateListener.java new file mode 100644 index 00000000000..66aa718362f --- /dev/null +++ b/server/src/com/cloud/vm/VMStateListener.java @@ -0,0 +1,121 @@ +package com.cloud.vm; + +import com.cloud.capacity.CapacityVO; +import com.cloud.capacity.dao.CapacityDao; +import com.cloud.service.ServiceOfferingVO; +import com.cloud.service.dao.ServiceOfferingDao; +import com.cloud.utils.db.DB; +import com.cloud.utils.db.Transaction; +import com.cloud.utils.fsm.StateListener; +import com.cloud.vm.VirtualMachine.Event; +import com.cloud.vm.dao.VMInstanceDao; + +public class VMStateListener implements StateListener{ + CapacityDao _capacityDao; + ServiceOfferingDao _offeringDao; + VMInstanceDao _vmDao; + + public VMStateListener(CapacityDao capacityDao, ServiceOfferingDao offering, VMInstanceDao vmDao) { + _capacityDao = capacityDao; + _offeringDao = offering; + this._vmDao = vmDao; + } + + @Override + @DB + public boolean processStateTransitionEvent(State oldState, + Event event, State newState, VMInstanceVO vm, boolean transitionStatus) { + if (oldState == State.Starting) { + if (event == Event.OperationRetry || event == Event.OperationFailed) { + releaseResource(vm, false, false); + } + } + + if (!transitionStatus) { + return true; + } + + if (oldState == State.Starting) { + if (event == Event.OperationSucceeded) { + vm.setLastHostId(vm.getHostId()); + _vmDao.update(vm.getId(), vm); + } + } else if (oldState == State.Running) { + if (event == Event.AgentReportStopped) { + releaseResource(vm, false, true); + } + } else if (oldState == State.Migrating) { + if (event == Event.AgentReportStopped) { + releaseResource(vm, false, true); + } + } else if (oldState == State.Stopping) { + if (event == Event.AgentReportStopped || event == Event.OperationSucceeded) { + releaseResource(vm, false, true); + } + } else if (oldState == State.Stopped) { + if (event == Event.DestroyRequested) { + releaseResource(vm, true, false); + + vm.setLastHostId(null); + _vmDao.update(vm.getId(), vm); + } + } + return true; + } + + @DB + private void releaseResource(VMInstanceVO vm, boolean moveFromReserved, boolean moveToReservered) { + CapacityVO capacityCpu = _capacityDao.findByHostIdType(vm.getHostId(), CapacityVO.CAPACITY_TYPE_CPU); + CapacityVO capacityMemory = _capacityDao.findByHostIdType(vm.getHostId(), CapacityVO.CAPACITY_TYPE_MEMORY); + + ServiceOfferingVO offeringVO = _offeringDao.findById(vm.getServiceOfferingId()); + int cpu = offeringVO.getCpu(); + int speed = offeringVO.getSpeed(); + int vmCPU = cpu * speed; + int vmMem = offeringVO.getRamSize(); + + Transaction txn = Transaction.currentTxn(); + txn.start(); + + try { + capacityCpu = _capacityDao.lockRow(capacityCpu.getId(), true); + capacityMemory = _capacityDao.lockRow(capacityMemory.getId(), true); + + long usedCpu = capacityCpu.getUsedCapacity(); + long usedMem = capacityMemory.getUsedCapacity(); + long reservedCpu = capacityCpu.getReservedCapacity(); + long reservedMem = capacityMemory.getReservedCapacity(); + long totalCpu = capacityCpu.getTotalCapacity(); + long totalMem = capacityMemory.getTotalCapacity(); + + if (!moveFromReserved) { + /*move resource from used*/ + if (usedCpu >= vmCPU) + capacityCpu.setUsedCapacity(usedCpu - vmCPU); + if (usedMem >= vmMem) + capacityMemory.setUsedCapacity(usedMem - vmMem); + + if (moveToReservered) { + if (reservedCpu + vmCPU <= totalCpu) { + capacityCpu.setReservedCapacity(reservedCpu + vmCPU); + } + if (reservedMem + vmMem <= totalMem) { + capacityMemory.setReservedCapacity(reservedMem + vmMem); + } + } + } else { + if (reservedCpu >= vmCPU) { + capacityCpu.setReservedCapacity(reservedCpu - vmCPU); + } + if (reservedMem >= vmMem) { + capacityMemory.setReservedCapacity(reservedMem - vmMem); + } + } + + _capacityDao.update(capacityCpu.getId(), capacityCpu); + _capacityDao.update(capacityMemory.getId(), capacityMemory); + } finally { + txn.commit(); + } + } +} diff --git a/server/src/com/cloud/vm/VmManager.java b/server/src/com/cloud/vm/VmManager.java index 63ea51d7474..69a56ce5803 100644 --- a/server/src/com/cloud/vm/VmManager.java +++ b/server/src/com/cloud/vm/VmManager.java @@ -35,6 +35,7 @@ import com.cloud.user.Account; import com.cloud.user.User; import com.cloud.utils.Pair; import com.cloud.utils.component.Manager; +import com.cloud.vm.VirtualMachine.Event; /** * Manages allocating resources to vms. @@ -75,5 +76,7 @@ public interface VmManager extends Manager { ; void registerGuru(VirtualMachine.Type type, VirtualMachineGuru guru); + + boolean stateTransitTo(VMInstanceVO vm, Event e); } diff --git a/server/src/com/cloud/vm/dao/VMInstanceDao.java b/server/src/com/cloud/vm/dao/VMInstanceDao.java index d496a80f4f3..8403cf6d3b8 100644 --- a/server/src/com/cloud/vm/dao/VMInstanceDao.java +++ b/server/src/com/cloud/vm/dao/VMInstanceDao.java @@ -22,6 +22,7 @@ import java.util.Date; import java.util.List; import com.cloud.utils.db.GenericDao; +import com.cloud.utils.fsm.StateDao; import com.cloud.vm.State; import com.cloud.vm.VMInstanceVO; import com.cloud.vm.VirtualMachine; @@ -29,7 +30,7 @@ import com.cloud.vm.VirtualMachine; /* * Data Access Object for vm_instance table */ -public interface VMInstanceDao extends GenericDao { +public interface VMInstanceDao extends GenericDao, StateDao { /** * What are the vms running on this host? * @param hostId host. diff --git a/server/src/com/cloud/vm/dao/VMInstanceDaoImpl.java b/server/src/com/cloud/vm/dao/VMInstanceDaoImpl.java index d5b1ab90023..1e4d7a6ce31 100644 --- a/server/src/com/cloud/vm/dao/VMInstanceDaoImpl.java +++ b/server/src/com/cloud/vm/dao/VMInstanceDaoImpl.java @@ -37,6 +37,7 @@ import com.cloud.utils.db.UpdateBuilder; import com.cloud.vm.State; import com.cloud.vm.VMInstanceVO; import com.cloud.vm.VirtualMachine; +import com.cloud.vm.VirtualMachine.Event; import com.cloud.vm.VirtualMachine.Type; @Local(value = { VMInstanceDao.class }) @@ -291,4 +292,37 @@ public class VMInstanceDaoImpl extends GenericDaoBase implem vo.setProxyAssignTime(time); update(id, vo); } + + @Override + public boolean updateState(State oldState, Event event, + State newState, VMInstanceVO vm) { + if (newState == null) { + if (s_logger.isDebugEnabled()) { + s_logger.debug("There's no way to transition from old state: " + oldState.toString() + " event: " + event.toString()); + } + return false; + } + + SearchCriteria sc = StateChangeSearch.create(); + sc.setParameters("id", vm.getId()); + sc.setParameters("states", oldState); + sc.setParameters("host", vm.getHostId()); + sc.setParameters("update", vm.getUpdated()); + + vm.incrUpdated(); + UpdateBuilder ub = getUpdateBuilder(vm); + ub.set(vm, "state", newState); + ub.set(vm, "hostId", vm.getHostId()); + ub.set(vm, _updateTimeAttr, new Date()); + + int result = update(vm, sc); + if (result == 0 && s_logger.isDebugEnabled()) { + VMInstanceVO vo = findById(vm.getId()); + StringBuilder str = new StringBuilder("Unable to update ").append(vo.toString()); + str.append(": DB Data={Host=").append(vo.getHostId()).append("; State=").append(vo.getState().toString()).append("; updated=").append(vo.getUpdated()); + str.append("} Stale Data: {Host=").append(vm.getHostId()).append("; State=").append(vm.getState().toString()).append("; updated=").append(vm.getUpdated()).append("}"); + s_logger.debug(str.toString()); + } + return result > 0; + } } diff --git a/utils/src/com/cloud/utils/fsm/StateDao.java b/utils/src/com/cloud/utils/fsm/StateDao.java new file mode 100644 index 00000000000..e88d2ff468b --- /dev/null +++ b/utils/src/com/cloud/utils/fsm/StateDao.java @@ -0,0 +1,5 @@ +package com.cloud.utils.fsm; + +public interface StateDao { + boolean updateState(S currentState, E event, S nextState, V vo); +} diff --git a/utils/src/com/cloud/utils/fsm/StateListener.java b/utils/src/com/cloud/utils/fsm/StateListener.java new file mode 100644 index 00000000000..f5babc0679e --- /dev/null +++ b/utils/src/com/cloud/utils/fsm/StateListener.java @@ -0,0 +1,5 @@ +package com.cloud.utils.fsm; + +public interface StateListener { + public boolean processStateTransitionEvent(S oldState, E event, S newState, V vo, boolean status); +} \ No newline at end of file diff --git a/utils/src/com/cloud/utils/fsm/StateMachine2.java b/utils/src/com/cloud/utils/fsm/StateMachine2.java new file mode 100644 index 00000000000..6de376ec9cb --- /dev/null +++ b/utils/src/com/cloud/utils/fsm/StateMachine2.java @@ -0,0 +1,172 @@ +/** + * Copyright (C) 2010 Cloud.com, Inc. All rights reserved. + * + * This software is licensed under the GNU General Public License v3 or later. + * + * It is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + */ +package com.cloud.utils.fsm; + +import java.util.ArrayList; +import java.util.Formatter; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * StateMachine is a partial implementation of a finite state machine. + * Specifically, it implements the Moore machine. + * It's main purpose is to keep the state diagram in one place in code + * so someone else can add/modify states easily without regression. + * It doesn't implement any actions because that's generally in the + * business logic anyways. + * + * @param state + * @param event + */ +public class StateMachine2> { + private final HashMap _states = new HashMap(); + private final StateEntry _initialStateEntry; + private StateDao _instanceDao; + private List> _listeners = new ArrayList>(); + + public StateMachine2(StateDao dao) { + _initialStateEntry = new StateEntry(null); + _instanceDao = dao; + } + + public void addTransition(S currentState, E event, S toState) { + StateEntry entry = null; + if (currentState == null) { + entry = _initialStateEntry; + } else { + entry = _states.get(currentState); + if (entry == null) { + entry = new StateEntry(currentState); + _states.put(currentState, entry); + } + } + + entry.addTransition(event, toState); + + entry = _states.get(toState); + if (entry == null) { + entry = new StateEntry(toState); + _states.put(toState, entry); + } + entry.addFromTransition(event, currentState); + } + + public Set getPossibleEvents(S s) { + StateEntry entry = _states.get(s); + return entry.nextStates.keySet(); + } + + public S getNextState(S s, E e) { + StateEntry entry = null; + if (s == null) { + entry = _initialStateEntry; + } else { + entry = _states.get(s); + assert entry != null : "Cannot retrieve transitions for state " + s.toString(); + } + + return entry.nextStates.get(e); + } + + public List getFromStates(S s, E e) { + StateEntry entry = _states.get(s); + if (entry == null) { + return new ArrayList(); + } + + return entry.prevStates.get(e); + } + + public boolean transitTO(V vo, E e) { + S currentState = vo.getState(); + S nextState = getNextState(currentState, e); + + boolean transitionStatus = true; + if (nextState == null) { + transitionStatus = false; + } + + transitionStatus = _instanceDao.updateState(currentState, e, nextState, vo); + + for (StateListener listener : _listeners) { + listener.processStateTransitionEvent(currentState, e, nextState, vo, transitionStatus); + } + + return transitionStatus; + } + + public boolean registerListener(StateListener listener) { + return _listeners.add(listener); + } + + @Override + public String toString() { + StringBuilder str = new StringBuilder(1024); + _initialStateEntry.buildString(str); + for (StateEntry entry : _states.values()) { + entry.buildString(str); + } + return str.toString(); + } + + private class StateEntry { + public S state; + public HashMap nextStates; + public HashMap> prevStates; + + public StateEntry(S state) { + this.state = state; + nextStates = new HashMap(); + prevStates = new HashMap>(); + } + + public void addTransition(E e, S s) { + assert !nextStates.containsKey(e) : "State " + getStateStr() + " already contains a transition to state " + nextStates.get(e).toString() + " via event " + e.toString() + ". Please revisit the rule you're adding to state " + s.toString(); + nextStates.put(e, s); + } + + public void addFromTransition(E e, S s) { + List l = prevStates.get(e); + if (l == null) { + l = new ArrayList(); + prevStates.put(e, l); + } + + assert !l.contains(s) : "Already contains the from transition " + e.toString() + " from state " + s.toString() + " to " + getStateStr(); + l.add(s); + } + + protected String getStateStr() { + return state == null ? "Initial" : state.toString(); + } + + public void buildString(StringBuilder str) { + str.append("State: ").append(getStateStr()).append("\n"); + for (Map.Entry nextState : nextStates.entrySet()) { + str.append(" --> Event: "); + Formatter format = new Formatter(); + str.append(format.format("%-30s", nextState.getKey().toString())); + str.append("----> State: "); + str.append(nextState.getValue().toString()); + str.append("\n"); + } + } + } +} diff --git a/utils/src/com/cloud/utils/fsm/StateObject.java b/utils/src/com/cloud/utils/fsm/StateObject.java new file mode 100644 index 00000000000..8aa2fc9ac57 --- /dev/null +++ b/utils/src/com/cloud/utils/fsm/StateObject.java @@ -0,0 +1,8 @@ +package com.cloud.utils.fsm; + +public interface StateObject { + /** + * @return finite state. + */ + S getState(); +}