This commit is contained in:
Abhishek Kumar 2026-01-22 15:03:49 +01:00 committed by GitHub
commit 8686eca3da
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 68 additions and 32 deletions

View File

@ -28,6 +28,7 @@ import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -37,13 +38,11 @@ import javax.inject.Inject;
import javax.mail.MessagingException;
import javax.naming.ConfigurationException;
import com.cloud.dc.DataCenter;
import com.cloud.dc.Pod;
import com.cloud.org.Cluster;
import org.apache.cloudstack.framework.config.ConfigDepot;
import org.apache.cloudstack.framework.config.ConfigKey;
import org.apache.cloudstack.framework.config.Configurable;
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
import org.apache.cloudstack.managed.context.ManagedContextTimerTask;
import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
@ -52,8 +51,9 @@ import org.apache.cloudstack.utils.mailing.SMTPMailProperties;
import org.apache.cloudstack.utils.mailing.SMTPMailSender;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.Nullable;
import com.cloud.alert.dao.AlertDao;
import com.cloud.api.ApiDBUtils;
@ -66,9 +66,11 @@ import com.cloud.capacity.dao.CapacityDaoImpl.SummedCapacity;
import com.cloud.configuration.Config;
import com.cloud.configuration.ConfigurationManager;
import com.cloud.dc.ClusterVO;
import com.cloud.dc.DataCenter;
import com.cloud.dc.DataCenter.NetworkType;
import com.cloud.dc.DataCenterVO;
import com.cloud.dc.HostPodVO;
import com.cloud.dc.Pod;
import com.cloud.dc.Vlan.VlanType;
import com.cloud.dc.dao.ClusterDao;
import com.cloud.dc.dao.DataCenterDao;
@ -82,6 +84,7 @@ import com.cloud.host.HostVO;
import com.cloud.host.dao.HostDao;
import com.cloud.network.Ipv6Service;
import com.cloud.network.dao.IPAddressDao;
import com.cloud.org.Cluster;
import com.cloud.org.Grouping.AllocationState;
import com.cloud.resource.ResourceManager;
import com.cloud.storage.StorageManager;
@ -93,8 +96,6 @@ import com.cloud.utils.db.Transaction;
import com.cloud.utils.db.TransactionCallbackNoReturn;
import com.cloud.utils.db.TransactionStatus;
import org.jetbrains.annotations.Nullable;
public class AlertManagerImpl extends ManagerBase implements AlertManager, Configurable {
protected Logger logger = LogManager.getLogger(AlertManagerImpl.class.getName());
@ -289,25 +290,47 @@ public class AlertManagerImpl extends ManagerBase implements AlertManager, Confi
if (hostIds.isEmpty()) {
return;
}
ConcurrentHashMap<Long, Future<Void>> futures = new ConcurrentHashMap<>();
ExecutorService executorService = Executors.newFixedThreadPool(Math.max(1,
Math.min(CapacityManager.CapacityCalculateWorkers.value(), hostIds.size())));
for (Long hostId : hostIds) {
futures.put(hostId, executorService.submit(() -> {
final HostVO host = hostDao.findById(hostId);
_capacityMgr.updateCapacityForHost(host);
return null;
}));
}
for (Map.Entry<Long, Future<Void>> entry: futures.entrySet()) {
try {
entry.getValue().get();
} catch (InterruptedException | ExecutionException e) {
logger.error(String.format("Error during capacity calculation for host: %d due to : %s",
entry.getKey(), e.getMessage()), e);
final CountDownLatch latch = new CountDownLatch(hostIds.size());
final ConcurrentHashMap<Long, Throwable> failures = new ConcurrentHashMap<>();
try {
for (final Long hostId : hostIds) {
executorService.execute(new ManagedContextRunnable() {
@Override
protected void runInContext() {
try {
final HostVO host = hostDao.findById(hostId);
if (host == null) {
logger.error("Host with ID: {} no longer exists, skipping capacity calculation", hostId);
return;
}
_capacityMgr.updateCapacityForHost(host);
} catch (Throwable t) {
failures.put(hostId, t);
logger.error("Error during host capacity calculation for ID: {}", hostId, t);
} finally {
latch.countDown();
}
}
});
}
try {
latch.await();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
logger.warn("Interrupted while waiting for host capacity calculation tasks");
}
if (!failures.isEmpty()) {
logger.warn("Host capacity calculation finished with {} failures out of {} hosts",
failures.size(), hostIds.size());
}
} finally {
executorService.shutdown();
}
executorService.shutdown();
}
protected void recalculateStorageCapacities() {

View File

@ -823,11 +823,17 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
cpuCoreCap.setUsedCapacity(usedCpuCore);
}
}
try {
_capacityDao.update(cpuCoreCap.getId(), cpuCoreCap);
} catch (Exception e) {
logger.error("Caught exception while updating cpucore capacity for the host {}", host, e);
}
final CapacityVO cpuCoreCapFinal = cpuCoreCap;
Transaction.execute(new TransactionCallbackNoReturn() {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
try {
_capacityDao.update(cpuCoreCapFinal.getId(), cpuCoreCapFinal);
} catch (Exception e) {
logger.error("Caught exception while updating cpucore capacity for the host {}", host, e);
}
}
});
} else {
final long usedCpuCoreFinal = usedCpuCore;
final long reservedCpuCoreFinal = reservedCpuCore;
@ -903,12 +909,19 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
}
}
try {
_capacityDao.update(cpuCap.getId(), cpuCap);
_capacityDao.update(memCap.getId(), memCap);
} catch (Exception e) {
logger.error("Caught exception while updating cpu/memory capacity for the host {}", host, e);
}
final CapacityVO cpuCapFinal = cpuCap;
final CapacityVO memCapFinal = memCap;
Transaction.execute(new TransactionCallbackNoReturn() {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
try {
_capacityDao.update(cpuCapFinal.getId(), cpuCapFinal);
_capacityDao.update(memCapFinal.getId(), memCapFinal);
} catch (Exception e) {
logger.error("Caught exception while updating cpu/memory capacity for the host {}", host, e);
}
}
});
} else {
final long usedMemoryFinal = usedMemory;
final long reservedMemoryFinal = reservedMemory;