Merge VirtualMachineManger

This commit is contained in:
Kelven Yang 2013-10-16 17:00:57 -07:00
parent 534345e8aa
commit cf94cfb3f2
8 changed files with 635 additions and 11 deletions

View File

@ -111,6 +111,16 @@ public interface VirtualMachine extends RunningOn, ControlledEntity, Identity, I
s_fsm.addTransition(State.Expunging, VirtualMachine.Event.ExpungeOperation, State.Expunging);
s_fsm.addTransition(State.Error, VirtualMachine.Event.DestroyRequested, State.Expunging);
s_fsm.addTransition(State.Error, VirtualMachine.Event.ExpungeOperation, State.Expunging);
s_fsm.addTransition(State.Stopping, VirtualMachine.Event.FollowAgentPowerOnReport, State.Running);
s_fsm.addTransition(State.Stopped, VirtualMachine.Event.FollowAgentPowerOnReport, State.Running);
s_fsm.addTransition(State.Running, VirtualMachine.Event.FollowAgentPowerOnReport, State.Running);
s_fsm.addTransition(State.Migrating, VirtualMachine.Event.FollowAgentPowerOnReport, State.Running);
s_fsm.addTransition(State.Starting, VirtualMachine.Event.FollowAgentPowerOffReport, State.Stopped);
s_fsm.addTransition(State.Stopping, VirtualMachine.Event.FollowAgentPowerOffReport, State.Stopped);
s_fsm.addTransition(State.Running, VirtualMachine.Event.FollowAgentPowerOffReport, State.Stopped);
s_fsm.addTransition(State.Migrating, VirtualMachine.Event.FollowAgentPowerOffReport, State.Stopped);
}
public static boolean isVmStarted(State oldState, Event e, State newState) {
@ -179,9 +189,14 @@ public interface VirtualMachine extends RunningOn, ControlledEntity, Identity, I
AgentReportMigrated,
RevertRequested,
SnapshotRequested,
// added for new VMSync logic
FollowAgentPowerOnReport,
FollowAgentPowerOffReport,
};
public enum Type {
Instance(false),
User(false),
DomainRouter(true),
ConsoleProxy(true),

View File

@ -50,6 +50,7 @@ public interface AlertManager extends Manager {
public static final short ALERT_TYPE_DIRECT_ATTACHED_PUBLIC_IP = 24;
public static final short ALERT_TYPE_LOCAL_STORAGE = 25;
public static final short ALERT_TYPE_RESOURCE_LIMIT_EXCEEDED = 26; // Generated when the resource limit exceeds the limit. Currently used for recurring snapshots only
public static final short ALERT_TYPE_SYNC = 27;
static final ConfigKey<Double> StorageCapacityThreshold = new ConfigKey<Double>(Double.class, "cluster.storage.capacity.notificationthreshold", "Alert", "0.75",
"Percentage (as a value between 0 and 1) of storage utilization above which alerts will be sent about low storage available.", true, ConfigKey.Scope.Cluster, null);

View File

@ -18,6 +18,10 @@
package com.cloud.vm;
import java.net.URI;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
@ -26,6 +30,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@ -47,12 +52,17 @@ import org.apache.cloudstack.framework.config.ConfigDepot;
import org.apache.cloudstack.framework.config.ConfigKey;
import org.apache.cloudstack.framework.config.Configurable;
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
import org.apache.cloudstack.framework.jobs.dao.VmWorkJobDao;
import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO;
import org.apache.cloudstack.framework.messagebus.MessageBus;
import org.apache.cloudstack.framework.messagebus.MessageHandler;
import org.apache.cloudstack.jobs.JobInfo;
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
import org.apache.cloudstack.utils.identity.ManagementServerNode;
import org.apache.cloudstack.storage.to.VolumeObjectTO;
import org.apache.log4j.Logger;
import com.cloud.agent.AgentManager;
import com.cloud.agent.Listener;
@ -78,7 +88,6 @@ import com.cloud.agent.api.StartAnswer;
import com.cloud.agent.api.StartCommand;
import com.cloud.agent.api.StartupCommand;
import com.cloud.agent.api.StartupRoutingCommand;
import com.cloud.agent.api.StartupRoutingCommand.VmState;
import com.cloud.agent.api.StopAnswer;
import com.cloud.agent.api.StopCommand;
import com.cloud.agent.api.UnPlugNicAnswer;
@ -155,6 +164,7 @@ import com.cloud.storage.dao.VolumeDao;
import com.cloud.template.VirtualMachineTemplate;
import com.cloud.user.Account;
import com.cloud.user.User;
import com.cloud.utils.DateUtil;
import com.cloud.utils.Journal;
import com.cloud.utils.Pair;
import com.cloud.utils.StringUtils;
@ -171,6 +181,7 @@ import com.cloud.utils.fsm.NoTransitionException;
import com.cloud.utils.fsm.StateMachine2;
import com.cloud.vm.ItWorkVO.Step;
import com.cloud.vm.VirtualMachine.Event;
import com.cloud.vm.VirtualMachine.PowerState;
import com.cloud.vm.VirtualMachine.State;
import com.cloud.vm.dao.NicDao;
import com.cloud.vm.dao.UserVmDao;
@ -185,6 +196,8 @@ import com.cloud.vm.snapshot.dao.VMSnapshotDao;
public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMachineManager, Listener, Configurable {
private static final Logger s_logger = Logger.getLogger(VirtualMachineManagerImpl.class);
private static final String VM_SYNC_ALERT_SUBJECT = "VM state sync alert";
@Inject
DataStoreManager dataStoreMgr;
@Inject
@ -278,6 +291,11 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
@Inject
DeploymentPlanningManager _dpMgr;
@Inject protected MessageBus _messageBus;
@Inject protected VirtualMachinePowerStateSync _syncMgr;
@Inject protected VmWorkJobDao _workJobDao;
@Inject protected AsyncJobManager _jobMgr;
Map<VirtualMachine.Type, VirtualMachineGuru> _vmGurus = new HashMap<VirtualMachine.Type, VirtualMachineGuru>();
protected StateMachine2<State, VirtualMachine.Event, VirtualMachine> _stateMachine;
@ -297,6 +315,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
"On destroy, force-stop takes this value ", true);
static final ConfigKey<Integer> ClusterDeltaSyncInterval = new ConfigKey<Integer>("Advanced", Integer.class, "sync.interval", "60", "Cluster Delta sync interval in seconds",
false);
protected static final ConfigKey<Long> PingInterval = new ConfigKey<Long>("Advanced",
Long.class, "ping.interval", "60", "Ping interval in seconds", false);
ScheduledExecutorService _executor = null;
@ -1325,7 +1346,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
protected boolean checkVmOnHost(VirtualMachine vm, long hostId) throws AgentUnavailableException, OperationTimedoutException {
CheckVirtualMachineAnswer answer = (CheckVirtualMachineAnswer)_agentMgr.send(hostId, new CheckVirtualMachineCommand(vm.getInstanceName()));
if (!answer.getResult() || answer.getState() == State.Stopped) {
if (!answer.getResult() || answer.getState() == PowerState.PowerOff) {
return false;
}
@ -1910,6 +1931,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
return new StopCommand(vmName, getExecuteInSequence());
}
/*
public Commands fullHostSync(final long hostId, StartupRoutingCommand startup) {
Commands commands = new Commands(Command.OnError.Continue);
@ -1968,7 +1990,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
return commands;
}
*/
/*
public Commands deltaHostSync(long hostId, Map<String, State> newStates) {
Map<Long, AgentVmInfo> states = convertDeltaToInfos(newStates);
Commands commands = new Commands(Command.OnError.Continue);
@ -1996,7 +2020,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
return commands;
}
*/
public void deltaSync(Map<String, Pair<String, State>> newStates) {
Map<Long, AgentVmInfo> states = convertToInfos(newStates);
@ -2185,7 +2209,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
}
return map;
}
/*
protected Map<Long, AgentVmInfo> convertToInfos(StartupRoutingCommand cmd) {
final Map<String, VmState> states = cmd.getVmStates();
final HashMap<Long, AgentVmInfo> map = new HashMap<Long, AgentVmInfo>();
@ -2203,7 +2227,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
return map;
}
*/
protected Map<Long, AgentVmInfo> convertDeltaToInfos(final Map<String, State> states) {
final HashMap<Long, AgentVmInfo> map = new HashMap<Long, AgentVmInfo>();
@ -2229,7 +2253,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
*
*/
protected Command compareState(long hostId, VMInstanceVO vm, final AgentVmInfo info, final boolean fullSync, boolean trackExternalChange) {
State agentState = info.state;
return null;
/*
State agentState = info.state;
final State serverState = vm.getState();
final String serverName = vm.getInstanceName();
@ -2401,10 +2427,12 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
}
}
return command;
*/
}
private void ensureVmRunningContext(long hostId, VMInstanceVO vm, Event cause) throws OperationTimedoutException, ResourceUnavailableException, NoTransitionException,
InsufficientAddressCapacityException {
/*
VirtualMachineGuru vmGuru = getVmGuru(vm);
s_logger.debug("VM state is starting on full sync so updating it to running");
@ -2461,6 +2489,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
work.setStep(Step.Done);
_workDao.update(work.getId(), work);
}
*/
}
@Override
@ -2474,7 +2503,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
if (answer instanceof ClusterSyncAnswer) {
ClusterSyncAnswer hs = (ClusterSyncAnswer)answer;
if (!hs.isExceuted()) {
/*
deltaSync(hs.getNewStates());
*/
hs.setExecuted();
}
}
@ -2492,6 +2523,25 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
return -1;
}
@Override
public boolean processCommands(long agentId, long seq, Command[] cmds) {
boolean processed = false;
for (Command cmd : cmds) {
if (cmd instanceof PingRoutingCommand) {
PingRoutingCommand ping = (PingRoutingCommand) cmd;
if (ping.getNewStates() != null && ping.getNewStates().size() > 0) {
_syncMgr.processHostVmStatePingReport(agentId, ping.getNewStates());
}
// take the chance to scan VMs that are stuck in transitional states and are missing from the report
scanStalledVMInTransitionStateOnUpHost(agentId);
processed = true;
}
}
return processed;
}
/*
@Override
public boolean processCommands(long agentId, long seq, Command[] cmds) {
boolean processed = false;
@ -2513,7 +2563,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
}
return processed;
}
*/
@Override
public AgentControlAnswer processControlCommand(long agentId, AgentControlCommand cmd) {
return null;
@ -2523,7 +2574,19 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
public boolean processDisconnect(long agentId, Status state) {
return true;
}
@Override
public void processConnect(Host agent, StartupCommand cmd, boolean forRebalance) throws ConnectionException {
if (!(cmd instanceof StartupRoutingCommand)) {
return;
}
if(s_logger.isDebugEnabled())
s_logger.debug("Received startup command from hypervisor host. host id: " + agent.getId());
_syncMgr.resetHostSyncState(agent.getId());
}
/*
@Override
public void processConnect(Host agent, StartupCommand cmd, boolean forRebalance) throws ConnectionException {
if (!(cmd instanceof StartupRoutingCommand)) {
@ -2588,7 +2651,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
}
}
*/
protected class TransitionTask extends ManagedContextRunnable {
@Override
protected void runInContext() {
@ -2603,6 +2666,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
return;
}
try {
/*
lock.addRef();
List<VMInstanceVO> instances = _vmDao.findVMInTransition(new Date(new Date().getTime() - (AgentManager.Wait.value() * 1000)), State.Starting, State.Stopping);
for (VMInstanceVO instance : instances) {
@ -2613,6 +2677,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
_haMgr.scheduleRestart(instance, true);
}
}
*/
scanStalledVMInTransitionStateOnDisconnectedHosts();
} catch (Exception e) {
s_logger.warn("Caught the following exception on transition checking", e);
} finally {
@ -3280,4 +3348,283 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
this._storagePoolAllocators = storagePoolAllocators;
}
//
// PowerState report handling for out-of-band changes and handling of left-over transitional VM states
//
@MessageHandler(topic = Topics.VM_POWER_STATE)
private void HandlePownerStateReport(Object target, String subject, String senderAddress, Object args) {
assert(args != null);
Long vmId = (Long)args;
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
VirtualMachine.Type.Instance, vmId);
if(pendingWorkJobs.size() == 0) {
// there is no pending operation job
VMInstanceVO vm = _vmDao.findById(vmId);
if(vm != null) {
switch(vm.getPowerState()) {
case PowerOn :
handlePowerOnReportWithNoPendingJobsOnVM(vm);
break;
case PowerOff :
handlePowerOffReportWithNoPendingJobsOnVM(vm);
break;
// PowerUnknown shouldn't be reported, it is a derived
// VM power state from host state (host un-reachable
case PowerUnknown :
default :
assert(false);
break;
}
} else {
s_logger.warn("VM " + vmId + " no longer exists when processing VM state report");
}
} else {
// TODO, do job wake-up signalling, since currently async job wake-up is not in use
// we will skip it for nows
}
}
private void handlePowerOnReportWithNoPendingJobsOnVM(VMInstanceVO vm) {
//
// 1) handle left-over transitional VM states
// 2) handle out of band VM live migration
// 3) handle out of sync stationary states, marking VM from Stopped to Running with
// alert messages
//
switch(vm.getState()) {
case Starting :
try {
stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId());
} catch(NoTransitionException e) {
s_logger.warn("Unexpected VM state transition exception, race-condition?", e);
}
// we need to alert admin or user about this risky state transition
_alertMgr.sendAlert(AlertManager.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(),
VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (Starting -> Running) from out-of-context transition. VM network environment may need to be reset");
break;
case Running :
try {
if(vm.getHostId() != null && vm.getHostId().longValue() != vm.getPowerHostId().longValue())
s_logger.info("Detected out of band VM migration from host " + vm.getHostId() + " to host " + vm.getPowerHostId());
stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId());
} catch(NoTransitionException e) {
s_logger.warn("Unexpected VM state transition exception, race-condition?", e);
}
break;
case Stopping :
case Stopped :
try {
stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId());
} catch(NoTransitionException e) {
s_logger.warn("Unexpected VM state transition exception, race-condition?", e);
}
_alertMgr.sendAlert(AlertManager.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(),
VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (" + vm.getState() + " -> Running) from out-of-context transition. VM network environment may need to be reset");
break;
case Destroyed :
case Expunging :
s_logger.info("Receive power on report when VM is in destroyed or expunging state. vm: "
+ vm.getId() + ", state: " + vm.getState());
break;
case Migrating :
try {
stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId());
} catch(NoTransitionException e) {
s_logger.warn("Unexpected VM state transition exception, race-condition?", e);
}
break;
case Error :
default :
s_logger.info("Receive power on report when VM is in error or unexpected state. vm: "
+ vm.getId() + ", state: " + vm.getState());
break;
}
}
private void handlePowerOffReportWithNoPendingJobsOnVM(VMInstanceVO vm) {
// 1) handle left-over transitional VM states
// 2) handle out of sync stationary states, schedule force-stop to release resources
//
switch(vm.getState()) {
case Starting :
case Stopping :
case Stopped :
case Migrating :
try {
stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOffReport, vm.getPowerHostId());
} catch(NoTransitionException e) {
s_logger.warn("Unexpected VM state transition exception, race-condition?", e);
}
_alertMgr.sendAlert(AlertManager.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(),
VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (" + vm.getState() + " -> Stopped) from out-of-context transition.");
// TODO: we need to forcely release all resource allocation
break;
case Running :
case Destroyed :
case Expunging :
break;
case Error :
default :
break;
}
}
private void scanStalledVMInTransitionStateOnUpHost(long hostId) {
//
// Check VM that is stuck in Starting, Stopping, Migrating states, we won't check
// VMs in expunging state (this need to be handled specially)
//
// checking condition
// 1) no pending VmWork job
// 2) on hostId host and host is UP
//
// When host is UP, soon or later we will get a report from the host about the VM,
// however, if VM is missing from the host report (it may happen in out of band changes
// or from designed behave of XS/KVM), the VM may not get a chance to run the state-sync logic
//
// Therefor, we will scan thoses VMs on UP host based on last update timestamp, if the host is UP
// and a VM stalls for status update, we will consider them to be powered off
// (which is relatively safe to do so)
long stallThresholdInMs = PingInterval.value() + (PingInterval.value() >> 1);
Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - stallThresholdInMs);
List<Long> mostlikelyStoppedVMs = listStalledVMInTransitionStateOnUpHost(hostId, cutTime);
for(Long vmId : mostlikelyStoppedVMs) {
VMInstanceVO vm = _vmDao.findById(vmId);
assert(vm != null);
handlePowerOffReportWithNoPendingJobsOnVM(vm);
}
List<Long> vmsWithRecentReport = listVMInTransitionStateWithRecentReportOnUpHost(hostId, cutTime);
for(Long vmId : vmsWithRecentReport) {
VMInstanceVO vm = _vmDao.findById(vmId);
assert(vm != null);
if(vm.getPowerState() == PowerState.PowerOn)
handlePowerOnReportWithNoPendingJobsOnVM(vm);
else
handlePowerOffReportWithNoPendingJobsOnVM(vm);
}
}
private void scanStalledVMInTransitionStateOnDisconnectedHosts() {
Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - VmOpWaitInterval.value()*1000);
List<Long> stuckAndUncontrollableVMs = listStalledVMInTransitionStateOnDisconnectedHosts(cutTime);
for(Long vmId : stuckAndUncontrollableVMs) {
VMInstanceVO vm = _vmDao.findById(vmId);
// We now only alert administrator about this situation
_alertMgr.sendAlert(AlertManager.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(),
VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") is stuck in " + vm.getState() + " state and its host is unreachable for too long");
}
}
// VMs that in transitional state without recent power state report
private List<Long> listStalledVMInTransitionStateOnUpHost(long hostId, Date cutTime) {
String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status = 'UP' " +
"AND h.id = ? AND i.power_state_update_time < ? AND i.host_id = h.id " +
"AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " +
"AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)";
List<Long> l = new ArrayList<Long>();
Transaction txn = null;
try {
txn = Transaction.open(Transaction.CLOUD_DB);
PreparedStatement pstmt = null;
try {
pstmt = txn.prepareAutoCloseStatement(sql);
pstmt.setLong(1, hostId);
pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime));
pstmt.setInt(3, JobInfo.Status.IN_PROGRESS.ordinal());
ResultSet rs = pstmt.executeQuery();
while(rs.next()) {
l.add(rs.getLong(1));
}
} catch (SQLException e) {
} catch (Throwable e) {
}
} finally {
if(txn != null)
txn.close();
}
return l;
}
// VMs that in transitional state and recently have power state update
private List<Long> listVMInTransitionStateWithRecentReportOnUpHost(long hostId, Date cutTime) {
String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status = 'UP' " +
"AND h.id = ? AND i.power_state_update_time > ? AND i.host_id = h.id " +
"AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " +
"AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)";
List<Long> l = new ArrayList<Long>();
Transaction txn = null;
try {
txn = Transaction.open(Transaction.CLOUD_DB);
PreparedStatement pstmt = null;
try {
pstmt = txn.prepareAutoCloseStatement(sql);
pstmt.setLong(1, hostId);
pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime));
pstmt.setInt(3, JobInfo.Status.IN_PROGRESS.ordinal());
ResultSet rs = pstmt.executeQuery();
while(rs.next()) {
l.add(rs.getLong(1));
}
} catch (SQLException e) {
} catch (Throwable e) {
}
return l;
} finally {
if(txn != null)
txn.close();
}
}
private List<Long> listStalledVMInTransitionStateOnDisconnectedHosts(Date cutTime) {
String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status != 'UP' " +
"AND i.power_state_update_time < ? AND i.host_id = h.id " +
"AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " +
"AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)";
List<Long> l = new ArrayList<Long>();
Transaction txn = null;
try {
txn = Transaction.open(Transaction.CLOUD_DB);
PreparedStatement pstmt = null;
try {
pstmt = txn.prepareAutoCloseStatement(sql);
pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime));
pstmt.setInt(2, JobInfo.Status.IN_PROGRESS.ordinal());
ResultSet rs = pstmt.executeQuery();
while(rs.next()) {
l.add(rs.getLong(1));
}
} catch (SQLException e) {
} catch (Throwable e) {
}
return l;
} finally {
if(txn != null)
txn.close();
}
}
}

View File

@ -98,6 +98,7 @@ import com.cloud.utils.Pair;
import com.cloud.utils.db.EntityManager;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.vm.VirtualMachine.Event;
import com.cloud.vm.VirtualMachine.PowerState;
import com.cloud.vm.VirtualMachine.State;
import com.cloud.vm.dao.UserVmDao;
import com.cloud.vm.dao.UserVmDetailsDao;
@ -397,7 +398,7 @@ public class VirtualMachineManagerImplTest {
CheckVirtualMachineAnswer checkVmAnswerMock = mock(CheckVirtualMachineAnswer.class);
when(checkVmAnswerMock.getResult()).thenReturn(true);
when(checkVmAnswerMock.getState()).thenReturn(State.Running);
when(checkVmAnswerMock.getState()).thenReturn(PowerState.PowerOn);
when(_agentMgr.send(anyLong(), isA(CheckVirtualMachineCommand.class))).thenReturn(checkVmAnswerMock);
// Mock the state transitions of vm.

View File

@ -0,0 +1,35 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.cloudstack.framework.jobs.dao;
import java.util.Date;
import java.util.List;
import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO;
import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO.Step;
import com.cloud.utils.db.GenericDao;
import com.cloud.vm.VirtualMachine;
public interface VmWorkJobDao extends GenericDao<VmWorkJobVO, Long> {
VmWorkJobVO findPendingWorkJob(VirtualMachine.Type type, long instanceId);
List<VmWorkJobVO> listPendingWorkJobs(VirtualMachine.Type type, long instanceId);
List<VmWorkJobVO> listPendingWorkJobs(VirtualMachine.Type type, long instanceId, String jobCmd);
void updateStep(long workJobId, Step step);
void expungeCompletedWorkJobs(Date cutDate);
}

View File

@ -0,0 +1,125 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.cloudstack.framework.jobs.dao;
import java.util.Date;
import java.util.List;
import javax.annotation.PostConstruct;
import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO;
import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO.Step;
import org.apache.cloudstack.jobs.JobInfo;
import com.cloud.utils.DateUtil;
import com.cloud.utils.db.Filter;
import com.cloud.utils.db.GenericDaoBase;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.SearchCriteria.Op;
import com.cloud.vm.VirtualMachine;
public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implements VmWorkJobDao {
protected SearchBuilder<VmWorkJobVO> PendingWorkJobSearch;
protected SearchBuilder<VmWorkJobVO> PendingWorkJobByCommandSearch;
protected SearchBuilder<VmWorkJobVO> ExpungeWorkJobSearch;
public VmWorkJobDaoImpl() {
}
@PostConstruct
public void init() {
PendingWorkJobSearch = createSearchBuilder();
PendingWorkJobSearch.and("jobStatus", PendingWorkJobSearch.entity().getStatus(), Op.EQ);
PendingWorkJobSearch.and("vmType", PendingWorkJobSearch.entity().getVmType(), Op.EQ);
PendingWorkJobSearch.and("vmInstanceId", PendingWorkJobSearch.entity().getVmInstanceId(), Op.EQ);
PendingWorkJobSearch.and("step", PendingWorkJobSearch.entity().getStep(), Op.NEQ);
PendingWorkJobSearch.done();
PendingWorkJobByCommandSearch = createSearchBuilder();
PendingWorkJobByCommandSearch.and("jobStatus", PendingWorkJobByCommandSearch.entity().getStatus(), Op.EQ);
PendingWorkJobByCommandSearch.and("vmType", PendingWorkJobByCommandSearch.entity().getVmType(), Op.EQ);
PendingWorkJobByCommandSearch.and("vmInstanceId", PendingWorkJobByCommandSearch.entity().getVmInstanceId(), Op.EQ);
PendingWorkJobByCommandSearch.and("step", PendingWorkJobByCommandSearch.entity().getStep(), Op.NEQ);
PendingWorkJobByCommandSearch.and("cmd", PendingWorkJobByCommandSearch.entity().getCmd(), Op.EQ);
PendingWorkJobByCommandSearch.done();
ExpungeWorkJobSearch = createSearchBuilder();
ExpungeWorkJobSearch.and("lastUpdated", ExpungeWorkJobSearch.entity().getLastUpdated(), Op.LT);
ExpungeWorkJobSearch.and("jobStatus", ExpungeWorkJobSearch.entity().getStatus(), Op.NEQ);
ExpungeWorkJobSearch.done();
}
@Override
public VmWorkJobVO findPendingWorkJob(VirtualMachine.Type type, long instanceId) {
SearchCriteria<VmWorkJobVO> sc = PendingWorkJobSearch.create();
sc.setParameters("jobStatus", JobInfo. Status.IN_PROGRESS);
sc.setParameters("vmType", type);
sc.setParameters("vmInstanceId", instanceId);
Filter filter = new Filter(VmWorkJobVO.class, "created", true, null, null);
List<VmWorkJobVO> result = this.listBy(sc, filter);
if(result != null && result.size() > 0)
return result.get(0);
return null;
}
@Override
public List<VmWorkJobVO> listPendingWorkJobs(VirtualMachine.Type type, long instanceId) {
SearchCriteria<VmWorkJobVO> sc = PendingWorkJobSearch.create();
sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS);
sc.setParameters("vmType", type);
sc.setParameters("vmInstanceId", instanceId);
Filter filter = new Filter(VmWorkJobVO.class, "created", true, null, null);
return this.listBy(sc, filter);
}
@Override
public List<VmWorkJobVO> listPendingWorkJobs(VirtualMachine.Type type, long instanceId, String jobCmd) {
SearchCriteria<VmWorkJobVO> sc = PendingWorkJobByCommandSearch.create();
sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS);
sc.setParameters("vmType", type);
sc.setParameters("vmInstanceId", instanceId);
sc.setParameters("cmd", jobCmd);
Filter filter = new Filter(VmWorkJobVO.class, "created", true, null, null);
return this.listBy(sc, filter);
}
@Override
public void updateStep(long workJobId, Step step) {
VmWorkJobVO jobVo = findById(workJobId);
jobVo.setStep(step);
jobVo.setLastUpdated(DateUtil.currentGMTTime());
update(workJobId, jobVo);
}
@Override
public void expungeCompletedWorkJobs(Date cutDate) {
SearchCriteria<VmWorkJobVO> sc = ExpungeWorkJobSearch.create();
sc.setParameters("lastUpdated",cutDate);
sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS);
expunge(sc);
}
}

View File

@ -0,0 +1,101 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.cloudstack.framework.jobs.impl;
import javax.persistence.Column;
import javax.persistence.DiscriminatorValue;
import javax.persistence.Entity;
import javax.persistence.EnumType;
import javax.persistence.Enumerated;
import javax.persistence.PrimaryKeyJoinColumn;
import javax.persistence.Table;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
import com.cloud.vm.VirtualMachine;
@Entity
@Table(name="vm_work_job")
@DiscriminatorValue(value="VmWork")
@PrimaryKeyJoinColumn(name="id")
public class VmWorkJobVO extends AsyncJobVO {
// These steps are rather arbitrary. What's recorded depends on the
// the operation being performed.
public enum Step {
Filed(false),
Prepare(false),
Starting(true),
Started(false),
Release(false),
Done(false),
Migrating(true),
Reconfiguring(false),
Error(false);
boolean updateState; // Should the VM State be updated after this step?
private Step(boolean updateState) {
this.updateState = updateState;
}
boolean updateState() {
return updateState;
}
}
@Column(name="step")
Step step;
@Column(name="vm_type")
@Enumerated(value=EnumType.STRING)
VirtualMachine.Type vmType;
@Column(name="vm_instance_id")
long vmInstanceId;
protected VmWorkJobVO() {
}
public VmWorkJobVO(String related) {
step = Step.Filed;
setRelated(related);
}
public Step getStep() {
return step;
}
public void setStep(Step step) {
this.step = step;
}
public VirtualMachine.Type getVmType() {
return vmType;
}
public void setVmType(VirtualMachine.Type vmType) {
this.vmType = vmType;
}
public long getVmInstanceId() {
return vmInstanceId;
}
public void setVmInstanceId(long vmInstanceId) {
this.vmInstanceId = vmInstanceId;
}
}

View File

@ -31,7 +31,6 @@ import com.cloud.host.Status;
import com.cloud.utils.component.AdapterBase;
import com.cloud.vm.VirtualMachine;
import com.cloud.vm.VirtualMachine.PowerState;
import com.cloud.vm.VirtualMachine.State;
@Local(value=Investigator.class)
public class CheckOnAgentInvestigator extends AdapterBase implements Investigator {