mirror of https://github.com/apache/cloudstack.git
Host reservation release using MessageBus
This commit is contained in:
parent
9b865161dd
commit
81d6da5ad7
|
|
@ -93,6 +93,11 @@
|
|||
<artifactId>cloud-api</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.cloudstack</groupId>
|
||||
<artifactId>cloud-framework-ipc</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.cloudstack</groupId>
|
||||
<artifactId>cloud-framework-events</artifactId>
|
||||
|
|
|
|||
|
|
@ -27,6 +27,8 @@ import javax.ejb.Local;
|
|||
import javax.inject.Inject;
|
||||
import javax.naming.ConfigurationException;
|
||||
|
||||
import org.apache.cloudstack.framework.messagebus.MessageBus;
|
||||
import org.apache.cloudstack.framework.messagebus.PublishScope;
|
||||
import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
|
||||
import com.cloud.dc.ClusterDetailsDao;
|
||||
import com.cloud.dc.DataCenter;
|
||||
|
|
@ -104,9 +106,9 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
|
|||
HostDao _hostDao;
|
||||
@Inject
|
||||
VMInstanceDao _vmDao;
|
||||
@Inject
|
||||
@Inject
|
||||
VolumeDao _volumeDao;
|
||||
@Inject
|
||||
@Inject
|
||||
VMTemplatePoolDao _templatePoolDao;
|
||||
@Inject
|
||||
AgentManager _agentManager;
|
||||
|
|
@ -115,16 +117,16 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
|
|||
@Inject
|
||||
StorageManager _storageMgr;
|
||||
@Inject
|
||||
SwiftManager _swiftMgr;
|
||||
SwiftManager _swiftMgr;
|
||||
@Inject
|
||||
ConfigurationManager _configMgr;
|
||||
ConfigurationManager _configMgr;
|
||||
@Inject
|
||||
HypervisorCapabilitiesDao _hypervisorCapabilitiesDao;
|
||||
@Inject
|
||||
protected VMSnapshotDao _vmSnapshotDao;
|
||||
@Inject
|
||||
protected UserVmDao _userVMDao;
|
||||
|
||||
|
||||
@Inject
|
||||
ClusterDetailsDao _clusterDetailsDao;
|
||||
@Inject
|
||||
|
|
@ -135,6 +137,9 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
|
|||
long _extraBytesPerVolume = 0;
|
||||
private float _storageOverProvisioningFactor = 1.0f;
|
||||
|
||||
@Inject
|
||||
MessageBus _messageBus;
|
||||
|
||||
@Override
|
||||
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
|
||||
_vmCapacityReleaseInterval = NumbersUtil.parseInt(_configDao.getValue(Config.CapacitySkipcountingHours.key()), 3600);
|
||||
|
|
@ -398,10 +403,10 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
|
|||
failureReason = "Host does not have enough reserved CPU available";
|
||||
}
|
||||
} else {
|
||||
|
||||
|
||||
long reservedCpuValueToUse = reservedCpu;
|
||||
long reservedMemValueToUse = reservedMem;
|
||||
|
||||
|
||||
if(!considerReservedCapacity){
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("considerReservedCapacity is" + considerReservedCapacity + " , not considering reserved capacity for calculating free capacity");
|
||||
|
|
@ -458,7 +463,7 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
|
|||
return hasCapacity;
|
||||
|
||||
}
|
||||
|
||||
|
||||
private long getVMSnapshotAllocatedCapacity(StoragePoolVO pool){
|
||||
List<VolumeVO> volumes = _volumeDao.findByPoolId(pool.getId());
|
||||
long totalSize = 0;
|
||||
|
|
@ -486,39 +491,39 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
|
|||
}
|
||||
return totalSize;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public long getAllocatedPoolCapacity(StoragePoolVO pool, VMTemplateVO templateForVmCreation){
|
||||
|
||||
|
||||
// Get size for all the volumes
|
||||
Pair<Long, Long> sizes = _volumeDao.getCountAndTotalByPool(pool.getId());
|
||||
long totalAllocatedSize = sizes.second() + sizes.first() * _extraBytesPerVolume;
|
||||
|
||||
// Get size for VM Snapshots
|
||||
|
||||
// Get size for VM Snapshots
|
||||
totalAllocatedSize = totalAllocatedSize + getVMSnapshotAllocatedCapacity(pool);
|
||||
|
||||
// Iterate through all templates on this storage pool
|
||||
boolean tmpinstalled = false;
|
||||
List<VMTemplateStoragePoolVO> templatePoolVOs;
|
||||
templatePoolVOs = _templatePoolDao.listByPoolId(pool.getId());
|
||||
|
||||
for (VMTemplateStoragePoolVO templatePoolVO : templatePoolVOs) {
|
||||
|
||||
for (VMTemplateStoragePoolVO templatePoolVO : templatePoolVOs) {
|
||||
if ((templateForVmCreation != null) && !tmpinstalled && (templatePoolVO.getTemplateId() == templateForVmCreation.getId())) {
|
||||
tmpinstalled = true;
|
||||
}
|
||||
long templateSize = templatePoolVO.getTemplateSize();
|
||||
totalAllocatedSize += templateSize + _extraBytesPerVolume;
|
||||
}
|
||||
|
||||
|
||||
// Add the size for the templateForVmCreation if its not already present
|
||||
/*if ((templateForVmCreation != null) && !tmpinstalled) {
|
||||
|
||||
|
||||
}*/
|
||||
|
||||
|
||||
return totalAllocatedSize;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
@DB
|
||||
@Override
|
||||
public void updateCapacityForHost(HostVO host){
|
||||
|
|
@ -528,7 +533,7 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
|
|||
for (ServiceOfferingVO offering : offerings) {
|
||||
offeringsMap.put(offering.getId(), offering);
|
||||
}
|
||||
|
||||
|
||||
long usedCpu = 0;
|
||||
long usedMemory = 0;
|
||||
long reservedMemory = 0;
|
||||
|
|
@ -555,6 +560,10 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
|
|||
ServiceOffering so = offeringsMap.get(vm.getServiceOfferingId());
|
||||
reservedMemory += so.getRamSize() * 1024L * 1024L;
|
||||
reservedCpu += so.getCpu() * so.getSpeed();
|
||||
} else {
|
||||
// signal that the VM has been stopped for skip.counting.hours,
|
||||
// hence capacity will not be reserved anymore.
|
||||
_messageBus.publish(_name, "VM_ReservedCapacity_Free", PublishScope.LOCAL, vm);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -574,7 +583,7 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
|
|||
+ usedCpu);
|
||||
cpuCap.setUsedCapacity(usedCpu);
|
||||
}
|
||||
|
||||
|
||||
if (memCap.getUsedCapacity() == usedMemory && memCap.getReservedCapacity() == reservedMemory) {
|
||||
s_logger.debug("No need to calibrate memory capacity, host:" + host.getId() + " usedMem: " + memCap.getUsedCapacity()
|
||||
+ " reservedMem: " + memCap.getReservedCapacity());
|
||||
|
|
@ -591,7 +600,7 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
|
|||
+ " new usedMem: " + usedMemory);
|
||||
memCap.setUsedCapacity(usedMemory);
|
||||
}
|
||||
|
||||
|
||||
try {
|
||||
_capacityDao.update(cpuCap.getId(), cpuCap);
|
||||
_capacityDao.update(memCap.getId(), memCap);
|
||||
|
|
@ -610,11 +619,11 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
|
|||
capacity.setReservedCapacity(reservedMemory);
|
||||
capacity.setCapacityState(capacityState);
|
||||
_capacityDao.persist(capacity);
|
||||
|
||||
|
||||
capacity = new CapacityVO(
|
||||
host.getId(),
|
||||
host.getDataCenterId(),
|
||||
host.getPodId(),
|
||||
host.getPodId(),
|
||||
host.getClusterId(),
|
||||
usedCpu,
|
||||
(long)(host.getCpus().longValue() * host.getSpeed().longValue()),
|
||||
|
|
@ -623,11 +632,11 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
|
|||
capacity.setCapacityState(capacityState);
|
||||
_capacityDao.persist(capacity);
|
||||
txn.commit();
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean preStateTransitionEvent(State oldState, Event event, State newState, VirtualMachine vm, boolean transitionStatus, Object opaque) {
|
||||
return true;
|
||||
|
|
@ -681,7 +690,7 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
|
|||
releaseVmCapacity(vm, false, false, oldHostId);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if ((newState == State.Starting || newState == State.Migrating || event == Event.AgentReportMigrated) && vm.getHostId() != null) {
|
||||
boolean fromLastHost = false;
|
||||
if (vm.getLastHostId() == vm.getHostId()) {
|
||||
|
|
@ -815,32 +824,32 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
|
|||
|
||||
@Override
|
||||
public void processCancelMaintenaceEventAfter(Long hostId) {
|
||||
updateCapacityForHost(_hostDao.findById(hostId));
|
||||
updateCapacityForHost(_hostDao.findById(hostId));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processCancelMaintenaceEventBefore(Long hostId) {
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processDeletHostEventAfter(HostVO host) {
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processDeleteHostEventBefore(HostVO host) {
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processDiscoverEventAfter(
|
||||
Map<? extends ServerResource, Map<String, String>> resources) {
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -848,11 +857,11 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
|
|||
Long clusterId, URI uri, String username, String password,
|
||||
List<String> hostTags) {
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processPrepareMaintenaceEventAfter(Long hostId) {
|
||||
public void processPrepareMaintenaceEventAfter(Long hostId) {
|
||||
_capacityDao.removeBy(Capacity.CAPACITY_TYPE_MEMORY, null, null, null, hostId);
|
||||
_capacityDao.removeBy(Capacity.CAPACITY_TYPE_CPU, null, null, null, hostId);
|
||||
}
|
||||
|
|
@ -860,7 +869,7 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
|
|||
@Override
|
||||
public void processPrepareMaintenaceEventBefore(Long hostId) {
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -871,7 +880,7 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
|
|||
Long maxGuestLimit = _hypervisorCapabilitiesDao.getMaxGuestsLimit(hypervisorType, hypervisorVersion);
|
||||
if(vmCount.longValue() >= maxGuestLimit.longValue()){
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Host name: " + host.getName() + ", hostId: "+ host.getId() +
|
||||
s_logger.debug("Host name: " + host.getName() + ", hostId: "+ host.getId() +
|
||||
" already reached max Running VMs(count includes system VMs), limit is: " + maxGuestLimit + ",Running VM counts is: "+vmCount.longValue());
|
||||
}
|
||||
return true;
|
||||
|
|
|
|||
|
|
@ -34,6 +34,8 @@ import org.apache.cloudstack.affinity.dao.AffinityGroupDao;
|
|||
import org.apache.cloudstack.affinity.dao.AffinityGroupVMMapDao;
|
||||
import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
|
||||
import org.apache.cloudstack.engine.subsystem.api.storage.StoragePoolAllocator;
|
||||
import org.apache.cloudstack.framework.messagebus.MessageBus;
|
||||
import org.apache.cloudstack.framework.messagebus.MessageSubscriber;
|
||||
|
||||
import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
|
||||
import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
|
||||
|
|
@ -89,6 +91,7 @@ import com.cloud.utils.db.DB;
|
|||
import com.cloud.utils.db.Transaction;
|
||||
import com.cloud.vm.DiskProfile;
|
||||
import com.cloud.vm.ReservationContext;
|
||||
import com.cloud.vm.VMInstanceVO;
|
||||
import com.cloud.vm.VirtualMachine;
|
||||
import com.cloud.vm.VirtualMachineProfile;
|
||||
import com.cloud.vm.dao.UserVmDao;
|
||||
|
|
@ -122,6 +125,9 @@ public class DeploymentPlanningManagerImpl extends ManagerBase implements Deploy
|
|||
@Inject
|
||||
PlannerHostReservationDao _plannerHostReserveDao;
|
||||
|
||||
@Inject
|
||||
MessageBus _messageBus;
|
||||
|
||||
protected List<StoragePoolAllocator> _storagePoolAllocators;
|
||||
public List<StoragePoolAllocator> getStoragePoolAllocators() {
|
||||
return _storagePoolAllocators;
|
||||
|
|
@ -488,6 +494,59 @@ public class DeploymentPlanningManagerImpl extends ManagerBase implements Deploy
|
|||
return false;
|
||||
}
|
||||
|
||||
@DB
|
||||
public void checkHostReservationRelease(VMInstanceVO vm) {
|
||||
s_logger.debug("MessageBus message: host reserved capacity released for VM: " + vm.getLastHostId()
|
||||
+ ", checking if host reservation can be released for host:" + vm.getLastHostId());
|
||||
|
||||
Long hostId = vm.getLastHostId();
|
||||
|
||||
if (hostId != null) {
|
||||
List<VMInstanceVO> vms = _vmInstanceDao.listUpByHostId(hostId);
|
||||
if (vms.size() > 0) {
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Cannot release reservation, Found " + vms.size() + " VMs Running on host " + hostId);
|
||||
}
|
||||
}
|
||||
|
||||
List<VMInstanceVO> vmsByLastHostId = _vmInstanceDao.listByLastHostId(hostId);
|
||||
if (vmsByLastHostId.size() > 0) {
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Cannot release reservation, Found " + vmsByLastHostId.size()
|
||||
+ " VMs Stopped but reserved on host " + hostId);
|
||||
}
|
||||
}
|
||||
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Host has no VMs, releasing the planner reservation");
|
||||
}
|
||||
|
||||
PlannerHostReservationVO reservationEntry = _plannerHostReserveDao.findByHostId(hostId);
|
||||
if (reservationEntry != null) {
|
||||
long id = reservationEntry.getId();
|
||||
|
||||
final Transaction txn = Transaction.currentTxn();
|
||||
|
||||
try {
|
||||
txn.start();
|
||||
|
||||
final PlannerHostReservationVO lockedEntry = _plannerHostReserveDao.lockRow(id, true);
|
||||
if (lockedEntry == null) {
|
||||
s_logger.error("Unable to lock the host entry for reservation, host: " + hostId);
|
||||
}
|
||||
// check before updating
|
||||
if (lockedEntry.getResourceUsage() != null) {
|
||||
lockedEntry.setResourceUsage(null);
|
||||
_plannerHostReserveDao.persist(lockedEntry);
|
||||
}
|
||||
} finally {
|
||||
txn.commit();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean processAnswers(long agentId, long seq, Answer[] answers) {
|
||||
// TODO Auto-generated method stub
|
||||
|
|
@ -549,6 +608,13 @@ public class DeploymentPlanningManagerImpl extends ManagerBase implements Deploy
|
|||
@Override
|
||||
public boolean configure(final String name, final Map<String, Object> params) throws ConfigurationException {
|
||||
_agentMgr.registerForHostEvents(this, true, false, true);
|
||||
_messageBus.subscribe("VM_ReservedCapacity_Free", new MessageSubscriber() {
|
||||
@Override
|
||||
public void onPublishMessage(String senderAddress, String subject, Object vm) {
|
||||
checkHostReservationRelease((VMInstanceVO) vm);
|
||||
}
|
||||
});
|
||||
|
||||
return super.configure(name, params);
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue