diff --git a/core/src/com/cloud/hypervisor/xen/resource/CitrixResourceBase.java b/core/src/com/cloud/hypervisor/xen/resource/CitrixResourceBase.java index cc929fa1358..042f4a95f5e 100755 --- a/core/src/com/cloud/hypervisor/xen/resource/CitrixResourceBase.java +++ b/core/src/com/cloud/hypervisor/xen/resource/CitrixResourceBase.java @@ -71,6 +71,8 @@ import com.cloud.agent.api.CheckRouterCommand; import com.cloud.agent.api.CheckVirtualMachineAnswer; import com.cloud.agent.api.CheckVirtualMachineCommand; import com.cloud.agent.api.CleanupNetworkRulesCmd; +import com.cloud.agent.api.ClusterSyncAnswer; +import com.cloud.agent.api.ClusterSyncCommand; import com.cloud.agent.api.Command; import com.cloud.agent.api.CreatePrivateTemplateFromSnapshotCommand; import com.cloud.agent.api.CreatePrivateTemplateFromVolumeCommand; @@ -253,7 +255,7 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe protected long _dcId; protected String _pod; protected String _cluster; - protected HashMap _vms = new HashMap(71); + protected static final XenServerPoolVms s_vms = new XenServerPoolVms(); protected String _privateNetworkName; protected String _linkLocalPrivateNetworkName; protected String _publicNetworkName; @@ -501,6 +503,8 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe return execute((CheckRouterCommand)cmd); } else if (cmd instanceof SetFirewallRulesCommand) { return execute((SetFirewallRulesCommand)cmd); + } else if (cmd instanceof ClusterSyncCommand) { + return execute((ClusterSyncCommand)cmd); } else if (cmd instanceof BumpUpPriorityCommand) { return execute((BumpUpPriorityCommand)cmd); } else { @@ -1073,9 +1077,8 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe } } - synchronized (_vms) { - _vms.put(vmName, State.Starting); - } + s_vms.put(_cluster, _name, vmName, State.Starting); + Host host = Host.getByUuid(conn, _host.uuid); vm = createVmFromTemplate(conn, vmSpec, host); @@ -1154,11 +1157,11 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe String msg = handleVmStartFailure(conn, vmName, vm, "", e); return new StartAnswer(cmd, msg); } finally { - synchronized (_vms) { + synchronized (s_vms) { if (state != State.Stopped) { - _vms.put(vmName, state); + s_vms.put(_cluster, _name, vmName, state); } else { - _vms.remove(vmName); + s_vms.remove(_cluster, _name, vmName); } } } @@ -2034,13 +2037,12 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe return state == null ? State.Unknown : state; } - protected HashMap getAllVms(Connection conn) { - final HashMap vmStates = new HashMap(); - Set vms = null; + protected HashMap> getAllVms(Connection conn) { + final HashMap> vmStates = new HashMap>(); + Map vm_map = null; for (int i = 0; i < 2; i++) { try { - Host host = Host.getByUuid(conn, _host.uuid); - vms = host.getResidentVMs(conn); + vm_map = VM.getAllRecords(conn); //USE THIS TO GET ALL VMS FROM A CLUSTER break; } catch (final Throwable e) { s_logger.warn("Unable to get vms", e); @@ -2051,29 +2053,11 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe } } - if (vms == null) { + + if (vm_map == null) { return null; } - for (VM vm : vms) { - VM.Record record = null; - for (int i = 0; i < 2; i++) { - try { - record = vm.getRecord(conn); - break; - } catch (XenAPIException e1) { - s_logger.debug("VM.getRecord failed on host:" + _host.uuid + " due to " + e1.toString()); - } catch (XmlRpcException e1) { - s_logger.debug("VM.getRecord failed on host:" + _host.uuid + " due to " + e1.getMessage()); - } - try { - Thread.sleep(1000); - } catch (final InterruptedException ex) { - - } - } - if (record == null) { - continue; - } + for (VM.Record record: vm_map.values()) { if (record.isControlDomain || record.isASnapshot || record.isATemplate) { continue; // Skip DOM0 } @@ -2083,7 +2067,23 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe if (s_logger.isTraceEnabled()) { s_logger.trace("VM " + record.nameLabel + ": powerstate = " + ps + "; vm state=" + state.toString()); } - vmStates.put(record.nameLabel, state); + Host host = record.residentOn; + String host_uuid = null; + if( ! isRefNull(host) ) { + try { + host_uuid = host.getUuid(conn); + } catch (BadServerResponse e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (XenAPIException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (XmlRpcException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + vmStates.put(record.nameLabel, new Pair(host_uuid, state)); + } } return vmStates; @@ -2135,8 +2135,8 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe final State state = getVmState(conn, vmName); Integer vncPort = null; if (state == State.Running) { - synchronized (_vms) { - _vms.put(vmName, State.Running); + synchronized (s_vms) { + s_vms.put(_cluster, _name, vmName, State.Running); } } @@ -2158,9 +2158,7 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe for (NicTO nic : nics) { getNetwork(conn, nic); } - synchronized (_vms) { - _vms.put(vm.getName(), State.Migrating); - } + s_vms.put(_cluster, _name, vm.getName(), State.Migrating); return new PrepareForMigrationAnswer(cmd); } catch (Exception e) { @@ -2394,10 +2392,8 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe final String vmName = cmd.getVmName(); State state = null; - synchronized (_vms) { - state = _vms.get(vmName); - _vms.put(vmName, State.Stopping); - } + state = s_vms.getState(_cluster, vmName); + s_vms.put(_cluster, _name, vmName, State.Stopping); try { Set vms = VM.getByNameLabel(conn, vmName); @@ -2463,9 +2459,7 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe s_logger.warn(msg, e); return new MigrateAnswer(cmd, false, msg, null); } finally { - synchronized (_vms) { - _vms.put(vmName, state); - } + s_vms.put(_cluster, _name, vmName, state); } } @@ -2514,106 +2508,8 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe throw new CloudRuntimeException("Com'on no control domain? What the crap?!#@!##$@"); } - - protected HashMap deltaSync(Connection conn) { - HashMap newStates; - HashMap oldStates = null; - - final HashMap changes = new HashMap(); - - newStates = getAllVms(conn); - if (newStates == null) { - s_logger.debug("Unable to get the vm states so no state sync at this point."); - return null; - } - - synchronized (_vms) { - oldStates = new HashMap(_vms.size()); - oldStates.putAll(_vms); - - for (final Map.Entry entry : newStates.entrySet()) { - final String vm = entry.getKey(); - - State newState = entry.getValue(); - final State oldState = oldStates.remove(vm); - - if (newState == State.Stopped && oldState != State.Stopping && oldState != null && oldState != State.Stopped) { - newState = getRealPowerState(conn, vm); - } - - if (s_logger.isTraceEnabled()) { - s_logger.trace("VM " + vm + ": xen has state " + newState + " and we have state " + (oldState != null ? oldState.toString() : "null")); - } - - if (vm.startsWith("migrating")) { - s_logger.debug("Migrating from xen detected. Skipping"); - continue; - } - if (oldState == null) { - _vms.put(vm, newState); - s_logger.debug("Detecting a new state but couldn't find a old state so adding it to the changes: " + vm); - changes.put(vm, newState); - } else if (oldState == State.Starting) { - if (newState == State.Running) { - _vms.put(vm, newState); - } else if (newState == State.Stopped) { - s_logger.debug("Ignoring vm " + vm + " because of a lag in starting the vm."); - } - } else if (oldState == State.Migrating) { - if (newState == State.Running) { - s_logger.debug("Detected that an migrating VM is now running: " + vm); - _vms.put(vm, newState); - } - } else if (oldState == State.Stopping) { - if (newState == State.Stopped) { - _vms.put(vm, newState); - } else if (newState == State.Running) { - s_logger.debug("Ignoring vm " + vm + " because of a lag in stopping the vm. "); - } - } else if (oldState != newState) { - _vms.put(vm, newState); - if (newState == State.Stopped) { - /* - * if (_vmsKilled.remove(vm)) { s_logger.debug("VM " + vm + " has been killed for storage. "); - * newState = State.Error; } - */ - } - changes.put(vm, newState); - } - } - - for (final Map.Entry entry : oldStates.entrySet()) { - final String vm = entry.getKey(); - final State oldState = entry.getValue(); - - if (s_logger.isTraceEnabled()) { - s_logger.trace("VM " + vm + " is now missing from xen so reporting stopped"); - } - - if (oldState == State.Stopping) { - s_logger.debug("Ignoring VM " + vm + " in transition state stopping."); - _vms.remove(vm); - } else if (oldState == State.Starting) { - s_logger.debug("Ignoring VM " + vm + " in transition state starting."); - } else if (oldState == State.Stopped) { - _vms.remove(vm); - } else if (oldState == State.Migrating) { - s_logger.debug("Ignoring VM " + vm + " in migrating state."); - } else { - State newState = State.Stopped; - changes.put(entry.getKey(), newState); - } - } - } - - return changes; - } - - - protected void fullSync(StartupRoutingCommand cmd, Connection conn) { - synchronized (_vms) { - _vms.clear(); - } + + protected void fullHostSync(StartupRoutingCommand cmd, Connection conn) { try { final HashMap vmStates = new HashMap(); Host lhost = Host.getByUuid(conn, _host.uuid); @@ -2629,18 +2525,14 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe Host host = record.residentOn; String host_uuid = null; if( ! isRefNull(host) ) { - host_uuid = host.getUuid(conn); - if( host_uuid.equals(_host.uuid)) { - synchronized (_vms) { - _vms.put(vm_name, state); - } - } + host_uuid = host.getUuid(conn); + VmState vm_state = new StartupRoutingCommand.VmState(state, host_uuid); + vmStates.put(vm_name, vm_state); + s_vms.put(_cluster, host_uuid, vm_name, state); } if (s_logger.isTraceEnabled()) { s_logger.trace("VM " + vm_name + ": powerstate = " + ps + "; vm state=" + state.toString()); - } - VmState vm_state = new StartupRoutingCommand.VmState(state, host_uuid); - vmStates.put(vm_name, vm_state); + } } cmd.setChanges(vmStates); } catch (final Throwable e) { @@ -2720,9 +2612,7 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe @Override public RebootAnswer execute(RebootCommand cmd) { Connection conn = getConnection(); - synchronized (_vms) { - _vms.put(cmd.getVmName(), State.Starting); - } + s_vms.put(_cluster, _name, cmd.getVmName(), State.Starting); try { Set vms = null; try { @@ -2745,9 +2635,7 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe } return new RebootAnswer(cmd, "reboot succeeded", null, null); } finally { - synchronized (_vms) { - _vms.put(cmd.getVmName(), State.Running); - } + s_vms.put(_cluster, _name, cmd.getVmName(), State.Running); } } @@ -3209,9 +3097,7 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe if (vms.size() == 0) { s_logger.warn("VM does not exist on XenServer" + _host.uuid); - synchronized (_vms) { - _vms.remove(vmName); - } + s_vms.remove(_cluster, _name, vmName); return new StopAnswer(cmd, "VM does not exist", 0 , 0L, 0L); } Long bytesSent = 0L; @@ -3231,11 +3117,8 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe return new StopAnswer(cmd, msg); } - State state = null; - synchronized (_vms) { - state = _vms.get(vmName); - _vms.put(vmName, State.Stopping); - } + State state = s_vms.getState(_cluster, vmName); + s_vms.put(_cluster, _name, vmName, State.Stopping); try { if (vmr.powerState == VmPowerState.RUNNING) { @@ -3296,9 +3179,7 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe String msg = "VM destroy failed in Stop " + vmName + " Command due to " + e.getMessage(); s_logger.warn(msg, e); } finally { - synchronized (_vms) { - _vms.put(vmName, state); - } + s_vms.put(_cluster, _name, vmName, state); } } } @@ -3807,19 +3688,20 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe } } Connection conn = getConnection(); - HashMap newStates = deltaSync(conn); + /**HashMap newStates = deltaSync(conn); if (newStates == null) { s_logger.warn("Unable to get current status from sync"); return null; } if (!_canBridgeFirewall && !_isOvs) { return new PingRoutingCommand(getType(), id, newStates); - } else if (_isOvs) { + } else **/ + if (_isOvs) { List>ovsStates = ovsFullSyncStates(); - return new PingRoutingWithOvsCommand(getType(), id, newStates, ovsStates); + return new PingRoutingWithOvsCommand(getType(), id, null, ovsStates); }else { HashMap> nwGrpStates = syncNetworkGroups(conn, id); - return new PingRoutingWithNwGroupsCommand(getType(), id, newStates, nwGrpStates); + return new PingRoutingWithNwGroupsCommand(getType(), id, null, nwGrpStates); } } catch (Exception e) { s_logger.warn("Unable to get current status", e); @@ -4047,7 +3929,7 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe StartupRoutingCommand cmd = new StartupRoutingCommand(); fillHostInfo(conn, cmd); - fullSync(cmd, conn); + fullHostSync(cmd, conn); cmd.setHypervisorType(HypervisorType.XenServer); cmd.setCluster(_cluster); cmd.setPoolSync(false); @@ -6602,4 +6484,172 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe } return new SetFirewallRulesAnswer(cmd, true, results); } + + + protected Answer execute(final ClusterSyncCommand cmd) { + Connection conn = getConnection(); + //check if this is master + Pool pool; + try { + pool = Pool.getByUuid(conn, _host.pool); + Pool.Record poolr = pool.getRecord(conn); + + Host.Record hostr = poolr.master.getRecord(conn); + if (!_host.uuid.equals(hostr.uuid)) { + s_logger.debug("Not the master node so just return ok: " + _host.ip); + return new Answer(cmd, false, "Not a pool master"); + } + } catch (Exception e) { + s_logger.warn("Check for master failed, failing the Cluster sync command"); + return new Answer(cmd, false, "Not a pool master"); + } + HashMap> newStates; + int sync_type; + if (cmd.isRightStep()){ + // do full sync + newStates=fullClusterSync(conn); + sync_type = ClusterSyncAnswer.FULL_SYNC; + } + else { + // do delta sync + newStates = deltaClusterSync(conn); + if (newStates == null) { + s_logger.warn("Unable to get current status from sync"); + } + sync_type = ClusterSyncAnswer.DELTA_SYNC; + } + cmd.incrStep(); + return new ClusterSyncAnswer(cmd.getClusterId(), newStates, sync_type); + } + + + protected HashMap> fullClusterSync(Connection conn) { + s_vms.clear(_cluster); + try { + Host lhost = Host.getByUuid(conn, _host.uuid); + Map vm_map = VM.getAllRecords(conn); //USE THIS TO GET ALL VMS FROM A CLUSTER + for (VM.Record record: vm_map.values()) { + if (record.isControlDomain || record.isASnapshot || record.isATemplate) { + continue; // Skip DOM0 + } + String vm_name = record.nameLabel; + VmPowerState ps = record.powerState; + final State state = convertToState(ps); + Host host = record.residentOn; + String host_uuid = null; + if( ! isRefNull(host) ) { + host_uuid = host.getUuid(conn); + s_vms.put(_cluster, host_uuid, vm_name, state); + } + if (s_logger.isTraceEnabled()) { + s_logger.trace("VM " + vm_name + ": powerstate = " + ps + "; vm state=" + state.toString()); + } + } + } catch (final Throwable e) { + String msg = "Unable to get vms through host " + _host.uuid + " due to to " + e.toString(); + s_logger.warn(msg, e); + throw new CloudRuntimeException(msg); + } + return s_vms.getClusterVmState(_cluster); + } + + + protected HashMap> deltaClusterSync(Connection conn) { + HashMap> newStates; + HashMap> oldStates = null; + + final HashMap> changes = new HashMap>(); + + newStates = getAllVms(conn); + if (newStates == null) { + s_logger.warn("Unable to get the vm states so no state sync at this point."); + return null; + } + + synchronized (s_vms) { + oldStates = new HashMap>(s_vms.size(_cluster)); + oldStates.putAll(s_vms.getClusterVmState(_cluster)); + + for (final Map.Entry> entry : newStates.entrySet()) { + final String vm = entry.getKey(); + + State newState = entry.getValue().second(); + String host_uuid = entry.getValue().first(); + final Pair oldState = oldStates.remove(vm); + + if (newState == State.Stopped && oldState != null && oldState.second() != State.Stopping && oldState.second() != State.Stopped) { + newState = getRealPowerState(conn, vm); + } + + if (s_logger.isTraceEnabled()) { + s_logger.trace("VM " + vm + ": xen has state " + newState + " and we have state " + (oldState != null ? oldState.toString() : "null")); + } + + if (vm.startsWith("migrating")) { + s_logger.warn("Migrating from xen detected. Skipping"); + continue; + } + if (oldState == null) { + s_vms.put(_cluster, host_uuid, vm, newState); + s_logger.warn("Detecting a new state but couldn't find a old state so adding it to the changes: " + vm); + changes.put(vm, new Pair(host_uuid, newState)); + } else if (oldState.second() == State.Starting) { + if (newState == State.Running) { + s_vms.put(_cluster, host_uuid, vm, newState); + } else if (newState == State.Stopped) { + s_logger.warn("Ignoring vm " + vm + " because of a lag in starting the vm."); + } + } else if (oldState.second() == State.Migrating) { + if (newState == State.Running) { + s_logger.debug("Detected that an migrating VM is now running: " + vm); + s_vms.put(_cluster, host_uuid, vm, newState); + } + } else if (oldState.second() == State.Stopping) { + if (newState == State.Stopped) { + s_vms.put(_cluster, host_uuid, vm, newState); + } else if (newState == State.Running) { + s_logger.warn("Ignoring vm " + vm + " because of a lag in stopping the vm. "); + } + } else if (oldState.second() != newState) { + s_vms.put(_cluster, host_uuid, vm, newState); + if (newState == State.Stopped) { + /* + * if (s_vmsKilled.remove(vm)) { s_logger.debug("VM " + vm + " has been killed for storage. "); + * newState = State.Error; } + */ + } + changes.put(vm, new Pair(host_uuid, newState)); + } + } + + for (final Map.Entry> entry : oldStates.entrySet()) { + final String vm = entry.getKey(); + final State oldState = entry.getValue().second(); + String host_uuid = entry.getValue().first(); + + if (s_logger.isTraceEnabled()) { + s_logger.trace("VM " + vm + " is now missing from xen so reporting stopped"); + } + + if (oldState == State.Stopping) { + s_logger.warn("Ignoring VM " + vm + " in transition state stopping."); + s_vms.remove(_cluster, host_uuid, vm); + } else if (oldState == State.Starting) { + s_logger.warn("Ignoring VM " + vm + " in transition state starting."); + } else if (oldState == State.Stopped) { + s_vms.remove(_cluster, host_uuid, vm); + } else if (oldState == State.Migrating) { + s_logger.warn("Ignoring VM " + vm + " in migrating state."); + } else { + State newState = State.Stopped; + changes.put(vm, new Pair(host_uuid, newState)); + } + } + } + + return changes; + } + + + } diff --git a/server/src/com/cloud/vm/VirtualMachineManagerImpl.java b/server/src/com/cloud/vm/VirtualMachineManagerImpl.java index 50cf3a1ed17..7155c915cb4 100755 --- a/server/src/com/cloud/vm/VirtualMachineManagerImpl.java +++ b/server/src/com/cloud/vm/VirtualMachineManagerImpl.java @@ -42,6 +42,8 @@ import com.cloud.agent.api.AgentControlCommand; import com.cloud.agent.api.Answer; import com.cloud.agent.api.CheckVirtualMachineAnswer; import com.cloud.agent.api.CheckVirtualMachineCommand; +import com.cloud.agent.api.ClusterSyncAnswer; +import com.cloud.agent.api.ClusterSyncCommand; import com.cloud.agent.api.Command; import com.cloud.agent.api.MigrateAnswer; import com.cloud.agent.api.MigrateCommand; @@ -1515,7 +1517,7 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene return new StopCommand(vmName); } - public Commands deltaSync(long hostId, Map newStates) { + public Commands deltaSync(Map> newStates) { Map states = convertDeltaToInfos(newStates); Commands commands = new Commands(OnError.Continue); @@ -1527,7 +1529,7 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene Command command = null; if (vm != null) { HypervisorGuru hvGuru = _hvGuruMgr.getGuru(vm.getHypervisorType()); - command = compareState(hostId, vm, info, false, hvGuru.trackVmHostChange()); + command = compareState(vm.hostId, vm, info, false, hvGuru.trackVmHostChange()); } else { if (s_logger.isDebugEnabled()) { s_logger.debug("Cleaning up a VM that is no longer found: " + info.name); @@ -1543,8 +1545,43 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene return commands; } + public Commands fullSync(final long clusterId, Map> newStates) { + Commands commands = new Commands(OnError.Continue); + Map infos = convertToInfos(newStates); + long hId = 0; + final List vms = _vmDao.listByClusterId(clusterId); + for (VMInstanceVO vm : vms) { + AgentVmInfo info = infos.remove(vm.getId()); + VMInstanceVO castedVm = null; + if (info == null) { + info = new AgentVmInfo(vm.getInstanceName(), getVmGuru(vm), vm, State.Stopped); + castedVm = info.guru.findById(vm.getId()); + hId = vm.getHostId() == null ? vm.getLastHostId() : vm.getHostId(); + } else { + castedVm = info.vm; + String host_guid = info.getHostUuid(); + Host host = _hostDao.findByGuid(host_guid); + if (host == null) { + infos.put(vm.getId(), info); + continue; + } + hId = host.getId(); + } + HypervisorGuru hvGuru = _hvGuruMgr.getGuru(castedVm.getHypervisorType()); + Command command = compareState(hId, castedVm, info, true, hvGuru.trackVmHostChange()); + if (command != null) { + commands.addCommand(command); + } + } + for (final AgentVmInfo left : infos.values()) { + s_logger.warn("Stopping a VM that we have no record of: " + left.name); + commands.addCommand(cleanup(left.name)); + } + return commands; + } - protected Map convertDeltaToInfos(final Map states) { + + protected Map convertDeltaToInfos(final Map> states) { final HashMap map = new HashMap(); if (states == null) { @@ -1553,20 +1590,20 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene Collection> vmGurus = _vmGurus.values(); - for (Map.Entry entry : states.entrySet()) { + for (Map.Entry> entry : states.entrySet()) { for (VirtualMachineGuru vmGuru : vmGurus) { String name = entry.getKey(); VMInstanceVO vm = vmGuru.findByName(name); if (vm != null) { - map.put(vm.getId(), new AgentVmInfo(entry.getKey(), vmGuru, vm, entry.getValue())); + map.put(vm.getId(), new AgentVmInfo(entry.getKey(), vmGuru, vm, entry.getValue().second())); break; } Long id = vmGuru.convertToId(name); if (id != null) { - map.put(id, new AgentVmInfo(entry.getKey(), vmGuru, null,entry.getValue())); + map.put(id, new AgentVmInfo(entry.getKey(), vmGuru, null, entry.getValue().second())); break; } } @@ -1575,37 +1612,34 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene return map; } - protected Map convertToInfos(final Map states) { + protected Map convertToInfos(final Map> newStates) { final HashMap map = new HashMap(); - if (states == null) { + if (newStates == null) { return map; } Collection> vmGurus = _vmGurus.values(); - for (Map.Entry entry : states.entrySet()) { + for (Map.Entry> entry : newStates.entrySet()) { for (VirtualMachineGuru vmGuru : vmGurus) { String name = entry.getKey(); - VMInstanceVO vm = vmGuru.findByName(name); - if (vm != null) { - map.put(vm.getId(), new AgentVmInfo(entry.getKey(), vmGuru, vm, entry.getValue().getState(), entry.getValue().getHost() )); + map.put(vm.getId(), new AgentVmInfo(entry.getKey(), vmGuru, vm, entry.getValue().second(), entry.getValue().first())); break; } - Long id = vmGuru.convertToId(name); if (id != null) { - map.put(id, new AgentVmInfo(entry.getKey(), vmGuru, null,entry.getValue().getState(), entry.getValue().getHost() )); + map.put(id, new AgentVmInfo(entry.getKey(), vmGuru, null, entry.getValue().second(), entry.getValue().first())); break; } } } - return map; } + /** * compareState does as its name suggests and compares the states between management server and agent. It returns whether * something should be cleaned up @@ -1622,7 +1656,7 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene if (s_logger.isDebugEnabled()) { s_logger.debug("VM " + serverName + ": server state = " + serverState + " and agent state = " + agentState); } - + if (agentState == State.Error) { agentState = State.Stopped; @@ -1817,95 +1851,6 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene } } - public Commands fullSync(final long hostId, StartupRoutingCommand startup) { - - Commands commands = new Commands(OnError.Continue); - Map infos = convertToInfos(startup.getVmStates()); - if( startup.isPoolSync()) { - long hId = 0; - Host host = _hostDao.findById(hostId); - long clusterId= host.getClusterId(); - final List vms = _vmDao.listByClusterId(clusterId); - s_logger.debug("Found " + vms.size() + " VMs for cluster " + clusterId); - for (VMInstanceVO vm : vms) { - AgentVmInfo info = infos.remove(vm.getId()); - VMInstanceVO castedVm = null; - if (info == null) { - info = new AgentVmInfo(vm.getInstanceName(), getVmGuru(vm), vm, State.Stopped); - hId = 0; - castedVm = info.guru.findById(vm.getId()); - } else { - castedVm = info.vm; - String host_guid = info.getHost(); - host = _hostDao.findByGuid(host_guid); - if ( host == null ) { - infos.put(vm.getId(), info); - continue; - } - hId = host.getId(); - HypervisorGuru hvGuru = _hvGuruMgr.getGuru(castedVm.getHypervisorType()); - - Command command = compareState(hId, castedVm, info, true, hvGuru.trackVmHostChange()); - if (command != null) { - commands.addCommand(command); - } - } - } - for (final AgentVmInfo left : infos.values()) { - s_logger.warn("Stopping a VM that we have no record of: " + left.name); - commands.addCommand(cleanup(left.name)); - } - - } else { - final List vms = _vmDao.listByHostId(hostId); - s_logger.debug("Found " + vms.size() + " VMs for host " + hostId); - for (VMInstanceVO vm : vms) { - AgentVmInfo info = infos.remove(vm.getId()); - - VMInstanceVO castedVm = null; - if (info == null) { - info = new AgentVmInfo(vm.getInstanceName(), getVmGuru(vm), vm, State.Stopped); - castedVm = info.guru.findById(vm.getId()); - } else { - castedVm = info.vm; - } - - HypervisorGuru hvGuru = _hvGuruMgr.getGuru(castedVm.getHypervisorType()); - - Command command = compareState(hostId, castedVm, info, true, hvGuru.trackVmHostChange()); - if (command != null) { - commands.addCommand(command); - } - } - - for (final AgentVmInfo left : infos.values()) { - boolean found = false; - for (VirtualMachineGuru vmGuru : _vmGurus.values()) { - VMInstanceVO vm = vmGuru.findByName(left.name); - if (vm != null) { - found = true; - HypervisorGuru hvGuru = _hvGuruMgr.getGuru(vm.getHypervisorType()); - if(hvGuru.trackVmHostChange()) { - Command command = compareState(hostId, vm, left, true, true); - if (command != null) { - commands.addCommand(command); - } - } else { - s_logger.warn("Stopping a VM, VM " + left.name + " migrate from Host " + vm.getHostId() + " to Host " + hostId ); - commands.addCommand(cleanup(left.name)); - } - break; - } - } - if ( ! found ) { - s_logger.warn("Stopping a VM that we have no record of: " + left.name); - commands.addCommand(cleanup(left.name)); - } - } - } - return commands; - } - @Override public boolean isRecurring() { return false; @@ -1914,7 +1859,14 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene @Override public boolean processAnswers(long agentId, long seq, Answer[] answers) { for (final Answer answer : answers) { - if (!answer.getResult()) { + if (answer instanceof ClusterSyncAnswer) { + ClusterSyncAnswer hs = (ClusterSyncAnswer) answer; + if (hs.isFull()) { + fullSync(hs.getClusterId(), hs.getNewStates()); + } else { + deltaSync(hs.getNewStates()); + } + } else if (!answer.getResult()) { s_logger.warn("Cleanup failed due to " + answer.getDetails()); } else { if (s_logger.isDebugEnabled()) { @@ -1937,24 +1889,7 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene @Override public boolean processCommands(long agentId, long seq, Command[] cmds) { - boolean processed = false; - for (Command cmd : cmds) { - if (cmd instanceof PingRoutingCommand) { - PingRoutingCommand ping = (PingRoutingCommand) cmd; - if (ping.getNewStates().size() > 0) { - Commands commands = deltaSync(agentId, ping.getNewStates()); - if (commands.size() > 0) { - try { - _agentMgr.send(agentId, commands, this); - } catch (final AgentUnavailableException e) { - s_logger.warn("Agent is now unavailable", e); - } - } - } - processed = true; - } - } - return processed; + return false; } @Override @@ -1979,33 +1914,13 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene } long agentId = agent.getId(); - - StartupRoutingCommand startup = (StartupRoutingCommand) cmd; - - Commands commands = fullSync(agentId, startup); - - if (commands.size() > 0) { - s_logger.debug("Sending clean commands to the agent"); - - try { - boolean error = false; - Answer[] answers = _agentMgr.send(agentId, commands); - for (Answer answer : answers) { - if (!answer.getResult()) { - s_logger.warn("Unable to stop a VM due to " + answer.getDetails()); - error = true; - } - } - if (error) { - throw new ConnectionException(true, "Unable to stop VMs"); - } - } catch (final AgentUnavailableException e) { - s_logger.warn("Agent is unavailable now", e); - throw new ConnectionException(true, "Unable to sync", e); - } catch (final OperationTimedoutException e) { - s_logger.warn("Agent is unavailable now", e); - throw new ConnectionException(true, "Unable to sync", e); - } + Long clusterId = agent.getClusterId(); + ClusterSyncCommand syncCmd = new ClusterSyncCommand(60, 20, clusterId); + try { + long seq_no = _agentMgr.send(agentId, new Commands(syncCmd), this); + s_logger.debug("Cluster VM sync started with jobid " + seq_no); + } catch (AgentUnavailableException e) { + s_logger.fatal("The Cluster VM sync process failed for cluster id " + clusterId + " with ", e); } } @@ -2045,7 +1960,7 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene protected class AgentVmInfo { public String name; public State state; - public String host; + public String hostUuid; public VMInstanceVO vm; public VirtualMachineGuru guru; @@ -2055,15 +1970,15 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene this.state = state; this.vm = vm; this.guru = (VirtualMachineGuru) guru; - this.host = host; + this.hostUuid = host; } public AgentVmInfo(String name, VirtualMachineGuru guru, VMInstanceVO vm, State state) { this(name, guru, vm, state, null); } - public String getHost() { - return host; + public String getHostUuid() { + return hostUuid; } }