changes in vm powerstate sync

Signed-off-by: Abhishek Kumar <abhishek.mrt22@gmail.com>
This commit is contained in:
Abhishek Kumar 2024-10-07 16:25:14 +05:30
parent a9661f4587
commit c859f8ba80
3 changed files with 193 additions and 106 deletions

View File

@ -16,23 +16,23 @@
// under the License.
package com.cloud.vm;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.inject.Inject;
import org.apache.cloudstack.framework.messagebus.MessageBus;
import org.apache.cloudstack.framework.messagebus.PublishScope;
import org.apache.commons.collections.MapUtils;
import org.apache.log4j.Logger;
import com.cloud.agent.api.HostVmStateReportEntry;
import com.cloud.configuration.ManagementServiceConfiguration;
import com.cloud.utils.DateUtil;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.vm.dao.VMInstanceDao;
public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStateSync {
@ -68,106 +68,108 @@ public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStat
processReport(hostId, translatedInfo, force);
}
private void processReport(long hostId, Map<Long, VirtualMachine.PowerState> translatedInfo, boolean force) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Process VM state report. host: " + hostId + ", number of records in report: " + translatedInfo.size());
private void updateAndPublishVmPowerStates(long hostId, Map<Long, VirtualMachine.PowerState> instancePowerStates,
Date updateTime) {
if (instancePowerStates.isEmpty()) {
return;
}
for (Map.Entry<Long, VirtualMachine.PowerState> entry : translatedInfo.entrySet()) {
if (s_logger.isDebugEnabled())
s_logger.debug("VM state report. host: " + hostId + ", vm id: " + entry.getKey() + ", power state: " + entry.getValue());
if (_instanceDao.updatePowerState(entry.getKey(), hostId, entry.getValue(), DateUtil.currentGMTTime())) {
if (s_logger.isInfoEnabled()) {
s_logger.debug("VM state report is updated. host: " + hostId + ", vm id: " + entry.getKey() + ", power state: " + entry.getValue());
Set<Long> vmIds = instancePowerStates.keySet();
Map<Long, VirtualMachine.PowerState> notUpdated = _instanceDao.updatePowerState(instancePowerStates, hostId,
updateTime);
if (notUpdated.size() <= vmIds.size()) {
for (Long vmId : vmIds) {
if (!notUpdated.isEmpty() && !notUpdated.containsKey(vmId)) {
if (s_logger.isDebugEnabled()) {
s_logger.debug(String.format("VM state report is updated. host: %d, vm id: %d, power state: %s",
hostId, vmId, instancePowerStates.get(vmId)));
}
_messageBus.publish(null, VirtualMachineManager.Topics.VM_POWER_STATE,
PublishScope.GLOBAL, vmId);
continue;
}
_messageBus.publish(null, VirtualMachineManager.Topics.VM_POWER_STATE, PublishScope.GLOBAL, entry.getKey());
} else {
if (s_logger.isTraceEnabled()) {
s_logger.trace("VM power state does not change, skip DB writing. vm id: " + entry.getKey());
s_logger.trace(String.format("VM power state does not change, skip DB writing. vm id: %d", vmId));
}
}
}
}
private void processMissingVmReport(long hostId, Set<Long> vmIds, boolean force) {
// any state outdates should be checked against the time before this list was retrieved
Date startTime = DateUtil.currentGMTTime();
// for all running/stopping VMs, we provide monitoring of missing report
// FIXME: CPU & DB hotspot findByHostInStatesExcluding
List<VMInstanceVO> vmsThatAreMissingReport = _instanceDao.findByHostInStatesExcluding(hostId, new ArrayList<>(translatedInfo.keySet()),
List<VMInstanceVO> vmsThatAreMissingReport = _instanceDao.findByHostInStatesExcluding(hostId, vmIds,
VirtualMachine.State.Running, VirtualMachine.State.Stopping, VirtualMachine.State.Starting);
// here we need to be wary of out of band migration as opposed to other, more unexpected state changes
if (vmsThatAreMissingReport.size() > 0) {
Date currentTime = DateUtil.currentGMTTime();
if (s_logger.isDebugEnabled()) {
s_logger.debug("Run missing VM report. current time: " + currentTime.getTime());
}
if (vmsThatAreMissingReport.isEmpty()) {
return;
}
Date currentTime = DateUtil.currentGMTTime();
if (s_logger.isDebugEnabled()) {
s_logger.debug("Run missing VM report. current time: " + currentTime.getTime());
}
if (!force) {
List<Long> outdatedVms = vmsThatAreMissingReport.stream()
.filter(v -> !_instanceDao.isPowerStateUpToDate(v))
.map(VMInstanceVO::getId)
.collect(Collectors.toList());
_instanceDao.resetVmPowerStateTracking(outdatedVms);
vmsThatAreMissingReport = vmsThatAreMissingReport.stream()
.filter(v -> !outdatedVms.contains(v.getId()))
.collect(Collectors.toList());
}
// 2 times of sync-update interval for graceful period
long milliSecondsGracefullPeriod = mgmtServiceConf.getPingInterval() * 2000L;
for (VMInstanceVO instance : vmsThatAreMissingReport) {
// Make sure powerState is up to date for missing VMs
try {
if (!force && !_instanceDao.isPowerStateUpToDate(instance.getId())) {
s_logger.warn("Detected missing VM but power state is outdated, wait for another process report run for VM id: " + instance.getId());
_instanceDao.resetVmPowerStateTracking(instance.getId());
continue;
}
} catch (CloudRuntimeException e) {
s_logger.warn("Checked for missing powerstate of a none existing vm", e);
continue;
}
Date vmStateUpdateTime = instance.getPowerStateUpdateTime();
// 2 times of sync-update interval for graceful period
long milliSecondsGracefulPeriod = mgmtServiceConf.getPingInterval() * 2000L;
Map<Long, VirtualMachine.PowerState> instancePowerStates = new HashMap<>();
for (VMInstanceVO instance : vmsThatAreMissingReport) {
Date vmStateUpdateTime = instance.getPowerStateUpdateTime();
if (vmStateUpdateTime == null) {
s_logger.warn("VM power state update time is null, falling back to update time for vm id: " + instance.getId());
vmStateUpdateTime = instance.getUpdateTime();
if (vmStateUpdateTime == null) {
s_logger.warn("VM power state update time is null, falling back to update time for vm id: " + instance.getId());
vmStateUpdateTime = instance.getUpdateTime();
if (vmStateUpdateTime == null) {
s_logger.warn("VM update time is null, falling back to creation time for vm id: " + instance.getId());
vmStateUpdateTime = instance.getCreated();
}
s_logger.warn("VM update time is null, falling back to creation time for vm id: " + instance.getId());
vmStateUpdateTime = instance.getCreated();
}
if (s_logger.isInfoEnabled()) {
String lastTime = new SimpleDateFormat("yyyy/MM/dd'T'HH:mm:ss.SSS'Z'").format(vmStateUpdateTime);
s_logger.debug(
String.format("Detected missing VM. host: %d, vm id: %d(%s), power state: %s, last state update: %s"
, hostId
, instance.getId()
, instance.getUuid()
, VirtualMachine.PowerState.PowerReportMissing
, lastTime));
}
long milliSecondsSinceLastStateUpdate = currentTime.getTime() - vmStateUpdateTime.getTime();
if (force || milliSecondsSinceLastStateUpdate > milliSecondsGracefullPeriod) {
s_logger.debug("vm id: " + instance.getId() + " - time since last state update(" + milliSecondsSinceLastStateUpdate + "ms) has passed graceful period");
// this is were a race condition might have happened if we don't re-fetch the instance;
// between the startime of this job and the currentTime of this missing-branch
// an update might have occurred that we should not override in case of out of band migration
if (_instanceDao.updatePowerState(instance.getId(), hostId, VirtualMachine.PowerState.PowerReportMissing, startTime)) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("VM state report is updated. host: " + hostId + ", vm id: " + instance.getId() + ", power state: PowerReportMissing ");
}
_messageBus.publish(null, VirtualMachineManager.Topics.VM_POWER_STATE, PublishScope.GLOBAL, instance.getId());
} else {
if (s_logger.isDebugEnabled()) {
s_logger.debug("VM power state does not change, skip DB writing. vm id: " + instance.getId());
}
}
} else {
s_logger.debug("vm id: " + instance.getId() + " - time since last state update(" + milliSecondsSinceLastStateUpdate + "ms) has not passed graceful period yet");
}
if (s_logger.isDebugEnabled()) {
s_logger.debug(
String.format("Detected missing VM. host: %d, vm id: %d(%s), power state: %s, last state update: %s"
, hostId
, instance.getId()
, instance.getUuid()
, VirtualMachine.PowerState.PowerReportMissing
, DateUtil.getOutputString(vmStateUpdateTime)));
}
long milliSecondsSinceLastStateUpdate = currentTime.getTime() - vmStateUpdateTime.getTime();
if (force || (milliSecondsSinceLastStateUpdate > milliSecondsGracefulPeriod)) {
if (s_logger.isDebugEnabled()) {
s_logger.debug(String.format("vm id: %d - time since last state update(%d ms) has passed graceful period",
instance.getId(), milliSecondsSinceLastStateUpdate));
}
// this is where a race condition might have happened if we don't re-fetch the instance;
// between the startime of this job and the currentTime of this missing-branch
// an update might have occurred that we should not override in case of out of band migration
instancePowerStates.put(instance.getId(), VirtualMachine.PowerState.PowerReportMissing);
} else {
s_logger.debug(String.format("vm id: %d - time since last state update(%d ms) has not passed graceful period yet",
instance.getId(), milliSecondsSinceLastStateUpdate));
}
}
updateAndPublishVmPowerStates(hostId, instancePowerStates, startTime);
}
private void processReport(long hostId, Map<Long, VirtualMachine.PowerState> translatedInfo, boolean force) {
if (s_logger.isDebugEnabled()) {
s_logger.debug(String.format("Process VM state report. Host: %d, number of records in report: %d. VMs: [%s]",
hostId,
translatedInfo.size(),
translatedInfo.entrySet().stream().map(entry -> entry.getKey() + ":" + entry.getValue())
.collect(Collectors.joining(", ")) + "]"));
}
updateAndPublishVmPowerStates(hostId, translatedInfo, DateUtil.currentGMTTime());
processMissingVmReport(hostId, translatedInfo.keySet(), force);
if (s_logger.isDebugEnabled())
s_logger.debug("Done with process of VM state report. host: " + hostId);
@ -175,24 +177,19 @@ public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStat
@Override
public Map<Long, VirtualMachine.PowerState> convertVmStateReport(Map<String, HostVmStateReportEntry> states) {
final HashMap<Long, VirtualMachine.PowerState> map = new HashMap<Long, VirtualMachine.PowerState>();
if (states == null) {
final HashMap<Long, VirtualMachine.PowerState> map = new HashMap<>();
if (MapUtils.isEmpty(states)) {
return map;
}
Map<String, Long> nameIdMap = _instanceDao.getNameIdMapForVmInstanceNames(states.keySet());
for (Map.Entry<String, HostVmStateReportEntry> entry : states.entrySet()) {
VMInstanceVO vm = findVM(entry.getKey());
if (vm != null) {
map.put(vm.getId(), entry.getValue().getState());
Long id = nameIdMap.get(entry.getKey());
if (id != null) {
map.put(id, entry.getValue().getState());
} else {
s_logger.debug("Unable to find matched VM in CloudStack DB. name: " + entry.getKey());
}
}
return map;
}
private VMInstanceVO findVM(String vmName) {
return _instanceDao.findVMByInstanceName(vmName);
}
}

View File

@ -21,7 +21,6 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.cloud.hypervisor.Hypervisor;
import com.cloud.utils.Pair;
@ -146,7 +145,7 @@ public interface VMInstanceDao extends GenericDao<VMInstanceVO, Long>, StateDao<
*/
List<String> listDistinctHostNames(long networkId, VirtualMachine.Type... types);
List<VMInstanceVO> findByHostInStatesExcluding(Long hostId, Set<Long> excludingIds, State... states);
List<VMInstanceVO> findByHostInStatesExcluding(Long hostId, Collection<Long> excludingIds, State... states);
List<VMInstanceVO> findByHostInStates(Long hostId, State... states);
@ -154,15 +153,20 @@ public interface VMInstanceDao extends GenericDao<VMInstanceVO, Long>, StateDao<
boolean updatePowerState(long instanceId, long powerHostId, VirtualMachine.PowerState powerState, Date wisdomEra);
Map<Long, VirtualMachine.PowerState> updatePowerState(Map<Long, VirtualMachine.PowerState> instancePowerStates,
long powerHostId, Date wisdomEra);
void resetVmPowerStateTracking(long instanceId);
void resetVmPowerStateTracking(List<Long> instanceId);
void resetHostPowerStateTracking(long hostId);
HashMap<String, Long> countVgpuVMs(Long dcId, Long podId, Long clusterId);
VMInstanceVO findVMByHostNameInZone(String hostName, long zoneId);
boolean isPowerStateUpToDate(long instanceId);
boolean isPowerStateUpToDate(VMInstanceVO instance);
List<VMInstanceVO> listNonMigratingVmsByHostEqualsLastHost(long hostId);

View File

@ -25,7 +25,6 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
@ -103,6 +102,7 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem
protected SearchBuilder<VMInstanceVO> NotMigratingSearch;
protected SearchBuilder<VMInstanceVO> BackupSearch;
protected SearchBuilder<VMInstanceVO> LastHostAndStatesSearch;
protected SearchBuilder<VMInstanceVO> IdsPowerStateSelectSearch;
@Inject
ResourceTagDao _tagsDao;
@ -322,6 +322,15 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem
LastHostAndStatesSearch.and("states", LastHostAndStatesSearch.entity().getState(), Op.IN);
LastHostAndStatesSearch.done();
IdsPowerStateSelectSearch = createSearchBuilder();
IdsPowerStateSelectSearch.and("id", IdsPowerStateSelectSearch.entity().getId(), Op.IN);
IdsPowerStateSelectSearch.selectFields(IdsPowerStateSelectSearch.entity().getId(),
IdsPowerStateSelectSearch.entity().getPowerHostId(),
IdsPowerStateSelectSearch.entity().getPowerState(),
IdsPowerStateSelectSearch.entity().getPowerStateUpdateCount(),
IdsPowerStateSelectSearch.entity().getPowerStateUpdateTime());
IdsPowerStateSelectSearch.done();
}
@Override
@ -896,7 +905,7 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem
}
@Override
public List<VMInstanceVO> findByHostInStatesExcluding(Long hostId, Set<Long> excludingIds, State... states) {
public List<VMInstanceVO> findByHostInStatesExcluding(Long hostId, Collection<Long> excludingIds, State... states) {
SearchCriteria<VMInstanceVO> sc = HostAndStateSearch.create();
sc.setParameters("host", hostId);
if (excludingIds != null && !excludingIds.isEmpty()) {
@ -921,6 +930,28 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem
return listBy(sc);
}
protected List<VMInstanceVO> listSelectPowerStateByIds(final List<Long> ids) {
SearchCriteria<VMInstanceVO> sc = IdsPowerStateSelectSearch.create();
sc.setParameters("id", ids.toArray());
return customSearch(sc, null);
}
protected Integer getPowerUpdateCount(final VMInstanceVO instance, final long powerHostId, final VirtualMachine.PowerState powerState, Date wisdomEra) {
if (instance.getPowerStateUpdateTime() == null || instance.getPowerStateUpdateTime().before(wisdomEra)) {
Long savedPowerHostId = instance.getPowerHostId();
boolean isStateMismatch = instance.getPowerState() != powerState
|| savedPowerHostId == null
|| !savedPowerHostId.equals(powerHostId)
|| !isPowerStateInSyncWithInstanceState(powerState, powerHostId, instance);
if (isStateMismatch) {
return 1;
} else if (instance.getPowerStateUpdateCount() < MAX_CONSECUTIVE_SAME_STATE_UPDATE_COUNT) {
return instance.getPowerStateUpdateCount() + 1;
}
}
return null;
}
@Override
public boolean updatePowerState(final long instanceId, final long powerHostId, final VirtualMachine.PowerState powerState, Date wisdomEra) {
return Transaction.execute((TransactionCallback<Boolean>) status -> {
@ -954,6 +985,49 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem
});
}
@Override
public Map<Long, VirtualMachine.PowerState> updatePowerState(Map<Long, VirtualMachine.PowerState> instancePowerStates, long powerHostId, Date wisdomEra) {
Map<Long, VirtualMachine.PowerState> notUpdated = new HashMap<>();
List<VMInstanceVO> instances = listSelectPowerStateByIds(new ArrayList<>(instancePowerStates.keySet()));
Map<Long, Integer> updateCounts = new HashMap<>();
for (VMInstanceVO instance : instances) {
VirtualMachine.PowerState powerState = instancePowerStates.get(instance.getId());
Integer count = getPowerUpdateCount(instance, powerHostId, powerState, wisdomEra);
if (count != null) {
updateCounts.put(instance.getId(), count);
} else {
notUpdated.put(instance.getId(), powerState);
}
}
if (updateCounts.isEmpty()) {
return notUpdated;
}
StringBuilder sql = new StringBuilder("UPDATE vm_instance SET " +
"power_host = ?, power_state_update_time = now(), power_state = CASE ");
updateCounts.keySet().forEach(key -> {
sql.append("WHEN id = ").append(key).append(" THEN '").append(instancePowerStates.get(key)).append("' ");
});
sql.append("END, power_state_update_count = CASE ");
StringBuilder idList = new StringBuilder();
updateCounts.forEach((key, value) -> {
sql.append("WHEN id = ").append(key).append(" THEN ").append(value).append(" ");
idList.append(key).append(",");
});
idList.setLength(idList.length() - 1);
sql.append("END WHERE id IN (").append(idList).append(")");
try (TransactionLegacy txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB)) {
try (PreparedStatement pstmt = txn.prepareAutoCloseStatement(sql.toString())) {
pstmt.setLong(1, powerHostId);
pstmt.executeUpdate();
} catch (SQLException e) {
s_logger.error(String.format("Unable to execute update power states SQL from VMs %s due to: %s",
idList, e.getMessage()), e);
return instancePowerStates;
}
}
return notUpdated;
}
private boolean isPowerStateInSyncWithInstanceState(final VirtualMachine.PowerState powerState, final long powerHostId, final VMInstanceVO instance) {
State instanceState = instance.getState();
if ((powerState == VirtualMachine.PowerState.PowerOff && instanceState == State.Running)
@ -966,11 +1040,7 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem
}
@Override
public boolean isPowerStateUpToDate(final long instanceId) {
VMInstanceVO instance = findById(instanceId);
if(instance == null) {
throw new CloudRuntimeException("checking power state update count on non existing instance " + instanceId);
}
public boolean isPowerStateUpToDate(final VMInstanceVO instance) {
return instance.getPowerStateUpdateCount() < MAX_CONSECUTIVE_SAME_STATE_UPDATE_COUNT;
}
@ -989,6 +1059,22 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem
});
}
@Override
public void resetVmPowerStateTracking(List<Long> instanceIds) {
Transaction.execute(new TransactionCallbackNoReturn() {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
SearchCriteria<VMInstanceVO> sc = IdsPowerStateSelectSearch.create();
sc.setParameters("id", instanceIds.toArray());
VMInstanceVO vm = createForUpdate();
vm.setPowerStateUpdateCount(0);
vm.setPowerStateUpdateTime(DateUtil.currentGMTTime());
UpdateBuilder ub = getUpdateBuilder(vm);
update(ub, sc, null);
}
});
}
@Override @DB
public void resetHostPowerStateTracking(final long hostId) {
Transaction.execute(new TransactionCallbackNoReturn() {