// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package com.cloud.capacity; import java.net.URI; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import javax.ejb.Local; import javax.inject.Inject; import javax.naming.ConfigurationException; import org.apache.cloudstack.framework.messagebus.MessageBus; import org.apache.cloudstack.framework.messagebus.PublishScope; import org.apache.cloudstack.storage.datastore.db.StoragePoolVO; import org.apache.log4j.Logger; import org.springframework.stereotype.Component; import com.cloud.agent.AgentManager; import com.cloud.agent.Listener; import com.cloud.agent.api.AgentControlAnswer; import com.cloud.agent.api.AgentControlCommand; import com.cloud.agent.api.Answer; import com.cloud.agent.api.Command; import com.cloud.agent.api.StartupCommand; import com.cloud.agent.api.StartupRoutingCommand; import com.cloud.api.ApiDBUtils; import com.cloud.capacity.dao.CapacityDao; import com.cloud.configuration.Config; import com.cloud.configuration.ConfigurationManager; import com.cloud.configuration.dao.ConfigurationDao; import com.cloud.dc.ClusterDetailsDao; import com.cloud.dc.ClusterVO; import com.cloud.dc.dao.ClusterDao; import com.cloud.exception.ConnectionException; import com.cloud.host.Host; import com.cloud.host.HostVO; import com.cloud.host.Status; import com.cloud.host.dao.HostDao; import com.cloud.hypervisor.Hypervisor.HypervisorType; import com.cloud.hypervisor.dao.HypervisorCapabilitiesDao; import com.cloud.offering.ServiceOffering; import com.cloud.org.Grouping.AllocationState; import com.cloud.resource.ResourceListener; import com.cloud.resource.ResourceManager; import com.cloud.resource.ServerResource; import com.cloud.service.ServiceOfferingVO; import com.cloud.service.dao.ServiceOfferingDao; import com.cloud.storage.StorageManager; import com.cloud.storage.VMTemplateStoragePoolVO; import com.cloud.storage.VMTemplateVO; import com.cloud.storage.VolumeVO; import com.cloud.storage.dao.VMTemplatePoolDao; import com.cloud.storage.dao.VolumeDao; import com.cloud.storage.swift.SwiftManager; import com.cloud.uservm.UserVm; import com.cloud.utils.DateUtil; import com.cloud.utils.NumbersUtil; import com.cloud.utils.Pair; import com.cloud.utils.component.ManagerBase; import com.cloud.utils.concurrency.NamedThreadFactory; import com.cloud.utils.db.DB; import com.cloud.utils.db.SearchCriteria; import com.cloud.utils.db.Transaction; import com.cloud.utils.fsm.StateListener; import com.cloud.vm.UserVmDetailVO; import com.cloud.vm.UserVmVO; import com.cloud.vm.VMInstanceVO; import com.cloud.vm.VirtualMachine; import com.cloud.vm.VirtualMachine.Event; import com.cloud.vm.VirtualMachine.State; import com.cloud.vm.dao.UserVmDao; import com.cloud.vm.dao.UserVmDetailsDao; import com.cloud.vm.dao.VMInstanceDao; import com.cloud.vm.snapshot.VMSnapshot; import com.cloud.vm.snapshot.VMSnapshotVO; import com.cloud.vm.snapshot.dao.VMSnapshotDao; @Component @Local(value = CapacityManager.class) public class CapacityManagerImpl extends ManagerBase implements CapacityManager, StateListener, Listener, ResourceListener { private static final Logger s_logger = Logger.getLogger(CapacityManagerImpl.class); @Inject CapacityDao _capacityDao; @Inject ConfigurationDao _configDao; @Inject ServiceOfferingDao _offeringsDao; @Inject HostDao _hostDao; @Inject VMInstanceDao _vmDao; @Inject VolumeDao _volumeDao; @Inject VMTemplatePoolDao _templatePoolDao; @Inject AgentManager _agentManager; @Inject ResourceManager _resourceMgr; @Inject StorageManager _storageMgr; @Inject SwiftManager _swiftMgr; @Inject ConfigurationManager _configMgr; @Inject HypervisorCapabilitiesDao _hypervisorCapabilitiesDao; @Inject protected VMSnapshotDao _vmSnapshotDao; @Inject protected UserVmDao _userVMDao; @Inject protected UserVmDetailsDao _userVmDetailsDao; @Inject ClusterDetailsDao _clusterDetailsDao; @Inject ClusterDao _clusterDao; private int _vmCapacityReleaseInterval; private ScheduledExecutorService _executor; private boolean _stopped; long _extraBytesPerVolume = 0; private float _storageOverProvisioningFactor = 1.0f; @Inject MessageBus _messageBus; private static final String MESSAGE_RESERVED_CAPACITY_FREED_FLAG = "Message.ReservedCapacityFreed.Flag"; @Override public boolean configure(String name, Map params) throws ConfigurationException { _vmCapacityReleaseInterval = NumbersUtil.parseInt(_configDao.getValue(Config.CapacitySkipcountingHours.key()), 3600); _storageOverProvisioningFactor = NumbersUtil.parseFloat(_configDao.getValue(Config.StorageOverprovisioningFactor.key()), 1.0f); _executor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("HostCapacity-Checker")); VirtualMachine.State.getStateMachine().registerListener(this); _agentManager.registerForHostEvents(new StorageCapacityListener(_capacityDao, _storageMgr), true, false, false); _agentManager.registerForHostEvents(new ComputeCapacityListener(_capacityDao, this), true, false, false); return true; } @Override public boolean start() { _resourceMgr.registerResourceEvent(ResourceListener.EVENT_PREPARE_MAINTENANCE_AFTER, this); _resourceMgr.registerResourceEvent(ResourceListener.EVENT_CANCEL_MAINTENANCE_AFTER, this); return true; } @Override public boolean stop() { _executor.shutdownNow(); _stopped = true; return true; } @DB @Override public boolean releaseVmCapacity(VirtualMachine vm, boolean moveFromReserved, boolean moveToReservered, Long hostId) { ServiceOfferingVO svo = _offeringsDao.findById(vm.getServiceOfferingId()); CapacityVO capacityCpu = _capacityDao.findByHostIdType(hostId, CapacityVO.CAPACITY_TYPE_CPU); CapacityVO capacityMemory = _capacityDao.findByHostIdType(hostId, CapacityVO.CAPACITY_TYPE_MEMORY); Long clusterId = null; if (hostId != null) { HostVO host = _hostDao.findById(hostId); clusterId = host.getClusterId(); } if (capacityCpu == null || capacityMemory == null || svo == null) { return false; } Transaction txn = Transaction.currentTxn(); try { txn.start(); capacityCpu = _capacityDao.lockRow(capacityCpu.getId(), true); capacityMemory = _capacityDao.lockRow(capacityMemory.getId(), true); long usedCpu = capacityCpu.getUsedCapacity(); long usedMem = capacityMemory.getUsedCapacity(); long reservedCpu = capacityCpu.getReservedCapacity(); long reservedMem = capacityMemory.getReservedCapacity(); long actualTotalCpu = capacityCpu.getTotalCapacity(); float cpuOvercommitRatio =Float.parseFloat(_clusterDetailsDao.findDetail(clusterId,"cpuOvercommitRatio").getValue()); float memoryOvercommitRatio = Float.parseFloat(_clusterDetailsDao.findDetail(clusterId,"memoryOvercommitRatio").getValue()); int vmCPU = svo.getCpu() * svo.getSpeed(); long vmMem = svo.getRamSize() * 1024L * 1024L; long actualTotalMem = capacityMemory.getTotalCapacity(); long totalMem = (long) (actualTotalMem * memoryOvercommitRatio); long totalCpu = (long) (actualTotalCpu * cpuOvercommitRatio); if (s_logger.isDebugEnabled()) { s_logger.debug("Hosts's actual total CPU: " + actualTotalCpu + " and CPU after applying overprovisioning: " + totalCpu); s_logger.debug("Hosts's actual total RAM: " + actualTotalMem + " and RAM after applying overprovisioning: " + totalMem); } if (!moveFromReserved) { /* move resource from used */ if (usedCpu >= vmCPU) { capacityCpu.setUsedCapacity(usedCpu - vmCPU); } if (usedMem >= vmMem) { capacityMemory.setUsedCapacity(usedMem - vmMem); } if (moveToReservered) { if (reservedCpu + vmCPU <= totalCpu) { capacityCpu.setReservedCapacity(reservedCpu + vmCPU); } if (reservedMem + vmMem <= totalMem) { capacityMemory.setReservedCapacity(reservedMem + vmMem); } } } else { if (reservedCpu >= vmCPU) { capacityCpu.setReservedCapacity(reservedCpu - vmCPU); } if (reservedMem >= vmMem) { capacityMemory.setReservedCapacity(reservedMem - vmMem); } } s_logger.debug("release cpu from host: " + hostId + ", old used: " + usedCpu + ",reserved: " + reservedCpu + ", actual total: " + actualTotalCpu + ", total with overprovisioning: " + totalCpu + "; new used: " + capacityCpu.getUsedCapacity() + ",reserved:" + capacityCpu.getReservedCapacity() + "; movedfromreserved: " + moveFromReserved + ",moveToReservered" + moveToReservered); s_logger.debug("release mem from host: " + hostId + ", old used: " + usedMem + ",reserved: " + reservedMem + ", total: " + totalMem + "; new used: " + capacityMemory.getUsedCapacity() + ",reserved:" + capacityMemory.getReservedCapacity() + "; movedfromreserved: " + moveFromReserved + ",moveToReservered" + moveToReservered); _capacityDao.update(capacityCpu.getId(), capacityCpu); _capacityDao.update(capacityMemory.getId(), capacityMemory); txn.commit(); return true; } catch (Exception e) { s_logger.debug("Failed to transit vm's state, due to " + e.getMessage()); txn.rollback(); return false; } } @DB @Override public void allocateVmCapacity(VirtualMachine vm, boolean fromLastHost) { long hostId = vm.getHostId(); HostVO host = _hostDao.findById(hostId); long clusterId = host.getClusterId(); float cpuOvercommitRatio = Float.parseFloat(_clusterDetailsDao.findDetail(clusterId, "cpuOvercommitRatio").getValue()); float memoryOvercommitRatio = Float.parseFloat(_clusterDetailsDao.findDetail(clusterId, "memoryOvercommitRatio").getValue()); ServiceOfferingVO svo = _offeringsDao.findById(vm.getServiceOfferingId()); CapacityVO capacityCpu = _capacityDao.findByHostIdType(hostId, CapacityVO.CAPACITY_TYPE_CPU); CapacityVO capacityMem = _capacityDao.findByHostIdType(hostId, CapacityVO.CAPACITY_TYPE_MEMORY); if (capacityCpu == null || capacityMem == null || svo == null) { return; } int cpu = svo.getCpu() * svo.getSpeed(); long ram = svo.getRamSize() * 1024L * 1024L; Transaction txn = Transaction.currentTxn(); try { txn.start(); capacityCpu = _capacityDao.lockRow(capacityCpu.getId(), true); capacityMem = _capacityDao.lockRow(capacityMem.getId(), true); long usedCpu = capacityCpu.getUsedCapacity(); long usedMem = capacityMem.getUsedCapacity(); long reservedCpu = capacityCpu.getReservedCapacity(); long reservedMem = capacityMem.getReservedCapacity(); long actualTotalCpu = capacityCpu.getTotalCapacity(); long actualTotalMem = capacityMem.getTotalCapacity(); long totalCpu = (long) (actualTotalCpu * cpuOvercommitRatio); long totalMem = (long) (actualTotalMem * memoryOvercommitRatio); if (s_logger.isDebugEnabled()) { s_logger.debug("Hosts's actual total CPU: " + actualTotalCpu + " and CPU after applying overprovisioning: " + totalCpu); } long freeCpu = totalCpu - (reservedCpu + usedCpu); long freeMem = totalMem - (reservedMem + usedMem); if (s_logger.isDebugEnabled()) { s_logger.debug("We are allocating VM, increasing the used capacity of this host:" + hostId); s_logger.debug("Current Used CPU: " + usedCpu + " , Free CPU:" + freeCpu + " ,Requested CPU: " + cpu); s_logger.debug("Current Used RAM: " + usedMem + " , Free RAM:" + freeMem + " ,Requested RAM: " + ram); } capacityCpu.setUsedCapacity(usedCpu + cpu); capacityMem.setUsedCapacity(usedMem + ram); if (fromLastHost) { /* alloc from reserved */ if (s_logger.isDebugEnabled()) { s_logger.debug("We are allocating VM to the last host again, so adjusting the reserved capacity if it is not less than required"); s_logger.debug("Reserved CPU: " + reservedCpu + " , Requested CPU: " + cpu); s_logger.debug("Reserved RAM: " + reservedMem + " , Requested RAM: " + ram); } if (reservedCpu >= cpu && reservedMem >= ram) { capacityCpu.setReservedCapacity(reservedCpu - cpu); capacityMem.setReservedCapacity(reservedMem - ram); } } else { /* alloc from free resource */ if (!((reservedCpu + usedCpu + cpu <= totalCpu) && (reservedMem + usedMem + ram <= totalMem))) { if (s_logger.isDebugEnabled()) { s_logger.debug("Host doesnt seem to have enough free capacity, but increasing the used capacity anyways, since the VM is already starting on this host "); } } } s_logger.debug("CPU STATS after allocation: for host: " + hostId + ", old used: " + usedCpu + ", old reserved: " + reservedCpu + ", actual total: " + actualTotalCpu + ", total with overprovisioning: " + totalCpu + "; new used:" + capacityCpu.getUsedCapacity() + ", reserved:" + capacityCpu.getReservedCapacity() + "; requested cpu:" + cpu + ",alloc_from_last:" + fromLastHost); s_logger.debug("RAM STATS after allocation: for host: " + hostId + ", old used: " + usedMem + ", old reserved: " + reservedMem + ", total: " + totalMem + "; new used: " + capacityMem.getUsedCapacity() + ", reserved: " + capacityMem.getReservedCapacity() + "; requested mem: " + ram + ",alloc_from_last:" + fromLastHost); _capacityDao.update(capacityCpu.getId(), capacityCpu); _capacityDao.update(capacityMem.getId(), capacityMem); txn.commit(); } catch (Exception e) { txn.rollback(); return; } } @Override public boolean checkIfHostHasCapacity(long hostId, Integer cpu, long ram, boolean checkFromReservedCapacity, float cpuOvercommitRatio, float memoryOvercommitRatio, boolean considerReservedCapacity) { boolean hasCapacity = false; if (s_logger.isDebugEnabled()) { s_logger.debug("Checking if host: " + hostId + " has enough capacity for requested CPU: " + cpu + " and requested RAM: " + ram + " , cpuOverprovisioningFactor: " + cpuOvercommitRatio); } CapacityVO capacityCpu = _capacityDao.findByHostIdType(hostId, CapacityVO.CAPACITY_TYPE_CPU); CapacityVO capacityMem = _capacityDao.findByHostIdType(hostId, CapacityVO.CAPACITY_TYPE_MEMORY); if (capacityCpu == null || capacityMem == null) { if (capacityCpu == null) { if (s_logger.isDebugEnabled()) { s_logger.debug("Cannot checkIfHostHasCapacity, Capacity entry for CPU not found in Db, for hostId: " + hostId); } } if (capacityMem == null) { if (s_logger.isDebugEnabled()) { s_logger.debug("Cannot checkIfHostHasCapacity, Capacity entry for RAM not found in Db, for hostId: " + hostId); } } return false; } long usedCpu = capacityCpu.getUsedCapacity(); long usedMem = capacityMem.getUsedCapacity(); long reservedCpu = capacityCpu.getReservedCapacity(); long reservedMem = capacityMem.getReservedCapacity(); long actualTotalCpu = capacityCpu.getTotalCapacity(); long actualTotalMem = capacityMem.getTotalCapacity(); long totalCpu = (long) (actualTotalCpu * cpuOvercommitRatio ); long totalMem = (long) (actualTotalMem * memoryOvercommitRatio); if (s_logger.isDebugEnabled()) { s_logger.debug("Hosts's actual total CPU: " + actualTotalCpu + " and CPU after applying overprovisioning: " + totalCpu); } String failureReason = ""; if (checkFromReservedCapacity) { long freeCpu = reservedCpu; long freeMem = reservedMem; if (s_logger.isDebugEnabled()) { s_logger.debug("We need to allocate to the last host again, so checking if there is enough reserved capacity"); s_logger.debug("Reserved CPU: " + freeCpu + " , Requested CPU: " + cpu); s_logger.debug("Reserved RAM: " + freeMem + " , Requested RAM: " + ram); } /* alloc from reserved */ if (reservedCpu >= cpu) { if (reservedMem >= ram) { hasCapacity = true; } else { failureReason = "Host does not have enough reserved RAM available"; } } else { failureReason = "Host does not have enough reserved CPU available"; } } else { long reservedCpuValueToUse = reservedCpu; long reservedMemValueToUse = reservedMem; if(!considerReservedCapacity){ if (s_logger.isDebugEnabled()) { s_logger.debug("considerReservedCapacity is" + considerReservedCapacity + " , not considering reserved capacity for calculating free capacity"); } reservedCpuValueToUse = 0; reservedMemValueToUse = 0; } long freeCpu = totalCpu - (reservedCpuValueToUse + usedCpu); long freeMem = totalMem - (reservedMemValueToUse + usedMem); if (s_logger.isDebugEnabled()) { s_logger.debug("Free CPU: " + freeCpu + " , Requested CPU: " + cpu); s_logger.debug("Free RAM: " + freeMem + " , Requested RAM: " + ram); } /* alloc from free resource */ if ((reservedCpuValueToUse + usedCpu + cpu <= totalCpu)) { if ((reservedMemValueToUse + usedMem + ram <= totalMem)) { hasCapacity = true; } else { failureReason = "Host does not have enough RAM available"; } } else { failureReason = "Host does not have enough CPU available"; } } if (hasCapacity) { if (s_logger.isDebugEnabled()) { s_logger.debug("Host has enough CPU and RAM available"); } s_logger.debug("STATS: Can alloc CPU from host: " + hostId + ", used: " + usedCpu + ", reserved: " + reservedCpu + ", actual total: " + actualTotalCpu + ", total with overprovisioning: " + totalCpu + "; requested cpu:" + cpu + ",alloc_from_last_host?:" + checkFromReservedCapacity + " ,considerReservedCapacity?: "+considerReservedCapacity); s_logger.debug("STATS: Can alloc MEM from host: " + hostId + ", used: " + usedMem + ", reserved: " + reservedMem + ", total: " + totalMem + "; requested mem: " + ram + ",alloc_from_last_host?:" + checkFromReservedCapacity+ " ,considerReservedCapacity?: "+considerReservedCapacity); } else { if (checkFromReservedCapacity) { s_logger.debug("STATS: Failed to alloc resource from host: " + hostId + " reservedCpu: " + reservedCpu + ", requested cpu: " + cpu + ", reservedMem: " + reservedMem + ", requested mem: " + ram); } else { s_logger.debug("STATS: Failed to alloc resource from host: " + hostId + " reservedCpu: " + reservedCpu + ", used cpu: " + usedCpu + ", requested cpu: " + cpu + ", actual total cpu: " + actualTotalCpu + ", total cpu with overprovisioning: " + totalCpu + ", reservedMem: " + reservedMem + ", used Mem: " + usedMem + ", requested mem: " + ram + ", total Mem:" + totalMem+ " ,considerReservedCapacity?: "+considerReservedCapacity); } if (s_logger.isDebugEnabled()) { s_logger.debug(failureReason + ", cannot allocate to this host."); } } return hasCapacity; } private long getVMSnapshotAllocatedCapacity(StoragePoolVO pool){ List volumes = _volumeDao.findByPoolId(pool.getId()); long totalSize = 0; for (VolumeVO volume : volumes) { if(volume.getInstanceId() == null) continue; Long vmId = volume.getInstanceId(); UserVm vm = _userVMDao.findById(vmId); if(vm == null) continue; ServiceOffering offering = _offeringsDao.findById(vm.getServiceOfferingId()); List vmSnapshots = _vmSnapshotDao.findByVm(vmId); long pathCount = 0; long memorySnapshotSize = 0; for (VMSnapshotVO vmSnapshotVO : vmSnapshots) { if(_vmSnapshotDao.listByParent(vmSnapshotVO.getId()).size() == 0) pathCount++; if(vmSnapshotVO.getType() == VMSnapshot.Type.DiskAndMemory) memorySnapshotSize += (offering.getRamSize() * 1024 * 1024); } if(pathCount <= 1) totalSize = totalSize + memorySnapshotSize; else totalSize = totalSize + volume.getSize() * (pathCount - 1) + memorySnapshotSize; } return totalSize; } @Override public long getAllocatedPoolCapacity(StoragePoolVO pool, VMTemplateVO templateForVmCreation){ // Get size for all the volumes Pair sizes = _volumeDao.getCountAndTotalByPool(pool.getId()); long totalAllocatedSize = sizes.second() + sizes.first() * _extraBytesPerVolume; // Get size for VM Snapshots totalAllocatedSize = totalAllocatedSize + getVMSnapshotAllocatedCapacity(pool); // Iterate through all templates on this storage pool boolean tmpinstalled = false; List templatePoolVOs; templatePoolVOs = _templatePoolDao.listByPoolId(pool.getId()); for (VMTemplateStoragePoolVO templatePoolVO : templatePoolVOs) { if ((templateForVmCreation != null) && !tmpinstalled && (templatePoolVO.getTemplateId() == templateForVmCreation.getId())) { tmpinstalled = true; } long templateSize = templatePoolVO.getTemplateSize(); totalAllocatedSize += templateSize + _extraBytesPerVolume; } // Add the size for the templateForVmCreation if its not already present /*if ((templateForVmCreation != null) && !tmpinstalled) { }*/ return totalAllocatedSize; } @DB @Override public void updateCapacityForHost(Host host){ // prepare the service offerings List offerings = _offeringsDao.listAllIncludingRemoved(); Map offeringsMap = new HashMap(); for (ServiceOfferingVO offering : offerings) { offeringsMap.put(offering.getId(), offering); } long usedCpu = 0; long usedMemory = 0; long reservedMemory = 0; long reservedCpu = 0; List vms = _vmDao.listUpByHostId(host.getId()); if (s_logger.isDebugEnabled()) { s_logger.debug("Found " + vms.size() + " VMs on host " + host.getId()); } for (VMInstanceVO vm : vms) { ServiceOffering so = offeringsMap.get(vm.getServiceOfferingId()); usedMemory += so.getRamSize() * 1024L * 1024L; usedCpu += so.getCpu() * so.getSpeed(); } List vmsByLastHostId = _vmDao.listByLastHostId(host.getId()); if (s_logger.isDebugEnabled()) { s_logger.debug("Found " + vmsByLastHostId.size() + " VM, not running on host " + host.getId()); } for (VMInstanceVO vm : vmsByLastHostId) { long secondsSinceLastUpdate = (DateUtil.currentGMTTime().getTime() - vm.getUpdateTime().getTime()) / 1000; if (secondsSinceLastUpdate < _vmCapacityReleaseInterval) { ServiceOffering so = offeringsMap.get(vm.getServiceOfferingId()); reservedMemory += so.getRamSize() * 1024L * 1024L; reservedCpu += so.getCpu() * so.getSpeed(); } else { // signal if not done already, that the VM has been stopped for skip.counting.hours, // hence capacity will not be reserved anymore. UserVmDetailVO messageSentFlag = _userVmDetailsDao.findDetail(vm.getId(), MESSAGE_RESERVED_CAPACITY_FREED_FLAG); if (messageSentFlag == null || !Boolean.valueOf(messageSentFlag.getValue())) { _messageBus.publish(_name, "VM_ReservedCapacity_Free", PublishScope.LOCAL, vm); if (vm.getType() == VirtualMachine.Type.User) { UserVmVO userVM = _userVMDao.findById(vm.getId()); _userVMDao.loadDetails(userVM); userVM.setDetail(MESSAGE_RESERVED_CAPACITY_FREED_FLAG, "true"); _userVMDao.saveDetails(userVM); } } } } CapacityVO cpuCap = _capacityDao.findByHostIdType(host.getId(), CapacityVO.CAPACITY_TYPE_CPU); CapacityVO memCap = _capacityDao.findByHostIdType(host.getId(), CapacityVO.CAPACITY_TYPE_MEMORY); if (cpuCap != null && memCap != null){ if (cpuCap.getUsedCapacity() == usedCpu && cpuCap.getReservedCapacity() == reservedCpu) { s_logger.debug("No need to calibrate cpu capacity, host:" + host.getId() + " usedCpu: " + cpuCap.getUsedCapacity() + " reservedCpu: " + cpuCap.getReservedCapacity()); } else if (cpuCap.getReservedCapacity() != reservedCpu) { s_logger.debug("Calibrate reserved cpu for host: " + host.getId() + " old reservedCpu:" + cpuCap.getReservedCapacity() + " new reservedCpu:" + reservedCpu); cpuCap.setReservedCapacity(reservedCpu); } else if (cpuCap.getUsedCapacity() != usedCpu) { s_logger.debug("Calibrate used cpu for host: " + host.getId() + " old usedCpu:" + cpuCap.getUsedCapacity() + " new usedCpu:" + usedCpu); cpuCap.setUsedCapacity(usedCpu); } if (memCap.getUsedCapacity() == usedMemory && memCap.getReservedCapacity() == reservedMemory) { s_logger.debug("No need to calibrate memory capacity, host:" + host.getId() + " usedMem: " + memCap.getUsedCapacity() + " reservedMem: " + memCap.getReservedCapacity()); } else if (memCap.getReservedCapacity() != reservedMemory) { s_logger.debug("Calibrate reserved memory for host: " + host.getId() + " old reservedMem:" + memCap.getReservedCapacity() + " new reservedMem:" + reservedMemory); memCap.setReservedCapacity(reservedMemory); } else if (memCap.getUsedCapacity() != usedMemory) { /* * Didn't calibrate for used memory, because VMs can be in state(starting/migrating) that I don't know on which host they are * allocated */ s_logger.debug("Calibrate used memory for host: " + host.getId() + " old usedMem: " + memCap.getUsedCapacity() + " new usedMem: " + usedMemory); memCap.setUsedCapacity(usedMemory); } try { _capacityDao.update(cpuCap.getId(), cpuCap); _capacityDao.update(memCap.getId(), memCap); } catch (Exception e) { s_logger.error("Caught exception while updating cpu/memory capacity for the host " +host.getId(), e); } }else { Transaction txn = Transaction.currentTxn(); txn.start(); CapacityVO capacity = new CapacityVO(host.getId(), host.getDataCenterId(), host.getPodId(), host.getClusterId(), usedMemory, host.getTotalMemory(), CapacityVO.CAPACITY_TYPE_MEMORY); capacity.setReservedCapacity(reservedMemory); CapacityState capacityState = CapacityState.Enabled; if (host.getClusterId() != null) { ClusterVO cluster = ApiDBUtils.findClusterById(host.getClusterId()); if (cluster != null) { capacityState = _configMgr.findClusterAllocationState(cluster) == AllocationState.Disabled ? CapacityState.Disabled : CapacityState.Enabled; capacity.setCapacityState(capacityState); } } _capacityDao.persist(capacity); capacity = new CapacityVO( host.getId(), host.getDataCenterId(), host.getPodId(), host.getClusterId(), usedCpu, host.getCpus().longValue() * host.getSpeed().longValue(), CapacityVO.CAPACITY_TYPE_CPU); capacity.setReservedCapacity(reservedCpu); capacity.setCapacityState(capacityState); _capacityDao.persist(capacity); txn.commit(); } } @Override public boolean preStateTransitionEvent(State oldState, Event event, State newState, VirtualMachine vm, boolean transitionStatus, Object opaque) { return true; } @Override public boolean postStateTransitionEvent(State oldState, Event event, State newState, VirtualMachine vm, boolean status, Object opaque) { if (!status) { return false; } @SuppressWarnings("unchecked") Pair hosts = (Pair)opaque; Long oldHostId = hosts.first(); s_logger.debug("VM state transitted from :" + oldState + " to " + newState + " with event: " + event + "vm's original host id: " + vm.getLastHostId() + " new host id: " + vm.getHostId() + " host id before state transition: " + oldHostId); if (oldState == State.Starting) { if(newState != State.Running) { releaseVmCapacity(vm, false, false, oldHostId); } } else if (oldState == State.Running) { if (event == Event.AgentReportStopped) { releaseVmCapacity(vm, false, true, oldHostId); } else if(event == Event.AgentReportMigrated) { releaseVmCapacity(vm, false, false, oldHostId); } } else if (oldState == State.Migrating) { if (event == Event.AgentReportStopped) { /* Release capacity from original host */ releaseVmCapacity(vm, false, false, vm.getLastHostId()); releaseVmCapacity(vm, false, false, oldHostId); } else if (event == Event.OperationFailed) { /* Release from dest host */ releaseVmCapacity(vm, false, false, oldHostId); } else if (event == Event.OperationSucceeded) { releaseVmCapacity(vm, false, false, vm.getLastHostId()); } } else if (oldState == State.Stopping) { if (event == Event.OperationSucceeded) { releaseVmCapacity(vm, false, true, oldHostId); } else if (event == Event.AgentReportStopped) { releaseVmCapacity(vm, false, false, oldHostId); } else if(event == Event.AgentReportMigrated) { releaseVmCapacity(vm, false, false, oldHostId); } } else if (oldState == State.Stopped) { if (event == Event.DestroyRequested || event == Event.ExpungeOperation) { releaseVmCapacity(vm, true, false, vm.getLastHostId()); } else if(event == Event.AgentReportMigrated) { releaseVmCapacity(vm, false, false, oldHostId); } } if ((newState == State.Starting || newState == State.Migrating || event == Event.AgentReportMigrated) && vm.getHostId() != null) { boolean fromLastHost = false; if (vm.getHostId().equals(vm.getLastHostId())) { s_logger.debug("VM starting again on the last host it was stopped on"); fromLastHost = true; } allocateVmCapacity(vm, fromLastHost); } if (newState == State.Stopped) { if (vm.getType() == VirtualMachine.Type.User) { UserVmVO userVM = _userVMDao.findById(vm.getId()); _userVMDao.loadDetails(userVM); // free the message sent flag if it exists userVM.setDetail(MESSAGE_RESERVED_CAPACITY_FREED_FLAG, "false"); _userVMDao.saveDetails(userVM); } } return true; } // TODO: Get rid of this case once we've determined that the capacity listeners above have all the changes // create capacity entries if none exist for this server private void createCapacityEntry(StartupCommand startup, HostVO server) { SearchCriteria capacitySC = _capacityDao.createSearchCriteria(); capacitySC.addAnd("hostOrPoolId", SearchCriteria.Op.EQ, server.getId()); capacitySC.addAnd("dataCenterId", SearchCriteria.Op.EQ, server.getDataCenterId()); capacitySC.addAnd("podId", SearchCriteria.Op.EQ, server.getPodId()); if (startup instanceof StartupRoutingCommand) { SearchCriteria capacityCPU = _capacityDao.createSearchCriteria(); capacityCPU.addAnd("hostOrPoolId", SearchCriteria.Op.EQ, server.getId()); capacityCPU.addAnd("dataCenterId", SearchCriteria.Op.EQ, server.getDataCenterId()); capacityCPU.addAnd("podId", SearchCriteria.Op.EQ, server.getPodId()); capacityCPU.addAnd("capacityType", SearchCriteria.Op.EQ, CapacityVO.CAPACITY_TYPE_CPU); List capacityVOCpus = _capacityDao.search(capacitySC, null); Float cpuovercommitratio = Float.parseFloat(_clusterDetailsDao.findDetail(server.getClusterId(), "cpuOvercommitRatio").getValue()); Float memoryOvercommitRatio = Float.parseFloat(_clusterDetailsDao.findDetail(server.getClusterId(), "memoryOvercommitRatio").getValue()); if (capacityVOCpus != null && !capacityVOCpus.isEmpty()) { CapacityVO CapacityVOCpu = capacityVOCpus.get(0); long newTotalCpu = (long) (server.getCpus().longValue() * server.getSpeed().longValue() * cpuovercommitratio); if ((CapacityVOCpu.getTotalCapacity() <= newTotalCpu) || ((CapacityVOCpu.getUsedCapacity() + CapacityVOCpu.getReservedCapacity()) <= newTotalCpu)) { CapacityVOCpu.setTotalCapacity(newTotalCpu); } else if ((CapacityVOCpu.getUsedCapacity() + CapacityVOCpu.getReservedCapacity() > newTotalCpu) && (CapacityVOCpu.getUsedCapacity() < newTotalCpu)) { CapacityVOCpu.setReservedCapacity(0); CapacityVOCpu.setTotalCapacity(newTotalCpu); } else { s_logger.debug("What? new cpu is :" + newTotalCpu + ", old one is " + CapacityVOCpu.getUsedCapacity() + "," + CapacityVOCpu.getReservedCapacity() + "," + CapacityVOCpu.getTotalCapacity()); } _capacityDao.update(CapacityVOCpu.getId(), CapacityVOCpu); } else { CapacityVO capacity = new CapacityVO(server.getId(), server.getDataCenterId(), server.getPodId(), server.getClusterId(), 0L, server.getCpus().longValue() * server.getSpeed().longValue(), CapacityVO.CAPACITY_TYPE_CPU); _capacityDao.persist(capacity); } SearchCriteria capacityMem = _capacityDao.createSearchCriteria(); capacityMem.addAnd("hostOrPoolId", SearchCriteria.Op.EQ, server.getId()); capacityMem.addAnd("dataCenterId", SearchCriteria.Op.EQ, server.getDataCenterId()); capacityMem.addAnd("podId", SearchCriteria.Op.EQ, server.getPodId()); capacityMem.addAnd("capacityType", SearchCriteria.Op.EQ, CapacityVO.CAPACITY_TYPE_MEMORY); List capacityVOMems = _capacityDao.search(capacityMem, null); if (capacityVOMems != null && !capacityVOMems.isEmpty()) { CapacityVO CapacityVOMem = capacityVOMems.get(0); long newTotalMem = (long) ((server.getTotalMemory()) * memoryOvercommitRatio); if (CapacityVOMem.getTotalCapacity() <= newTotalMem || (CapacityVOMem.getUsedCapacity() + CapacityVOMem.getReservedCapacity() <= newTotalMem)) { CapacityVOMem.setTotalCapacity(newTotalMem); } else if (CapacityVOMem.getUsedCapacity() + CapacityVOMem.getReservedCapacity() > newTotalMem && CapacityVOMem.getUsedCapacity() < newTotalMem) { CapacityVOMem.setReservedCapacity(0); CapacityVOMem.setTotalCapacity(newTotalMem); } else { s_logger.debug("What? new cpu is :" + newTotalMem + ", old one is " + CapacityVOMem.getUsedCapacity() + "," + CapacityVOMem.getReservedCapacity() + "," + CapacityVOMem.getTotalCapacity()); } _capacityDao.update(CapacityVOMem.getId(), CapacityVOMem); } else { CapacityVO capacity = new CapacityVO(server.getId(), server.getDataCenterId(), server.getPodId(), server.getClusterId(), 0L, server.getTotalMemory(), CapacityVO.CAPACITY_TYPE_MEMORY); _capacityDao.persist(capacity); } } } @Override public boolean processAnswers(long agentId, long seq, Answer[] answers) { // TODO Auto-generated method stub return false; } @Override public boolean processCommands(long agentId, long seq, Command[] commands) { // TODO Auto-generated method stub return false; } @Override public AgentControlAnswer processControlCommand(long agentId, AgentControlCommand cmd) { // TODO Auto-generated method stub return null; } @Override public void processConnect(Host host, StartupCommand cmd, boolean forRebalance) throws ConnectionException { // TODO Auto-generated method stub } @Override public boolean processDisconnect(long agentId, Status state) { // TODO Auto-generated method stub return false; } @Override public boolean isRecurring() { // TODO Auto-generated method stub return false; } @Override public int getTimeout() { // TODO Auto-generated method stub return 0; } @Override public boolean processTimeout(long agentId, long seq) { // TODO Auto-generated method stub return false; } @Override public void processCancelMaintenaceEventAfter(Long hostId) { updateCapacityForHost(_hostDao.findById(hostId)); } @Override public void processCancelMaintenaceEventBefore(Long hostId) { // TODO Auto-generated method stub } @Override public void processDeletHostEventAfter(Host host) { // TODO Auto-generated method stub } @Override public void processDeleteHostEventBefore(Host host) { // TODO Auto-generated method stub } @Override public void processDiscoverEventAfter( Map> resources) { // TODO Auto-generated method stub } @Override public void processDiscoverEventBefore(Long dcid, Long podId, Long clusterId, URI uri, String username, String password, List hostTags) { // TODO Auto-generated method stub } @Override public void processPrepareMaintenaceEventAfter(Long hostId) { _capacityDao.removeBy(Capacity.CAPACITY_TYPE_MEMORY, null, null, null, hostId); _capacityDao.removeBy(Capacity.CAPACITY_TYPE_CPU, null, null, null, hostId); } @Override public void processPrepareMaintenaceEventBefore(Long hostId) { // TODO Auto-generated method stub } @Override public boolean checkIfHostReachMaxGuestLimit(HostVO host) { Long vmCount = _vmDao.countRunningByHostId(host.getId()); HypervisorType hypervisorType = host.getHypervisorType(); String hypervisorVersion = host.getHypervisorVersion(); Long maxGuestLimit = _hypervisorCapabilitiesDao.getMaxGuestsLimit(hypervisorType, hypervisorVersion); if(vmCount.longValue() >= maxGuestLimit.longValue()){ if (s_logger.isDebugEnabled()) { s_logger.debug("Host name: " + host.getName() + ", hostId: "+ host.getId() + " already reached max Running VMs(count includes system VMs), limit is: " + maxGuestLimit + ",Running VM counts is: "+vmCount.longValue()); } return true; } return false; } }