Signed-off-by: Abhishek Kumar <abhishek.mrt22@gmail.com>
This commit is contained in:
Abhishek Kumar 2024-09-10 10:22:59 +05:30
parent 61764aba1f
commit df137fc387
6 changed files with 60 additions and 51 deletions

View File

@ -16,14 +16,11 @@
// under the License.
package com.cloud.capacity;
import java.util.Map;
import org.apache.cloudstack.framework.config.ConfigKey;
import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
import com.cloud.host.Host;
import com.cloud.offering.ServiceOffering;
import com.cloud.service.ServiceOfferingVO;
import com.cloud.storage.VMTemplateVO;
import com.cloud.utils.Pair;
import com.cloud.vm.VirtualMachine;
@ -133,8 +130,6 @@ public interface CapacityManager {
void updateCapacityForHost(Host host);
void updateCapacityForHost(Host host, Map<Long, ServiceOfferingVO> offeringsMap);
/**
* @param pool storage pool
* @param templateForVmCreation template that will be used for vm creation

View File

@ -75,8 +75,6 @@ import com.cloud.network.Ipv6Service;
import com.cloud.network.dao.IPAddressDao;
import com.cloud.org.Grouping.AllocationState;
import com.cloud.resource.ResourceManager;
import com.cloud.service.ServiceOfferingVO;
import com.cloud.service.dao.ServiceOfferingDao;
import com.cloud.storage.StorageManager;
import com.cloud.utils.Pair;
import com.cloud.utils.component.ManagerBase;
@ -120,8 +118,6 @@ public class AlertManagerImpl extends ManagerBase implements AlertManager, Confi
@Inject
protected ConfigDepot _configDepot;
@Inject
ServiceOfferingDao _offeringsDao;
@Inject
Ipv6Service ipv6Service;
private Timer _timer = null;
@ -276,14 +272,8 @@ public class AlertManagerImpl extends ManagerBase implements AlertManager, Confi
// get all hosts...even if they are not in 'UP' state
List<HostVO> hosts = _resourceMgr.listAllNotInMaintenanceHostsInOneZone(Host.Type.Routing, null);
if (hosts != null) {
// prepare the service offerings
List<ServiceOfferingVO> offerings = _offeringsDao.listAllIncludingRemoved();
Map<Long, ServiceOfferingVO> offeringsMap = new HashMap<Long, ServiceOfferingVO>();
for (ServiceOfferingVO offering : offerings) {
offeringsMap.put(offering.getId(), offering);
}
for (HostVO host : hosts) {
_capacityMgr.updateCapacityForHost(host, offeringsMap);
_capacityMgr.updateCapacityForHost(host);
}
}
if (logger.isDebugEnabled()) {

View File

@ -22,6 +22,7 @@ import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.naming.ConfigurationException;
@ -37,7 +38,9 @@ import org.apache.cloudstack.framework.messagebus.MessageBus;
import org.apache.cloudstack.framework.messagebus.PublishScope;
import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
import org.apache.cloudstack.utils.cache.LazyCache;
import org.apache.cloudstack.utils.executor.QueueExecutor;
import org.apache.cloudstack.utils.cache.SingleCache;
import org.apache.cloudstack.utils.executor.QueuedExecutor;
import org.apache.commons.collections.CollectionUtils;
import org.apache.log4j.Logger;
import com.cloud.agent.AgentManager;
@ -144,7 +147,8 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
MessageBus _messageBus;
private LazyCache<Long, Pair<ClusterDetailsVO, ClusterDetailsVO>> clusterValuesCache;
private QueueExecutor<Host> hostCapacityUpdateExecutor;
private SingleCache<Map<Long, ServiceOfferingVO>> serviceOfferingsCache;
private QueuedExecutor<Host> hostCapacityUpdateExecutor;
@Override
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
@ -154,8 +158,8 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
_agentManager.registerForHostEvents(new StorageCapacityListener(_capacityDao, _storageMgr), true, false, false);
_agentManager.registerForHostEvents(new ComputeCapacityListener(_capacityDao, this), true, false, false);
hostCapacityUpdateExecutor = new QueueExecutor<>("HostCapacityUpdateExecutor", 10, 10,
s_logger, this::updateCapacityForHostInternal);
hostCapacityUpdateExecutor = new QueuedExecutor<>("HostCapacityUpdateExecutor", 10, 10,
1, s_logger, this::updateCapacityForHostInternal);
return true;
}
@ -165,6 +169,7 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
_resourceMgr.registerResourceEvent(ResourceListener.EVENT_PREPARE_MAINTENANCE_AFTER, this);
_resourceMgr.registerResourceEvent(ResourceListener.EVENT_CANCEL_MAINTENANCE_AFTER, this);
clusterValuesCache = new LazyCache<>(16, 60, this::getClusterValues);
serviceOfferingsCache = new SingleCache<>(60, this::getServiceOfferingsMap);
hostCapacityUpdateExecutor.startProcessing();
return true;
}
@ -641,16 +646,6 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
return totalAllocatedSize;
}
protected void updateCapacityForHostInternal(final Host host) {
// prepare the service offerings
List<ServiceOfferingVO> offerings = _offeringsDao.listAllIncludingRemoved();
Map<Long, ServiceOfferingVO> offeringsMap = new HashMap<Long, ServiceOfferingVO>();
for (ServiceOfferingVO offering : offerings) {
offeringsMap.put(offering.getId(), offering);
}
updateCapacityForHost(host, offeringsMap);
}
@DB
@Override
public void updateCapacityForHost(final Host host) {
@ -662,9 +657,33 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
_clusterDetailsDao.findDetail(clusterId, "memoryOvercommitRatio"));
}
protected Map<Long, ServiceOfferingVO> getServiceOfferingsMap() {
List<ServiceOfferingVO> serviceOfferings = _offeringsDao.listAllIncludingRemoved();
if (CollectionUtils.isEmpty(serviceOfferings)) {
return new HashMap<>();
}
return serviceOfferings.stream()
.collect(Collectors.toMap(
ServiceOfferingVO::getId,
offering -> offering
));
}
protected ServiceOfferingVO getServiceOffering(long id) {
Map <Long, ServiceOfferingVO> map = serviceOfferingsCache.get();
if (map.containsKey(id)) {
return map.get(id);
}
ServiceOfferingVO serviceOfferingVO = _offeringsDao.findByIdIncludingRemoved(id);
if (serviceOfferingVO != null) {
serviceOfferingsCache.invalidate();
}
return serviceOfferingVO;
}
@DB
@Override
public void updateCapacityForHost(final Host host, final Map<Long, ServiceOfferingVO> offeringsMap) {
protected void updateCapacityForHostInternal(final Host host) {
long usedCpuCore = 0;
long reservedCpuCore = 0;
long usedCpu = 0;
@ -699,7 +718,7 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
// if vmDetailCpu or vmDetailRam is not null it means it is running in a overcommitted cluster.
cpuOvercommitRatio = (vmDetailCpu != null) ? Float.parseFloat(vmDetailCpu) : clusterCpuOvercommitRatio;
ramOvercommitRatio = (vmDetailRam != null) ? Float.parseFloat(vmDetailRam) : clusterRamOvercommitRatio;
ServiceOffering so = offeringsMap.get(vm.getServiceOfferingId());
ServiceOffering so = getServiceOffering(vm.getServiceOfferingId());
if (so == null) {
so = _offeringsDao.findByIdIncludingRemoved(vm.getServiceOfferingId());
}
@ -728,6 +747,7 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
if (s_logger.isDebugEnabled()) {
s_logger.debug("Found " + vmsByLastHostId.size() + " VM, not running on host " + host.getId());
}
for (VMInstanceVO vm : vmsByLastHostId) {
Float cpuOvercommitRatio = 1.0f;
Float ramOvercommitRatio = 1.0f;
@ -740,7 +760,7 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
cpuOvercommitRatio = Float.parseFloat(vmDetailCpu.getValue());
ramOvercommitRatio = Float.parseFloat(vmDetailRam.getValue());
}
ServiceOffering so = offeringsMap.get(vm.getServiceOfferingId());
ServiceOffering so = getServiceOffering(vm.getServiceOfferingId());
Map<String, String> vmDetails = _userVmDetailsDao.listDetailsKeyPairs(vm.getId());
if (so == null) {
so = _offeringsDao.findByIdIncludingRemoved(vm.getServiceOfferingId());

View File

@ -16,20 +16,15 @@
// under the License.
package com.cloud.storage.download;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import org.apache.cloudstack.utils.cache.LazyCache;
import org.apache.cloudstack.utils.cache.SingleCache;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.cloudstack.engine.subsystem.api.storage.DataObject;
import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
@ -43,6 +38,9 @@ import org.apache.cloudstack.storage.command.DownloadCommand;
import org.apache.cloudstack.storage.command.DownloadCommand.ResourceType;
import org.apache.cloudstack.storage.command.DownloadProgressCommand;
import org.apache.cloudstack.storage.command.DownloadProgressCommand.RequestType;
import org.apache.cloudstack.utils.cache.LazyCache;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import com.cloud.agent.Listener;
import com.cloud.agent.api.AgentControlAnswer;
@ -62,8 +60,6 @@ import com.cloud.storage.VMTemplateStorageResourceAssoc.Status;
import com.cloud.storage.download.DownloadState.DownloadEvent;
import com.cloud.storage.upload.UploadListener;
import com.cloud.utils.exception.CloudRuntimeException;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
/**
* Monitor progress of template download to a single storage server
@ -142,9 +138,16 @@ public class DownloadListener implements Listener {
private LazyCache<Long, List<Hypervisor.HypervisorType>> zoneHypervisorsCache;
private List<Hypervisor.HypervisorType> listAvailHypervisorInZone(long zoneId) {
if (_resourceMgr == null) {
return Collections.emptyList();
}
return _resourceMgr.listAvailHypervisorInZone(zoneId);
}
protected void initZoneHypervisorsCache() {
zoneHypervisorsCache =
new LazyCache<>(32, 30, _resourceMgr::listAvailHypervisorInZone);
new LazyCache<>(32, 30, this::listAvailHypervisorInZone);
}
// TODO: this constructor should be the one used for template only, remove other template constructor later
@ -289,7 +292,6 @@ public class DownloadListener implements Listener {
@Override
public void processConnect(Host agent, StartupCommand cmd, boolean forRebalance) throws ConnectionException {
if (cmd instanceof StartupRoutingCommand) {
// FIXME: CPU and DB hotspot
List<Hypervisor.HypervisorType> hypervisors = zoneHypervisorsCache.get(agent.getDataCenterId());
Hypervisor.HypervisorType hostHyper = agent.getHypervisorType();
if (hypervisors.contains(hostHyper)) {

View File

@ -25,7 +25,7 @@ import com.github.benmanes.caffeine.cache.LoadingCache;
public class SingleCache<V> {
private final LoadingCache<Void, V> cache;
private final LoadingCache<Integer, V> cache;
public SingleCache(long expireAfterWriteSeconds, Supplier<V> loader) {
this.cache = Caffeine.newBuilder()
@ -35,11 +35,11 @@ public class SingleCache<V> {
}
public V get() {
return cache.get(null);
return cache.get(0);
}
public void invalidate() {
cache.invalidate(null);
cache.invalidate(0);
}
public void clear() {

View File

@ -28,7 +28,9 @@ import java.util.function.Consumer;
import org.apache.log4j.Logger;
public class QueueExecutor<K> {
import com.cloud.utils.concurrency.NamedThreadFactory;
public class QueuedExecutor<K> {
private final String name;
private final int processingSize;
@ -38,15 +40,15 @@ public class QueueExecutor<K> {
private final Logger logger;
private final Consumer<K> consumer;
public QueueExecutor(String name, int processingSize, int processingInterval, Logger logger,
Consumer<K> consumer) {
public QueuedExecutor(String name, int processingSize, int processingInterval, int maxThreads, Logger logger,
Consumer<K> consumer) {
this.name = name;
this.processingSize = processingSize;
this.processingInterval = processingInterval;
this.logger = logger;
this.consumer = consumer;
requestQueue = new LinkedBlockingQueue<>(processingSize);
executorService = Executors.newSingleThreadScheduledExecutor();
executorService = Executors.newScheduledThreadPool(maxThreads, new NamedThreadFactory(name));
}
public void queueRequest(K request) {