diff --git a/engine/components-api/src/main/java/com/cloud/capacity/CapacityManager.java b/engine/components-api/src/main/java/com/cloud/capacity/CapacityManager.java index 1c3edad886b..0f35fbe977e 100644 --- a/engine/components-api/src/main/java/com/cloud/capacity/CapacityManager.java +++ b/engine/components-api/src/main/java/com/cloud/capacity/CapacityManager.java @@ -16,14 +16,11 @@ // under the License. package com.cloud.capacity; -import java.util.Map; - import org.apache.cloudstack.framework.config.ConfigKey; import org.apache.cloudstack.storage.datastore.db.StoragePoolVO; import com.cloud.host.Host; import com.cloud.offering.ServiceOffering; -import com.cloud.service.ServiceOfferingVO; import com.cloud.storage.VMTemplateVO; import com.cloud.utils.Pair; import com.cloud.vm.VirtualMachine; @@ -133,8 +130,6 @@ public interface CapacityManager { void updateCapacityForHost(Host host); - void updateCapacityForHost(Host host, Map offeringsMap); - /** * @param pool storage pool * @param templateForVmCreation template that will be used for vm creation diff --git a/server/src/main/java/com/cloud/alert/AlertManagerImpl.java b/server/src/main/java/com/cloud/alert/AlertManagerImpl.java index ca26de5cf21..6c7d048894c 100644 --- a/server/src/main/java/com/cloud/alert/AlertManagerImpl.java +++ b/server/src/main/java/com/cloud/alert/AlertManagerImpl.java @@ -75,8 +75,6 @@ import com.cloud.network.Ipv6Service; import com.cloud.network.dao.IPAddressDao; import com.cloud.org.Grouping.AllocationState; import com.cloud.resource.ResourceManager; -import com.cloud.service.ServiceOfferingVO; -import com.cloud.service.dao.ServiceOfferingDao; import com.cloud.storage.StorageManager; import com.cloud.utils.Pair; import com.cloud.utils.component.ManagerBase; @@ -120,8 +118,6 @@ public class AlertManagerImpl extends ManagerBase implements AlertManager, Confi @Inject protected ConfigDepot _configDepot; @Inject - ServiceOfferingDao _offeringsDao; - @Inject Ipv6Service ipv6Service; private Timer _timer = null; @@ -276,14 +272,8 @@ public class AlertManagerImpl extends ManagerBase implements AlertManager, Confi // get all hosts...even if they are not in 'UP' state List hosts = _resourceMgr.listAllNotInMaintenanceHostsInOneZone(Host.Type.Routing, null); if (hosts != null) { - // prepare the service offerings - List offerings = _offeringsDao.listAllIncludingRemoved(); - Map offeringsMap = new HashMap(); - for (ServiceOfferingVO offering : offerings) { - offeringsMap.put(offering.getId(), offering); - } for (HostVO host : hosts) { - _capacityMgr.updateCapacityForHost(host, offeringsMap); + _capacityMgr.updateCapacityForHost(host); } } if (logger.isDebugEnabled()) { diff --git a/server/src/main/java/com/cloud/capacity/CapacityManagerImpl.java b/server/src/main/java/com/cloud/capacity/CapacityManagerImpl.java index fc70b2b2370..abb67e7e714 100644 --- a/server/src/main/java/com/cloud/capacity/CapacityManagerImpl.java +++ b/server/src/main/java/com/cloud/capacity/CapacityManagerImpl.java @@ -22,6 +22,7 @@ import java.net.URI; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import javax.inject.Inject; import javax.naming.ConfigurationException; @@ -37,7 +38,9 @@ import org.apache.cloudstack.framework.messagebus.MessageBus; import org.apache.cloudstack.framework.messagebus.PublishScope; import org.apache.cloudstack.storage.datastore.db.StoragePoolVO; import org.apache.cloudstack.utils.cache.LazyCache; -import org.apache.cloudstack.utils.executor.QueueExecutor; +import org.apache.cloudstack.utils.cache.SingleCache; +import org.apache.cloudstack.utils.executor.QueuedExecutor; +import org.apache.commons.collections.CollectionUtils; import org.apache.log4j.Logger; import com.cloud.agent.AgentManager; @@ -144,7 +147,8 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager, MessageBus _messageBus; private LazyCache> clusterValuesCache; - private QueueExecutor hostCapacityUpdateExecutor; + private SingleCache> serviceOfferingsCache; + private QueuedExecutor hostCapacityUpdateExecutor; @Override public boolean configure(String name, Map params) throws ConfigurationException { @@ -154,8 +158,8 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager, _agentManager.registerForHostEvents(new StorageCapacityListener(_capacityDao, _storageMgr), true, false, false); _agentManager.registerForHostEvents(new ComputeCapacityListener(_capacityDao, this), true, false, false); - hostCapacityUpdateExecutor = new QueueExecutor<>("HostCapacityUpdateExecutor", 10, 10, - s_logger, this::updateCapacityForHostInternal); + hostCapacityUpdateExecutor = new QueuedExecutor<>("HostCapacityUpdateExecutor", 10, 10, + 1, s_logger, this::updateCapacityForHostInternal); return true; } @@ -165,6 +169,7 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager, _resourceMgr.registerResourceEvent(ResourceListener.EVENT_PREPARE_MAINTENANCE_AFTER, this); _resourceMgr.registerResourceEvent(ResourceListener.EVENT_CANCEL_MAINTENANCE_AFTER, this); clusterValuesCache = new LazyCache<>(16, 60, this::getClusterValues); + serviceOfferingsCache = new SingleCache<>(60, this::getServiceOfferingsMap); hostCapacityUpdateExecutor.startProcessing(); return true; } @@ -641,16 +646,6 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager, return totalAllocatedSize; } - protected void updateCapacityForHostInternal(final Host host) { - // prepare the service offerings - List offerings = _offeringsDao.listAllIncludingRemoved(); - Map offeringsMap = new HashMap(); - for (ServiceOfferingVO offering : offerings) { - offeringsMap.put(offering.getId(), offering); - } - updateCapacityForHost(host, offeringsMap); - } - @DB @Override public void updateCapacityForHost(final Host host) { @@ -662,9 +657,33 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager, _clusterDetailsDao.findDetail(clusterId, "memoryOvercommitRatio")); } + + protected Map getServiceOfferingsMap() { + List serviceOfferings = _offeringsDao.listAllIncludingRemoved(); + if (CollectionUtils.isEmpty(serviceOfferings)) { + return new HashMap<>(); + } + return serviceOfferings.stream() + .collect(Collectors.toMap( + ServiceOfferingVO::getId, + offering -> offering + )); + } + + protected ServiceOfferingVO getServiceOffering(long id) { + Map map = serviceOfferingsCache.get(); + if (map.containsKey(id)) { + return map.get(id); + } + ServiceOfferingVO serviceOfferingVO = _offeringsDao.findByIdIncludingRemoved(id); + if (serviceOfferingVO != null) { + serviceOfferingsCache.invalidate(); + } + return serviceOfferingVO; + } + @DB - @Override - public void updateCapacityForHost(final Host host, final Map offeringsMap) { + protected void updateCapacityForHostInternal(final Host host) { long usedCpuCore = 0; long reservedCpuCore = 0; long usedCpu = 0; @@ -699,7 +718,7 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager, // if vmDetailCpu or vmDetailRam is not null it means it is running in a overcommitted cluster. cpuOvercommitRatio = (vmDetailCpu != null) ? Float.parseFloat(vmDetailCpu) : clusterCpuOvercommitRatio; ramOvercommitRatio = (vmDetailRam != null) ? Float.parseFloat(vmDetailRam) : clusterRamOvercommitRatio; - ServiceOffering so = offeringsMap.get(vm.getServiceOfferingId()); + ServiceOffering so = getServiceOffering(vm.getServiceOfferingId()); if (so == null) { so = _offeringsDao.findByIdIncludingRemoved(vm.getServiceOfferingId()); } @@ -728,6 +747,7 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager, if (s_logger.isDebugEnabled()) { s_logger.debug("Found " + vmsByLastHostId.size() + " VM, not running on host " + host.getId()); } + for (VMInstanceVO vm : vmsByLastHostId) { Float cpuOvercommitRatio = 1.0f; Float ramOvercommitRatio = 1.0f; @@ -740,7 +760,7 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager, cpuOvercommitRatio = Float.parseFloat(vmDetailCpu.getValue()); ramOvercommitRatio = Float.parseFloat(vmDetailRam.getValue()); } - ServiceOffering so = offeringsMap.get(vm.getServiceOfferingId()); + ServiceOffering so = getServiceOffering(vm.getServiceOfferingId()); Map vmDetails = _userVmDetailsDao.listDetailsKeyPairs(vm.getId()); if (so == null) { so = _offeringsDao.findByIdIncludingRemoved(vm.getServiceOfferingId()); diff --git a/server/src/main/java/com/cloud/storage/download/DownloadListener.java b/server/src/main/java/com/cloud/storage/download/DownloadListener.java index 4d1dcdcae40..96796d01c28 100644 --- a/server/src/main/java/com/cloud/storage/download/DownloadListener.java +++ b/server/src/main/java/com/cloud/storage/download/DownloadListener.java @@ -16,20 +16,15 @@ // under the License. package com.cloud.storage.download; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Timer; -import java.util.concurrent.TimeUnit; import javax.inject.Inject; -import org.apache.cloudstack.utils.cache.LazyCache; -import org.apache.cloudstack.utils.cache.SingleCache; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; - import org.apache.cloudstack.engine.subsystem.api.storage.DataObject; import org.apache.cloudstack.engine.subsystem.api.storage.DataStore; import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager; @@ -43,6 +38,9 @@ import org.apache.cloudstack.storage.command.DownloadCommand; import org.apache.cloudstack.storage.command.DownloadCommand.ResourceType; import org.apache.cloudstack.storage.command.DownloadProgressCommand; import org.apache.cloudstack.storage.command.DownloadProgressCommand.RequestType; +import org.apache.cloudstack.utils.cache.LazyCache; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; import com.cloud.agent.Listener; import com.cloud.agent.api.AgentControlAnswer; @@ -62,8 +60,6 @@ import com.cloud.storage.VMTemplateStorageResourceAssoc.Status; import com.cloud.storage.download.DownloadState.DownloadEvent; import com.cloud.storage.upload.UploadListener; import com.cloud.utils.exception.CloudRuntimeException; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; /** * Monitor progress of template download to a single storage server @@ -142,9 +138,16 @@ public class DownloadListener implements Listener { private LazyCache> zoneHypervisorsCache; + private List listAvailHypervisorInZone(long zoneId) { + if (_resourceMgr == null) { + return Collections.emptyList(); + } + return _resourceMgr.listAvailHypervisorInZone(zoneId); + } + protected void initZoneHypervisorsCache() { zoneHypervisorsCache = - new LazyCache<>(32, 30, _resourceMgr::listAvailHypervisorInZone); + new LazyCache<>(32, 30, this::listAvailHypervisorInZone); } // TODO: this constructor should be the one used for template only, remove other template constructor later @@ -289,7 +292,6 @@ public class DownloadListener implements Listener { @Override public void processConnect(Host agent, StartupCommand cmd, boolean forRebalance) throws ConnectionException { if (cmd instanceof StartupRoutingCommand) { - // FIXME: CPU and DB hotspot List hypervisors = zoneHypervisorsCache.get(agent.getDataCenterId()); Hypervisor.HypervisorType hostHyper = agent.getHypervisorType(); if (hypervisors.contains(hostHyper)) { diff --git a/utils/src/main/java/org/apache/cloudstack/utils/cache/SingleCache.java b/utils/src/main/java/org/apache/cloudstack/utils/cache/SingleCache.java index 4302a5258c8..5fa77d9a28c 100644 --- a/utils/src/main/java/org/apache/cloudstack/utils/cache/SingleCache.java +++ b/utils/src/main/java/org/apache/cloudstack/utils/cache/SingleCache.java @@ -25,7 +25,7 @@ import com.github.benmanes.caffeine.cache.LoadingCache; public class SingleCache { - private final LoadingCache cache; + private final LoadingCache cache; public SingleCache(long expireAfterWriteSeconds, Supplier loader) { this.cache = Caffeine.newBuilder() @@ -35,11 +35,11 @@ public class SingleCache { } public V get() { - return cache.get(null); + return cache.get(0); } public void invalidate() { - cache.invalidate(null); + cache.invalidate(0); } public void clear() { diff --git a/utils/src/main/java/org/apache/cloudstack/utils/executor/QueueExecutor.java b/utils/src/main/java/org/apache/cloudstack/utils/executor/QueuedExecutor.java similarity index 88% rename from utils/src/main/java/org/apache/cloudstack/utils/executor/QueueExecutor.java rename to utils/src/main/java/org/apache/cloudstack/utils/executor/QueuedExecutor.java index 589acc4976b..3181f791bce 100644 --- a/utils/src/main/java/org/apache/cloudstack/utils/executor/QueueExecutor.java +++ b/utils/src/main/java/org/apache/cloudstack/utils/executor/QueuedExecutor.java @@ -28,7 +28,9 @@ import java.util.function.Consumer; import org.apache.log4j.Logger; -public class QueueExecutor { +import com.cloud.utils.concurrency.NamedThreadFactory; + +public class QueuedExecutor { private final String name; private final int processingSize; @@ -38,15 +40,15 @@ public class QueueExecutor { private final Logger logger; private final Consumer consumer; - public QueueExecutor(String name, int processingSize, int processingInterval, Logger logger, - Consumer consumer) { + public QueuedExecutor(String name, int processingSize, int processingInterval, int maxThreads, Logger logger, + Consumer consumer) { this.name = name; this.processingSize = processingSize; this.processingInterval = processingInterval; this.logger = logger; this.consumer = consumer; requestQueue = new LinkedBlockingQueue<>(processingSize); - executorService = Executors.newSingleThreadScheduledExecutor(); + executorService = Executors.newScheduledThreadPool(maxThreads, new NamedThreadFactory(name)); } public void queueRequest(K request) {