mirror of https://github.com/apache/cloudstack.git
Removed unused agent lb timer
Conflicts: engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
This commit is contained in:
parent
5b4fa5c068
commit
592863f1ef
|
|
@ -106,7 +106,6 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
protected HashMap<String, SocketChannel> _peers;
|
||||
protected HashMap<String, SSLEngine> _sslEngines;
|
||||
private final Timer _timer = new Timer("ClusteredAgentManager Timer");
|
||||
private final Timer _agentLbTimer = new Timer("ClusteredAgentManager AgentRebalancing Timer");
|
||||
boolean _agentLbHappened = false;
|
||||
|
||||
@Inject
|
||||
|
|
@ -133,8 +132,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
protected final ConfigKey<Integer> LoadSize = new ConfigKey<Integer>(Integer.class, "direct.agent.load.size", "Advanced", "16",
|
||||
"How many agents to connect to in each round", true);
|
||||
protected final ConfigKey<Integer> ScanInterval = new ConfigKey<Integer>(Integer.class, "direct.agent.scan.interval", "Advanced", "90",
|
||||
"Interval between scans to load agents", false, ConfigKey.Scope.Global, 1000);
|
||||
|
||||
"Interval between scans to load agents", false, ConfigKey.Scope.Global, 1000);
|
||||
|
||||
@Override
|
||||
public boolean configure(String name, Map<String, Object> xmlParams) throws ConfigurationException {
|
||||
|
|
@ -277,9 +275,9 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
|
||||
@Override
|
||||
protected AgentAttache createAttacheForDirectConnect(Host host, ServerResource resource) {
|
||||
// if (resource instanceof DummySecondaryStorageResource) {
|
||||
// return new DummyAttache(this, host.getId(), false);
|
||||
// }
|
||||
// if (resource instanceof DummySecondaryStorageResource) {
|
||||
// return new DummyAttache(this, host.getId(), false);
|
||||
// }
|
||||
s_logger.debug("create ClusteredDirectAgentAttache for " + host.getId());
|
||||
final DirectAgentAttache attache = new ClusteredDirectAgentAttache(this, host.getId(), host.getName(), _nodeId, resource, host.isInMaintenanceStates(), this);
|
||||
AgentAttache old = null;
|
||||
|
|
@ -329,23 +327,24 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
}
|
||||
AgentAttache attache = findAttache(hostId);
|
||||
if (attache != null) {
|
||||
//don't process disconnect if the host is being rebalanced
|
||||
// don't process disconnect if the host is being rebalanced
|
||||
if (isAgentRebalanceEnabled()) {
|
||||
HostTransferMapVO transferVO = _hostTransferDao.findById(hostId);
|
||||
if (transferVO != null) {
|
||||
if (transferVO.getFutureOwner() == _nodeId && transferVO.getState() == HostTransferState.TransferStarted) {
|
||||
s_logger.debug("Not processing " + Event.AgentDisconnected + " event for the host id="
|
||||
+ hostId +" as the host is being connected to " + _nodeId);
|
||||
|
||||
s_logger.debug("Not processing " + Event.AgentDisconnected + " event for the host id=" + hostId + " as the host is being connected to " +
|
||||
_nodeId);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//don't process disconnect if the disconnect came for the host via delayed cluster notification,
|
||||
//but the host has already reconnected to the current management server
|
||||
// don't process disconnect if the disconnect came for the host via delayed cluster notification,
|
||||
// but the host has already reconnected to the current management server
|
||||
if (!attache.forForward()) {
|
||||
s_logger.debug("Not processing " + Event.AgentDisconnected + " event for the host id="
|
||||
+ hostId +" as the host is directly connected to the current management server " + _nodeId);
|
||||
s_logger.debug("Not processing " + Event.AgentDisconnected + " event for the host id=" + hostId +
|
||||
" as the host is directly connected to the current management server " + _nodeId);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
@ -390,18 +389,18 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
}
|
||||
|
||||
protected static void logT(byte[] bytes, final String msg) {
|
||||
s_logger.trace("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": "
|
||||
+ (Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg);
|
||||
s_logger.trace("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": " +
|
||||
(Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg);
|
||||
}
|
||||
|
||||
protected static void logD(byte[] bytes, final String msg) {
|
||||
s_logger.debug("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": "
|
||||
+ (Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg);
|
||||
s_logger.debug("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": " +
|
||||
(Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg);
|
||||
}
|
||||
|
||||
protected static void logI(byte[] bytes, final String msg) {
|
||||
s_logger.info("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": "
|
||||
+ (Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg);
|
||||
s_logger.info("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": " +
|
||||
(Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg);
|
||||
}
|
||||
|
||||
public boolean routeToPeer(String peer, byte[] bytes) {
|
||||
|
|
@ -568,9 +567,8 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
}
|
||||
}
|
||||
_timer.cancel();
|
||||
_agentLbTimer.cancel();
|
||||
|
||||
//cancel all transfer tasks
|
||||
// cancel all transfer tasks
|
||||
s_transferExecutor.shutdownNow();
|
||||
cleanupTransferMap(_nodeId);
|
||||
|
||||
|
|
@ -626,8 +624,9 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
}
|
||||
agent.cancel(cancel.getSequence());
|
||||
final Long current = agent._currentSequence;
|
||||
//if the request is the current request, always have to trigger sending next request in sequence,
|
||||
//otherwise the agent queue will be blocked
|
||||
// if the request is the current request, always have to trigger sending next request in
|
||||
// sequence,
|
||||
// otherwise the agent queue will be blocked
|
||||
if (req.executeInSequence() && (current != null && current == Request.getSequence(data))) {
|
||||
agent.sendNext(Request.getSequence(data));
|
||||
}
|
||||
|
|
@ -726,8 +725,9 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean executeRebalanceRequest(long agentId, long currentOwnerId, long futureOwnerId, Event event) throws AgentUnavailableException, OperationTimedoutException {
|
||||
boolean result = false;
|
||||
public boolean executeRebalanceRequest(long agentId, long currentOwnerId, long futureOwnerId, Event event) throws AgentUnavailableException,
|
||||
OperationTimedoutException {
|
||||
boolean result = false;
|
||||
if (event == Event.RequestAgentRebalance) {
|
||||
return setToWaitForRebalance(agentId, currentOwnerId, futureOwnerId);
|
||||
} else if (event == Event.StartAgentRebalance) {
|
||||
|
|
@ -792,7 +792,8 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
avLoad = allManagedAgents.size() / allMS.size();
|
||||
} else {
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("There are no hosts to rebalance in the system. Current number of active management server nodes in the system is " + allMS.size() + "; number of managed agents is " + allManagedAgents.size());
|
||||
s_logger.debug("There are no hosts to rebalance in the system. Current number of active management server nodes in the system is " + allMS.size() +
|
||||
"; number of managed agents is " + allManagedAgents.size());
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
|
@ -848,7 +849,8 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Removing mapping from op_host_transfer as it failed to be set to transfer mode");
|
||||
}
|
||||
//just remove the mapping (if exists) as nothing was done on the peer management server yet
|
||||
// just remove the mapping (if exists) as nothing was done on the peer management
|
||||
// server yet
|
||||
_hostTransferDao.remove(transfer.getId());
|
||||
}
|
||||
}
|
||||
|
|
@ -934,7 +936,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
synchronized (_agentToTransferIds) {
|
||||
if (_agentToTransferIds.size() > 0) {
|
||||
s_logger.debug("Found " + _agentToTransferIds.size() + " agents to transfer");
|
||||
//for (Long hostId : _agentToTransferIds) {
|
||||
// for (Long hostId : _agentToTransferIds) {
|
||||
for (Iterator<Long> iterator = _agentToTransferIds.iterator(); iterator.hasNext();) {
|
||||
Long hostId = iterator.next();
|
||||
AgentAttache attache = findAttache(hostId);
|
||||
|
|
@ -946,7 +948,8 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
// remove the host from re-balance list and delete from op_host_transfer DB
|
||||
// no need to do anything with the real attache as we haven't modified it yet
|
||||
Date cutTime = DateUtil.currentGMTTime();
|
||||
HostTransferMapVO transferMap = _hostTransferDao.findActiveHostTransferMapByHostId(hostId, new Date(cutTime.getTime() - rebalanceTimeOut));
|
||||
HostTransferMapVO transferMap =
|
||||
_hostTransferDao.findActiveHostTransferMapByHostId(hostId, new Date(cutTime.getTime() - rebalanceTimeOut));
|
||||
|
||||
if (transferMap == null) {
|
||||
s_logger.debug("Timed out waiting for the host id=" + hostId + " to be ready to transfer, skipping rebalance for the host");
|
||||
|
|
@ -964,7 +967,8 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
|
||||
ManagementServerHostVO ms = _mshostDao.findByMsid(transferMap.getFutureOwner());
|
||||
if (ms != null && ms.getState() != ManagementServerHost.State.Up) {
|
||||
s_logger.debug("Can't transfer host " + hostId + " as it's future owner is not in UP state: " + ms + ", skipping rebalance for the host");
|
||||
s_logger.debug("Can't transfer host " + hostId + " as it's future owner is not in UP state: " + ms +
|
||||
", skipping rebalance for the host");
|
||||
iterator.remove();
|
||||
_hostTransferDao.completeAgentTransfer(hostId);
|
||||
continue;
|
||||
|
|
@ -980,7 +984,8 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
}
|
||||
|
||||
} else {
|
||||
s_logger.debug("Agent " + hostId + " can't be transfered yet as its request queue size is " + attache.getQueueSize() + " and listener queue size is " + attache.getNonRecurringListenersSize());
|
||||
s_logger.debug("Agent " + hostId + " can't be transfered yet as its request queue size is " + attache.getQueueSize() +
|
||||
" and listener queue size is " + attache.getNonRecurringListenersSize());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
|
@ -1048,7 +1053,8 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
|
||||
if (result) {
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Loading directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId + " as a part of rebalance process");
|
||||
s_logger.debug("Loading directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId +
|
||||
" as a part of rebalance process");
|
||||
}
|
||||
result = loadDirectlyConnectedHost(host, true);
|
||||
} else {
|
||||
|
|
@ -1057,14 +1063,17 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
}
|
||||
|
||||
} catch (Exception ex) {
|
||||
s_logger.warn("Failed to load directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId + " as a part of rebalance process due to:", ex);
|
||||
s_logger.warn("Failed to load directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId +
|
||||
" as a part of rebalance process due to:", ex);
|
||||
result = false;
|
||||
}
|
||||
|
||||
if (result) {
|
||||
s_logger.debug("Successfully loaded directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId + " as a part of rebalance process");
|
||||
s_logger.debug("Successfully loaded directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId +
|
||||
" as a part of rebalance process");
|
||||
} else {
|
||||
s_logger.warn("Failed to load directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId + " as a part of rebalance process");
|
||||
s_logger.warn("Failed to load directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId +
|
||||
" as a part of rebalance process");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1086,17 +1095,18 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
return;
|
||||
}
|
||||
|
||||
ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache)attache;
|
||||
ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache) attache;
|
||||
|
||||
if (success) {
|
||||
|
||||
//1) Set transfer mode to false - so the agent can start processing requests normally
|
||||
// 1) Set transfer mode to false - so the agent can start processing requests normally
|
||||
forwardAttache.setTransferMode(false);
|
||||
|
||||
//2) Get all transfer requests and route them to peer
|
||||
// 2) Get all transfer requests and route them to peer
|
||||
Request requestToTransfer = forwardAttache.getRequestToTransfer();
|
||||
while (requestToTransfer != null) {
|
||||
s_logger.debug("Forwarding request " + requestToTransfer.getSequence() + " held in transfer attache " + hostId + " from the management server " + _nodeId + " to " + futureOwnerId);
|
||||
s_logger.debug("Forwarding request " + requestToTransfer.getSequence() + " held in transfer attache " + hostId + " from the management server " +
|
||||
_nodeId + " to " + futureOwnerId);
|
||||
boolean routeResult = routeToPeer(Long.toString(futureOwnerId), requestToTransfer.getBytes());
|
||||
if (!routeResult) {
|
||||
logD(requestToTransfer.getBytes(), "Failed to route request to peer");
|
||||
|
|
@ -1134,10 +1144,10 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
}
|
||||
|
||||
synchronized (_agents) {
|
||||
ClusteredDirectAgentAttache attache = (ClusteredDirectAgentAttache)_agents.get(hostId);
|
||||
ClusteredDirectAgentAttache attache = (ClusteredDirectAgentAttache) _agents.get(hostId);
|
||||
if (attache != null && attache.getQueueSize() == 0 && attache.getNonRecurringListenersSize() == 0) {
|
||||
handleDisconnectWithoutInvestigation(attache, Event.StartAgentRebalance, true, true);
|
||||
ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache)createAttache(hostId);
|
||||
handleDisconnectWithoutInvestigation(attache, Event.StartAgentRebalance, true, true);
|
||||
ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache) createAttache(hostId);
|
||||
if (forwardAttache == null) {
|
||||
s_logger.warn("Unable to create a forward attache for the host " + hostId + " as a part of rebalance process");
|
||||
return false;
|
||||
|
|
@ -1149,7 +1159,8 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
if (attache == null) {
|
||||
s_logger.warn("Attache for the agent " + hostId + " no longer exists on management server " + _nodeId + ", can't start host rebalancing");
|
||||
} else {
|
||||
s_logger.warn("Attache for the agent " + hostId + " has request queue size= " + attache.getQueueSize() + " and listener queue size " + attache.getNonRecurringListenersSize() + ", can't start host rebalancing");
|
||||
s_logger.warn("Attache for the agent " + hostId + " has request queue size= " + attache.getQueueSize() + " and listener queue size " +
|
||||
attache.getNonRecurringListenersSize() + ", can't start host rebalancing");
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
|
@ -1207,8 +1218,8 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
} catch (Exception e) {
|
||||
// Scheduling host scan task in peer MS is a best effort operation during host add, regular host scan
|
||||
// happens at fixed intervals anyways. So handling any exceptions that may be thrown
|
||||
s_logger.warn("Exception happened while trying to schedule host scan task on mgmt server " + _clusterMgr.getSelfPeerName()
|
||||
+ ", ignoring as regular host scan happens at fixed interval anyways", e);
|
||||
s_logger.warn("Exception happened while trying to schedule host scan task on mgmt server " + _clusterMgr.getSelfPeerName() +
|
||||
", ignoring as regular host scan happens at fixed interval anyways", e);
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
@ -1247,8 +1258,8 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
s_logger.error("Excection in gson decoding : ", e);
|
||||
}
|
||||
|
||||
if (cmds.length == 1 && cmds[0] instanceof ChangeAgentCommand) { //intercepted
|
||||
ChangeAgentCommand cmd = (ChangeAgentCommand)cmds[0];
|
||||
if (cmds.length == 1 && cmds[0] instanceof ChangeAgentCommand) { // intercepted
|
||||
ChangeAgentCommand cmd = (ChangeAgentCommand) cmds[0];
|
||||
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Intercepting command for agent change: agent " + cmd.getAgentId() + " event: " + cmd.getEvent());
|
||||
|
|
@ -1269,7 +1280,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
answers[0] = new ChangeAgentAnswer(cmd, result);
|
||||
return _gson.toJson(answers);
|
||||
} else if (cmds.length == 1 && cmds[0] instanceof TransferAgentCommand) {
|
||||
TransferAgentCommand cmd = (TransferAgentCommand)cmds[0];
|
||||
TransferAgentCommand cmd = (TransferAgentCommand) cmds[0];
|
||||
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Intercepting command for agent rebalancing: agent " + cmd.getAgentId() + " event: " + cmd.getEvent());
|
||||
|
|
@ -1292,7 +1303,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
answers[0] = new Answer(cmd, result, null);
|
||||
return _gson.toJson(answers);
|
||||
} else if (cmds.length == 1 && cmds[0] instanceof PropagateResourceEventCommand) {
|
||||
PropagateResourceEventCommand cmd = (PropagateResourceEventCommand)cmds[0];
|
||||
PropagateResourceEventCommand cmd = (PropagateResourceEventCommand) cmds[0];
|
||||
|
||||
s_logger.debug("Intercepting command to propagate event " + cmd.getEvent().name() + " for host " + cmd.getHostId());
|
||||
|
||||
|
|
@ -1309,7 +1320,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
answers[0] = new Answer(cmd, result, null);
|
||||
return _gson.toJson(answers);
|
||||
} else if (cmds.length == 1 && cmds[0] instanceof ScheduleHostScanTaskCommand) {
|
||||
ScheduleHostScanTaskCommand cmd = (ScheduleHostScanTaskCommand)cmds[0];
|
||||
ScheduleHostScanTaskCommand cmd = (ScheduleHostScanTaskCommand) cmds[0];
|
||||
String response = handleScheduleHostScanTaskCommand(cmd);
|
||||
return response;
|
||||
}
|
||||
|
|
@ -1325,15 +1336,15 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
String jsonReturn = _gson.toJson(answers);
|
||||
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Completed dispatching -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage() +
|
||||
" in " + (System.currentTimeMillis() - startTick) + " ms, return result: " + jsonReturn);
|
||||
s_logger.debug("Completed dispatching -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage() + " in " +
|
||||
(System.currentTimeMillis() - startTick) + " ms, return result: " + jsonReturn);
|
||||
}
|
||||
|
||||
return jsonReturn;
|
||||
} else {
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Completed dispatching -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage() +
|
||||
" in " + (System.currentTimeMillis() - startTick) + " ms, return null result");
|
||||
s_logger.debug("Completed dispatching -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage() + " in " +
|
||||
(System.currentTimeMillis() - startTick) + " ms, return null result");
|
||||
}
|
||||
}
|
||||
} catch (AgentUnavailableException e) {
|
||||
|
|
@ -1362,38 +1373,41 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
|
||||
private Runnable getAgentRebalanceScanTask() {
|
||||
return new ManagedContextRunnable() {
|
||||
@Override
|
||||
protected void runInContext() {
|
||||
try {
|
||||
if (s_logger.isTraceEnabled()) {
|
||||
s_logger.trace("Agent rebalance task check, management server id:" + _nodeId);
|
||||
@Override
|
||||
protected void runInContext() {
|
||||
try {
|
||||
if (s_logger.isTraceEnabled()) {
|
||||
s_logger.trace("Agent rebalance task check, management server id:" + _nodeId);
|
||||
}
|
||||
// initiate agent lb task will be scheduled and executed only once, and only when number of agents
|
||||
// loaded exceeds _connectedAgentsThreshold
|
||||
if (!_agentLbHappened) {
|
||||
QueryBuilder<HostVO> sc = QueryBuilder.create(HostVO.class);
|
||||
sc.and(sc.entity().getManagementServerId(), Op.NNULL);
|
||||
sc.and(sc.entity().getType(), Op.EQ, Host.Type.Routing);
|
||||
List<HostVO> allManagedRoutingAgents = sc.list();
|
||||
|
||||
sc = QueryBuilder.create(HostVO.class);
|
||||
sc.and(sc.entity().getType(), Op.EQ, Host.Type.Routing);
|
||||
List<HostVO> allAgents = sc.list();
|
||||
double allHostsCount = allAgents.size();
|
||||
double managedHostsCount = allManagedRoutingAgents.size();
|
||||
if (allHostsCount > 0.0) {
|
||||
double load = managedHostsCount / allHostsCount;
|
||||
if (load >= ConnectedAgentThreshold.value()) {
|
||||
s_logger.debug("Scheduling agent rebalancing task as the average agent load " + load + " is more than the threshold " +
|
||||
ConnectedAgentThreshold.value());
|
||||
scheduleRebalanceAgents();
|
||||
_agentLbHappened = true;
|
||||
} else {
|
||||
s_logger.debug("Not scheduling agent rebalancing task as the averages load " + load + " is less than the threshold " +
|
||||
ConnectedAgentThreshold.value());
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
s_logger.error("Problem with the clustered agent transfer scan check!", e);
|
||||
}
|
||||
//initiate agent lb task will be scheduled and executed only once, and only when number of agents loaded exceeds _connectedAgentsThreshold
|
||||
if (!_agentLbHappened) {
|
||||
QueryBuilder<HostVO> sc = QueryBuilder.create(HostVO.class);
|
||||
sc.and(sc.entity().getManagementServerId(), Op.NNULL);
|
||||
sc.and(sc.entity().getType(), Op.EQ, Host.Type.Routing);
|
||||
List<HostVO> allManagedRoutingAgents = sc.list();
|
||||
|
||||
sc = QueryBuilder.create(HostVO.class);
|
||||
sc.and(sc.entity().getType(), Op.EQ, Host.Type.Routing);
|
||||
List<HostVO> allAgents = sc.list();
|
||||
double allHostsCount = allAgents.size();
|
||||
double managedHostsCount = allManagedRoutingAgents.size();
|
||||
if (allHostsCount > 0.0) {
|
||||
double load = managedHostsCount / allHostsCount;
|
||||
if (load >= ConnectedAgentThreshold.value()) {
|
||||
s_logger.debug("Scheduling agent rebalancing task as the average agent load " + load + " is more than the threshold " + ConnectedAgentThreshold.value());
|
||||
scheduleRebalanceAgents();
|
||||
_agentLbHappened = true;
|
||||
} else {
|
||||
s_logger.debug("Not scheduling agent rebalancing task as the averages load " + load + " is less than the threshold " + ConnectedAgentThreshold.value());
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
s_logger.error("Problem with the clustered agent transfer scan check!", e);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue