cache and executors refactoring

Signed-off-by: Abhishek Kumar <abhishek.mrt22@gmail.com>
This commit is contained in:
Abhishek Kumar 2024-09-09 19:39:50 +05:30
parent e798ab30b3
commit 61764aba1f
5 changed files with 167 additions and 24 deletions

View File

@ -84,6 +84,7 @@ import org.apache.cloudstack.resource.ResourceCleanupService;
import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
import org.apache.cloudstack.storage.to.VolumeObjectTO;
import org.apache.cloudstack.utils.cache.SingleCache;
import org.apache.cloudstack.utils.identity.ManagementServerNode;
import org.apache.cloudstack.vm.UnmanagedVMsManager;
import org.apache.commons.collections.CollectionUtils;
@ -398,7 +399,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
@Inject
VmWorkJobDao vmWorkJobDao;
private LoadingCache<Integer, List<Long>> vmIdsInProgressCache;
private SingleCache<List<Long>> vmIdsInProgressCache;
VmWorkJobHandlerProxy _jobHandlerProxy = new VmWorkJobHandlerProxy(this);
@ -811,10 +812,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
@Override
public boolean start() {
vmIdsInProgressCache = Caffeine.newBuilder()
.expireAfterWrite(10, TimeUnit.SECONDS)
.maximumSize(1)
.build(key -> vmWorkJobDao.listVmIdsWithPendingJob());
vmIdsInProgressCache = new SingleCache<>(10, vmWorkJobDao::listVmIdsWithPendingJob);
_executor.scheduleAtFixedRate(new CleanupTask(), 5, VmJobStateReportInterval.value(), TimeUnit.SECONDS);
_executor.scheduleAtFixedRate(new TransitionTask(), VmOpCleanupInterval.value(), VmOpCleanupInterval.value(), TimeUnit.SECONDS);
cancelWorkItems(_nodeId);

View File

@ -36,6 +36,8 @@ import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
import org.apache.cloudstack.framework.messagebus.MessageBus;
import org.apache.cloudstack.framework.messagebus.PublishScope;
import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
import org.apache.cloudstack.utils.cache.LazyCache;
import org.apache.cloudstack.utils.executor.QueueExecutor;
import org.apache.log4j.Logger;
import com.cloud.agent.AgentManager;
@ -50,7 +52,6 @@ import com.cloud.capacity.dao.CapacityDao;
import com.cloud.configuration.Config;
import com.cloud.dc.ClusterDetailsDao;
import com.cloud.dc.ClusterDetailsVO;
import com.cloud.dc.ClusterVO;
import com.cloud.dc.dao.ClusterDao;
import com.cloud.deploy.DeploymentClusterPlanner;
import com.cloud.event.UsageEventVO;
@ -142,6 +143,9 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
@Inject
MessageBus _messageBus;
private LazyCache<Long, Pair<ClusterDetailsVO, ClusterDetailsVO>> clusterValuesCache;
private QueueExecutor<Host> hostCapacityUpdateExecutor;
@Override
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
_vmCapacityReleaseInterval = NumbersUtil.parseInt(_configDao.getValue(Config.CapacitySkipcountingHours.key()), 3600);
@ -150,6 +154,9 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
_agentManager.registerForHostEvents(new StorageCapacityListener(_capacityDao, _storageMgr), true, false, false);
_agentManager.registerForHostEvents(new ComputeCapacityListener(_capacityDao, this), true, false, false);
hostCapacityUpdateExecutor = new QueueExecutor<>("HostCapacityUpdateExecutor", 10, 10,
s_logger, this::updateCapacityForHostInternal);
return true;
}
@ -157,11 +164,14 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
public boolean start() {
_resourceMgr.registerResourceEvent(ResourceListener.EVENT_PREPARE_MAINTENANCE_AFTER, this);
_resourceMgr.registerResourceEvent(ResourceListener.EVENT_CANCEL_MAINTENANCE_AFTER, this);
clusterValuesCache = new LazyCache<>(16, 60, this::getClusterValues);
hostCapacityUpdateExecutor.startProcessing();
return true;
}
@Override
public boolean stop() {
hostCapacityUpdateExecutor.shutdown();
return true;
}
@ -631,9 +641,7 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
return totalAllocatedSize;
}
@DB
@Override
public void updateCapacityForHost(final Host host) {
protected void updateCapacityForHostInternal(final Host host) {
// prepare the service offerings
List<ServiceOfferingVO> offerings = _offeringsDao.listAllIncludingRemoved();
Map<Long, ServiceOfferingVO> offeringsMap = new HashMap<Long, ServiceOfferingVO>();
@ -643,6 +651,17 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
updateCapacityForHost(host, offeringsMap);
}
@DB
@Override
public void updateCapacityForHost(final Host host) {
hostCapacityUpdateExecutor.queueRequest(host);
}
protected Pair<ClusterDetailsVO, ClusterDetailsVO> getClusterValues(long clusterId) {
return new Pair<>(_clusterDetailsDao.findDetail(clusterId, "cpuOvercommitRatio"),
_clusterDetailsDao.findDetail(clusterId, "memoryOvercommitRatio"));
}
@DB
@Override
public void updateCapacityForHost(final Host host, final Map<Long, ServiceOfferingVO> offeringsMap) {
@ -665,9 +684,10 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
}
vms.addAll(vosMigrating);
ClusterVO cluster = _clusterDao.findById(host.getClusterId());
ClusterDetailsVO clusterDetailCpu = _clusterDetailsDao.findDetail(cluster.getId(), "cpuOvercommitRatio");
ClusterDetailsVO clusterDetailRam = _clusterDetailsDao.findDetail(cluster.getId(), "memoryOvercommitRatio");
Pair<ClusterDetailsVO, ClusterDetailsVO> clusterValues =
clusterValuesCache.get(host.getClusterId());
ClusterDetailsVO clusterDetailCpu = clusterValues.first();
ClusterDetailsVO clusterDetailRam = clusterValues.second();
Float clusterCpuOvercommitRatio = Float.parseFloat(clusterDetailCpu.getValue());
Float clusterRamOvercommitRatio = Float.parseFloat(clusterDetailRam.getValue());
for (VMInstanceVO vm : vms) {

View File

@ -25,6 +25,8 @@ import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import org.apache.cloudstack.utils.cache.LazyCache;
import org.apache.cloudstack.utils.cache.SingleCache;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@ -138,13 +140,11 @@ public class DownloadListener implements Listener {
@Inject
private VolumeService _volumeSrv;
private Cache<Long, List<Hypervisor.HypervisorType>> zoneHypervisorsCache;
private LazyCache<Long, List<Hypervisor.HypervisorType>> zoneHypervisorsCache;
protected void initZoneHypervisorsCache() {
zoneHypervisorsCache = Caffeine.newBuilder()
.maximumSize(32)
.expireAfterWrite(30, TimeUnit.SECONDS)
.build();
zoneHypervisorsCache =
new LazyCache<>(32, 30, _resourceMgr::listAvailHypervisorInZone);
}
// TODO: this constructor should be the one used for template only, remove other template constructor later
@ -286,18 +286,13 @@ public class DownloadListener implements Listener {
public void processHostAdded(long hostId) {
}
protected List<Hypervisor.HypervisorType> getAvailHypervisorInZone(long zoneId) {
return _resourceMgr.listAvailHypervisorInZone(zoneId);
}
@Override
public void processConnect(Host agent, StartupCommand cmd, boolean forRebalance) throws ConnectionException {
if (cmd instanceof StartupRoutingCommand) {
// FIXME: CPU and DB hotspot
List<Hypervisor.HypervisorType> hypers = zoneHypervisorsCache.get(agent.getDataCenterId(),
this::getAvailHypervisorInZone);
List<Hypervisor.HypervisorType> hypervisors = zoneHypervisorsCache.get(agent.getDataCenterId());
Hypervisor.HypervisorType hostHyper = agent.getHypervisorType();
if (hypers.contains(hostHyper)) {
if (hypervisors.contains(hostHyper)) {
return;
}
_imageSrv.handleSysTemplateDownload(hostHyper, agent.getDataCenterId());

View File

@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.cloudstack.utils.cache;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
public class SingleCache<V> {
private final LoadingCache<Void, V> cache;
public SingleCache(long expireAfterWriteSeconds, Supplier<V> loader) {
this.cache = Caffeine.newBuilder()
.maximumSize(1)
.expireAfterWrite(expireAfterWriteSeconds, TimeUnit.SECONDS)
.build(key -> loader.get());
}
public V get() {
return cache.get(null);
}
public void invalidate() {
cache.invalidate(null);
}
public void clear() {
cache.invalidateAll();
}
}

View File

@ -0,0 +1,82 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.cloudstack.utils.executor;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.log4j.Logger;
public class QueueExecutor<K> {
private final String name;
private final int processingSize;
private final int processingInterval;
private final BlockingQueue<K> requestQueue;
private final ScheduledExecutorService executorService;
private final Logger logger;
private final Consumer<K> consumer;
public QueueExecutor(String name, int processingSize, int processingInterval, Logger logger,
Consumer<K> consumer) {
this.name = name;
this.processingSize = processingSize;
this.processingInterval = processingInterval;
this.logger = logger;
this.consumer = consumer;
requestQueue = new LinkedBlockingQueue<>(processingSize);
executorService = Executors.newSingleThreadScheduledExecutor();
}
public void queueRequest(K request) {
try {
requestQueue.put(request);
if (requestQueue.size() >= processingSize) {
processRequests();
}
} catch (InterruptedException e) {
logger.warn(String.format("Error queuing request for %s", name), e);
}
}
public void startProcessing() {
executorService.scheduleAtFixedRate(this::processRequests, 0,
processingInterval, TimeUnit.SECONDS);
}
private void processRequests() {
List<K> requestsToProcess = new ArrayList<>();
requestQueue.drainTo(requestsToProcess, processingSize);
if (!requestsToProcess.isEmpty()) {
for (K request : requestsToProcess) {
consumer.accept(request);
}
}
}
public void shutdown() {
executorService.shutdown();
}
}