bug 12530: introduce initial full sync; cleanup syncronization;

This commit is contained in:
Abhinandan Prateek 2011-12-27 11:13:38 +05:30
parent ac0275b3e8
commit 5e4acc2a55
6 changed files with 148 additions and 80 deletions

View File

@ -31,14 +31,8 @@ public class ClusterSyncAnswer extends Answer {
public static final int FULL_SYNC=0;
public static final int DELTA_SYNC=1;
public ClusterSyncAnswer(long clusterId) {
_clusterId = clusterId;
result = false;
this.details = "Ignore sync as this is not a pool master";
_type = -1;
}
public static final int INITIAL_FULL_SYNC=2;
// this is here because a cron command answer is being sent twice
// AgentAttache.processAnswers
// AgentManagerImpl.notifyAnswersToMonitors
@ -67,6 +61,14 @@ public class ClusterSyncAnswer extends Answer {
result = true;
}
public ClusterSyncAnswer(long clusterId, HashMap<String, Pair<String, State>> newStates, HashMap<String, Pair<String, State>> allStates, boolean init){
_clusterId = clusterId;
_newStates = newStates;
_allStates = allStates;
_type = INITIAL_FULL_SYNC;
result = true;
}
public long getClusterId() {
return _clusterId;
}

View File

@ -22,17 +22,15 @@ public class ClusterSyncCommand extends Command implements CronCommand {
private int _interval;
private int _skipSteps; // skip this many steps for full sync
private int _steps;
private boolean _init;
private long _clusterId;
public ClusterSyncCommand() {
}
public ClusterSyncCommand(int interval, int skipSteps, long clusterId){
_interval = interval;
_skipSteps = skipSteps;
_clusterId = clusterId;
_steps=0;
_init=true;
}
@Override
@ -50,7 +48,7 @@ public class ClusterSyncCommand extends Command implements CronCommand {
}
public boolean isRightStep(){
return (_steps==_skipSteps);
return (_steps==0);
}
public long getClusterId() {
@ -62,4 +60,12 @@ public class ClusterSyncCommand extends Command implements CronCommand {
return false;
}
public boolean isInit(){
return _init;
}
public void unsetInit(){
_init = false;
}
}

View File

@ -1081,8 +1081,10 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
}
}
}
s_logger.debug("1. The VM " + vmName + " is in Starting state.");
s_vms.put(_cluster, _name, vmName, State.Starting);
synchronized (s_vms) {
s_logger.debug("1. The VM " + vmName + " is in Starting state.");
s_vms.put(_cluster, _name, vmName, State.Starting);
}
Host host = Host.getByUuid(conn, _host.uuid);
vm = createVmFromTemplate(conn, vmSpec, host);
@ -1155,7 +1157,7 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
}
}
state = State.Starting;
state = State.Running;
return new StartAnswer(cmd);
} catch (Exception e) {
s_logger.warn("Catch Exception: " + e.getClass().toString() + " due to " + e.toString(), e);
@ -2191,8 +2193,10 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
for (NicTO nic : nics) {
getNetwork(conn, nic);
}
s_logger.debug("4. The VM " + vm.getName() + " is in " + State.Migrating + " state");
s_vms.put(_cluster, _name, vm.getName(), State.Migrating);
synchronized (s_vms) {
s_logger.debug("4. The VM " + vm.getName() + " is in " + State.Migrating + " state");
s_vms.put(_cluster, _name, vm.getName(), State.Migrating);
}
return new PrepareForMigrationAnswer(cmd);
} catch (Exception e) {
@ -2428,8 +2432,10 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
state = s_vms.getState(_cluster, vmName);
s_logger.debug("5. The VM " + vmName + " is in " + State.Stopping + " state");
s_vms.put(_cluster, _name, vmName, State.Stopping);
synchronized (s_vms) {
s_logger.debug("5. The VM " + vmName + " is in " + State.Stopping + " state");
s_vms.put(_cluster, _name, vmName, State.Stopping);
}
try {
Set<VM> vms = VM.getByNameLabel(conn, vmName);
@ -2495,8 +2501,10 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
s_logger.warn(msg, e);
return new MigrateAnswer(cmd, false, msg, null);
} finally {
s_logger.debug("6. The VM " + vmName + " is in " + state + " state");
s_vms.put(_cluster, _name, vmName, state);
synchronized (s_vms) {
s_logger.debug("6. The VM " + vmName + " is in " + state + " state");
s_vms.put(_cluster, _name, vmName, state);
}
}
}
@ -2618,8 +2626,10 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
@Override
public RebootAnswer execute(RebootCommand cmd) {
Connection conn = getConnection();
s_logger.debug("7. The VM " + cmd.getVmName() + " is in " + State.Starting + " state");
s_vms.put(_cluster, _name, cmd.getVmName(), State.Starting);
synchronized (s_vms) {
s_logger.debug("7. The VM " + cmd.getVmName() + " is in " + State.Starting + " state");
s_vms.put(_cluster, _name, cmd.getVmName(), State.Starting);
}
try {
Set<VM> vms = null;
try {
@ -2642,8 +2652,10 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
}
return new RebootAnswer(cmd, "reboot succeeded", null, null);
} finally {
s_logger.debug("8. The VM " + cmd.getVmName() + " is in " + State.Running + " state");
s_vms.put(_cluster, _name, cmd.getVmName(), State.Running);
synchronized (s_vms) {
s_logger.debug("8. The VM " + cmd.getVmName() + " is in " + State.Running + " state");
s_vms.put(_cluster, _name, cmd.getVmName(), State.Running);
}
}
}
@ -3113,8 +3125,10 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
}
if (vms.size() == 0) {
s_logger.info("VM does not exist on XenServer" + _host.uuid);
s_vms.remove(_cluster, _name, vmName);
synchronized (s_vms) {
s_logger.info("VM does not exist on XenServer" + _host.uuid);
s_vms.remove(_cluster, _name, vmName);
}
return new StopAnswer(cmd, "VM does not exist", 0 , 0L, 0L);
}
Long bytesSent = 0L;
@ -3136,8 +3150,10 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
State state = s_vms.getState(_cluster, vmName);
s_logger.debug("9. The VM " + vmName + " is in " + State.Stopping + " state");
s_vms.put(_cluster, _name, vmName, State.Stopping);
synchronized (s_vms) {
s_logger.debug("9. The VM " + vmName + " is in " + State.Stopping + " state");
s_vms.put(_cluster, _name, vmName, State.Stopping);
}
try {
if (vmr.powerState == VmPowerState.RUNNING) {
@ -3198,8 +3214,10 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
String msg = "VM destroy failed in Stop " + vmName + " Command due to " + e.getMessage();
s_logger.warn(msg, e);
} finally {
s_logger.debug("10. The VM " + vmName + " is in " + state + " state");
s_vms.put(_cluster, _name, vmName, state);
synchronized (s_vms) {
s_logger.debug("10. The VM " + vmName + " is in " + state + " state");
s_vms.put(_cluster, _name, vmName, state);
}
}
}
}
@ -6563,20 +6581,27 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
Host.Record hostr = poolr.master.getRecord(conn);
if (!_host.uuid.equals(hostr.uuid)) {
return new ClusterSyncAnswer(cmd.getClusterId());
return new Answer(cmd);
}
} catch (Throwable e) {
s_logger.warn("Check for master failed, failing the Cluster sync command");
return new ClusterSyncAnswer(cmd.getClusterId());
return new Answer(cmd);
}
HashMap<String, Pair<String, State>> newStates = deltaClusterSync(conn);
cmd.incrStep();
if (cmd.isRightStep()){
cmd.incrStep();
// do full sync
HashMap<String, Pair<String, State>> allStates=fullClusterSync(conn);
return new ClusterSyncAnswer(cmd.getClusterId(), newStates, allStates);
if (cmd.isInit()){
cmd.unsetInit();
return new ClusterSyncAnswer(cmd.getClusterId(), newStates, allStates, true);
}
else {
return new ClusterSyncAnswer(cmd.getClusterId(), newStates, allStates);
}
}
else {
cmd.incrStep();
return new ClusterSyncAnswer(cmd.getClusterId(), newStates);
}
}
@ -6613,19 +6638,16 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
protected HashMap<String, Pair<String, State>> deltaClusterSync(Connection conn) {
HashMap<String, Pair<String, State>> newStates;
HashMap<String, Pair<String, State>> oldStates = null;
final HashMap<String, Pair<String, State>> changes = new HashMap<String, Pair<String, State>>();
newStates = getAllVms(conn);
if (newStates == null) {
s_logger.warn("Unable to get the vm states so no state sync at this point.");
return null;
}
synchronized (s_vms) {
oldStates = new HashMap<String, Pair<String, State>>(s_vms.size(_cluster));
HashMap<String, Pair<String, State>> newStates = getAllVms(conn);
if (newStates == null) {
s_logger.warn("Unable to get the vm states so no state sync at this point.");
return null;
}
HashMap<String, Pair<String, State>> oldStates = new HashMap<String, Pair<String, State>>(s_vms.size(_cluster));
oldStates.putAll(s_vms.getClusterVmState(_cluster));
for (final Map.Entry<String, Pair<String, State>> entry : newStates.entrySet()) {

View File

@ -1,9 +1,6 @@
package com.cloud.hypervisor.xen.resource;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import org.apache.log4j.Logger;
@ -15,7 +12,6 @@ public class XenServerPoolVms {
private static final Logger s_logger = Logger.getLogger(XenServerPoolVms.class);
private HashMap<String/* clusterId */, HashMap<String/* vm name */, Pair<String/* host uuid */, State/* vm state */>>> _cluster_vms =
new HashMap<String, HashMap<String, Pair<String, State>>>();
private long _last_sync_time=0;
public HashMap<String, Pair<String, State>> getClusterVmState(String clusterId){
HashMap<String, Pair<String, State>> _vms= _cluster_vms.get(clusterId);
@ -29,9 +25,7 @@ public class XenServerPoolVms {
public void clear(String clusterId){
HashMap<String, Pair<String, State>> _vms= getClusterVmState(clusterId);
synchronized (_vms) {
_vms.clear();
}
_vms.clear();
}
public State getState(String clusterId, String name){
@ -42,23 +36,17 @@ public class XenServerPoolVms {
public void put(String clusterId, String hostUuid, String name, State state){
HashMap<String, Pair<String, State>> vms= getClusterVmState(clusterId);
synchronized (vms) {
vms.put(name, new Pair<String, State>(hostUuid, state));
}
vms.put(name, new Pair<String, State>(hostUuid, state));
}
public void remove(String clusterId, String hostUuid, String name){
HashMap<String, Pair<String, State>> vms= getClusterVmState(clusterId);
synchronized (vms) {
vms.remove(name);
}
vms.remove(name);
}
public void putAll(String clusterId, HashMap<String, Pair<String, State>> new_vms){
HashMap<String, Pair<String, State>> vms= getClusterVmState(clusterId);
synchronized (vms) {
vms.putAll(new_vms);
}
vms.putAll(new_vms);
}
public int size(String clusterId){
@ -66,10 +54,6 @@ public class XenServerPoolVms {
return vms.size();
}
public void initSyncTime(){
_last_sync_time = System.currentTimeMillis();
}
@Override
public String toString(){
StringBuilder sbuf = new StringBuilder("PoolVms=");

View File

@ -48,8 +48,8 @@ import com.cloud.vm.VirtualMachine.State;
@Table(name="vm_instance")
@Inheritance(strategy=InheritanceType.JOINED)
@DiscriminatorColumn(name="type", discriminatorType=DiscriminatorType.STRING, length=32)
public class VMInstanceVO implements VirtualMachine, FiniteStateObject<State, VirtualMachine.Event> {
@Id
public class VMInstanceVO implements VirtualMachine, FiniteStateObject<State, VirtualMachine.Event> {
@Id
@TableGenerator(name="vm_instance_sq", table="sequence", pkColumnName="name", valueColumnName="value", pkColumnValue="vm_instance_seq", allocationSize=1)
@Column(name="id", updatable=false, nullable = false)
protected long id;
@ -435,5 +435,29 @@ public class VMInstanceVO implements VirtualMachine, FiniteStateObject<State, Vi
toString = new StringBuilder("VM[").append(type.toString()).append("|").append(instanceName).append("]").toString();
}
return toString;
}
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (int) (id ^ (id >>> 32));
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
VMInstanceVO other = (VMInstanceVO) obj;
if (id != other.id)
return false;
return true;
}
}

View File

@ -19,11 +19,14 @@ package com.cloud.vm;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@ -1704,19 +1707,43 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene
}
public void fullSync(final long clusterId, Map<String, Pair<String, State>> newStates) {
public void fullSync(final long clusterId, Map<String, Pair<String, State>> newStates, boolean init) {
Map<Long, AgentVmInfo> infos = convertToInfos(newStates);
List<VMInstanceVO> vms = _vmDao.listByClusterId(clusterId);
for (VMInstanceVO vm : vms) {
Set<VMInstanceVO> set_vms = Collections.synchronizedSet(new HashSet<VMInstanceVO>());
set_vms.addAll(_vmDao.listByClusterId(clusterId));
set_vms.addAll(_vmDao.listStartingByClusterId(clusterId));
for (VMInstanceVO vm : set_vms) {
if (vm.isRemoved() || vm.getState() == State.Destroyed || vm.getState() == State.Expunging) continue;
infos.remove(vm.getId());
}
// some VMs may be starting and will have last host id null
vms = _vmDao.listStartingByClusterId(clusterId);
for (VMInstanceVO vm : vms) {
if (vm.isRemoved() || vm.getState() == State.Destroyed || vm.getState() == State.Expunging) continue;
infos.remove(vm.getId());
AgentVmInfo info = infos.remove(vm.getId());
if (init){ // mark the VMs real state on initial sync
VMInstanceVO castedVm = null;
if (info == null) {
info = new AgentVmInfo(vm.getInstanceName(), getVmGuru(vm), vm, State.Stopped);
castedVm = info.guru.findById(vm.getId());
} else {
castedVm = info.vm;
}
try {
Host host = _resourceMgr.findHostByGuid(info.getHostUuid());
long hostId = host == null ? (vm.getHostId() == null ? vm.getLastHostId() : vm.getHostId()) : host.getId();
HypervisorGuru hvGuru = _hvGuruMgr.getGuru(castedVm.getHypervisorType());
Command command = compareState(hostId, castedVm, info, true, hvGuru.trackVmHostChange());
if (command != null){
Answer answer = _agentMgr.send(hostId, command);
if (!answer.getResult()) {
s_logger.warn("Failed to update state of the VM due to " + answer.getDetails());
}
}
} catch (Exception e) {
s_logger.warn("Unable to update state of the VM due to exception " + e.getMessage());
e.printStackTrace();
}
}
}
for (final AgentVmInfo left : infos.values()) {
try {
Host host = _resourceMgr.findHostByGuid(left.getHostUuid());
@ -1834,7 +1861,7 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene
final String serverName = vm.getInstanceName();
Command command = null;
s_logger.debug("VM " + serverName + ": cs state = " + serverState + " and realState = " + agentState);
if (s_logger.isDebugEnabled()) {
s_logger.debug("VM " + serverName + ": cs state = " + serverState + " and realState = " + agentState);
}
@ -2064,10 +2091,13 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene
if (!hs.isExceuted()){
if (hs.isFull()) {
deltaSync(hs.getNewStates());
fullSync(hs.getClusterId(), hs.getAllStates());
fullSync(hs.getClusterId(), hs.getAllStates(), false);
} else if (hs.isDelta()){
deltaSync(hs.getNewStates());
}
else {
fullSync(hs.getClusterId(), hs.getAllStates(), true);
}
hs.setExecuted();
}
} else if (!answer.getResult()) {