mirror of https://github.com/apache/cloudstack.git
bug 9127: intermediate checkin for agent load balancer
This commit is contained in:
parent
f03273f34e
commit
635a1075de
|
|
@ -33,7 +33,7 @@ public enum Status {
|
|||
Maintenance(false, false, false),
|
||||
Alert(true, true, true),
|
||||
Removed(true, false, true),
|
||||
Rebalance(false, false, false);
|
||||
Rebalancing(false, false, false);
|
||||
|
||||
private final boolean updateManagementServer;
|
||||
private final boolean checkManagementServer;
|
||||
|
|
@ -137,7 +137,7 @@ public enum Status {
|
|||
s_fsm.addTransition(Status.Up, Event.Ping, Status.Up);
|
||||
s_fsm.addTransition(Status.Up, Event.AgentConnected, Status.Connecting);
|
||||
s_fsm.addTransition(Status.Up, Event.ManagementServerDown, Status.Disconnected);
|
||||
s_fsm.addTransition(Status.Up, Event.StartAgentRebalance, Status.Rebalance);
|
||||
s_fsm.addTransition(Status.Up, Event.StartAgentRebalance, Status.Rebalancing);
|
||||
s_fsm.addTransition(Status.Updating, Event.PingTimeout, Status.Alert);
|
||||
s_fsm.addTransition(Status.Updating, Event.Ping, Status.Updating);
|
||||
s_fsm.addTransition(Status.Updating, Event.AgentConnected, Status.Connecting);
|
||||
|
|
@ -183,8 +183,8 @@ public enum Status {
|
|||
s_fsm.addTransition(Status.Alert, Event.Ping, Status.Up);
|
||||
s_fsm.addTransition(Status.Alert, Event.Remove, Status.Removed);
|
||||
s_fsm.addTransition(Status.Alert, Event.ManagementServerDown, Status.Alert);
|
||||
s_fsm.addTransition(Status.Rebalance, Event.RebalanceFailed, Status.Alert);
|
||||
s_fsm.addTransition(Status.Rebalance, Event.RebalanceCompleted, Status.Connecting);
|
||||
s_fsm.addTransition(Status.Rebalancing, Event.RebalanceFailed, Status.Alert);
|
||||
s_fsm.addTransition(Status.Rebalancing, Event.RebalanceCompleted, Status.Connecting);
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
|
|
|||
|
|
@ -93,7 +93,7 @@
|
|||
<adapters key="com.cloud.acl.SecurityChecker">
|
||||
<adapter name="DomainChecker" class="com.cloud.acl.DomainChecker"/>
|
||||
</adapters>
|
||||
<adapters key="com.cloud.cluster.agentlb">
|
||||
<adapters key="com.cloud.cluster.agentlb.AgentLoadBalancerPlanner">
|
||||
<adapter name="ClusterBasedAgentLbPlanner" class="com.cloud.cluster.agentlb.ClusterBasedAgentLoadBalancerPlanner"/>
|
||||
</adapters>
|
||||
|
||||
|
|
|
|||
|
|
@ -17,11 +17,13 @@
|
|||
*/
|
||||
package com.cloud.agent.manager;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
|
|
@ -238,8 +240,25 @@ public abstract class AgentAttache {
|
|||
return _requests.size();
|
||||
}
|
||||
|
||||
public int getListenersSize() {
|
||||
return _waitForList.size();
|
||||
public int getNonRecurringListenersSize() {
|
||||
List<Listener> nonRecurringListenersList = new ArrayList<Listener>();
|
||||
if (_waitForList.isEmpty()) {
|
||||
return 0;
|
||||
} else {
|
||||
final Set<Map.Entry<Long, Listener>> entries = _waitForList.entrySet();
|
||||
final Iterator<Map.Entry<Long, Listener>> it = entries.iterator();
|
||||
while (it.hasNext()) {
|
||||
final Map.Entry<Long, Listener> entry = it.next();
|
||||
final Listener monitor = entry.getValue();
|
||||
if (!monitor.isRecurring()) {
|
||||
//TODO - remove this debug statement later
|
||||
s_logger.debug("Listener is " + entry.getValue() + " waiting on " + entry.getKey());
|
||||
nonRecurringListenersList.add(monitor);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nonRecurringListenersList.size();
|
||||
}
|
||||
|
||||
public boolean processAnswers(final long seq, final Response resp) {
|
||||
|
|
|
|||
|
|
@ -129,7 +129,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
|
||||
// schedule transfer scan executor - if agent LB is enabled
|
||||
if (_clusterMgr.isAgentRebalanceEnabled()) {
|
||||
s_transferExecutor.scheduleAtFixedRate(getTransferScanTask(), ClusteredAgentRebalanceService.DEFAULT_TRANSFER_CHECK_INTERVAL, ClusteredAgentRebalanceService.DEFAULT_TRANSFER_CHECK_INTERVAL,
|
||||
s_transferExecutor.scheduleAtFixedRate(getTransferScanTask(), 60000, ClusteredAgentRebalanceService.DEFAULT_TRANSFER_CHECK_INTERVAL,
|
||||
TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
|
|
@ -524,7 +524,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
}
|
||||
}
|
||||
if (agent == null) {
|
||||
throw new AgentUnavailableException("Host is not in the right state", hostId);
|
||||
throw new AgentUnavailableException("Host is not in the right state: " + host.getStatus() , hostId);
|
||||
}
|
||||
|
||||
return agent;
|
||||
|
|
@ -698,23 +698,26 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
@Override
|
||||
public void startRebalanceAgents() {
|
||||
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Management server " + _nodeId + " was asked to do agent rebalancing; checking how many hosts can be taken from this server");
|
||||
}
|
||||
|
||||
List<ManagementServerHostVO> allMS = _mshostDao.listBy(ManagementServerHost.State.Up, ManagementServerHost.State.Starting);
|
||||
List<HostVO> allManagedAgents = _hostDao.listManagedAgents();
|
||||
List<HostVO> allManagedAgents = _hostDao.listManagedRoutingAgents();
|
||||
|
||||
long avLoad = 0L;
|
||||
int avLoad = 0;
|
||||
|
||||
if (!allManagedAgents.isEmpty() && !allMS.isEmpty()) {
|
||||
avLoad = allManagedAgents.size() / allMS.size();
|
||||
} else {
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Management server " + _nodeId + " found no hosts to rebalance. Current number of active management server nodes in the system is " + allMS.size() + "; number of managed agents is " + allMS.size());
|
||||
s_logger.debug("There are no hosts to rebalance in the system. Current number of active management server nodes in the system is " + allMS.size() + "; number of managed agents is " + allManagedAgents.size());
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (avLoad == 0L) {
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("As calculated average load is less than 1, rounding it to 1");
|
||||
}
|
||||
avLoad = 1;
|
||||
}
|
||||
|
||||
for (ManagementServerHostVO node : allMS) {
|
||||
if (node.getMsid() != _nodeId) {
|
||||
|
|
@ -722,21 +725,21 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
List<HostVO> hostsToRebalance = new ArrayList<HostVO>();
|
||||
for (AgentLoadBalancerPlanner lbPlanner : _lbPlanners) {
|
||||
hostsToRebalance = lbPlanner.getHostsToRebalance(node.getMsid(), avLoad);
|
||||
if (!hostsToRebalance.isEmpty()) {
|
||||
if (hostsToRebalance != null && !hostsToRebalance.isEmpty()) {
|
||||
break;
|
||||
} else {
|
||||
s_logger.debug("Agent load balancer planner " + lbPlanner.getName() + " found no hosts to be rebalanced from management server " + _nodeId);
|
||||
s_logger.debug("Agent load balancer planner " + lbPlanner.getName() + " found no hosts to be rebalanced from management server " + node.getMsid());
|
||||
}
|
||||
}
|
||||
|
||||
if (!hostsToRebalance.isEmpty()) {
|
||||
if (hostsToRebalance != null && !hostsToRebalance.isEmpty()) {
|
||||
for (HostVO host : hostsToRebalance) {
|
||||
long hostId = host.getId();
|
||||
s_logger.debug("Asking management server " + node.getMsid() + " to give away host id=" + hostId);
|
||||
boolean result = true;
|
||||
HostTransferMapVO transfer = _hostTransferDao.startAgentTransfering(hostId, _nodeId, node.getMsid());
|
||||
HostTransferMapVO transfer = _hostTransferDao.startAgentTransfering(hostId, node.getMsid(), _nodeId);
|
||||
try {
|
||||
Answer[] answer = sendRebalanceCommand(hostId, _nodeId, Event.RequestAgentRebalance);
|
||||
Answer[] answer = sendRebalanceCommand(hostId, node.getMsid(), Event.RequestAgentRebalance);
|
||||
if (answer == null) {
|
||||
s_logger.warn("Failed to get host id=" + hostId + " from management server " + node.getMsid());
|
||||
result = false;
|
||||
|
|
@ -756,7 +759,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
}
|
||||
}
|
||||
} else {
|
||||
s_logger.debug("Management server " + _nodeId + " found no hosts to rebalance.");
|
||||
s_logger.debug("Found no hosts to rebalance from the management server " + node.getMsid());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -797,10 +800,11 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
s_logger.debug("Found " + _agentToTransferIds.size() + " agents to transfer");
|
||||
for (Long hostId : _agentToTransferIds) {
|
||||
AgentAttache attache = findAttache(hostId);
|
||||
if (attache.getQueueSize() == 0 && attache.getListenersSize() == 0) {
|
||||
boolean result = true;
|
||||
if (attache.getQueueSize() == 0 && attache.getNonRecurringListenersSize() == 0) {
|
||||
boolean result = false;
|
||||
_agentToTransferIds.remove(hostId);
|
||||
try {
|
||||
_hostTransferDao.startAgentTransfer(hostId);
|
||||
result = rebalanceHost(hostId);
|
||||
} finally {
|
||||
if (result) {
|
||||
|
|
@ -810,16 +814,16 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
}
|
||||
}
|
||||
} else {
|
||||
// if we timed out waiting for the host to reconnect, remove host from rebalance list and mark it as failed to rebalance
|
||||
// if we timed out waiting for the host to reconnect, remove host from rebalance list and delete from op_host_transfer DB
|
||||
// no need to do anything with the real attache
|
||||
Date cutTime = DateUtil.currentGMTTime();
|
||||
if (!(_hostTransferDao.isActive(hostId, new Date(cutTime.getTime() - rebalanceTimeOut)))) {
|
||||
if (_hostTransferDao.isNotActive(hostId, new Date(cutTime.getTime() - rebalanceTimeOut))) {
|
||||
s_logger.debug("Timed out waiting for the host id=" + hostId + " to be ready to transfer, failing rebalance for this host");
|
||||
_agentToTransferIds.remove(hostId);
|
||||
HostTransferMapVO transferMap = _hostTransferDao.findById(hostId);
|
||||
transferMap.setState(HostTransferState.TransferFailed);
|
||||
_hostTransferDao.update(hostId, transferMap);
|
||||
}
|
||||
_hostTransferDao.completeAgentTransfer(hostId);
|
||||
} else {
|
||||
s_logger.debug("Agent " + hostId + " can't be transfered yet as its request queue size is " + attache.getQueueSize() + " and listener queue size is " + attache.getNonRecurringListenersSize());
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
|
@ -859,7 +863,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
_agents.put(hostId, attache);
|
||||
if (host != null && host.getRemoved() == null) {
|
||||
host.setManagementServerId(null);
|
||||
s_logger.debug("Updating host id=" + hostId + " with the status " + Status.Rebalance);
|
||||
s_logger.debug("Updating host id=" + hostId + " with the status " + Status.Rebalancing);
|
||||
_hostDao.updateStatus(host, Event.StartAgentRebalance, _nodeId);
|
||||
}
|
||||
|
||||
|
|
@ -877,7 +881,10 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
s_logger.debug("Got host id=" + hostId + " from management server " + map.getFutureOwner());
|
||||
}
|
||||
|
||||
}
|
||||
} else {
|
||||
s_logger.warn("Unable to find agent " + hostId + " on management server " + _nodeId);
|
||||
result = false;
|
||||
}
|
||||
} else if (map.getFutureOwner() == _nodeId) {
|
||||
try {
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
|
|
@ -887,61 +894,57 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
loadDirectlyConnectedHost(host);
|
||||
} catch (Exception ex) {
|
||||
s_logger.warn("Unable to load directly connected host " + host.getId() + " as a part of rebalance due to exception: ", ex);
|
||||
result = false;
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
|
||||
}
|
||||
|
||||
private boolean finishRebalance(final long hostId, Event event) {
|
||||
HostTransferMapVO map = _hostTransferDao.findById(hostId);
|
||||
AgentAttache attache = findAttache(hostId);
|
||||
|
||||
if (attache == null) {
|
||||
s_logger.debug("Unable to find attache for the host id=" + hostId + ", assuming that the agent disconnected already");
|
||||
HostTransferState state = (event == Event.RebalanceCompleted) ? HostTransferState.TransferCompleted : HostTransferState.TransferFailed;
|
||||
map.setState(state);
|
||||
_hostTransferDao.update(hostId, map);
|
||||
return true;
|
||||
}
|
||||
|
||||
if (map.getInitialOwner() != _nodeId) {
|
||||
s_logger.warn("Why finish rebalance called not by initial host owner???");
|
||||
return false;
|
||||
}
|
||||
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Finishing rebalancing for the host id=" + hostId);
|
||||
}
|
||||
|
||||
boolean success = (event == Event.RebalanceCompleted) ? true : false;
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Finishing rebalancing for the agent " + hostId + " with result " + success);
|
||||
}
|
||||
|
||||
if (event == Event.RebalanceFailed) {
|
||||
((ClusteredDirectAgentAttache) attache).setTransferMode(false);
|
||||
s_logger.debug("Rebalance failed for the host id=" + hostId);
|
||||
map.setState(HostTransferState.TransferFailed);
|
||||
_hostTransferDao.update(hostId, map);
|
||||
} else if (event == Event.RebalanceCompleted) {
|
||||
|
||||
//1) Get all the requests remove transfer attache
|
||||
AgentAttache attache = findAttache(hostId);
|
||||
if (attache == null) {
|
||||
s_logger.debug("Unable to find attache for the host id=" + hostId + ", assuming that the agent disconnected already");
|
||||
return true;
|
||||
} else if (success) {
|
||||
s_logger.debug("Management server " + _nodeId + " is completing agent " + hostId + " rebalance");
|
||||
//1) Get all the requests before removing transfer attache
|
||||
LinkedList<Request> requests = ((ClusteredDirectAgentAttache) attache).getRequests();
|
||||
removeAgent(attache, Status.Rebalance);
|
||||
removeAgent(attache, Status.Rebalancing);
|
||||
|
||||
//2) Create forward attache
|
||||
createAttache(hostId);
|
||||
|
||||
//3) forward all the requests to the management server which owns the host now
|
||||
if (!requests.isEmpty()) {
|
||||
for (Request request : requests) {
|
||||
routeToPeer(Long.toString(map.getFutureOwner()), request.getBytes());
|
||||
try {
|
||||
getAttache(hostId);
|
||||
//3) forward all the requests to the management server which owns the host now
|
||||
if (!requests.isEmpty()) {
|
||||
s_logger.debug("Forwarding requests held in transfer attache " + hostId + " from the management server " + _nodeId + " to " + map.getFutureOwner());
|
||||
for (Request request : requests) {
|
||||
routeToPeer(Long.toString(map.getFutureOwner()), request.getBytes());
|
||||
}
|
||||
}
|
||||
} catch (AgentUnavailableException ex) {
|
||||
s_logger.warn("Not creating forward attache as agent is not available", ex);
|
||||
//TODO - - have to handle the case when requests can't be forwarded due to lack of forward attache
|
||||
}
|
||||
|
||||
map.setState(HostTransferState.TransferCompleted);
|
||||
_hostTransferDao.update(hostId, map);
|
||||
|
||||
return true;
|
||||
|
||||
} else {
|
||||
((ClusteredDirectAgentAttache) attache).setTransferMode(false);
|
||||
//TODO - have to handle the case when agent fails to rebalance 1) Either connect it back 2) Or disconnect it
|
||||
}
|
||||
|
||||
_hostTransferDao.completeAgentTransfer(hostId);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -574,14 +574,11 @@ public class ClusterManagerImpl implements ClusterManager {
|
|||
Connection conn = getHeartbeatConnection();
|
||||
_mshostDao.update(conn, _mshostId, getCurrentRunId(), DateUtil.currentGMTTime());
|
||||
|
||||
// for cluster in Starting state check if there are any agents being transfered
|
||||
if (_state == State.Starting) {
|
||||
synchronized (stateLock) {
|
||||
if (isClusterReadyToStart()) {
|
||||
_mshostDao.update(conn, _mshostId, getCurrentRunId(), State.Up, DateUtil.currentGMTTime());
|
||||
_state = State.Up;
|
||||
stateLock.notifyAll();
|
||||
}
|
||||
_mshostDao.update(conn, _mshostId, getCurrentRunId(), State.Up, DateUtil.currentGMTTime());
|
||||
_state = State.Up;
|
||||
stateLock.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -618,45 +615,6 @@ public class ClusterManagerImpl implements ClusterManager {
|
|||
s_logger.error("Problem with the cluster heartbeat!", e);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isClusterReadyToStart() {
|
||||
if (!_agentLBEnabled) {
|
||||
return true;
|
||||
}
|
||||
boolean isReady = false;
|
||||
int transferCount = _hostTransferDao.listHostsJoiningCluster(_msId).size();
|
||||
if (transferCount == 0) {
|
||||
//Check how many servers got transfered successfully
|
||||
List<HostTransferMapVO> rebalancedHosts = _hostTransferDao.listBy(_msId, HostTransferState.TransferCompleted);
|
||||
|
||||
if (!rebalancedHosts.isEmpty() && s_logger.isDebugEnabled()) {
|
||||
s_logger.debug(rebalancedHosts.size() + " hosts joined the cluster " + _msId + " as a result of rebalance process");
|
||||
|
||||
}
|
||||
|
||||
for (HostTransferMapVO host : rebalancedHosts) {
|
||||
_hostTransferDao.remove(host.getId());
|
||||
}
|
||||
|
||||
//Check how many servers failed to transfer
|
||||
List<HostTransferMapVO> failedToRebalanceHosts = _hostTransferDao.listBy(_msId, HostTransferState.TransferFailed);
|
||||
if (!failedToRebalanceHosts.isEmpty() && s_logger.isDebugEnabled()) {
|
||||
s_logger.debug(failedToRebalanceHosts.size() + " hosts failed to join the cluster " + _msId + " as a result of rebalance process");
|
||||
}
|
||||
|
||||
for (HostTransferMapVO host : failedToRebalanceHosts) {
|
||||
_hostTransferDao.remove(host.getId());
|
||||
}
|
||||
|
||||
s_logger.debug("There are no hosts currently joining cluser msid=" + _msId + ", so management server is ready to start");
|
||||
isReady = true;
|
||||
} else if (s_logger.isDebugEnabled()) {
|
||||
//TODO : change to trace mode later
|
||||
s_logger.debug("There are " + transferCount + " agents currently joinging the cluster " + _msId);
|
||||
}
|
||||
|
||||
return isReady;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
|
@ -952,15 +910,15 @@ public class ClusterManagerImpl implements ClusterManager {
|
|||
s_logger.info("Management server (host id : " + _mshostId + ") is being started at " + _clusterNodeIP + ":" + _currentServiceAdapter.getServicePort());
|
||||
}
|
||||
|
||||
// use seperate thread for heartbeat updates
|
||||
_heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), heartbeatInterval, heartbeatInterval, TimeUnit.MILLISECONDS);
|
||||
_notificationExecutor.submit(getNotificationTask());
|
||||
|
||||
// Do agent rebalancing
|
||||
// Initiate agent rebalancing
|
||||
if (_agentLBEnabled) {
|
||||
s_logger.debug("Management server " + _msId + " is asking other peers to rebalance their agents");
|
||||
_rebalanceService.startRebalanceAgents();
|
||||
}
|
||||
|
||||
// use seperate thread for heartbeat updates
|
||||
_heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), heartbeatInterval, heartbeatInterval, TimeUnit.MILLISECONDS);
|
||||
_notificationExecutor.submit(getNotificationTask());
|
||||
|
||||
//wait here for heartbeat task to update the host state
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -26,6 +26,6 @@ import com.cloud.utils.component.Adapter;
|
|||
|
||||
public interface AgentLoadBalancerPlanner extends Adapter{
|
||||
|
||||
List<HostVO> getHostsToRebalance(long msId, long avLoad);
|
||||
List<HostVO> getHostsToRebalance(long msId, int avLoad);
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -65,11 +65,11 @@ public class ClusterBasedAgentLoadBalancerPlanner implements AgentLoadBalancerPl
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<HostVO> getHostsToRebalance(long msId, long avLoad) {
|
||||
public List<HostVO> getHostsToRebalance(long msId, int avLoad) {
|
||||
List<HostVO> allHosts = _hostDao.listByManagementServer(msId);
|
||||
|
||||
if (allHosts.size() <= avLoad) {
|
||||
s_logger.debug("Agent load for management server " + msId + " doesn't exceed av load " + avLoad + "; so it doesn't participate in agent rebalancing process");
|
||||
s_logger.debug("Agent load = " + allHosts.size() + " for management server " + msId + " doesn't exceed average system agent load = " + avLoad + "; so it doesn't participate in agent rebalancing process");
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
@ -96,9 +96,9 @@ public class ClusterBasedAgentLoadBalancerPlanner implements AgentLoadBalancerPl
|
|||
|
||||
hostToClusterMap = sortByClusterSize(hostToClusterMap);
|
||||
|
||||
long hostsToGive = allHosts.size() - avLoad;
|
||||
long hostsLeftToGive = hostsToGive;
|
||||
long hostsLeft = directHosts.size();
|
||||
int hostsToGive = allHosts.size() - avLoad;
|
||||
int hostsLeftToGive = hostsToGive;
|
||||
int hostsLeft = directHosts.size();
|
||||
List<HostVO> hostsToReturn = new ArrayList<HostVO>();
|
||||
int count = 0;
|
||||
|
||||
|
|
|
|||
|
|
@ -35,7 +35,7 @@ import com.cloud.utils.db.GenericDao;
|
|||
public class HostTransferMapVO {
|
||||
|
||||
public enum HostTransferState {
|
||||
TransferRequested, TransferStarted, TransferCompleted, TransferFailed;
|
||||
TransferRequested, TransferStarted;
|
||||
}
|
||||
|
||||
@Id
|
||||
|
|
|
|||
|
|
@ -29,13 +29,15 @@ public interface HostTransferMapDao extends GenericDao<HostTransferMapVO, Long>
|
|||
|
||||
List<HostTransferMapVO> listHostsLeavingCluster(long clusterId);
|
||||
|
||||
List<HostTransferMapVO> listHostsJoiningCluster(long clusterId);
|
||||
List<HostTransferMapVO> listHostsJoiningCluster(long futureOwnerId);
|
||||
|
||||
HostTransferMapVO startAgentTransfering(long hostId, long currentOwner, long futureOwner);
|
||||
|
||||
boolean completeAgentTransfering(long hostId, boolean success);
|
||||
boolean completeAgentTransfer(long hostId);
|
||||
|
||||
List<HostTransferMapVO> listBy(long futureOwnerId, HostTransferState state);
|
||||
|
||||
boolean isActive(long hostId, Date cutTime);
|
||||
boolean isNotActive(long hostId, Date cutTime);
|
||||
|
||||
boolean startAgentTransfer(long hostId);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ public class HostTransferMapDaoImpl extends GenericDaoBase<HostTransferMapVO, Lo
|
|||
|
||||
IntermediateStateSearch = createSearchBuilder();
|
||||
IntermediateStateSearch.and("futureOwner", IntermediateStateSearch.entity().getFutureOwner(), SearchCriteria.Op.EQ);
|
||||
IntermediateStateSearch.and("state", IntermediateStateSearch.entity().getState(), SearchCriteria.Op.NOTIN);
|
||||
IntermediateStateSearch.and("state", IntermediateStateSearch.entity().getState(), SearchCriteria.Op.IN);
|
||||
IntermediateStateSearch.done();
|
||||
|
||||
InactiveSearch = createSearchBuilder();
|
||||
|
|
@ -72,10 +72,10 @@ public class HostTransferMapDaoImpl extends GenericDaoBase<HostTransferMapVO, Lo
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<HostTransferMapVO> listHostsJoiningCluster(long clusterId) {
|
||||
public List<HostTransferMapVO> listHostsJoiningCluster(long futureOwnerId) {
|
||||
SearchCriteria<HostTransferMapVO> sc = IntermediateStateSearch.create();
|
||||
sc.setParameters("futureOwner", clusterId);
|
||||
sc.setParameters("state", HostTransferState.TransferRequested, HostTransferState.TransferStarted);
|
||||
sc.setParameters("futureOwner", futureOwnerId);
|
||||
sc.setParameters("state", HostTransferState.TransferRequested);
|
||||
return listBy(sc);
|
||||
}
|
||||
|
||||
|
|
@ -88,14 +88,8 @@ public class HostTransferMapDaoImpl extends GenericDaoBase<HostTransferMapVO, Lo
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean completeAgentTransfering(long hostId, boolean success) {
|
||||
HostTransferMapVO transfer = findById(hostId);
|
||||
if (success) {
|
||||
transfer.setState(HostTransferState.TransferCompleted);
|
||||
} else {
|
||||
transfer.setState(HostTransferState.TransferFailed);
|
||||
}
|
||||
return update(hostId, transfer);
|
||||
public boolean completeAgentTransfer(long hostId) {
|
||||
return remove(hostId);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -108,18 +102,24 @@ public class HostTransferMapDaoImpl extends GenericDaoBase<HostTransferMapVO, Lo
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean isActive(long hostId, Date cutTime) {
|
||||
public boolean isNotActive(long hostId, Date cutTime) {
|
||||
SearchCriteria<HostTransferMapVO> sc = InactiveSearch.create();
|
||||
sc.setParameters("id", hostId);
|
||||
sc.setParameters("state", HostTransferState.TransferRequested);
|
||||
sc.setParameters("created", cutTime);
|
||||
|
||||
|
||||
if (listBy(sc).isEmpty()) {
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean startAgentTransfer(long hostId) {
|
||||
HostTransferMapVO transfer = findById(hostId);
|
||||
transfer.setState(HostTransferState.TransferStarted);
|
||||
return update(hostId, transfer);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -171,7 +171,7 @@ public interface HostDao extends GenericDao<HostVO, Long> {
|
|||
|
||||
List<HostVO> listManagedDirectAgents();
|
||||
|
||||
List<HostVO> listManagedAgents();
|
||||
List<HostVO> listManagedRoutingAgents();
|
||||
|
||||
HostVO findTrafficMonitorHost();
|
||||
|
||||
|
|
|
|||
|
|
@ -91,7 +91,7 @@ public class HostDaoImpl extends GenericDaoBase<HostVO, Long> implements HostDao
|
|||
|
||||
protected final SearchBuilder<HostVO> DirectConnectSearch;
|
||||
protected final SearchBuilder<HostVO> ManagedDirectConnectSearch;
|
||||
protected final SearchBuilder<HostVO> ManagedConnectSearch;
|
||||
protected final SearchBuilder<HostVO> ManagedRoutingServersSearch;
|
||||
|
||||
protected final GenericSearchBuilder<HostVO, Long> HostsInStatusSearch;
|
||||
protected final GenericSearchBuilder<HostVO, Long> CountRoutingByDc;
|
||||
|
|
@ -264,9 +264,10 @@ public class HostDaoImpl extends GenericDaoBase<HostVO, Long> implements HostDao
|
|||
ManagedDirectConnectSearch.and("server", ManagedDirectConnectSearch.entity().getManagementServerId(), SearchCriteria.Op.NULL);
|
||||
ManagedDirectConnectSearch.done();
|
||||
|
||||
ManagedConnectSearch = createSearchBuilder();
|
||||
ManagedConnectSearch.and("server", ManagedConnectSearch.entity().getManagementServerId(), SearchCriteria.Op.NNULL);
|
||||
ManagedConnectSearch.done();
|
||||
ManagedRoutingServersSearch = createSearchBuilder();
|
||||
ManagedRoutingServersSearch.and("server", ManagedRoutingServersSearch.entity().getManagementServerId(), SearchCriteria.Op.NNULL);
|
||||
ManagedRoutingServersSearch.and("type", ManagedRoutingServersSearch.entity().getType(), SearchCriteria.Op.EQ);
|
||||
ManagedRoutingServersSearch.done();
|
||||
|
||||
_statusAttr = _allAttributes.get("status");
|
||||
_msIdAttr = _allAttributes.get("managementServerId");
|
||||
|
|
@ -868,8 +869,9 @@ public class HostDaoImpl extends GenericDaoBase<HostVO, Long> implements HostDao
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<HostVO> listManagedAgents() {
|
||||
SearchCriteria<HostVO> sc = ManagedConnectSearch.create();
|
||||
public List<HostVO> listManagedRoutingAgents() {
|
||||
SearchCriteria<HostVO> sc = ManagedRoutingServersSearch.create();
|
||||
sc.setParameters("type", Type.Routing);
|
||||
return listBy(sc);
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue