From 61764aba1f7271c6fa22dc04cb10585b65b9d138 Mon Sep 17 00:00:00 2001 From: Abhishek Kumar Date: Mon, 9 Sep 2024 19:39:50 +0530 Subject: [PATCH] cache and executors refactoring Signed-off-by: Abhishek Kumar --- .../cloud/vm/VirtualMachineManagerImpl.java | 8 +- .../cloud/capacity/CapacityManagerImpl.java | 34 ++++++-- .../storage/download/DownloadListener.java | 19 ++--- .../cloudstack/utils/cache/SingleCache.java | 48 +++++++++++ .../utils/executor/QueueExecutor.java | 82 +++++++++++++++++++ 5 files changed, 167 insertions(+), 24 deletions(-) create mode 100644 utils/src/main/java/org/apache/cloudstack/utils/cache/SingleCache.java create mode 100644 utils/src/main/java/org/apache/cloudstack/utils/executor/QueueExecutor.java diff --git a/engine/orchestration/src/main/java/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/main/java/com/cloud/vm/VirtualMachineManagerImpl.java index 0031e352d46..9ce9c5948dd 100755 --- a/engine/orchestration/src/main/java/com/cloud/vm/VirtualMachineManagerImpl.java +++ b/engine/orchestration/src/main/java/com/cloud/vm/VirtualMachineManagerImpl.java @@ -84,6 +84,7 @@ import org.apache.cloudstack.resource.ResourceCleanupService; import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao; import org.apache.cloudstack.storage.datastore.db.StoragePoolVO; import org.apache.cloudstack.storage.to.VolumeObjectTO; +import org.apache.cloudstack.utils.cache.SingleCache; import org.apache.cloudstack.utils.identity.ManagementServerNode; import org.apache.cloudstack.vm.UnmanagedVMsManager; import org.apache.commons.collections.CollectionUtils; @@ -398,7 +399,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Inject VmWorkJobDao vmWorkJobDao; - private LoadingCache> vmIdsInProgressCache; + private SingleCache> vmIdsInProgressCache; VmWorkJobHandlerProxy _jobHandlerProxy = new VmWorkJobHandlerProxy(this); @@ -811,10 +812,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Override public boolean start() { - vmIdsInProgressCache = Caffeine.newBuilder() - .expireAfterWrite(10, TimeUnit.SECONDS) - .maximumSize(1) - .build(key -> vmWorkJobDao.listVmIdsWithPendingJob()); + vmIdsInProgressCache = new SingleCache<>(10, vmWorkJobDao::listVmIdsWithPendingJob); _executor.scheduleAtFixedRate(new CleanupTask(), 5, VmJobStateReportInterval.value(), TimeUnit.SECONDS); _executor.scheduleAtFixedRate(new TransitionTask(), VmOpCleanupInterval.value(), VmOpCleanupInterval.value(), TimeUnit.SECONDS); cancelWorkItems(_nodeId); diff --git a/server/src/main/java/com/cloud/capacity/CapacityManagerImpl.java b/server/src/main/java/com/cloud/capacity/CapacityManagerImpl.java index 9c5f3e0fd3b..fc70b2b2370 100644 --- a/server/src/main/java/com/cloud/capacity/CapacityManagerImpl.java +++ b/server/src/main/java/com/cloud/capacity/CapacityManagerImpl.java @@ -36,6 +36,8 @@ import org.apache.cloudstack.framework.config.dao.ConfigurationDao; import org.apache.cloudstack.framework.messagebus.MessageBus; import org.apache.cloudstack.framework.messagebus.PublishScope; import org.apache.cloudstack.storage.datastore.db.StoragePoolVO; +import org.apache.cloudstack.utils.cache.LazyCache; +import org.apache.cloudstack.utils.executor.QueueExecutor; import org.apache.log4j.Logger; import com.cloud.agent.AgentManager; @@ -50,7 +52,6 @@ import com.cloud.capacity.dao.CapacityDao; import com.cloud.configuration.Config; import com.cloud.dc.ClusterDetailsDao; import com.cloud.dc.ClusterDetailsVO; -import com.cloud.dc.ClusterVO; import com.cloud.dc.dao.ClusterDao; import com.cloud.deploy.DeploymentClusterPlanner; import com.cloud.event.UsageEventVO; @@ -142,6 +143,9 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager, @Inject MessageBus _messageBus; + private LazyCache> clusterValuesCache; + private QueueExecutor hostCapacityUpdateExecutor; + @Override public boolean configure(String name, Map params) throws ConfigurationException { _vmCapacityReleaseInterval = NumbersUtil.parseInt(_configDao.getValue(Config.CapacitySkipcountingHours.key()), 3600); @@ -150,6 +154,9 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager, _agentManager.registerForHostEvents(new StorageCapacityListener(_capacityDao, _storageMgr), true, false, false); _agentManager.registerForHostEvents(new ComputeCapacityListener(_capacityDao, this), true, false, false); + hostCapacityUpdateExecutor = new QueueExecutor<>("HostCapacityUpdateExecutor", 10, 10, + s_logger, this::updateCapacityForHostInternal); + return true; } @@ -157,11 +164,14 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager, public boolean start() { _resourceMgr.registerResourceEvent(ResourceListener.EVENT_PREPARE_MAINTENANCE_AFTER, this); _resourceMgr.registerResourceEvent(ResourceListener.EVENT_CANCEL_MAINTENANCE_AFTER, this); + clusterValuesCache = new LazyCache<>(16, 60, this::getClusterValues); + hostCapacityUpdateExecutor.startProcessing(); return true; } @Override public boolean stop() { + hostCapacityUpdateExecutor.shutdown(); return true; } @@ -631,9 +641,7 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager, return totalAllocatedSize; } - @DB - @Override - public void updateCapacityForHost(final Host host) { + protected void updateCapacityForHostInternal(final Host host) { // prepare the service offerings List offerings = _offeringsDao.listAllIncludingRemoved(); Map offeringsMap = new HashMap(); @@ -643,6 +651,17 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager, updateCapacityForHost(host, offeringsMap); } + @DB + @Override + public void updateCapacityForHost(final Host host) { + hostCapacityUpdateExecutor.queueRequest(host); + } + + protected Pair getClusterValues(long clusterId) { + return new Pair<>(_clusterDetailsDao.findDetail(clusterId, "cpuOvercommitRatio"), + _clusterDetailsDao.findDetail(clusterId, "memoryOvercommitRatio")); + } + @DB @Override public void updateCapacityForHost(final Host host, final Map offeringsMap) { @@ -665,9 +684,10 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager, } vms.addAll(vosMigrating); - ClusterVO cluster = _clusterDao.findById(host.getClusterId()); - ClusterDetailsVO clusterDetailCpu = _clusterDetailsDao.findDetail(cluster.getId(), "cpuOvercommitRatio"); - ClusterDetailsVO clusterDetailRam = _clusterDetailsDao.findDetail(cluster.getId(), "memoryOvercommitRatio"); + Pair clusterValues = + clusterValuesCache.get(host.getClusterId()); + ClusterDetailsVO clusterDetailCpu = clusterValues.first(); + ClusterDetailsVO clusterDetailRam = clusterValues.second(); Float clusterCpuOvercommitRatio = Float.parseFloat(clusterDetailCpu.getValue()); Float clusterRamOvercommitRatio = Float.parseFloat(clusterDetailRam.getValue()); for (VMInstanceVO vm : vms) { diff --git a/server/src/main/java/com/cloud/storage/download/DownloadListener.java b/server/src/main/java/com/cloud/storage/download/DownloadListener.java index 6bf63404bfa..4d1dcdcae40 100644 --- a/server/src/main/java/com/cloud/storage/download/DownloadListener.java +++ b/server/src/main/java/com/cloud/storage/download/DownloadListener.java @@ -25,6 +25,8 @@ import java.util.concurrent.TimeUnit; import javax.inject.Inject; +import org.apache.cloudstack.utils.cache.LazyCache; +import org.apache.cloudstack.utils.cache.SingleCache; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -138,13 +140,11 @@ public class DownloadListener implements Listener { @Inject private VolumeService _volumeSrv; - private Cache> zoneHypervisorsCache; + private LazyCache> zoneHypervisorsCache; protected void initZoneHypervisorsCache() { - zoneHypervisorsCache = Caffeine.newBuilder() - .maximumSize(32) - .expireAfterWrite(30, TimeUnit.SECONDS) - .build(); + zoneHypervisorsCache = + new LazyCache<>(32, 30, _resourceMgr::listAvailHypervisorInZone); } // TODO: this constructor should be the one used for template only, remove other template constructor later @@ -286,18 +286,13 @@ public class DownloadListener implements Listener { public void processHostAdded(long hostId) { } - protected List getAvailHypervisorInZone(long zoneId) { - return _resourceMgr.listAvailHypervisorInZone(zoneId); - } - @Override public void processConnect(Host agent, StartupCommand cmd, boolean forRebalance) throws ConnectionException { if (cmd instanceof StartupRoutingCommand) { // FIXME: CPU and DB hotspot - List hypers = zoneHypervisorsCache.get(agent.getDataCenterId(), - this::getAvailHypervisorInZone); + List hypervisors = zoneHypervisorsCache.get(agent.getDataCenterId()); Hypervisor.HypervisorType hostHyper = agent.getHypervisorType(); - if (hypers.contains(hostHyper)) { + if (hypervisors.contains(hostHyper)) { return; } _imageSrv.handleSysTemplateDownload(hostHyper, agent.getDataCenterId()); diff --git a/utils/src/main/java/org/apache/cloudstack/utils/cache/SingleCache.java b/utils/src/main/java/org/apache/cloudstack/utils/cache/SingleCache.java new file mode 100644 index 00000000000..4302a5258c8 --- /dev/null +++ b/utils/src/main/java/org/apache/cloudstack/utils/cache/SingleCache.java @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.cloudstack.utils.cache; + +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; + +public class SingleCache { + + private final LoadingCache cache; + + public SingleCache(long expireAfterWriteSeconds, Supplier loader) { + this.cache = Caffeine.newBuilder() + .maximumSize(1) + .expireAfterWrite(expireAfterWriteSeconds, TimeUnit.SECONDS) + .build(key -> loader.get()); + } + + public V get() { + return cache.get(null); + } + + public void invalidate() { + cache.invalidate(null); + } + + public void clear() { + cache.invalidateAll(); + } +} diff --git a/utils/src/main/java/org/apache/cloudstack/utils/executor/QueueExecutor.java b/utils/src/main/java/org/apache/cloudstack/utils/executor/QueueExecutor.java new file mode 100644 index 00000000000..589acc4976b --- /dev/null +++ b/utils/src/main/java/org/apache/cloudstack/utils/executor/QueueExecutor.java @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.cloudstack.utils.executor; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import org.apache.log4j.Logger; + +public class QueueExecutor { + + private final String name; + private final int processingSize; + private final int processingInterval; + private final BlockingQueue requestQueue; + private final ScheduledExecutorService executorService; + private final Logger logger; + private final Consumer consumer; + + public QueueExecutor(String name, int processingSize, int processingInterval, Logger logger, + Consumer consumer) { + this.name = name; + this.processingSize = processingSize; + this.processingInterval = processingInterval; + this.logger = logger; + this.consumer = consumer; + requestQueue = new LinkedBlockingQueue<>(processingSize); + executorService = Executors.newSingleThreadScheduledExecutor(); + } + + public void queueRequest(K request) { + try { + requestQueue.put(request); + if (requestQueue.size() >= processingSize) { + processRequests(); + } + } catch (InterruptedException e) { + logger.warn(String.format("Error queuing request for %s", name), e); + } + } + + public void startProcessing() { + executorService.scheduleAtFixedRate(this::processRequests, 0, + processingInterval, TimeUnit.SECONDS); + } + + private void processRequests() { + List requestsToProcess = new ArrayList<>(); + requestQueue.drainTo(requestsToProcess, processingSize); + + if (!requestsToProcess.isEmpty()) { + for (K request : requestsToProcess) { + consumer.accept(request); + } + } + } + + public void shutdown() { + executorService.shutdown(); + } +}