ha done? needs testing

This commit is contained in:
Alex Huang 2011-02-07 09:06:36 -08:00
parent f948926a2f
commit bee6953e09
7 changed files with 213 additions and 185 deletions

View File

@ -92,7 +92,7 @@ public interface VirtualMachine extends RunningOn, ControlledEntity, StateObject
s_fsm.addTransition(State.Migrating, VirtualMachine.Event.MigrationFailedOnDest, State.Running);
s_fsm.addTransition(State.Migrating, VirtualMachine.Event.AgentReportRunning, State.Running);
s_fsm.addTransition(State.Migrating, VirtualMachine.Event.AgentReportStopped, State.Stopped);
s_fsm.addTransition(State.Migrating, VirtualMachine.Event.AgentReportShutdowned, State.Stopped);
s_fsm.addTransition(State.Migrating, VirtualMachine.Event.AgentReportShutdowned, State.Stopped);
s_fsm.addTransition(State.Stopping, VirtualMachine.Event.OperationSucceeded, State.Stopped);
s_fsm.addTransition(State.Stopping, VirtualMachine.Event.OperationFailed, State.Running);
s_fsm.addTransition(State.Stopping, VirtualMachine.Event.AgentReportRunning, State.Running);

View File

@ -31,6 +31,7 @@ import javax.persistence.Temporal;
import javax.persistence.TemporalType;
import com.cloud.ha.HighAvailabilityManager.Step;
import com.cloud.ha.HighAvailabilityManager.WorkType;
import com.cloud.utils.db.GenericDao;
import com.cloud.vm.VirtualMachine;
import com.cloud.vm.VirtualMachine.State;
@ -38,14 +39,6 @@ import com.cloud.vm.VirtualMachine.State;
@Entity
@Table(name="op_ha_work")
public class HaWorkVO {
public enum WorkType {
Migration,
Stop,
CheckStop,
Destroy,
HA;
}
@Id
@GeneratedValue(strategy=GenerationType.IDENTITY)
@Column(name="id")

View File

@ -28,6 +28,14 @@ import com.cloud.vm.VMInstanceVO;
* HighAvailabilityManager checks to make sure the VMs are running fine.
*/
public interface HighAvailabilityManager extends Manager {
public enum WorkType {
Migration, // Migrating VMs off of a host.
Stop, // Stops a VM for storage pool migration purposes. This should be obsolete now.
CheckStop, // Checks if a VM has been stopped.
ForceStop, // Force a VM to stop even if the states don't allow it. Use this only if you know the VM is stopped on the physical hypervisor.
Destroy, // Destroy a VM.
HA; // Restart a VM.
}
enum Step {
Scheduled,
@ -35,9 +43,7 @@ public interface HighAvailabilityManager extends Manager {
Fencing,
Stopping,
Restarting,
Preparing,
Migrating,
Checking,
Cancelled,
Done,
Error,
@ -63,7 +69,7 @@ public interface HighAvailabilityManager extends Manager {
* @param vm the vm that has gone away.
* @param investigate must be investigated before we do anything with this vm.
*/
void scheduleRestart(final VMInstanceVO vm, boolean investigate);
void scheduleRestart(VMInstanceVO vm, boolean investigate);
void cancelDestroy(VMInstanceVO vm, Long hostId);
@ -73,7 +79,7 @@ public interface HighAvailabilityManager extends Manager {
* Schedule restarts for all vms running on the host.
* @param host host.
*/
void scheduleRestartForVmsOnHost(final HostVO host);
void scheduleRestartForVmsOnHost(HostVO host);
/**
* Schedule the vm for migration.
@ -81,18 +87,23 @@ public interface HighAvailabilityManager extends Manager {
* @param vm
* @return true if schedule worked.
*/
boolean scheduleMigration(final VMInstanceVO vm);
boolean scheduleMigration(VMInstanceVO vm);
List<VMInstanceVO> findTakenMigrationWork();
/**
* Stops a VM.
* Schedules a work item to stop a VM. This method schedules a work
* item to do one of three things.
*
* 1. Perform a regular stop of a VM: WorkType.Stop
* 2. Perform a force stop of a VM: WorkType.ForceStop
* 3. Check if a VM has been stopped: WorkType.CheckStop
*
* @param vm virtual machine to stop.
* @param host host the virtual machine is on.
* @param verifyHost make sure it is the same host as the schedule time.
* @param type which type of stop is requested.
*/
void scheduleStop(final VMInstanceVO vm, long hostId, boolean verifyHost);
void scheduleStop(VMInstanceVO vm, long hostId, WorkType type);
void cancelScheduledMigrations(HostVO host);
}

View File

@ -33,9 +33,9 @@ import javax.naming.ConfigurationException;
import org.apache.log4j.Logger;
import com.cloud.agent.AgentManager;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.Command;
import com.cloud.alert.AlertManager;
import com.cloud.cluster.ClusterManagerListener;
import com.cloud.cluster.ManagementServerHostVO;
import com.cloud.configuration.dao.ConfigurationDao;
import com.cloud.dc.ClusterDetailsDao;
import com.cloud.dc.DataCenterVO;
@ -48,7 +48,6 @@ import com.cloud.exception.InsufficientCapacityException;
import com.cloud.exception.InsufficientServerCapacityException;
import com.cloud.exception.OperationTimedoutException;
import com.cloud.exception.ResourceUnavailableException;
import com.cloud.ha.HaWorkVO.WorkType;
import com.cloud.ha.dao.HighAvailabilityDao;
import com.cloud.host.Host;
import com.cloud.host.HostVO;
@ -65,6 +64,7 @@ import com.cloud.utils.component.Adapters;
import com.cloud.utils.component.ComponentLocator;
import com.cloud.utils.component.Inject;
import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.vm.VMInstanceVO;
import com.cloud.vm.VirtualMachine;
import com.cloud.vm.VirtualMachine.Event;
@ -110,7 +110,7 @@ import com.cloud.vm.dao.VMInstanceDao;
* }
**/
@Local(value={HighAvailabilityManager.class})
public class HighAvailabilityManagerImpl implements HighAvailabilityManager {
public class HighAvailabilityManagerImpl implements HighAvailabilityManager, ClusterManagerListener {
protected static final Logger s_logger = Logger.getLogger(HighAvailabilityManagerImpl.class);
String _name;
WorkerThread[] _workers;
@ -222,17 +222,18 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager {
}
@Override
public void scheduleStop(final VMInstanceVO vm, long hostId, boolean verifyHost) {
public void scheduleStop(VMInstanceVO vm, long hostId, WorkType type) {
assert (type == WorkType.CheckStop || type == WorkType.ForceStop || type == WorkType.Stop);
if (_haDao.hasBeenScheduled(vm.getId(), verifyHost ? WorkType.CheckStop : WorkType.Stop)) {
s_logger.info("There's already a job scheduled to stop " + vm.toString());
if (_haDao.hasBeenScheduled(vm.getId(), type)) {
s_logger.info("There's already a job scheduled to stop " + vm);
return;
}
final HaWorkVO work = new HaWorkVO(vm.getId(), vm.getType(), verifyHost ? WorkType.CheckStop : WorkType.Stop, Step.Scheduled, hostId, vm.getState(), 0, vm.getUpdated());
HaWorkVO work = new HaWorkVO(vm.getId(), vm.getType(), type, Step.Scheduled, hostId, vm.getState(), 0, vm.getUpdated());
_haDao.persist(work);
if (s_logger.isDebugEnabled()) {
s_logger.debug("Scheduled " + work.toString() + " verifyHost = " + verifyHost);
s_logger.debug("Scheduled " + work);
}
wakeupWorkers();
}
@ -323,14 +324,13 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager {
return null;
}
s_logger.info("HA on " + vm.toString());
s_logger.info("HA on " + vm);
if (vm.getState() != work.getPreviousState() || vm.getUpdated() != work.getUpdateTime()) {
s_logger.info("VM " + vm.toString() + " has been changed. Current State = " + vm.getState() + " Previous State = " + work.getPreviousState() + " last updated = " + vm.getUpdated() + " previous updated = " + work.getUpdateTime());
s_logger.info("VM " + vm + " has been changed. Current State = " + vm.getState() + " Previous State = " + work.getPreviousState() + " last updated = " + vm.getUpdated() + " previous updated = " + work.getUpdateTime());
return null;
}
final HostVO host = _hostDao.findById(work.getHostId());
boolean nativeHA = _agentMgr.isHostNativeHAEnabled(work.getHostId());
HostVO host = _hostDao.findById(work.getHostId());
DataCenterVO dcVO = _dcDao.findById(host.getDataCenterId());
HostPodVO podVO = _podDao.findById(host.getPodId());
@ -347,9 +347,6 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager {
if (work.getStep() == Step.Investigating) {
if (vm.getHostId() == null || vm.getHostId() != work.getHostId()) {
s_logger.info("VM " + vm.toString() + " is now no longer on host " + work.getHostId());
if (vm.getState() == State.Starting && vm.getUpdated() == work.getUpdateTime()) {
_itMgr.stateTransitTo(vm, Event.AgentReportStopped, null);
}
return null;
}
@ -366,7 +363,7 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager {
if (alive != null && alive) {
s_logger.debug("VM " + vm.getName() + " is found to be alive by " + investigator.getName());
if (host.getStatus() == Status.Up) {
//FIXME compareState(vm, new AgentVmInfo(vm.getInstanceName(), null, State.Running), false, nativeHA);
s_logger.info(vm + " is alive and host is up. No need to restart it.");
return null;
} else {
s_logger.debug("Rescheduling because the host is not up but the vm is alive");
@ -397,11 +394,14 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager {
try {
_itMgr.advanceStop(vm, true, _accountMgr.getSystemUser(), _accountMgr.getSystemAccount());
} catch (ResourceUnavailableException e) {
// FIXME
assert false : "How do we hit this when force is true?";
throw new CloudRuntimeException("Caught exception even though it should be handled.", e);
} catch (OperationTimedoutException e) {
// FIXME
assert false : "How do we hit this when force is true?";
throw new CloudRuntimeException("Caught exception even though it should be handled.", e);
} catch (ConcurrentOperationException e) {
// FIXME
assert false : "How do we hit this when force is true?";
throw new CloudRuntimeException("Caught exception even though it should be handled.", e);
}
work.setStep(Step.Scheduled);
@ -462,7 +462,8 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager {
_alertMgr.sendAlert(alertType, vm.getDataCenterId(), vm.getPodId(), "Unable to restart " + vm.getName() + " which was running on host " + hostDesc, "The Storage is unavailable for trying to restart VM, name: " + vm.getName() + ", id: " + vmId + " which was running on host " + hostDesc);
return null;
} catch (OperationTimedoutException e) {
// FIXME
s_logger.warn("Unable to restart " + vm.toString() + " due to " + e.getMessage());
_alertMgr.sendAlert(alertType, vm.getDataCenterId(), vm.getPodId(), "Unable to restart " + vm.getName() + " which was running on host " + hostDesc, "The Storage is unavailable for trying to restart VM, name: " + vm.getName() + ", id: " + vmId + " which was running on host " + hostDesc);
return null;
}
}
@ -470,8 +471,12 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager {
public Long migrate(final HaWorkVO work) {
long vmId = work.getInstanceId();
long srcHostId = work.getHostId();
try {
work.setStep(Step.Migrating);
_haDao.update(work.getId(), work);
if (!_itMgr.migrateAway(work.getType(), vmId, srcHostId)) {
s_logger.warn("Unable to migrate vm from " + srcHostId);
_agentMgr.maintenanceFailed(srcHostId);
@ -532,40 +537,44 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager {
return (System.currentTimeMillis() >> 10) + _stopRetryInterval;
}
protected Long stopVM(final HaWorkVO work) {
final VMInstanceVO vm = _itMgr.findById(work.getType(), work.getInstanceId());
s_logger.info("Stopping " + vm.toString());
protected Long stopVM(final HaWorkVO work) throws ConcurrentOperationException {
VMInstanceVO vm = _itMgr.findById(work.getType(), work.getInstanceId());
if (vm == null) {
s_logger.info("No longer can find VM " + work.getInstanceId() + ". Throwing away " + work);
work.setStep(Step.Done);
return null;
}
s_logger.info("Stopping " + vm);
try {
if (work.getWorkType() == WorkType.Stop) {
if (vm.getHostId() != null) {
// FIXME if (_itMgr.advanceStop(vm, false, _accountMgr.getSystemUser(), _accountMgr.getSystemAccount())) {
// s_logger.info("Successfully stopped " + vm.toString());
// return null;
// }
} else {
if (s_logger.isDebugEnabled()) {
s_logger.debug(vm.toString() + " has already been stopped");
}
return null;
if (vm.getHostId() == null) {
if (s_logger.isDebugEnabled()) {
s_logger.debug(vm.toString() + " has already been stopped");
}
return null;
}
if (_itMgr.advanceStop(vm, false, _accountMgr.getSystemUser(), _accountMgr.getSystemAccount())) {
s_logger.info("Successfully stopped " + vm);
return null;
}
} else if (work.getWorkType() == WorkType.CheckStop) {
if ((vm.getState() != State.Stopping) || vm.getHostId() == null || vm.getHostId().longValue() != work.getHostId()) {
if (s_logger.isDebugEnabled()) {
s_logger.debug(vm.toString() + " is different now. Scheduled Host: " + work.getHostId() + " Current Host: " + (vm.getHostId() != null ? vm.getHostId() : "none") + " State: " + vm.getState());
}
if ((vm.getState() != work.getPreviousState()) || vm.getUpdated() != work.getUpdateTime() || vm.getHostId() == null || vm.getHostId().longValue() != work.getHostId()) {
s_logger.info(vm + " is different now. Scheduled Host: " + work.getHostId() + " Current Host: " + (vm.getHostId() != null ? vm.getHostId() : "none") + " State: " + vm.getState());
return null;
} else {
Command cmd = null;
//FIXME Command cmd = _itMgr.cleanup(vm.getInstanceName());
Answer ans = _agentMgr.send(work.getHostId(), cmd);
if (ans.getResult()) {
_itMgr.stateTransitTo(vm, Event.AgentReportStopped, null);
//FIXME mgr.finalizeStop(new VirtualMachineProfileImpl<VMInstanceVO>(vm), (StopAnswer)ans);
s_logger.info("Successfully stopped " + vm.toString());
return null;
}
s_logger.debug("Stop for " + vm.toString() + " was unsuccessful. Detail: " + ans.getDetails());
}
if (_itMgr.advanceStop(vm, false, _accountMgr.getSystemUser(), _accountMgr.getSystemAccount())) {
s_logger.info("Stop for " + vm + " was successful");
return null;
}
} else if (work.getWorkType() == WorkType.ForceStop){
if ((vm.getState() != work.getPreviousState()) || vm.getUpdated() != work.getUpdateTime() || vm.getHostId() == null || vm.getHostId().longValue() != work.getHostId()) {
s_logger.info(vm + " is different now. Scheduled Host: " + work.getHostId() + " Current Host: " + (vm.getHostId() != null ? vm.getHostId() : "none") + " State: " + vm.getState());
return null;
}
if (_itMgr.advanceStop(vm, true, _accountMgr.getSystemUser(), _accountMgr.getSystemAccount())) {
s_logger.info("Stop for " + vm + " was successful");
return null;
}
} else {
assert false : "Who decided there's other steps but didn't modify the guy who does the work?";
}
@ -576,6 +585,9 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager {
}
work.setTimesTried(work.getTimesTried() + 1);
if (s_logger.isDebugEnabled()) {
s_logger.debug("Stop was unsuccessful. Rescheduling");
}
return (System.currentTimeMillis() >> 10) + _stopRetryInterval;
}
@ -650,6 +662,8 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager {
if (_instance == null) {
_instance = "VMOPS";
}
_haDao.releaseWorkItems(_serverId);
_stopped = true;
@ -726,7 +740,7 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager {
}
}
s_logger.info("Working on " + work.toString());
s_logger.info("Processing " + work);
try {
final WorkType wt = work.getWorkType();
@ -735,7 +749,7 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager {
nextTime = migrate(work);
} else if (wt == WorkType.HA) {
nextTime = restart(work);
} else if (wt == WorkType.Stop || wt == WorkType.CheckStop) {
} else if (wt == WorkType.Stop || wt == WorkType.CheckStop || wt == WorkType.ForceStop) {
nextTime = stopVM(work);
} else if (wt == WorkType.Destroy) {
nextTime = destroyVM(work);
@ -745,20 +759,16 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager {
}
if (nextTime == null) {
if (s_logger.isDebugEnabled()) {
s_logger.debug(work.toString() + " is complete");
}
s_logger.info("Completed " + work);
work.setStep(Step.Done);
} else {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Rescheduling " + work.toString() + " for instance " + work.getInstanceId() + " to try again at " + new Date(nextTime << 10));
}
s_logger.info("Rescheduling " + work + " to try again at " + new Date(nextTime << 10));
work.setTimeToTry(nextTime);
work.setServerId(null);
work.setDateTaken(null);
}
} catch (Exception e) {
s_logger.error("Caught this exception while processing the work queue.", e);
s_logger.error("Terminating " + work, e);
work.setStep(Step.Error);
}
_haDao.update(work.getId(), work);
@ -775,5 +785,16 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager {
notifyAll();
}
}
@Override
public void onManagementNodeJoined(List<ManagementServerHostVO> nodeList, long selfNodeId) {
}
@Override
public void onManagementNodeLeft(List<ManagementServerHostVO> nodeList, long selfNodeId) {
for (ManagementServerHostVO node : nodeList) {
_haDao.releaseWorkItems(node.getMsid());
}
}
}

View File

@ -20,7 +20,7 @@ package com.cloud.ha.dao;
import java.util.List;
import com.cloud.ha.HaWorkVO;
import com.cloud.ha.HaWorkVO.WorkType;
import com.cloud.ha.HighAvailabilityManager.WorkType;
import com.cloud.utils.db.GenericDao;
public interface HighAvailabilityDao extends GenericDao<HaWorkVO, Long> {
@ -63,4 +63,6 @@ public interface HighAvailabilityDao extends GenericDao<HaWorkVO, Long> {
* @return true if it has been scheduled and false if it hasn't.
*/
boolean hasBeenScheduled(long instanceId, WorkType type);
int releaseWorkItems(long nodeId);
}

View File

@ -24,14 +24,15 @@ import javax.ejb.Local;
import org.apache.log4j.Logger;
import com.cloud.ha.HighAvailabilityManager;
import com.cloud.ha.HaWorkVO;
import com.cloud.ha.HighAvailabilityManager;
import com.cloud.ha.HighAvailabilityManager.Step;
import com.cloud.ha.HaWorkVO.WorkType;
import com.cloud.ha.HighAvailabilityManager.WorkType;
import com.cloud.utils.db.Filter;
import com.cloud.utils.db.GenericDaoBase;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.SearchCriteria.Op;
import com.cloud.utils.db.Transaction;
import com.cloud.utils.exception.CloudRuntimeException;
@ -45,44 +46,51 @@ public class HighAvailabilityDaoImpl extends GenericDaoBase<HaWorkVO, Long> impl
private final SearchBuilder<HaWorkVO> CleanupSearch;
private final SearchBuilder<HaWorkVO> PreviousWorkSearch;
private final SearchBuilder<HaWorkVO> TakenWorkSearch;
private final SearchBuilder<HaWorkVO> ReleaseSearch;
protected HighAvailabilityDaoImpl() {
super();
CleanupSearch = createSearchBuilder();
CleanupSearch.and("time", CleanupSearch.entity().getTimeToTry(), SearchCriteria.Op.LTEQ);
CleanupSearch.and("step", CleanupSearch.entity().getStep(), SearchCriteria.Op.IN);
CleanupSearch.and("time", CleanupSearch.entity().getTimeToTry(), Op.LTEQ);
CleanupSearch.and("step", CleanupSearch.entity().getStep(), Op.IN);
CleanupSearch.done();
TBASearch = createSearchBuilder();
TBASearch.and("server", TBASearch.entity().getServerId(), SearchCriteria.Op.NULL);
TBASearch.and("taken", TBASearch.entity().getDateTaken(), SearchCriteria.Op.NULL);
TBASearch.and("time", TBASearch.entity().getTimeToTry(), SearchCriteria.Op.LTEQ);
TBASearch.and("server", TBASearch.entity().getServerId(), Op.NULL);
TBASearch.and("taken", TBASearch.entity().getDateTaken(), Op.NULL);
TBASearch.and("time", TBASearch.entity().getTimeToTry(), Op.LTEQ);
TBASearch.done();
PreviousInstanceSearch = createSearchBuilder();
PreviousInstanceSearch.and("instance", PreviousInstanceSearch.entity().getInstanceId(), SearchCriteria.Op.EQ);
PreviousInstanceSearch.and("instance", PreviousInstanceSearch.entity().getInstanceId(), Op.EQ);
PreviousInstanceSearch.done();
UntakenMigrationSearch = createSearchBuilder();
UntakenMigrationSearch.and("host", UntakenMigrationSearch.entity().getHostId(), SearchCriteria.Op.EQ);
UntakenMigrationSearch.and("type", UntakenMigrationSearch.entity().getWorkType(), SearchCriteria.Op.EQ);
UntakenMigrationSearch.and("server", UntakenMigrationSearch.entity().getServerId(), SearchCriteria.Op.NULL);
UntakenMigrationSearch.and("taken", UntakenMigrationSearch.entity().getDateTaken(), SearchCriteria.Op.NULL);
UntakenMigrationSearch.and("host", UntakenMigrationSearch.entity().getHostId(), Op.EQ);
UntakenMigrationSearch.and("type", UntakenMigrationSearch.entity().getWorkType(), Op.EQ);
UntakenMigrationSearch.and("server", UntakenMigrationSearch.entity().getServerId(), Op.NULL);
UntakenMigrationSearch.and("taken", UntakenMigrationSearch.entity().getDateTaken(), Op.NULL);
UntakenMigrationSearch.done();
TakenWorkSearch = createSearchBuilder();
TakenWorkSearch.and("type", TakenWorkSearch.entity().getWorkType(), SearchCriteria.Op.EQ);
TakenWorkSearch.and("server", TakenWorkSearch.entity().getServerId(), SearchCriteria.Op.NNULL);
TakenWorkSearch.and("taken", TakenWorkSearch.entity().getDateTaken(), SearchCriteria.Op.NNULL);
TakenWorkSearch.and("step", TakenWorkSearch.entity().getStep(), SearchCriteria.Op.NIN);
TakenWorkSearch.and("type", TakenWorkSearch.entity().getWorkType(), Op.EQ);
TakenWorkSearch.and("server", TakenWorkSearch.entity().getServerId(), Op.NNULL);
TakenWorkSearch.and("taken", TakenWorkSearch.entity().getDateTaken(), Op.NNULL);
TakenWorkSearch.and("step", TakenWorkSearch.entity().getStep(), Op.NIN);
TakenWorkSearch.done();
PreviousWorkSearch = createSearchBuilder();
PreviousWorkSearch.and("instance", PreviousWorkSearch.entity().getInstanceId(), SearchCriteria.Op.EQ);
PreviousWorkSearch.and("type", PreviousWorkSearch.entity().getWorkType(), SearchCriteria.Op.EQ);
PreviousWorkSearch.and("taken", PreviousWorkSearch.entity().getDateTaken(), SearchCriteria.Op.NULL);
PreviousWorkSearch.and("instance", PreviousWorkSearch.entity().getInstanceId(), Op.EQ);
PreviousWorkSearch.and("type", PreviousWorkSearch.entity().getWorkType(), Op.EQ);
PreviousWorkSearch.and("taken", PreviousWorkSearch.entity().getDateTaken(), Op.NULL);
PreviousWorkSearch.done();
ReleaseSearch = createSearchBuilder();
ReleaseSearch.and("server", ReleaseSearch.entity().getServerId(), Op.EQ);
ReleaseSearch.and("step", ReleaseSearch.entity().getStep(), Op.NIN);
ReleaseSearch.and("taken", ReleaseSearch.entity().getDateTaken(), Op.NNULL);
ReleaseSearch.done();
}
@Override
@ -171,4 +179,17 @@ public class HighAvailabilityDaoImpl extends GenericDaoBase<HaWorkVO, Long> impl
sc.setParameters("type", type);
return listBy(sc, null).size() > 0;
}
@Override
public int releaseWorkItems(long nodeId) {
SearchCriteria<HaWorkVO> sc = ReleaseSearch.create();
sc.setParameters("server", nodeId);
sc.setParameters("step", Step.Done, Step.Cancelled, Step.Error);
HaWorkVO vo = createForUpdate();
vo.setDateTaken(null);
vo.setServerId(null);
return update(vo, sc);
}
}

View File

@ -86,6 +86,7 @@ import com.cloud.exception.InsufficientServerCapacityException;
import com.cloud.exception.OperationTimedoutException;
import com.cloud.exception.ResourceUnavailableException;
import com.cloud.ha.HighAvailabilityManager;
import com.cloud.ha.HighAvailabilityManager.WorkType;
import com.cloud.host.Host;
import com.cloud.host.HostVO;
import com.cloud.host.Status;
@ -696,7 +697,7 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, StateLi
} finally {
if (startedVm == null) {
_workDao.updateStep(work, Step.Release);
cleanup(vmGuru, vmProfile, work, false, caller, account);
cleanup(vmGuru, vmProfile, work, Event.OperationFailed, false, caller, account);
}
}
}
@ -744,7 +745,7 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, StateLi
return true;
}
protected <T extends VMInstanceVO> boolean cleanup(VirtualMachineGuru<T> guru, VirtualMachineProfile<T> profile, ItWorkVO work, boolean force, User user, Account account) {
protected <T extends VMInstanceVO> boolean cleanup(VirtualMachineGuru<T> guru, VirtualMachineProfile<T> profile, ItWorkVO work, Event event, boolean force, User user, Account account) {
T vm = profile.getVirtualMachine();
State state = vm.getState();
s_logger.debug("Cleaning up resources for the vm " + vm + " in " + state + " state");
@ -838,7 +839,7 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, StateLi
if ((vm.getState() == State.Starting || vm.getState() == State.Stopping || vm.getState() == State.Migrating) && forced) {
ItWorkVO work = _workDao.findByOutstandingWork(vm.getId(), vm.getState());
if (work != null) {
if (cleanup(vmGuru, new VirtualMachineProfileImpl<T>(vm), work, forced, user, account)) {
if (cleanup(vmGuru, new VirtualMachineProfileImpl<T>(vm), work, Event.StopRequested, forced, user, account)) {
return stateTransitTo(vm, Event.AgentReportStopped, null);
}
}
@ -1231,12 +1232,12 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, StateLi
}
public Commands deltaSync(long hostId, Map<String, State> newStates) {
Map<Long, AgentVmInfo> states = convertToIds(newStates);
Map<Long, AgentVmInfo> states = convertToInfos(newStates);
Commands commands = new Commands(OnError.Continue);
boolean nativeHA = _agentMgr.isHostNativeHAEnabled(hostId);
for (final Map.Entry<Long, AgentVmInfo> entry : states.entrySet()) {
for (Map.Entry<Long, AgentVmInfo> entry : states.entrySet()) {
AgentVmInfo info = entry.getValue();
VMInstanceVO vm = info.vm;
@ -1245,6 +1246,9 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, StateLi
if (vm != null) {
command = compareState(vm, info, false, nativeHA);
} else {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Cleaning up a VM that is no longer found: " + info.name);
}
command = cleanup(info.name);
}
@ -1256,7 +1260,7 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, StateLi
return commands;
}
protected Map<Long, AgentVmInfo> convertToIds(final Map<String, State> states) {
protected Map<Long, AgentVmInfo> convertToInfos(final Map<String, State> states) {
final HashMap<Long, AgentVmInfo> map = new HashMap<Long, AgentVmInfo>();
if (states == null) {
@ -1272,13 +1276,13 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, StateLi
VMInstanceVO vm = vmGuru.findByName(name);
if (vm != null) {
map.put(vm.getId(), new AgentVmInfo(entry.getKey(), vm, entry.getValue()));
map.put(vm.getId(), new AgentVmInfo(entry.getKey(), vmGuru, vm, entry.getValue()));
break;
}
Long id = vmGuru.convertToId(name);
if (id != null) {
map.put(id, new AgentVmInfo(entry.getKey(), null, entry.getValue()));
map.put(id, new AgentVmInfo(entry.getKey(), vmGuru, null, entry.getValue()));
}
}
}
@ -1324,61 +1328,59 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, StateLi
_alertMgr.sendAlert(alertType, vm.getDataCenterId(), vm.getPodId(), "VM (name: " + vm.getName() + ", id: " + vm.getId() + ") stopped on host " + hostDesc + " due to storage failure", "Virtual Machine " + vm.getName() + " (id: " + vm.getId() + ") running on host [" + vm.getHostId() + "] stopped due to storage failure.");
}
if (serverState == State.Migrating) {
s_logger.debug("Skipping vm in migrating state: " + vm.toString());
return null;
}
// if (serverState == State.Migrating) {
// s_logger.debug("Skipping vm in migrating state: " + vm.toString());
// return null;
// }
if (agentState == serverState) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Both states are " + agentState.toString() + " for " + serverName);
s_logger.debug("Both states are " + agentState + " for " + vm);
}
assert (agentState == State.Stopped || agentState == State.Running) : "If the states we send up is changed, this must be changed.";
stateTransitTo(vm, agentState == State.Stopped ? VirtualMachine.Event.AgentReportStopped : VirtualMachine.Event.AgentReportRunning, vm.getHostId());
if (agentState == State.Stopped) {
s_logger.debug("State matches but the agent said stopped so let's send a cleanup anyways.");
return cleanup(agentName);
if (agentState == State.Running) {
stateTransitTo(vm, VirtualMachine.Event.AgentReportRunning, vm.getHostId());
// FIXME: What if someone comes in and sets it to stopping? Then what?
return null;
}
return null;
}
if (agentState == State.Shutdowned ) {
if ( serverState == State.Running || serverState == State.Starting || serverState == State.Stopping ) {
stateTransitTo(vm, VirtualMachine.Event.AgentReportShutdowned, null);
s_logger.debug("State matches but the agent said stopped so let's send a cleanup command anyways.");
return cleanup(agentName);
}
if (agentState == State.Shutdowned) {
if (serverState == State.Running || serverState == State.Starting || serverState == State.Stopping) {
try {
advanceStop(vm, true, _accountMgr.getSystemUser(), _accountMgr.getSystemAccount());
} catch (AgentUnavailableException e) {
assert(false) : "How do we hit this with forced on?";
return null;
} catch (OperationTimedoutException e) {
assert(false) : "How do we hit this with forced on?";
return null;
} catch (ConcurrentOperationException e) {
assert(false) : "How do we hit this with forced on?";
return null;
}
} else {
s_logger.debug("Sending cleanup to a shutdowned vm: " + agentName);
command = cleanup(agentName);
}
s_logger.debug("Sending cleanup to a shutdowned vm: " + agentName);
command = cleanup(agentName);
} else if (agentState == State.Stopped) {
// This state means the VM on the agent was detected previously
// and now is gone. This is slightly different than if the VM
// was never completed but we still send down a Stop Command
// to ensure there's cleanup.
if (serverState == State.Running ) {
if(!nativeHA) {
// Our records showed that it should be running so let's restart it.
vm = findById(vm.getType(), vm.getId());
_haMgr.scheduleRestart(vm, false);
command = cleanup(agentName);
} else {
s_logger.info("VM is in runnting state, agent reported as stopped and native HA is enabled => skip sync action");
stateTransitTo(vm, Event.AgentReportStopped, null);
}
if (serverState == State.Running) {
// Our records showed that it should be running so let's restart it.
_haMgr.scheduleRestart(vm, false);
} else if (serverState == State.Stopping) {
if (fullSync) {
s_logger.debug("VM is in stopping state on full sync. Updating the status to stopped");
vm = findById(vm.getType(), vm.getId());
// advanceStop(vm, true, _accountMgr.getSystemUser(), _accountMgr.getSystemAccount());
command = cleanup(agentName);
} else {
s_logger.debug("Ignoring VM in stopping mode: " + vm.getName());
}
_haMgr.scheduleStop(vm, vm.getHostId(), WorkType.ForceStop);
s_logger.debug("Scheduling a check stop for VM in stopping mode: " + vm);
} else if (serverState == State.Starting) {
s_logger.debug("Ignoring VM in starting mode: " + vm.getName());
stateTransitTo(vm, VirtualMachine.Event.AgentReportStopped, null);
} else {
s_logger.debug("Sending cleanup to a stopped vm: " + agentName);
stateTransitTo(vm, VirtualMachine.Event.AgentReportStopped, null);
command = cleanup(agentName);
}
_haMgr.scheduleRestart(vm, false);
}
command = cleanup(agentName);
} else if (agentState == State.Running) {
if (serverState == State.Starting) {
if (fullSync) {
@ -1414,39 +1416,13 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, StateLi
}
}
} else if (serverState == State.Stopping) {
if (fullSync) {
s_logger.debug("VM state is in stopping on fullsync so resend stop.");
vm = findById(vm.getType(), vm.getId());
stateTransitTo(vm, Event.AgentReportStopped, null);
//finalizeStop(new VirtualMachineProfileImpl<VMInstanceVO>(vm), null);
command = cleanup(agentName);
} else {
s_logger.debug("VM is in stopping state so no action.");
}
} else if (serverState == State.Destroyed || serverState == State.Stopped || serverState == State.Expunging) {
s_logger.debug("Scheduling a stop command for " + vm);
_haMgr.scheduleStop(vm, vm.getHostId(), WorkType.Stop);
} else {
s_logger.debug("VM state is in stopped so stopping it on the agent");
command = cleanup(agentName);
} else {
stateTransitTo(vm, VirtualMachine.Event.AgentReportRunning, vm.getHostId());
}
} /*else if (agentState == State.Unknown) {
if (serverState == State.Running) {
if (fullSync) {
vm = info.vmGuru.get(vm.getId());
}
scheduleRestart(vm, false);
} else if (serverState == State.Starting) {
if (fullSync) {
vm = info.vmGuru.get(vm.getId());
}
scheduleRestart(vm, false);
} else if (serverState == State.Stopping) {
if (fullSync) {
s_logger.debug("VM state is stopping in full sync. Resending stop");
command = info.vmGuru.getCleanupCommand(vm, agentName);
}
}
}*/
}
}
return command;
}
@ -1455,26 +1431,28 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, StateLi
final List<? extends VMInstanceVO> vms = _vmDao.listByHostId(hostId);
s_logger.debug("Found " + vms.size() + " VMs for host " + hostId);
Map<Long, AgentVmInfo> states = convertToIds(newStates);
Map<Long, AgentVmInfo> infos = convertToInfos(newStates);
boolean nativeHA = _agentMgr.isHostNativeHAEnabled(hostId);
for (final VMInstanceVO vm : vms) {
AgentVmInfo info = states.remove(vm.getId());
for (VMInstanceVO vm : vms) {
AgentVmInfo info = infos.remove(vm.getId());
VMInstanceVO castedVm = null;
if (info == null) {
info = new AgentVmInfo(vm.getInstanceName(), null, State.Stopped);
info = new AgentVmInfo(vm.getInstanceName(), getVmGuru(vm), vm, State.Stopped);
castedVm = info.guru.findById(vm.getId());
} else {
castedVm = info.vm;
}
VirtualMachineGuru<? extends VMInstanceVO> vmGuru = getVmGuru(vm);
VMInstanceVO castedVm = vmGuru.findById(vm.getId());
final Command command = compareState(castedVm, info, true, nativeHA);
Command command = compareState(castedVm, info, true, nativeHA);
if (command != null) {
commands.addCommand(command);
}
}
for (final AgentVmInfo left : states.values()) {
for (final AgentVmInfo left : infos.values()) {
if (nativeHA) {
for (VirtualMachineGuru<? extends VMInstanceVO> vmGuru : _vmGurus.values()) {
VMInstanceVO vm = vmGuru.findByName(left.name);
@ -1615,7 +1593,7 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, StateLi
for (VMInstanceVO instance : instances) {
State state = instance.getState();
if (state == State.Stopping) {
_haMgr.scheduleStop(instance, instance.getHostId(), true);
_haMgr.scheduleStop(instance, instance.getHostId(), WorkType.CheckStop);
} else if (state == State.Starting) {
_haMgr.scheduleRestart(instance, true);
}
@ -1632,13 +1610,15 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, StateLi
protected class AgentVmInfo {
public String name;
public State state;
public State action;
public VMInstanceVO vm;
public VirtualMachineGuru<VMInstanceVO> guru;
public AgentVmInfo(String name, VMInstanceVO vm, State state) {
@SuppressWarnings("unchecked")
public AgentVmInfo(String name, VirtualMachineGuru<? extends VMInstanceVO> guru, VMInstanceVO vm, State state) {
this.name = name;
this.state = state;
this.vm = vm;
this.guru = (VirtualMachineGuru<VMInstanceVO>)guru;
}
}
}