bug 11701: cluster sync only for Xen, old way of syncing for other Hypervisor

This commit is contained in:
Abhinandan Prateek 2011-10-28 11:45:15 +05:30
parent cc6bf73760
commit b81c380f86
1 changed files with 200 additions and 10 deletions

View File

@ -1516,7 +1516,90 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene
public Command cleanup(String vmName) {
return new StopCommand(vmName);
}
public Commands fullHostSync(final long hostId, StartupRoutingCommand startup) {
Commands commands = new Commands(OnError.Continue);
Map<Long, AgentVmInfo> infos = convertToInfos(startup);
final List<? extends VMInstanceVO> vms = _vmDao.listByHostId(hostId);
s_logger.debug("Found " + vms.size() + " VMs for host " + hostId);
for (VMInstanceVO vm : vms) {
AgentVmInfo info = infos.remove(vm.getId());
VMInstanceVO castedVm = null;
if (info == null) {
info = new AgentVmInfo(vm.getInstanceName(), getVmGuru(vm), vm, State.Stopped);
castedVm = info.guru.findById(vm.getId());
} else {
castedVm = info.vm;
}
HypervisorGuru hvGuru = _hvGuruMgr.getGuru(castedVm.getHypervisorType());
Command command = compareState(hostId, castedVm, info, true, hvGuru.trackVmHostChange());
if (command != null) {
commands.addCommand(command);
}
}
for (final AgentVmInfo left : infos.values()) {
boolean found = false;
for (VirtualMachineGuru<? extends VMInstanceVO> vmGuru : _vmGurus.values()) {
VMInstanceVO vm = vmGuru.findByName(left.name);
if (vm != null) {
found = true;
HypervisorGuru hvGuru = _hvGuruMgr.getGuru(vm.getHypervisorType());
if(hvGuru.trackVmHostChange()) {
Command command = compareState(hostId, vm, left, true, true);
if (command != null) {
commands.addCommand(command);
}
} else {
s_logger.warn("Stopping a VM, VM " + left.name + " migrate from Host " + vm.getHostId() + " to Host " + hostId );
commands.addCommand(cleanup(left.name));
}
break;
}
}
if ( ! found ) {
s_logger.warn("Stopping a VM that we have no record of: " + left.name);
commands.addCommand(cleanup(left.name));
}
}
return commands;
}
public Commands deltaHostSync(long hostId, Map<String, State> newStates) {
Map<Long, AgentVmInfo> states = convertDeltaToInfos(newStates);
Commands commands = new Commands(OnError.Continue);
for (Map.Entry<Long, AgentVmInfo> entry : states.entrySet()) {
AgentVmInfo info = entry.getValue();
VMInstanceVO vm = info.vm;
Command command = null;
if (vm != null) {
HypervisorGuru hvGuru = _hvGuruMgr.getGuru(vm.getHypervisorType());
command = compareState(hostId, vm, info, false, hvGuru.trackVmHostChange());
} else {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Cleaning up a VM that is no longer found: " + info.name);
}
command = cleanup(info.name);
}
if (command != null) {
commands.addCommand(command);
}
}
return commands;
}
public Commands deltaSync(Map<String, Pair<String, State>> newStates) {
Map<Long, AgentVmInfo> states = convertToInfos(newStates);
Commands commands = new Commands(OnError.Continue);
@ -1548,6 +1631,8 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene
return commands;
}
public Commands fullSync(final long clusterId, Map<String, Pair<String, State>> newStates) {
Commands commands = new Commands(OnError.Continue);
@ -1586,11 +1671,9 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene
protected Map<Long, AgentVmInfo> convertToInfos(final Map<String, Pair<String, State>> newStates) {
final HashMap<Long, AgentVmInfo> map = new HashMap<Long, AgentVmInfo>();
if (newStates == null) {
return map;
}
Collection<VirtualMachineGuru<? extends VMInstanceVO>> vmGurus = _vmGurus.values();
for (Map.Entry<String, Pair<String, State>> entry : newStates.entrySet()) {
@ -1610,6 +1693,66 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene
}
return map;
}
protected Map<Long, AgentVmInfo> convertToInfos(StartupRoutingCommand cmd) {
final Map<String, VmState> states = cmd.getVmStates();
final HashMap<Long, AgentVmInfo> map = new HashMap<Long, AgentVmInfo>();
if (states == null) {
return map;
}
Collection<VirtualMachineGuru<? extends VMInstanceVO>> vmGurus = _vmGurus.values();
for (Map.Entry<String, VmState> entry : states.entrySet()) {
for (VirtualMachineGuru<? extends VMInstanceVO> vmGuru : vmGurus) {
String name = entry.getKey();
VMInstanceVO vm = vmGuru.findByName(name);
if (vm != null) {
map.put(vm.getId(), new AgentVmInfo(entry.getKey(), vmGuru, vm, entry.getValue().getState(), entry.getValue().getHost() ));
break;
}
Long id = vmGuru.convertToId(name);
if (id != null) {
map.put(id, new AgentVmInfo(entry.getKey(), vmGuru, null,entry.getValue().getState(), entry.getValue().getHost() ));
break;
}
}
}
return map;
}
protected Map<Long, AgentVmInfo> convertDeltaToInfos(final Map<String, State> states) {
final HashMap<Long, AgentVmInfo> map = new HashMap<Long, AgentVmInfo>();
if (states == null) {
return map;
}
Collection<VirtualMachineGuru<? extends VMInstanceVO>> vmGurus = _vmGurus.values();
for (Map.Entry<String, State> entry : states.entrySet()) {
for (VirtualMachineGuru<? extends VMInstanceVO> vmGuru : vmGurus) {
String name = entry.getKey();
VMInstanceVO vm = vmGuru.findByName(name);
if (vm != null) {
map.put(vm.getId(), new AgentVmInfo(entry.getKey(), vmGuru, vm, entry.getValue()));
break;
}
Long id = vmGuru.convertToId(name);
if (id != null) {
map.put(id, new AgentVmInfo(entry.getKey(), vmGuru, null,entry.getValue()));
break;
}
}
}
return map;
}
/**
@ -1863,7 +2006,24 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene
@Override
public boolean processCommands(long agentId, long seq, Command[] cmds) {
return false;
boolean processed = false;
for (Command cmd : cmds) {
if (cmd instanceof PingRoutingCommand) {
PingRoutingCommand ping = (PingRoutingCommand) cmd;
if (ping.getNewStates() != null && ping.getNewStates().size() > 0) {
Commands commands = deltaHostSync(agentId, ping.getNewStates());
if (commands.size() > 0) {
try {
_agentMgr.send(agentId, commands, this);
} catch (final AgentUnavailableException e) {
s_logger.warn("Agent is now unavailable", e);
}
}
}
processed = true;
}
}
return processed;
}
@Override
@ -1889,13 +2049,43 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene
long agentId = agent.getId();
Long clusterId = agent.getClusterId();
ClusterSyncCommand syncCmd = new ClusterSyncCommand(Integer.parseInt(Config.ClusterDeltaSyncInterval.getDefaultValue()),
Integer.parseInt(Config.ClusterFullSyncSkipSteps.getDefaultValue()), clusterId);
try {
long seq_no = _agentMgr.send(agentId, new Commands(syncCmd), this);
s_logger.debug("Cluster VM sync started with jobid " + seq_no);
} catch (AgentUnavailableException e) {
s_logger.fatal("The Cluster VM sync process failed for cluster id " + clusterId + " with ", e);
if (agent.getHypervisorType() == HypervisorType.XenServer || agent.getHypervisorType() == HypervisorType.Xen){ // only fro Xen
ClusterSyncCommand syncCmd = new ClusterSyncCommand(Integer.parseInt(Config.ClusterDeltaSyncInterval.getDefaultValue()),
Integer.parseInt(Config.ClusterFullSyncSkipSteps.getDefaultValue()), clusterId);
try {
long seq_no = _agentMgr.send(agentId, new Commands(syncCmd), this);
s_logger.debug("Cluster VM sync started with jobid " + seq_no);
} catch (AgentUnavailableException e) {
s_logger.fatal("The Cluster VM sync process failed for cluster id " + clusterId + " with ", e);
}
}
else { // for others KVM and VMWare
StartupRoutingCommand startup = (StartupRoutingCommand) cmd;
Commands commands = fullHostSync(agentId, startup);
if (commands.size() > 0) {
s_logger.debug("Sending clean commands to the agent");
try {
boolean error = false;
Answer[] answers = _agentMgr.send(agentId, commands);
for (Answer answer : answers) {
if (!answer.getResult()) {
s_logger.warn("Unable to stop a VM due to " + answer.getDetails());
error = true;
}
}
if (error) {
throw new ConnectionException(true, "Unable to stop VMs");
}
} catch (final AgentUnavailableException e) {
s_logger.warn("Agent is unavailable now", e);
throw new ConnectionException(true, "Unable to sync", e);
} catch (final OperationTimedoutException e) {
s_logger.warn("Agent is unavailable now", e);
throw new ConnectionException(true, "Unable to sync", e);
}
}
}
}