CLOUDSTACK-5696: Fix sync issue with out-of-band changes

This commit is contained in:
Kelven Yang 2014-01-14 17:32:52 -08:00
parent 9aaea28d0d
commit 7164fc6e73
6 changed files with 150 additions and 70 deletions

View File

@ -61,6 +61,7 @@ import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl;
import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO;
import org.apache.cloudstack.framework.messagebus.MessageBus;
import org.apache.cloudstack.framework.messagebus.MessageDispatcher;
import org.apache.cloudstack.framework.messagebus.MessageHandler;
import org.apache.cloudstack.jobs.JobInfo;
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
@ -578,6 +579,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
_agentMgr.registerForHostEvents(this, true, true, true);
if (VmJobEnabled.value()) {
_messageBus.subscribe(VirtualMachineManager.Topics.VM_POWER_STATE, MessageDispatcher.getDispatcher(this));
}
return true;
}
@ -3816,7 +3821,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
//
@MessageHandler(topic = Topics.VM_POWER_STATE)
private void HandlePownerStateReport(Object target, String subject, String senderAddress, Object args) {
private void HandlePowerStateReport(String subject, String senderAddress, Object args) {
assert (args != null);
Long vmId = (Long)args;
@ -3836,7 +3841,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
break;
// PowerUnknown shouldn't be reported, it is a derived
// VM power state from host state (host un-reachable
// VM power state from host state (host un-reachable)
case PowerUnknown:
default:
assert (false);
@ -3846,8 +3851,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
s_logger.warn("VM " + vmId + " no longer exists when processing VM state report");
}
} else {
// TODO, do job wake-up signalling, since currently async job wake-up is not in use
// we will skip it for nows
// reset VM power state tracking so that we won't lost signal when VM has
// been translated to
_vmDao.resetVmPowerStateTracking(vmId);
}
}
@ -3924,6 +3930,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
switch (vm.getState()) {
case Starting:
case Stopping:
case Running:
case Stopped:
case Migrating:
try {
@ -3937,7 +3944,6 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
// TODO: we need to forcely release all resource allocation
break;
case Running:
case Destroyed:
case Expunging:
break;

View File

@ -28,4 +28,6 @@ public interface VirtualMachinePowerStateSync {
// to adapt legacy ping report
void processHostVmStatePingReport(long hostId, Map<String, HostVmStateReportEntry> report);
Map<Long, VirtualMachine.PowerState> convertVmStateReport(Map<String, HostVmStateReportEntry> states);
}

View File

@ -32,12 +32,9 @@ import com.cloud.vm.dao.VMInstanceDao;
public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStateSync {
private static final Logger s_logger = Logger.getLogger(VirtualMachinePowerStateSyncImpl.class);
@Inject
MessageBus _messageBus;
@Inject
VMInstanceDao _instanceDao;
@Inject
VirtualMachineManager _vmMgr;
@Inject MessageBus _messageBus;
@Inject VMInstanceDao _instanceDao;
@Inject VirtualMachineManager _vmMgr;
public VirtualMachinePowerStateSyncImpl() {
}
@ -53,7 +50,7 @@ public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStat
if (s_logger.isDebugEnabled())
s_logger.debug("Process host VM state report from ping process. host: " + hostId);
Map<Long, VirtualMachine.PowerState> translatedInfo = convertToInfos(report);
Map<Long, VirtualMachine.PowerState> translatedInfo = convertVmStateReport(report);
processReport(hostId, translatedInfo);
}
@ -62,7 +59,7 @@ public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStat
if (s_logger.isDebugEnabled())
s_logger.debug("Process host VM state report from ping process. host: " + hostId);
Map<Long, VirtualMachine.PowerState> translatedInfo = convertToInfos(report);
Map<Long, VirtualMachine.PowerState> translatedInfo = convertVmStateReport(report);
processReport(hostId, translatedInfo);
}
@ -74,16 +71,19 @@ public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStat
s_logger.debug("VM state report. host: " + hostId + ", vm id: " + entry.getKey() + ", power state: " + entry.getValue());
if (_instanceDao.updatePowerState(entry.getKey(), hostId, entry.getValue())) {
if (s_logger.isDebugEnabled())
s_logger.debug("VM state report is updated. host: " + hostId + ", vm id: " + entry.getKey() + ", power state: " + entry.getValue());
_messageBus.publish(null, VirtualMachineManager.Topics.VM_POWER_STATE, PublishScope.GLOBAL, entry.getKey());
} else {
if (s_logger.isDebugEnabled())
s_logger.debug("VM power state does not change, skip DB writing. vm id: " + entry.getKey());
}
}
}
private Map<Long, VirtualMachine.PowerState> convertToInfos(Map<String, HostVmStateReportEntry> states) {
@Override
public Map<Long, VirtualMachine.PowerState> convertVmStateReport(Map<String, HostVmStateReportEntry> states) {
final HashMap<Long, VirtualMachine.PowerState> map = new HashMap<Long, VirtualMachine.PowerState>();
if (states == null) {
return map;
@ -93,7 +93,6 @@ public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStat
VMInstanceVO vm = findVM(entry.getKey());
if (vm != null) {
map.put(vm.getId(), entry.getValue().getState());
break;
} else {
s_logger.info("Unable to find matched VM in CloudStack DB. name: " + entry.getKey());
}

View File

@ -69,6 +69,8 @@ public interface VMInstanceDao extends GenericDao<VMInstanceVO, Long>, StateDao<
List<VMInstanceVO> findVMInTransition(Date time, State... states);
List<VMInstanceVO> listByHostAndState(long hostId, State... states);
List<VMInstanceVO> listByTypes(VirtualMachine.Type... types);
VMInstanceVO findByIdTypes(long id, VirtualMachine.Type... types);

View File

@ -48,7 +48,11 @@ import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.SearchCriteria.Func;
import com.cloud.utils.db.SearchCriteria.Op;
import com.cloud.utils.db.Transaction;
import com.cloud.utils.db.TransactionCallback;
import com.cloud.utils.db.TransactionCallbackNoReturn;
import com.cloud.utils.db.TransactionLegacy;
import com.cloud.utils.db.TransactionStatus;
import com.cloud.utils.db.UpdateBuilder;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.vm.NicVO;
@ -76,6 +80,7 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem
protected SearchBuilder<VMInstanceVO> TypesSearch;
protected SearchBuilder<VMInstanceVO> IdTypesSearch;
protected SearchBuilder<VMInstanceVO> HostIdTypesSearch;
protected SearchBuilder<VMInstanceVO> HostIdStatesSearch;
protected SearchBuilder<VMInstanceVO> HostIdUpTypesSearch;
protected SearchBuilder<VMInstanceVO> HostUpSearch;
protected SearchBuilder<VMInstanceVO> InstanceNameSearch;
@ -182,6 +187,11 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem
HostIdTypesSearch.and("types", HostIdTypesSearch.entity().getType(), Op.IN);
HostIdTypesSearch.done();
HostIdStatesSearch = createSearchBuilder();
HostIdStatesSearch.and("hostId", HostIdStatesSearch.entity().getHostId(), Op.EQ);
HostIdStatesSearch.and("states", HostIdStatesSearch.entity().getState(), Op.IN);
HostIdStatesSearch.done();
HostIdUpTypesSearch = createSearchBuilder();
HostIdUpTypesSearch.and("hostid", HostIdUpTypesSearch.entity().getHostId(), Op.EQ);
HostIdUpTypesSearch.and("types", HostIdUpTypesSearch.entity().getType(), Op.IN);
@ -334,6 +344,15 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem
return listBy(sc);
}
@Override
public List<VMInstanceVO> listByHostAndState(long hostId, State... states) {
SearchCriteria<VMInstanceVO> sc = HostIdStatesSearch.create();
sc.setParameters("hostId", hostId);
sc.setParameters("states", (Object[])states);
return listBy(sc);
}
@Override
public List<VMInstanceVO> listUpByHostIdTypes(long hostid, Type... types) {
SearchCriteria<VMInstanceVO> sc = HostIdUpTypesSearch.create();
@ -702,60 +721,66 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem
}
@Override
public boolean updatePowerState(long instanceId, long powerHostId, VirtualMachine.PowerState powerState) {
boolean needToUpdate = false;
TransactionLegacy txn = TransactionLegacy.currentTxn();
txn.start();
public boolean updatePowerState(final long instanceId, final long powerHostId, final VirtualMachine.PowerState powerState) {
return Transaction.execute(new TransactionCallback<Boolean>() {
@Override
public Boolean doInTransaction(TransactionStatus status) {
boolean needToUpdate = false;
VMInstanceVO instance = findById(instanceId);
if (instance != null) {
Long savedPowerHostId = instance.getPowerHostId();
if (instance.getPowerState() != powerState || savedPowerHostId == null
|| savedPowerHostId.longValue() != powerHostId) {
instance.setPowerState(powerState);
instance.setPowerHostId(powerHostId);
instance.setPowerStateUpdateCount(1);
instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
needToUpdate = true;
update(instanceId, instance);
} else {
// to reduce DB updates, consecutive same state update for more than 3 times
if (instance.getPowerStateUpdateCount() < MAX_CONSECUTIVE_SAME_STATE_UPDATE_COUNT) {
instance.setPowerStateUpdateCount(instance.getPowerStateUpdateCount() + 1);
instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
needToUpdate = true;
update(instanceId, instance);
}
}
}
return needToUpdate;
}
});
}
VMInstanceVO instance = findById(instanceId);
if (instance != null) {
Long savedPowerHostId = instance.getPowerHostId();
if (instance.getPowerState() != powerState || savedPowerHostId == null || savedPowerHostId.longValue() != powerHostId) {
instance.setPowerState(powerState);
instance.setPowerHostId(powerHostId);
instance.setPowerStateUpdateCount(1);
instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
needToUpdate = true;
update(instanceId, instance);
} else {
// to reduce DB updates, consecutive same state update for more than 3 times
if (instance.getPowerStateUpdateCount() < MAX_CONSECUTIVE_SAME_STATE_UPDATE_COUNT) {
instance.setPowerStateUpdateCount(instance.getPowerStateUpdateCount() + 1);
@Override
public void resetVmPowerStateTracking(final long instanceId) {
Transaction.execute(new TransactionCallbackNoReturn() {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
VMInstanceVO instance = findById(instanceId);
if (instance != null) {
instance.setPowerStateUpdateCount(0);
instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
needToUpdate = true;
update(instanceId, instance);
}
}
}
txn.commit();
return needToUpdate;
});
}
@Override
public void resetVmPowerStateTracking(long instanceId) {
TransactionLegacy txn = TransactionLegacy.currentTxn();
txn.start();
VMInstanceVO instance = findById(instanceId);
if (instance != null) {
instance.setPowerStateUpdateCount(0);
instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
update(instanceId, instance);
}
@Override @DB
public void resetHostPowerStateTracking(final long hostId) {
Transaction.execute(new TransactionCallbackNoReturn() {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
SearchCriteria<VMInstanceVO> sc = createSearchCriteria();
sc.addAnd("powerHostId", SearchCriteria.Op.EQ, hostId);
txn.commit();
}
VMInstanceVO instance = createForUpdate();
instance.setPowerStateUpdateCount(0);
instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
@Override
@DB
public void resetHostPowerStateTracking(long hostId) {
SearchCriteria<VMInstanceVO> sc = createSearchCriteria();
sc.addAnd("powerHostId", SearchCriteria.Op.EQ, hostId);
VMInstanceVO instance = this.createForUpdate();
instance.setPowerStateUpdateCount(0);
instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
this.update(instance, sc);
update(instance, sc);
}
});
}
}

View File

@ -20,17 +20,24 @@ package org.apache.cloudstack.framework.messagebus;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
public class MessageDispatcher implements MessageSubscriber {
private static Map<Class<?>, Method> s_handlerCache = new HashMap<Class<?>, Method>();
private static final Logger s_logger = Logger.getLogger(MessageDispatcher.class);
private static Map<Class<?>, List<Method>> s_handlerCache = new HashMap<Class<?>, List<Method>>();
private static Map<Object, MessageDispatcher> s_targetMap = new HashMap<Object, MessageDispatcher>();
private Object _targetObject;
public MessageDispatcher(Object targetObject) {
_targetObject = targetObject;
buildHandlerMethodCache(targetObject.getClass());
}
@Override
@ -67,10 +74,13 @@ public class MessageDispatcher implements MessageSubscriber {
try {
handler.invoke(target, subject, senderAddress, args);
} catch (IllegalArgumentException e) {
s_logger.error("Unexpected exception when calling " + target.getClass().getName() + "." + handler.getName(), e);
throw new RuntimeException("IllegalArgumentException when invoking event handler for subject: " + subject);
} catch (IllegalAccessException e) {
s_logger.error("Unexpected exception when calling " + target.getClass().getName() + "." + handler.getName(), e);
throw new RuntimeException("IllegalAccessException when invoking event handler for subject: " + subject);
} catch (InvocationTargetException e) {
s_logger.error("Unexpected exception when calling " + target.getClass().getName() + "." + handler.getName(), e);
throw new RuntimeException("InvocationTargetException when invoking event handler for subject: " + subject);
}
@ -79,18 +89,18 @@ public class MessageDispatcher implements MessageSubscriber {
public static Method resolveHandler(Class<?> handlerClz, String subject) {
synchronized (s_handlerCache) {
Method handler = s_handlerCache.get(handlerClz);
if (handler != null)
return handler;
List<Method> handlerList = s_handlerCache.get(handlerClz);
if (handlerList != null) {
for (Method method : handlerList) {
MessageHandler annotation = method.getAnnotation(MessageHandler.class);
assert (annotation != null);
for (Method method : handlerClz.getMethods()) {
MessageHandler annotation = method.getAnnotation(MessageHandler.class);
if (annotation != null) {
if (match(annotation.topic(), subject)) {
s_handlerCache.put(handlerClz, method);
return method;
}
}
} else {
s_logger.error("Handler class " + handlerClz.getName() + " is not registered");
}
}
@ -100,4 +110,40 @@ public class MessageDispatcher implements MessageSubscriber {
private static boolean match(String expression, String param) {
return param.matches(expression);
}
private void buildHandlerMethodCache(Class<?> handlerClz) {
if (s_logger.isInfoEnabled())
s_logger.info("Build message handler cache for " + handlerClz.getName());
synchronized (s_handlerCache) {
List<Method> handlerList = s_handlerCache.get(handlerClz);
if (handlerList == null) {
handlerList = new ArrayList<Method>();
s_handlerCache.put(handlerClz, handlerList);
Class<?> clz = handlerClz;
while (clz != null && clz != Object.class) {
for (Method method : clz.getDeclaredMethods()) {
MessageHandler annotation = method.getAnnotation(MessageHandler.class);
if (annotation != null) {
// allow private member access via reflection
method.setAccessible(true);
handlerList.add(method);
if (s_logger.isInfoEnabled())
s_logger.info("Add message handler " + handlerClz.getName() + "." + method.getName() + " to cache");
}
}
clz = clz.getSuperclass();
}
} else {
if (s_logger.isInfoEnabled())
s_logger.info("Message handler for class " + handlerClz.getName() + " is already in cache");
}
}
if (s_logger.isInfoEnabled())
s_logger.info("Done building message handler cache for " + handlerClz.getName());
}
}