Handle transitional states across management server restart

This commit is contained in:
Kelven Yang 2013-06-19 17:47:14 -07:00
parent e2edae1711
commit 0bfc817bc6
4 changed files with 124 additions and 99 deletions

View File

@ -108,8 +108,12 @@ public interface VirtualMachine extends RunningOn, ControlledEntity, Identity, I
s_fsm.addTransition(State.Stopping, VirtualMachine.Event.FollowAgentPowerOnReport, State.Running);
s_fsm.addTransition(State.Stopped, VirtualMachine.Event.FollowAgentPowerOnReport, State.Running);
s_fsm.addTransition(State.Running, VirtualMachine.Event.FollowAgentPowerOnReport, State.Running);
s_fsm.addTransition(State.Migrating, VirtualMachine.Event.FollowAgentPowerOnReport, State.Running);
s_fsm.addTransition(State.Starting, VirtualMachine.Event.FollowAgentPowerOffReport, State.Stopped);
s_fsm.addTransition(State.Stopping, VirtualMachine.Event.FollowAgentPowerOffReport, State.Stopped);
s_fsm.addTransition(State.Running, VirtualMachine.Event.FollowAgentPowerOffReport, State.Stopped);
s_fsm.addTransition(State.Migrating, VirtualMachine.Event.FollowAgentPowerOffReport, State.Stopped);
}
public static boolean isVmStarted(State oldState, Event e, State newState) {

View File

@ -457,12 +457,12 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
_vmDao.remove(vm.getId());
}
@Override
public boolean start() {
_executor.scheduleAtFixedRate(new TransitionTask(), _pingInterval.value(), _pingInterval.value(), TimeUnit.SECONDS);
_executor.scheduleAtFixedRate(new CleanupTask(), _pingInterval.value()*2, _pingInterval.value()*2, TimeUnit.SECONDS);
// cancel jobs left-over from last run
cancelWorkItems(_nodeId);
return true;
@ -1801,42 +1801,42 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
protected void cancelWorkItems(long nodeId) {
/*
GlobalLock scanLock = GlobalLock.getInternLock("vmmgr.cancel.workitem");
GlobalLock scanLock = GlobalLock.getInternLock("vmmgr.cancel.workitem");
try {
if (scanLock.lock(3)) {
try {
List<VmWorkJobVO> works = _workDao.listWorkInProgressFor(nodeId);
for (VmWorkJobVO work : works) {
s_logger.info("Handling unfinished work item: " + work);
try {
VMInstanceVO vm = _vmDao.findById(work.getInstanceId());
if (vm != null) {
if (work.getType() == State.Starting) {
_haMgr.scheduleRestart(vm, true);
work.setManagementServerId(_nodeId);
_workDao.update(work.getId(), work);
} else if (work.getType() == State.Stopping) {
_haMgr.scheduleStop(vm, vm.getHostId(), WorkType.CheckStop);
work.setManagementServerId(_nodeId);
_workDao.update(work.getId(), work);
} else if (work.getType() == State.Migrating) {
_haMgr.scheduleMigration(vm);
work.setStep(Step.Done);
_workDao.update(work.getId(), work);
}
try {
if (scanLock.lock(3)) {
try {
List<VmWorkJobVO> works = _workDao.listWorkInProgressFor(nodeId);
for (VmWorkJobVO work : works) {
s_logger.info("Handling unfinished work item: " + work);
try {
VMInstanceVO vm = _vmDao.findById(work.getInstanceId());
if (vm != null) {
if (work.getType() == State.Starting) {
_haMgr.scheduleRestart(vm, true);
work.setManagementServerId(_nodeId);
_workDao.update(work.getId(), work);
} else if (work.getType() == State.Stopping) {
_haMgr.scheduleStop(vm, vm.getHostId(), WorkType.CheckStop);
work.setManagementServerId(_nodeId);
_workDao.update(work.getId(), work);
} else if (work.getType() == State.Migrating) {
_haMgr.scheduleMigration(vm);
work.setStep(Step.Done);
_workDao.update(work.getId(), work);
}
} catch (Exception e) {
s_logger.error("Error while handling " + work, e);
}
} catch (Exception e) {
s_logger.error("Error while handling " + work, e);
}
} finally {
scanLock.unlock();
}
} finally {
scanLock.unlock();
}
} finally {
scanLock.releaseRef();
}
} finally {
scanLock.releaseRef();
}
*/
}
@ -3475,7 +3475,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
}
_alertMgr.sendAlert(AlertManager.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(),
VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (" + vm.getState() + " -> Running) from out-of-context transition. VM network environment may need to be reset");
break;
break;
case Destroyed :
case Expunging :
@ -3501,30 +3501,29 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
private void handlePowerOffReportWithNoPendingJobsOnVM(VMInstanceVO vm) {
// TODO :
// 1) handle left-over transitional VM states
// 2) handle out of sync stationary states, schedule force-stop to release resources
//
switch(vm.getState()) {
case Starting :
break;
case Stopping :
case Stopped :
case Migrating :
try {
stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOffReport, vm.getPowerHostId());
} catch(NoTransitionException e) {
s_logger.warn("Unexpected VM state transition exception, race-condition?", e);
}
_alertMgr.sendAlert(AlertManager.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(),
VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (" + vm.getState() + " -> Stopped) from out-of-context transition.");
// TODO: we need to forcely release all resource allocation
break;
case Running :
break;
case Stopping :
break;
case Stopped :
break;
case Destroyed :
case Expunging :
break;
case Migrating :
break;
case Error :
default :
break;
@ -3582,79 +3581,99 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
// VMs that in transitional state without recent power state report
@DB
private List<Long> listStalledVMInTransitionStateOnUpHost(long hostId, Date cutTime) {
String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status = 'UP' " +
"AND h.id = ? AND i.power_state_update_time < ? AND i.host_id = h.id " +
"AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " +
"AND i.id NOT IN (SELECT vm_instance_id FROM vm_work_job)";
"AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)";
List<Long> l = new ArrayList<Long>();
Transaction txn = Transaction.currentTxn();;
PreparedStatement pstmt = null;
try {
pstmt = txn.prepareAutoCloseStatement(sql);
pstmt.setLong(1, hostId);
pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime));
ResultSet rs = pstmt.executeQuery();
while(rs.next()) {
l.add(rs.getLong(1));
}
} catch (SQLException e) {
} catch (Throwable e) {
}
Transaction txn = null;
try {
txn = Transaction.open(Transaction.CLOUD_DB);
PreparedStatement pstmt = null;
try {
pstmt = txn.prepareAutoCloseStatement(sql);
pstmt.setLong(1, hostId);
pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime));
pstmt.setInt(3, JobInfo.Status.IN_PROGRESS.ordinal());
ResultSet rs = pstmt.executeQuery();
while(rs.next()) {
l.add(rs.getLong(1));
}
} catch (SQLException e) {
} catch (Throwable e) {
}
} finally {
if(txn != null)
txn.close();
}
return l;
}
// VMs that in transitional state and recently have power state update
@DB
private List<Long> listVMInTransitionStateWithRecentReportOnUpHost(long hostId, Date cutTime) {
String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status = 'UP' " +
"AND h.id = ? AND i.power_state_update_time > ? AND i.host_id = h.id " +
"AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " +
"AND i.id NOT IN (SELECT vm_instance_id FROM vm_work_job)";
"AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)";
List<Long> l = new ArrayList<Long>();
Transaction txn = Transaction.currentTxn();;
PreparedStatement pstmt = null;
try {
pstmt = txn.prepareAutoCloseStatement(sql);
pstmt.setLong(1, hostId);
pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime));
ResultSet rs = pstmt.executeQuery();
while(rs.next()) {
l.add(rs.getLong(1));
}
} catch (SQLException e) {
} catch (Throwable e) {
}
return l;
Transaction txn = null;
try {
txn = Transaction.open(Transaction.CLOUD_DB);
PreparedStatement pstmt = null;
try {
pstmt = txn.prepareAutoCloseStatement(sql);
pstmt.setLong(1, hostId);
pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime));
pstmt.setInt(3, JobInfo.Status.IN_PROGRESS.ordinal());
ResultSet rs = pstmt.executeQuery();
while(rs.next()) {
l.add(rs.getLong(1));
}
} catch (SQLException e) {
} catch (Throwable e) {
}
return l;
} finally {
if(txn != null)
txn.close();
}
}
@DB
private List<Long> listStalledVMInTransitionStateOnDisconnectedHosts(Date cutTime) {
String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status != 'UP' " +
"AND i.power_state_update_time < ? AND i.host_id = h.id " +
"AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " +
"AND i.id NOT IN (SELECT vm_instance_id FROM vm_work_job)";
"AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)";
List<Long> l = new ArrayList<Long>();
Transaction txn = Transaction.currentTxn();;
PreparedStatement pstmt = null;
Transaction txn = null;
try {
pstmt = txn.prepareAutoCloseStatement(sql);
pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime));
ResultSet rs = pstmt.executeQuery();
while(rs.next()) {
l.add(rs.getLong(1));
}
} catch (SQLException e) {
} catch (Throwable e) {
txn = Transaction.open(Transaction.CLOUD_DB);
PreparedStatement pstmt = null;
try {
pstmt = txn.prepareAutoCloseStatement(sql);
pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime));
pstmt.setInt(2, JobInfo.Status.IN_PROGRESS.ordinal());
ResultSet rs = pstmt.executeQuery();
while(rs.next()) {
l.add(rs.getLong(1));
}
} catch (SQLException e) {
} catch (Throwable e) {
}
return l;
} finally {
if(txn != null)
txn.close();
}
return l;
}
@Override

View File

@ -44,12 +44,14 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen
@PostConstruct
public void init() {
PendingWorkJobSearch = createSearchBuilder();
PendingWorkJobSearch.and("jobStatus", PendingWorkJobSearch.entity().getStatus(), Op.EQ);
PendingWorkJobSearch.and("vmType", PendingWorkJobSearch.entity().getVmType(), Op.EQ);
PendingWorkJobSearch.and("vmInstanceId", PendingWorkJobSearch.entity().getVmInstanceId(), Op.EQ);
PendingWorkJobSearch.and("step", PendingWorkJobSearch.entity().getStep(), Op.NEQ);
PendingWorkJobSearch.done();
PendingWorkJobByCommandSearch = createSearchBuilder();
PendingWorkJobByCommandSearch.and("jobStatus", PendingWorkJobByCommandSearch.entity().getStatus(), Op.EQ);
PendingWorkJobByCommandSearch.and("vmType", PendingWorkJobByCommandSearch.entity().getVmType(), Op.EQ);
PendingWorkJobByCommandSearch.and("vmInstanceId", PendingWorkJobByCommandSearch.entity().getVmInstanceId(), Op.EQ);
PendingWorkJobByCommandSearch.and("step", PendingWorkJobByCommandSearch.entity().getStep(), Op.NEQ);
@ -58,7 +60,7 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen
ExpungeWorkJobSearch = createSearchBuilder();
ExpungeWorkJobSearch.and("lastUpdated", ExpungeWorkJobSearch.entity().getLastUpdated(), Op.LT);
ExpungeWorkJobSearch.and("status", ExpungeWorkJobSearch.entity().getStatus(), Op.NEQ);
ExpungeWorkJobSearch.and("jobStatus", ExpungeWorkJobSearch.entity().getStatus(), Op.NEQ);
ExpungeWorkJobSearch.done();
}
@ -66,9 +68,9 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen
public VmWorkJobVO findPendingWorkJob(VirtualMachine.Type type, long instanceId) {
SearchCriteria<VmWorkJobVO> sc = PendingWorkJobSearch.create();
sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS);
sc.setParameters("vmType", type);
sc.setParameters("vmInstanceId", instanceId);
sc.setParameters("step", Step.Done);
Filter filter = new Filter(VmWorkJobVO.class, "created", true, null, null);
List<VmWorkJobVO> result = this.listBy(sc, filter);
@ -82,9 +84,9 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen
public List<VmWorkJobVO> listPendingWorkJobs(VirtualMachine.Type type, long instanceId) {
SearchCriteria<VmWorkJobVO> sc = PendingWorkJobSearch.create();
sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS);
sc.setParameters("vmType", type);
sc.setParameters("vmInstanceId", instanceId);
sc.setParameters("step", Step.Done);
Filter filter = new Filter(VmWorkJobVO.class, "created", true, null, null);
return this.listBy(sc, filter);
@ -94,9 +96,9 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen
public List<VmWorkJobVO> listPendingWorkJobs(VirtualMachine.Type type, long instanceId, String jobCmd) {
SearchCriteria<VmWorkJobVO> sc = PendingWorkJobByCommandSearch.create();
sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS);
sc.setParameters("vmType", type);
sc.setParameters("vmInstanceId", instanceId);
sc.setParameters("step", Step.Done);
sc.setParameters("cmd", jobCmd);
Filter filter = new Filter(VmWorkJobVO.class, "created", true, null, null);
@ -115,7 +117,7 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen
public void expungeCompletedWorkJobs(Date cutDate) {
SearchCriteria<VmWorkJobVO> sc = ExpungeWorkJobSearch.create();
sc.setParameters("lastUpdated",cutDate);
sc.setParameters("status", JobInfo.Status.IN_PROGRESS);
sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS);
expunge(sc);
}

View File

@ -180,7 +180,7 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements
public void resetJobProcess(long msid, int jobResultCode, String jobResultMessage) {
String sql = "UPDATE async_job SET job_status=" + JobInfo.Status.FAILED.ordinal() + ", job_result_code=" + jobResultCode
+ ", job_result='" + jobResultMessage + "' where job_status=" + JobInfo.Status.IN_PROGRESS.ordinal()
+ " AND (job_complete_msid=? OR (job_complete_msid IS NULL AND job_init_msid=?))";
+ " AND (job_executing_msid=? OR (job_executing_msid IS NULL AND job_init_msid=?))";
Transaction txn = Transaction.currentTxn();
PreparedStatement pstmt = null;