refactor capacity calculation

Added capacity.calculate.workers config to control the number threads
that can be used to calculate capacities.

Signed-off-by: Abhishek Kumar <abhishek.mrt22@gmail.com>
This commit is contained in:
Abhishek Kumar 2024-10-21 17:00:24 +05:30
parent 16a541cd71
commit f6c16d9cfc
9 changed files with 94 additions and 121 deletions

View File

@ -127,6 +127,10 @@ public interface CapacityManager {
true,
ConfigKey.Scope.Zone);
ConfigKey<Integer> CapacityCalculateWorkers = new ConfigKey<>(ConfigKey.CATEGORY_ADVANCED, Integer.class,
"capacity.calculate.workers", "1",
"Number of worker threads to be used for capacities calculation", true);
public boolean releaseVmCapacity(VirtualMachine vm, boolean moveFromReserved, boolean moveToReservered, Long hostId);
void allocateVmCapacity(VirtualMachine vm, boolean fromLastHost);

View File

@ -214,8 +214,8 @@ public interface StorageManager extends StorageService {
true, ConfigKey.Scope.Zone);
ConfigKey<Integer> PrimaryStorageHostConnectWorkers = new ConfigKey<>("Storage", Integer.class,
"primary.storage.host.connect.workers", "3",
"Number of worker threads to be used to connect primary a storage to hosts", false);
"primary.storage.host.connect.workers", "1",
"Number of worker threads to be used to connect hosts to a primary storage", true);
/**
* should we execute in sequence not involving any storages?

View File

@ -102,6 +102,8 @@ public interface HostDao extends GenericDao<HostVO, Long>, StateDao<Status, Stat
List<Long> listIdsForUpRouting(Long zoneId, Long podId, Long clusterId);
List<Long> listIdsByType(Type type);
List<Long> listIdsForUpEnabledByZoneAndHypervisor(Long zoneId, HypervisorType hypervisorType);
List<HostVO> findByClusterIdAndEncryptionSupport(Long clusterId);

View File

@ -1245,6 +1245,11 @@ public class HostDaoImpl extends GenericDaoBase<HostVO, Long> implements HostDao
return listIdsBy(Type.Routing, Status.Up, null, null, zoneId, podId, clusterId);
}
@Override
public List<Long> listIdsByType(Type type) {
return listIdsBy(type, null, null, null, null, null, null);
}
@Override
public List<Long> listIdsForUpEnabledByZoneAndHypervisor(Long zoneId, HypervisorType hypervisorType) {
return listIdsBy(null, Status.Up, ResourceState.Enabled, hypervisorType, zoneId, null, null);

View File

@ -142,4 +142,6 @@ public interface PrimaryDataStoreDao extends GenericDao<StoragePoolVO, Long> {
String keyword, Filter searchFilter);
List<StoragePoolVO> listByIds(List<Long> ids);
List<Long> listAllIds();
}

View File

@ -28,8 +28,6 @@ import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.naming.ConfigurationException;
import com.cloud.utils.Pair;
import com.cloud.utils.db.Filter;
import org.apache.commons.collections.CollectionUtils;
import com.cloud.host.Status;
@ -41,7 +39,9 @@ import com.cloud.storage.StoragePoolStatus;
import com.cloud.storage.StoragePoolTagVO;
import com.cloud.storage.dao.StoragePoolHostDao;
import com.cloud.storage.dao.StoragePoolTagsDao;
import com.cloud.utils.Pair;
import com.cloud.utils.db.DB;
import com.cloud.utils.db.Filter;
import com.cloud.utils.db.GenericDaoBase;
import com.cloud.utils.db.GenericSearchBuilder;
import com.cloud.utils.db.JoinBuilder;
@ -714,4 +714,12 @@ public class PrimaryDataStoreDaoImpl extends GenericDaoBase<StoragePoolVO, Long>
sc.setParameters("parent", 0);
return sc;
}
@Override
public List<Long> listAllIds() {
GenericSearchBuilder<StoragePoolVO, Long> sb = createSearchBuilder(Long.class);
sb.selectFields(sb.entity().getId());
sb.done();
return customSearch(sb.create(), null);
}
}

View File

@ -26,8 +26,11 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.inject.Inject;
import javax.mail.MessagingException;
@ -71,6 +74,7 @@ import com.cloud.event.AlertGenerator;
import com.cloud.event.EventTypes;
import com.cloud.host.Host;
import com.cloud.host.HostVO;
import com.cloud.host.dao.HostDao;
import com.cloud.network.Ipv6Service;
import com.cloud.network.dao.IPAddressDao;
import com.cloud.org.Grouping.AllocationState;
@ -119,6 +123,8 @@ public class AlertManagerImpl extends ManagerBase implements AlertManager, Confi
protected ConfigDepot _configDepot;
@Inject
Ipv6Service ipv6Service;
@Inject
HostDao hostDao;
private Timer _timer = null;
private long _capacityCheckPeriod = 60L * 60L * 1000L; // One hour by default.
@ -252,6 +258,64 @@ public class AlertManagerImpl extends ManagerBase implements AlertManager, Confi
}
}
protected void recalculateHostCapacities() {
// Calculate CPU and RAM capacities
List<Long> hostIds = hostDao.listIdsByType(Host.Type.Routing);
if (hostIds.isEmpty()) {
return;
}
ConcurrentHashMap<Long, Future<Void>> futures = new ConcurrentHashMap<>();
ExecutorService executorService = Executors.newFixedThreadPool(Math.max(1,
Math.min(CapacityManager.CapacityCalculateWorkers.value(), hostIds.size())));
for (Long hostId : hostIds) {
futures.put(hostId, executorService.submit(() -> {
final HostVO host = hostDao.findById(hostId);
_capacityMgr.updateCapacityForHost(host);
return null;
}));
}
for (Map.Entry<Long, Future<Void>> entry: futures.entrySet()) {
try {
entry.getValue().get();
} catch (InterruptedException | ExecutionException e) {
logger.error(String.format("Error during capacity calculation for host: %d due to : %s",
entry.getKey(), e.getMessage()), e);
}
}
executorService.shutdown();
}
protected void recalculateStorageCapacities() {
List<Long> storagePoolIds = _storagePoolDao.listAllIds();
if (storagePoolIds.isEmpty()) {
return;
}
ConcurrentHashMap<Long, Future<Void>> futures = new ConcurrentHashMap<>();
ExecutorService executorService = Executors.newFixedThreadPool(Math.max(1,
Math.min(CapacityManager.CapacityCalculateWorkers.value(), storagePoolIds.size())));
for (Long poolId: storagePoolIds) {
futures.put(poolId, executorService.submit(() -> {
final StoragePoolVO pool = _storagePoolDao.findById(poolId);
long disk = _capacityMgr.getAllocatedPoolCapacity(pool, null);
if (pool.isShared()) {
_storageMgr.createCapacityEntry(pool, Capacity.CAPACITY_TYPE_STORAGE_ALLOCATED, disk);
} else {
_storageMgr.createCapacityEntry(pool, Capacity.CAPACITY_TYPE_LOCAL_STORAGE, disk);
}
return null;
}));
}
for (Map.Entry<Long, Future<Void>> entry: futures.entrySet()) {
try {
entry.getValue().get();
} catch (InterruptedException | ExecutionException e) {
logger.error(String.format("Error during capacity calculation for storage pool: %d due to : %s",
entry.getKey(), e.getMessage()), e);
}
}
executorService.shutdown();
}
@Override
public void recalculateCapacity() {
// FIXME: the right way to do this is to register a listener (see RouterStatsListener, VMSyncListener)
@ -267,30 +331,14 @@ public class AlertManagerImpl extends ManagerBase implements AlertManager, Confi
logger.debug("recalculating system capacity");
logger.debug("Executing cpu/ram capacity update");
}
// Calculate CPU and RAM capacities
// get all hosts...even if they are not in 'UP' state
List<HostVO> hosts = _resourceMgr.listAllNotInMaintenanceHostsInOneZone(Host.Type.Routing, null);
if (hosts != null) {
for (HostVO host : hosts) {
_capacityMgr.updateCapacityForHost(host);
}
}
recalculateHostCapacities();
if (logger.isDebugEnabled()) {
logger.debug("Done executing cpu/ram capacity update");
logger.debug("Executing storage capacity update");
}
// Calculate storage pool capacity
List<StoragePoolVO> storagePools = _storagePoolDao.listAll();
for (StoragePoolVO pool : storagePools) {
long disk = _capacityMgr.getAllocatedPoolCapacity(pool, null);
if (pool.isShared()) {
_storageMgr.createCapacityEntry(pool, Capacity.CAPACITY_TYPE_STORAGE_ALLOCATED, disk);
} else {
_storageMgr.createCapacityEntry(pool, Capacity.CAPACITY_TYPE_LOCAL_STORAGE, disk);
}
}
recalculateStorageCapacities();
if (logger.isDebugEnabled()) {
logger.debug("Done executing storage capacity update");
logger.debug("Executing capacity updates for public ip and Vlans");

View File

@ -40,7 +40,6 @@ import org.apache.cloudstack.framework.messagebus.PublishScope;
import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
import org.apache.cloudstack.utils.cache.LazyCache;
import org.apache.cloudstack.utils.cache.SingleCache;
import org.apache.cloudstack.utils.executor.QueuedExecutor;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.log4j.Logger;
@ -150,7 +149,6 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
private LazyCache<Long, Pair<String, String>> clusterValuesCache;
private SingleCache<Map<Long, ServiceOfferingVO>> serviceOfferingsCache;
private QueuedExecutor<Host> hostCapacityUpdateExecutor;
@Override
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
@ -160,9 +158,6 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
_agentManager.registerForHostEvents(new StorageCapacityListener(_capacityDao, _storageMgr), true, false, false);
_agentManager.registerForHostEvents(new ComputeCapacityListener(_capacityDao, this), true, false, false);
hostCapacityUpdateExecutor = new QueuedExecutor<>("HostCapacityUpdateExecutor", 10, 10,
1, s_logger, this::updateCapacityForHostInternal);
return true;
}
@ -172,13 +167,11 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
_resourceMgr.registerResourceEvent(ResourceListener.EVENT_CANCEL_MAINTENANCE_AFTER, this);
clusterValuesCache = new LazyCache<>(128, 60, this::getClusterValues);
serviceOfferingsCache = new SingleCache<>(60, this::getServiceOfferingsMap);
hostCapacityUpdateExecutor.startProcessing();
return true;
}
@Override
public boolean stop() {
hostCapacityUpdateExecutor.shutdown();
return true;
}
@ -646,12 +639,6 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
return totalAllocatedSize;
}
@DB
@Override
public void updateCapacityForHost(final Host host) {
hostCapacityUpdateExecutor.queueRequest(host);
}
protected Pair<String, String> getClusterValues(long clusterId) {
Map<String, String> map = _clusterDetailsDao.findDetails(clusterId,
List.of(VmDetailConstants.CPU_OVER_COMMIT_RATIO, VmDetailConstants.CPU_OVER_COMMIT_RATIO));
@ -694,7 +681,8 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
}
@DB
protected void updateCapacityForHostInternal(final Host host) {
@Override
public void updateCapacityForHost(final Host host) {
long usedCpuCore = 0;
long reservedCpuCore = 0;
long usedCpu = 0;
@ -1324,6 +1312,6 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
public ConfigKey<?>[] getConfigKeys() {
return new ConfigKey<?>[] {CpuOverprovisioningFactor, MemOverprovisioningFactor, StorageCapacityDisableThreshold, StorageOverprovisioningFactor,
StorageAllocatedCapacityDisableThreshold, StorageOperationsExcludeCluster, ImageStoreNFSVersion, SecondaryStorageCapacityThreshold,
StorageAllocatedCapacityDisableThresholdForVolumeSize };
StorageAllocatedCapacityDisableThresholdForVolumeSize, CapacityCalculateWorkers };
}
}

View File

@ -1,84 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.cloudstack.utils.executor;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.log4j.Logger;
import com.cloud.utils.concurrency.NamedThreadFactory;
public class QueuedExecutor<K> {
private final String name;
private final int processingSize;
private final int processingInterval;
private final BlockingQueue<K> requestQueue;
private final ScheduledExecutorService executorService;
private final Logger logger;
private final Consumer<K> consumer;
public QueuedExecutor(String name, int processingSize, int processingInterval, int maxThreads, Logger logger,
Consumer<K> consumer) {
this.name = name;
this.processingSize = processingSize;
this.processingInterval = processingInterval;
this.logger = logger;
this.consumer = consumer;
requestQueue = new LinkedBlockingQueue<>(processingSize);
executorService = Executors.newScheduledThreadPool(maxThreads, new NamedThreadFactory(name));
}
public void queueRequest(K request) {
try {
requestQueue.put(request);
if (requestQueue.size() >= processingSize) {
processRequests();
}
} catch (InterruptedException e) {
logger.warn(String.format("Error queuing request for %s", name), e);
}
}
public void startProcessing() {
executorService.scheduleAtFixedRate(this::processRequests, 0,
processingInterval, TimeUnit.SECONDS);
}
private void processRequests() {
List<K> requestsToProcess = new ArrayList<>();
requestQueue.drainTo(requestsToProcess, processingSize);
if (!requestsToProcess.isEmpty()) {
for (K request : requestsToProcess) {
consumer.accept(request);
}
}
}
public void shutdown() {
executorService.shutdown();
}
}