refactor transitioning vm process report

Signed-off-by: Abhishek Kumar <abhishek.mrt22@gmail.com>
This commit is contained in:
Abhishek Kumar 2024-09-04 18:35:23 +05:30
parent 060a8ca623
commit 8ee5e6a99a
3 changed files with 57 additions and 74 deletions

View File

@ -48,10 +48,6 @@ import javax.inject.Inject;
import javax.naming.ConfigurationException;
import javax.persistence.EntityExistsException;
import com.cloud.configuration.Resource;
import com.cloud.event.ActionEventUtils;
import com.cloud.exception.ResourceAllocationException;
import com.google.gson.Gson;
import org.apache.cloudstack.affinity.dao.AffinityGroupVMMapDao;
import org.apache.cloudstack.annotation.AnnotationService;
import org.apache.cloudstack.annotation.dao.AnnotationDao;
@ -154,6 +150,7 @@ import com.cloud.api.query.dao.UserVmJoinDao;
import com.cloud.api.query.vo.DomainRouterJoinVO;
import com.cloud.api.query.vo.UserVmJoinVO;
import com.cloud.capacity.CapacityManager;
import com.cloud.configuration.Resource;
import com.cloud.dc.ClusterDetailsDao;
import com.cloud.dc.ClusterDetailsVO;
import com.cloud.dc.ClusterVO;
@ -171,6 +168,7 @@ import com.cloud.deploy.DeploymentPlanner;
import com.cloud.deploy.DeploymentPlanner.ExcludeList;
import com.cloud.deploy.DeploymentPlanningManager;
import com.cloud.deployasis.dao.UserVmDeployAsIsDetailsDao;
import com.cloud.event.ActionEventUtils;
import com.cloud.event.EventTypes;
import com.cloud.event.UsageEventUtils;
import com.cloud.event.UsageEventVO;
@ -182,6 +180,7 @@ import com.cloud.exception.InsufficientCapacityException;
import com.cloud.exception.InsufficientServerCapacityException;
import com.cloud.exception.InvalidParameterValueException;
import com.cloud.exception.OperationTimedoutException;
import com.cloud.exception.ResourceAllocationException;
import com.cloud.exception.ResourceUnavailableException;
import com.cloud.exception.StorageAccessException;
import com.cloud.exception.StorageUnavailableException;
@ -273,6 +272,9 @@ import com.cloud.vm.dao.VMInstanceDao;
import com.cloud.vm.snapshot.VMSnapshotManager;
import com.cloud.vm.snapshot.VMSnapshotVO;
import com.cloud.vm.snapshot.dao.VMSnapshotDao;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.gson.Gson;
public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMachineManager, VmWorkJobHandler, Listener, Configurable {
private static final Logger s_logger = Logger.getLogger(VirtualMachineManagerImpl.class);
@ -393,6 +395,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
private AnnotationDao annotationDao;
@Inject
ResourceCleanupService resourceCleanupService;
@Inject
VmWorkJobDao vmWorkJobDao;
private LoadingCache<Integer, List<Long>> vmIdsInProgressCache;
VmWorkJobHandlerProxy _jobHandlerProxy = new VmWorkJobHandlerProxy(this);
@ -805,6 +811,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
@Override
public boolean start() {
vmIdsInProgressCache = Caffeine.newBuilder()
.expireAfterWrite(10, TimeUnit.SECONDS)
.maximumSize(1)
.build(key -> vmWorkJobDao.listVmIdsWithPendingJob());
_executor.scheduleAtFixedRate(new CleanupTask(), 5, VmJobStateReportInterval.value(), TimeUnit.SECONDS);
_executor.scheduleAtFixedRate(new TransitionTask(), VmOpCleanupInterval.value(), VmOpCleanupInterval.value(), TimeUnit.SECONDS);
cancelWorkItems(_nodeId);
@ -5003,25 +5013,27 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
// (which is relatively safe to do so)
final long stallThresholdInMs = VmJobStateReportInterval.value() * 2;
final Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - stallThresholdInMs);
HostVO hostVO = _hostDao.findById(hostId);
if (!Status.Up.equals(hostVO.getStatus())) {
return;
}
// FIXME: CPU & DB hotspot: listStalledVMInTransitionStateOnUpHost
final List<Long> mostLikelyStoppedVMs = listStalledVMInTransitionStateOnUpHost(hostId, cutTime);
for (final Long vmId : mostLikelyStoppedVMs) {
final VMInstanceVO vm = _vmDao.findById(vmId);
assert vm != null;
final List<VMInstanceVO> hostTransitionVms = _vmDao.listByHostAndState(hostId, State.Starting, State.Stopping, State.Migrating);
final List<VMInstanceVO> mostLikelyStoppedVMs = listStalledVMInTransitionStateOnUpHost(hostTransitionVms, cutTime.getTime());
for (final VMInstanceVO vm : mostLikelyStoppedVMs) {
handlePowerOffReportWithNoPendingJobsOnVM(vm);
}
// FIXME: CPU & DB hotspot: listVMInTransitionStateWithRecentReportOnUpHost
final List<Long> vmsWithRecentReport = listVMInTransitionStateWithRecentReportOnUpHost(hostId, cutTime);
for (final Long vmId : vmsWithRecentReport) {
final VMInstanceVO vm = _vmDao.findById(vmId);
assert vm != null;
final List<VMInstanceVO> vmsWithRecentReport = listVMInTransitionStateWithRecentReportOnUpHost(hostTransitionVms, cutTime.getTime());
for (final VMInstanceVO vm : vmsWithRecentReport) {
if (vm.getPowerState() == PowerState.PowerOn) {
handlePowerOnReportWithNoPendingJobsOnVM(vm);
} else {
handlePowerOffReportWithNoPendingJobsOnVM(vm);
}
}
long elapsed = System.currentTimeMillis() - startTime;
}
private void scanStalledVMInTransitionStateOnDisconnectedHosts() {
@ -5036,72 +5048,26 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
}
}
private List<Long> listStalledVMInTransitionStateOnUpHost(final long hostId, final Date cutTime) {
final String sql = "SELECT i.id\n" +
"FROM vm_instance AS i\n" +
"INNER JOIN host AS h ON i.host_id = h.id\n" +
"WHERE h.status = 'UP'\n" +
" AND h.id = ?\n" +
" AND i.power_state_update_time < ?\n" +
" AND i.state IN ('Starting', 'Stopping', 'Migrating')\n" +
" AND i.id NOT IN (\n" +
" SELECT vm_instance_id\n" +
" FROM vm_work_job AS w\n" +
" INNER JOIN async_job AS j ON w.id = j.id\n" +
" WHERE j.job_status = ?\n" +
" )\n" +
" AND i.removed IS NULL";
final List<Long> l = new ArrayList<>();
try (TransactionLegacy txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB)) {
String cutTimeStr = DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime);
try (PreparedStatement pstmt = txn.prepareAutoCloseStatement(sql)) {
pstmt.setLong(1, hostId);
pstmt.setString(2, cutTimeStr);
pstmt.setInt(3, JobInfo.Status.IN_PROGRESS.ordinal());
final ResultSet rs = pstmt.executeQuery();
while (rs.next()) {
l.add(rs.getLong(1));
}
} catch (SQLException e) {
s_logger.error(String.format("Unable to execute SQL [%s] with params {\"h.id\": %s, \"i.power_state_update_time\": \"%s\"} due to [%s].", sql, hostId, cutTimeStr, e.getMessage()), e);
}
private List<VMInstanceVO> listStalledVMInTransitionStateOnUpHost(
final List<VMInstanceVO> transitioningVms, final long cutTime) {
if (CollectionUtils.isEmpty(transitioningVms)) {
return transitioningVms;
}
return l;
List<Long> vmIdsInProgress = vmIdsInProgressCache.get(0);
return transitioningVms.stream()
.filter(v -> v.getPowerStateUpdateTime().getTime() < cutTime && !vmIdsInProgress.contains(v.getId()))
.collect(Collectors.toList());
}
private List<Long> listVMInTransitionStateWithRecentReportOnUpHost(final long hostId, final Date cutTime) {
final String sql = "SELECT i.id\n" +
"FROM vm_instance AS i\n" +
"INNER JOIN host AS h ON i.host_id = h.id\n" +
"WHERE h.status = 'UP' \n" +
" AND h.id = ?\n" +
" AND i.power_state_update_time > ?\n" +
" AND i.state IN ('Starting', 'Stopping', 'Migrating')\n" +
" AND i.id NOT IN (SELECT vm_instance_id FROM vm_work_job AS w\n" +
" INNER JOIN async_job AS j ON w.id = j.id\n" +
" WHERE j.job_status = ?)\n" +
" AND i.removed IS NULL";
final List<Long> l = new ArrayList<>();
try (TransactionLegacy txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB)) {
String cutTimeStr = DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime);
int jobStatusInProgress = JobInfo.Status.IN_PROGRESS.ordinal();
try (PreparedStatement pstmt = txn.prepareAutoCloseStatement(sql)) {
pstmt.setLong(1, hostId);
pstmt.setString(2, cutTimeStr);
pstmt.setInt(3, jobStatusInProgress);
final ResultSet rs = pstmt.executeQuery();
while (rs.next()) {
l.add(rs.getLong(1));
}
} catch (final SQLException e) {
s_logger.error(String.format("Unable to execute SQL [%s] with params {\"h.id\": %s, \"i.power_state_update_time\": \"%s\", \"j.job_status\": %s} due to [%s].", sql, hostId, cutTimeStr, jobStatusInProgress, e.getMessage()), e);
}
return l;
private List<VMInstanceVO> listVMInTransitionStateWithRecentReportOnUpHost(
final List<VMInstanceVO> transitioningVms, final long cutTime) {
if (CollectionUtils.isEmpty(transitioningVms)) {
return transitioningVms;
}
List<Long> vmIdsInProgress = vmIdsInProgressCache.get(0);
return transitioningVms.stream()
.filter(v -> v.getPowerStateUpdateTime().getTime() > cutTime && !vmIdsInProgress.contains(v.getId()))
.collect(Collectors.toList());
}
private List<Long> listStalledVMInTransitionStateOnDisconnectedHosts(final Date cutTime) {

View File

@ -40,4 +40,5 @@ public interface VmWorkJobDao extends GenericDao<VmWorkJobVO, Long> {
void expungeLeftoverWorkJobs(long msid);
int expungeByVmList(List<Long> vmIds, Long batchSize);
List<Long> listVmIdsWithPendingJob();
}

View File

@ -24,6 +24,7 @@ import java.util.List;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO;
import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO.Step;
import org.apache.cloudstack.jobs.JobInfo;
@ -33,6 +34,8 @@ import org.apache.log4j.Logger;
import com.cloud.utils.DateUtil;
import com.cloud.utils.db.Filter;
import com.cloud.utils.db.GenericDaoBase;
import com.cloud.utils.db.GenericSearchBuilder;
import com.cloud.utils.db.JoinBuilder;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.SearchCriteria.Op;
@ -226,4 +229,17 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen
sc.setParameters("vmIds", vmIds.toArray());
return batchExpunge(sc, batchSize);
}
@Override
public List<Long> listVmIdsWithPendingJob() {
GenericSearchBuilder<VmWorkJobVO, Long> sb = createSearchBuilder(Long.class);
SearchBuilder<AsyncJobVO> asyncJobSearch = _baseJobDao.createSearchBuilder();
asyncJobSearch.and("status", asyncJobSearch.entity().getStatus(), SearchCriteria.Op.EQ);
sb.join("asyncJobSearch", asyncJobSearch, sb.entity().getId(), asyncJobSearch.entity().getId(), JoinBuilder.JoinType.INNER);
sb.and("removed", sb.entity().getRemoved(), Op.NULL);
sb.selectFields(sb.entity().getVmInstanceId());
SearchCriteria<Long> sc = sb.create();
sc.setJoinParameters("asyncJobSearch", "status", JobInfo.Status.IN_PROGRESS);
return customSearch(sc, null);
}
}