From b81c380f86bc6d449576db51267e6f037c52357e Mon Sep 17 00:00:00 2001 From: Abhinandan Prateek Date: Fri, 28 Oct 2011 11:45:15 +0530 Subject: [PATCH] bug 11701: cluster sync only for Xen, old way of syncing for other Hypervisor --- .../cloud/vm/VirtualMachineManagerImpl.java | 210 +++++++++++++++++- 1 file changed, 200 insertions(+), 10 deletions(-) diff --git a/server/src/com/cloud/vm/VirtualMachineManagerImpl.java b/server/src/com/cloud/vm/VirtualMachineManagerImpl.java index 10e07da8308..2506606e8c8 100755 --- a/server/src/com/cloud/vm/VirtualMachineManagerImpl.java +++ b/server/src/com/cloud/vm/VirtualMachineManagerImpl.java @@ -1516,7 +1516,90 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene public Command cleanup(String vmName) { return new StopCommand(vmName); } + + public Commands fullHostSync(final long hostId, StartupRoutingCommand startup) { + Commands commands = new Commands(OnError.Continue); + + Map infos = convertToInfos(startup); + + final List vms = _vmDao.listByHostId(hostId); + s_logger.debug("Found " + vms.size() + " VMs for host " + hostId); + for (VMInstanceVO vm : vms) { + AgentVmInfo info = infos.remove(vm.getId()); + VMInstanceVO castedVm = null; + if (info == null) { + info = new AgentVmInfo(vm.getInstanceName(), getVmGuru(vm), vm, State.Stopped); + castedVm = info.guru.findById(vm.getId()); + } else { + castedVm = info.vm; + } + + HypervisorGuru hvGuru = _hvGuruMgr.getGuru(castedVm.getHypervisorType()); + + Command command = compareState(hostId, castedVm, info, true, hvGuru.trackVmHostChange()); + if (command != null) { + commands.addCommand(command); + } + } + + for (final AgentVmInfo left : infos.values()) { + boolean found = false; + for (VirtualMachineGuru vmGuru : _vmGurus.values()) { + VMInstanceVO vm = vmGuru.findByName(left.name); + if (vm != null) { + found = true; + HypervisorGuru hvGuru = _hvGuruMgr.getGuru(vm.getHypervisorType()); + if(hvGuru.trackVmHostChange()) { + Command command = compareState(hostId, vm, left, true, true); + if (command != null) { + commands.addCommand(command); + } + } else { + s_logger.warn("Stopping a VM, VM " + left.name + " migrate from Host " + vm.getHostId() + " to Host " + hostId ); + commands.addCommand(cleanup(left.name)); + } + break; + } + } + if ( ! found ) { + s_logger.warn("Stopping a VM that we have no record of: " + left.name); + commands.addCommand(cleanup(left.name)); + } + } + + return commands; + } + public Commands deltaHostSync(long hostId, Map newStates) { + Map states = convertDeltaToInfos(newStates); + Commands commands = new Commands(OnError.Continue); + + for (Map.Entry entry : states.entrySet()) { + AgentVmInfo info = entry.getValue(); + + VMInstanceVO vm = info.vm; + + Command command = null; + if (vm != null) { + HypervisorGuru hvGuru = _hvGuruMgr.getGuru(vm.getHypervisorType()); + command = compareState(hostId, vm, info, false, hvGuru.trackVmHostChange()); + } else { + if (s_logger.isDebugEnabled()) { + s_logger.debug("Cleaning up a VM that is no longer found: " + info.name); + } + command = cleanup(info.name); + } + + if (command != null) { + commands.addCommand(command); + } + } + + return commands; + } + + + public Commands deltaSync(Map> newStates) { Map states = convertToInfos(newStates); Commands commands = new Commands(OnError.Continue); @@ -1548,6 +1631,8 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene return commands; } + + public Commands fullSync(final long clusterId, Map> newStates) { Commands commands = new Commands(OnError.Continue); @@ -1586,11 +1671,9 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene protected Map convertToInfos(final Map> newStates) { final HashMap map = new HashMap(); - if (newStates == null) { return map; } - Collection> vmGurus = _vmGurus.values(); for (Map.Entry> entry : newStates.entrySet()) { @@ -1610,6 +1693,66 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene } return map; } + + protected Map convertToInfos(StartupRoutingCommand cmd) { + final Map states = cmd.getVmStates(); + final HashMap map = new HashMap(); + if (states == null) { + return map; + } + Collection> vmGurus = _vmGurus.values(); + + for (Map.Entry entry : states.entrySet()) { + for (VirtualMachineGuru vmGuru : vmGurus) { + String name = entry.getKey(); + VMInstanceVO vm = vmGuru.findByName(name); + if (vm != null) { + map.put(vm.getId(), new AgentVmInfo(entry.getKey(), vmGuru, vm, entry.getValue().getState(), entry.getValue().getHost() )); + break; + } + Long id = vmGuru.convertToId(name); + if (id != null) { + map.put(id, new AgentVmInfo(entry.getKey(), vmGuru, null,entry.getValue().getState(), entry.getValue().getHost() )); + break; + } + } + } + + return map; + } + + protected Map convertDeltaToInfos(final Map states) { + final HashMap map = new HashMap(); + + if (states == null) { + return map; + } + + Collection> vmGurus = _vmGurus.values(); + + for (Map.Entry entry : states.entrySet()) { + for (VirtualMachineGuru vmGuru : vmGurus) { + String name = entry.getKey(); + + VMInstanceVO vm = vmGuru.findByName(name); + + if (vm != null) { + map.put(vm.getId(), new AgentVmInfo(entry.getKey(), vmGuru, vm, entry.getValue())); + break; + } + + Long id = vmGuru.convertToId(name); + if (id != null) { + map.put(id, new AgentVmInfo(entry.getKey(), vmGuru, null,entry.getValue())); + break; + } + } + } + + return map; + } + + /** @@ -1863,7 +2006,24 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene @Override public boolean processCommands(long agentId, long seq, Command[] cmds) { - return false; + boolean processed = false; + for (Command cmd : cmds) { + if (cmd instanceof PingRoutingCommand) { + PingRoutingCommand ping = (PingRoutingCommand) cmd; + if (ping.getNewStates() != null && ping.getNewStates().size() > 0) { + Commands commands = deltaHostSync(agentId, ping.getNewStates()); + if (commands.size() > 0) { + try { + _agentMgr.send(agentId, commands, this); + } catch (final AgentUnavailableException e) { + s_logger.warn("Agent is now unavailable", e); + } + } + } + processed = true; + } + } + return processed; } @Override @@ -1889,13 +2049,43 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene long agentId = agent.getId(); Long clusterId = agent.getClusterId(); - ClusterSyncCommand syncCmd = new ClusterSyncCommand(Integer.parseInt(Config.ClusterDeltaSyncInterval.getDefaultValue()), - Integer.parseInt(Config.ClusterFullSyncSkipSteps.getDefaultValue()), clusterId); - try { - long seq_no = _agentMgr.send(agentId, new Commands(syncCmd), this); - s_logger.debug("Cluster VM sync started with jobid " + seq_no); - } catch (AgentUnavailableException e) { - s_logger.fatal("The Cluster VM sync process failed for cluster id " + clusterId + " with ", e); + if (agent.getHypervisorType() == HypervisorType.XenServer || agent.getHypervisorType() == HypervisorType.Xen){ // only fro Xen + ClusterSyncCommand syncCmd = new ClusterSyncCommand(Integer.parseInt(Config.ClusterDeltaSyncInterval.getDefaultValue()), + Integer.parseInt(Config.ClusterFullSyncSkipSteps.getDefaultValue()), clusterId); + try { + long seq_no = _agentMgr.send(agentId, new Commands(syncCmd), this); + s_logger.debug("Cluster VM sync started with jobid " + seq_no); + } catch (AgentUnavailableException e) { + s_logger.fatal("The Cluster VM sync process failed for cluster id " + clusterId + " with ", e); + } + } + else { // for others KVM and VMWare + StartupRoutingCommand startup = (StartupRoutingCommand) cmd; + Commands commands = fullHostSync(agentId, startup); + + if (commands.size() > 0) { + s_logger.debug("Sending clean commands to the agent"); + + try { + boolean error = false; + Answer[] answers = _agentMgr.send(agentId, commands); + for (Answer answer : answers) { + if (!answer.getResult()) { + s_logger.warn("Unable to stop a VM due to " + answer.getDetails()); + error = true; + } + } + if (error) { + throw new ConnectionException(true, "Unable to stop VMs"); + } + } catch (final AgentUnavailableException e) { + s_logger.warn("Agent is unavailable now", e); + throw new ConnectionException(true, "Unable to sync", e); + } catch (final OperationTimedoutException e) { + s_logger.warn("Agent is unavailable now", e); + throw new ConnectionException(true, "Unable to sync", e); + } + } } }