From f6c16d9cfc05168324ca63cceb2f4d8b1db48bbe Mon Sep 17 00:00:00 2001 From: Abhishek Kumar Date: Mon, 21 Oct 2024 17:00:24 +0530 Subject: [PATCH] refactor capacity calculation Added capacity.calculate.workers config to control the number threads that can be used to calculate capacities. Signed-off-by: Abhishek Kumar --- .../com/cloud/capacity/CapacityManager.java | 4 + .../com/cloud/storage/StorageManager.java | 4 +- .../main/java/com/cloud/host/dao/HostDao.java | 2 + .../java/com/cloud/host/dao/HostDaoImpl.java | 5 ++ .../datastore/db/PrimaryDataStoreDao.java | 2 + .../datastore/db/PrimaryDataStoreDaoImpl.java | 12 ++- .../com/cloud/alert/AlertManagerImpl.java | 84 +++++++++++++++---- .../cloud/capacity/CapacityManagerImpl.java | 18 +--- .../utils/executor/QueuedExecutor.java | 84 ------------------- 9 files changed, 94 insertions(+), 121 deletions(-) delete mode 100644 utils/src/main/java/org/apache/cloudstack/utils/executor/QueuedExecutor.java diff --git a/engine/components-api/src/main/java/com/cloud/capacity/CapacityManager.java b/engine/components-api/src/main/java/com/cloud/capacity/CapacityManager.java index ac69ef226e5..6b4b0d3bcf8 100644 --- a/engine/components-api/src/main/java/com/cloud/capacity/CapacityManager.java +++ b/engine/components-api/src/main/java/com/cloud/capacity/CapacityManager.java @@ -127,6 +127,10 @@ public interface CapacityManager { true, ConfigKey.Scope.Zone); + ConfigKey CapacityCalculateWorkers = new ConfigKey<>(ConfigKey.CATEGORY_ADVANCED, Integer.class, + "capacity.calculate.workers", "1", + "Number of worker threads to be used for capacities calculation", true); + public boolean releaseVmCapacity(VirtualMachine vm, boolean moveFromReserved, boolean moveToReservered, Long hostId); void allocateVmCapacity(VirtualMachine vm, boolean fromLastHost); diff --git a/engine/components-api/src/main/java/com/cloud/storage/StorageManager.java b/engine/components-api/src/main/java/com/cloud/storage/StorageManager.java index 20aae1bb437..8f06c0af4e8 100644 --- a/engine/components-api/src/main/java/com/cloud/storage/StorageManager.java +++ b/engine/components-api/src/main/java/com/cloud/storage/StorageManager.java @@ -214,8 +214,8 @@ public interface StorageManager extends StorageService { true, ConfigKey.Scope.Zone); ConfigKey PrimaryStorageHostConnectWorkers = new ConfigKey<>("Storage", Integer.class, - "primary.storage.host.connect.workers", "3", - "Number of worker threads to be used to connect primary a storage to hosts", false); + "primary.storage.host.connect.workers", "1", + "Number of worker threads to be used to connect hosts to a primary storage", true); /** * should we execute in sequence not involving any storages? diff --git a/engine/schema/src/main/java/com/cloud/host/dao/HostDao.java b/engine/schema/src/main/java/com/cloud/host/dao/HostDao.java index c104e5be10a..2ab92605dbf 100644 --- a/engine/schema/src/main/java/com/cloud/host/dao/HostDao.java +++ b/engine/schema/src/main/java/com/cloud/host/dao/HostDao.java @@ -102,6 +102,8 @@ public interface HostDao extends GenericDao, StateDao listIdsForUpRouting(Long zoneId, Long podId, Long clusterId); + List listIdsByType(Type type); + List listIdsForUpEnabledByZoneAndHypervisor(Long zoneId, HypervisorType hypervisorType); List findByClusterIdAndEncryptionSupport(Long clusterId); diff --git a/engine/schema/src/main/java/com/cloud/host/dao/HostDaoImpl.java b/engine/schema/src/main/java/com/cloud/host/dao/HostDaoImpl.java index 4a1dad51730..ceff6e71ef0 100644 --- a/engine/schema/src/main/java/com/cloud/host/dao/HostDaoImpl.java +++ b/engine/schema/src/main/java/com/cloud/host/dao/HostDaoImpl.java @@ -1245,6 +1245,11 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao return listIdsBy(Type.Routing, Status.Up, null, null, zoneId, podId, clusterId); } + @Override + public List listIdsByType(Type type) { + return listIdsBy(type, null, null, null, null, null, null); + } + @Override public List listIdsForUpEnabledByZoneAndHypervisor(Long zoneId, HypervisorType hypervisorType) { return listIdsBy(null, Status.Up, ResourceState.Enabled, hypervisorType, zoneId, null, null); diff --git a/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/PrimaryDataStoreDao.java b/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/PrimaryDataStoreDao.java index c23943ed8e9..73f024f8d61 100644 --- a/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/PrimaryDataStoreDao.java +++ b/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/PrimaryDataStoreDao.java @@ -142,4 +142,6 @@ public interface PrimaryDataStoreDao extends GenericDao { String keyword, Filter searchFilter); List listByIds(List ids); + + List listAllIds(); } diff --git a/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/PrimaryDataStoreDaoImpl.java b/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/PrimaryDataStoreDaoImpl.java index 58d5ec47dfd..eac6f824e8e 100644 --- a/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/PrimaryDataStoreDaoImpl.java +++ b/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/PrimaryDataStoreDaoImpl.java @@ -28,8 +28,6 @@ import java.util.stream.Collectors; import javax.inject.Inject; import javax.naming.ConfigurationException; -import com.cloud.utils.Pair; -import com.cloud.utils.db.Filter; import org.apache.commons.collections.CollectionUtils; import com.cloud.host.Status; @@ -41,7 +39,9 @@ import com.cloud.storage.StoragePoolStatus; import com.cloud.storage.StoragePoolTagVO; import com.cloud.storage.dao.StoragePoolHostDao; import com.cloud.storage.dao.StoragePoolTagsDao; +import com.cloud.utils.Pair; import com.cloud.utils.db.DB; +import com.cloud.utils.db.Filter; import com.cloud.utils.db.GenericDaoBase; import com.cloud.utils.db.GenericSearchBuilder; import com.cloud.utils.db.JoinBuilder; @@ -714,4 +714,12 @@ public class PrimaryDataStoreDaoImpl extends GenericDaoBase sc.setParameters("parent", 0); return sc; } + + @Override + public List listAllIds() { + GenericSearchBuilder sb = createSearchBuilder(Long.class); + sb.selectFields(sb.entity().getId()); + sb.done(); + return customSearch(sb.create(), null); + } } diff --git a/server/src/main/java/com/cloud/alert/AlertManagerImpl.java b/server/src/main/java/com/cloud/alert/AlertManagerImpl.java index 6c7d048894c..b5ae55b9666 100644 --- a/server/src/main/java/com/cloud/alert/AlertManagerImpl.java +++ b/server/src/main/java/com/cloud/alert/AlertManagerImpl.java @@ -26,8 +26,11 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.Timer; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import javax.inject.Inject; import javax.mail.MessagingException; @@ -71,6 +74,7 @@ import com.cloud.event.AlertGenerator; import com.cloud.event.EventTypes; import com.cloud.host.Host; import com.cloud.host.HostVO; +import com.cloud.host.dao.HostDao; import com.cloud.network.Ipv6Service; import com.cloud.network.dao.IPAddressDao; import com.cloud.org.Grouping.AllocationState; @@ -119,6 +123,8 @@ public class AlertManagerImpl extends ManagerBase implements AlertManager, Confi protected ConfigDepot _configDepot; @Inject Ipv6Service ipv6Service; + @Inject + HostDao hostDao; private Timer _timer = null; private long _capacityCheckPeriod = 60L * 60L * 1000L; // One hour by default. @@ -252,6 +258,64 @@ public class AlertManagerImpl extends ManagerBase implements AlertManager, Confi } } + protected void recalculateHostCapacities() { + // Calculate CPU and RAM capacities + List hostIds = hostDao.listIdsByType(Host.Type.Routing); + if (hostIds.isEmpty()) { + return; + } + ConcurrentHashMap> futures = new ConcurrentHashMap<>(); + ExecutorService executorService = Executors.newFixedThreadPool(Math.max(1, + Math.min(CapacityManager.CapacityCalculateWorkers.value(), hostIds.size()))); + for (Long hostId : hostIds) { + futures.put(hostId, executorService.submit(() -> { + final HostVO host = hostDao.findById(hostId); + _capacityMgr.updateCapacityForHost(host); + return null; + })); + } + for (Map.Entry> entry: futures.entrySet()) { + try { + entry.getValue().get(); + } catch (InterruptedException | ExecutionException e) { + logger.error(String.format("Error during capacity calculation for host: %d due to : %s", + entry.getKey(), e.getMessage()), e); + } + } + executorService.shutdown(); + } + + protected void recalculateStorageCapacities() { + List storagePoolIds = _storagePoolDao.listAllIds(); + if (storagePoolIds.isEmpty()) { + return; + } + ConcurrentHashMap> futures = new ConcurrentHashMap<>(); + ExecutorService executorService = Executors.newFixedThreadPool(Math.max(1, + Math.min(CapacityManager.CapacityCalculateWorkers.value(), storagePoolIds.size()))); + for (Long poolId: storagePoolIds) { + futures.put(poolId, executorService.submit(() -> { + final StoragePoolVO pool = _storagePoolDao.findById(poolId); + long disk = _capacityMgr.getAllocatedPoolCapacity(pool, null); + if (pool.isShared()) { + _storageMgr.createCapacityEntry(pool, Capacity.CAPACITY_TYPE_STORAGE_ALLOCATED, disk); + } else { + _storageMgr.createCapacityEntry(pool, Capacity.CAPACITY_TYPE_LOCAL_STORAGE, disk); + } + return null; + })); + } + for (Map.Entry> entry: futures.entrySet()) { + try { + entry.getValue().get(); + } catch (InterruptedException | ExecutionException e) { + logger.error(String.format("Error during capacity calculation for storage pool: %d due to : %s", + entry.getKey(), e.getMessage()), e); + } + } + executorService.shutdown(); + } + @Override public void recalculateCapacity() { // FIXME: the right way to do this is to register a listener (see RouterStatsListener, VMSyncListener) @@ -267,30 +331,14 @@ public class AlertManagerImpl extends ManagerBase implements AlertManager, Confi logger.debug("recalculating system capacity"); logger.debug("Executing cpu/ram capacity update"); } - // Calculate CPU and RAM capacities - // get all hosts...even if they are not in 'UP' state - List hosts = _resourceMgr.listAllNotInMaintenanceHostsInOneZone(Host.Type.Routing, null); - if (hosts != null) { - for (HostVO host : hosts) { - _capacityMgr.updateCapacityForHost(host); - } - } + recalculateHostCapacities(); if (logger.isDebugEnabled()) { logger.debug("Done executing cpu/ram capacity update"); logger.debug("Executing storage capacity update"); } // Calculate storage pool capacity - List storagePools = _storagePoolDao.listAll(); - for (StoragePoolVO pool : storagePools) { - long disk = _capacityMgr.getAllocatedPoolCapacity(pool, null); - if (pool.isShared()) { - _storageMgr.createCapacityEntry(pool, Capacity.CAPACITY_TYPE_STORAGE_ALLOCATED, disk); - } else { - _storageMgr.createCapacityEntry(pool, Capacity.CAPACITY_TYPE_LOCAL_STORAGE, disk); - } - } - + recalculateStorageCapacities(); if (logger.isDebugEnabled()) { logger.debug("Done executing storage capacity update"); logger.debug("Executing capacity updates for public ip and Vlans"); diff --git a/server/src/main/java/com/cloud/capacity/CapacityManagerImpl.java b/server/src/main/java/com/cloud/capacity/CapacityManagerImpl.java index 94614d9514e..97f1af2584c 100644 --- a/server/src/main/java/com/cloud/capacity/CapacityManagerImpl.java +++ b/server/src/main/java/com/cloud/capacity/CapacityManagerImpl.java @@ -40,7 +40,6 @@ import org.apache.cloudstack.framework.messagebus.PublishScope; import org.apache.cloudstack.storage.datastore.db.StoragePoolVO; import org.apache.cloudstack.utils.cache.LazyCache; import org.apache.cloudstack.utils.cache.SingleCache; -import org.apache.cloudstack.utils.executor.QueuedExecutor; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.ObjectUtils; import org.apache.log4j.Logger; @@ -150,7 +149,6 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager, private LazyCache> clusterValuesCache; private SingleCache> serviceOfferingsCache; - private QueuedExecutor hostCapacityUpdateExecutor; @Override public boolean configure(String name, Map params) throws ConfigurationException { @@ -160,9 +158,6 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager, _agentManager.registerForHostEvents(new StorageCapacityListener(_capacityDao, _storageMgr), true, false, false); _agentManager.registerForHostEvents(new ComputeCapacityListener(_capacityDao, this), true, false, false); - hostCapacityUpdateExecutor = new QueuedExecutor<>("HostCapacityUpdateExecutor", 10, 10, - 1, s_logger, this::updateCapacityForHostInternal); - return true; } @@ -172,13 +167,11 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager, _resourceMgr.registerResourceEvent(ResourceListener.EVENT_CANCEL_MAINTENANCE_AFTER, this); clusterValuesCache = new LazyCache<>(128, 60, this::getClusterValues); serviceOfferingsCache = new SingleCache<>(60, this::getServiceOfferingsMap); - hostCapacityUpdateExecutor.startProcessing(); return true; } @Override public boolean stop() { - hostCapacityUpdateExecutor.shutdown(); return true; } @@ -646,12 +639,6 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager, return totalAllocatedSize; } - @DB - @Override - public void updateCapacityForHost(final Host host) { - hostCapacityUpdateExecutor.queueRequest(host); - } - protected Pair getClusterValues(long clusterId) { Map map = _clusterDetailsDao.findDetails(clusterId, List.of(VmDetailConstants.CPU_OVER_COMMIT_RATIO, VmDetailConstants.CPU_OVER_COMMIT_RATIO)); @@ -694,7 +681,8 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager, } @DB - protected void updateCapacityForHostInternal(final Host host) { + @Override + public void updateCapacityForHost(final Host host) { long usedCpuCore = 0; long reservedCpuCore = 0; long usedCpu = 0; @@ -1324,6 +1312,6 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager, public ConfigKey[] getConfigKeys() { return new ConfigKey[] {CpuOverprovisioningFactor, MemOverprovisioningFactor, StorageCapacityDisableThreshold, StorageOverprovisioningFactor, StorageAllocatedCapacityDisableThreshold, StorageOperationsExcludeCluster, ImageStoreNFSVersion, SecondaryStorageCapacityThreshold, - StorageAllocatedCapacityDisableThresholdForVolumeSize }; + StorageAllocatedCapacityDisableThresholdForVolumeSize, CapacityCalculateWorkers }; } } diff --git a/utils/src/main/java/org/apache/cloudstack/utils/executor/QueuedExecutor.java b/utils/src/main/java/org/apache/cloudstack/utils/executor/QueuedExecutor.java deleted file mode 100644 index 3181f791bce..00000000000 --- a/utils/src/main/java/org/apache/cloudstack/utils/executor/QueuedExecutor.java +++ /dev/null @@ -1,84 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.cloudstack.utils.executor; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; - -import org.apache.log4j.Logger; - -import com.cloud.utils.concurrency.NamedThreadFactory; - -public class QueuedExecutor { - - private final String name; - private final int processingSize; - private final int processingInterval; - private final BlockingQueue requestQueue; - private final ScheduledExecutorService executorService; - private final Logger logger; - private final Consumer consumer; - - public QueuedExecutor(String name, int processingSize, int processingInterval, int maxThreads, Logger logger, - Consumer consumer) { - this.name = name; - this.processingSize = processingSize; - this.processingInterval = processingInterval; - this.logger = logger; - this.consumer = consumer; - requestQueue = new LinkedBlockingQueue<>(processingSize); - executorService = Executors.newScheduledThreadPool(maxThreads, new NamedThreadFactory(name)); - } - - public void queueRequest(K request) { - try { - requestQueue.put(request); - if (requestQueue.size() >= processingSize) { - processRequests(); - } - } catch (InterruptedException e) { - logger.warn(String.format("Error queuing request for %s", name), e); - } - } - - public void startProcessing() { - executorService.scheduleAtFixedRate(this::processRequests, 0, - processingInterval, TimeUnit.SECONDS); - } - - private void processRequests() { - List requestsToProcess = new ArrayList<>(); - requestQueue.drainTo(requestsToProcess, processingSize); - - if (!requestsToProcess.isEmpty()) { - for (K request : requestsToProcess) { - consumer.accept(request); - } - } - } - - public void shutdown() { - executorService.shutdown(); - } -}