diff --git a/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java b/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java index 103a7b345e9..e7048c62c1e 100644 --- a/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java +++ b/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java @@ -578,7 +578,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust //cancel all transfer tasks s_transferExecutor.shutdownNow(); - cleanupTransferMap(); + cleanupTransferMap(_nodeId); return super.stop(); } @@ -706,6 +706,8 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust for (ManagementServerHostVO vo : nodeList) { s_logger.info("Marking hosts as disconnected on Management server" + vo.getMsid()); _hostDao.markHostsAsDisconnected(vo.getMsid()); + s_logger.info("Deleting entries from op_host_transfer table for Management server " + vo.getMsid()); + cleanupTransferMap(vo.getMsid()); } } @@ -1082,14 +1084,14 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust return true; } - protected void cleanupTransferMap() { - List hostsJoingingCluster = _hostTransferDao.listHostsJoiningCluster(_nodeId); + protected void cleanupTransferMap(long msId) { + List hostsJoingingCluster = _hostTransferDao.listHostsJoiningCluster(msId); for (HostTransferMapVO hostJoingingCluster : hostsJoingingCluster) { _hostTransferDao.remove(hostJoingingCluster.getId()); } - List hostsLeavingCluster = _hostTransferDao.listHostsLeavingCluster(_nodeId); + List hostsLeavingCluster = _hostTransferDao.listHostsLeavingCluster(msId); for (HostTransferMapVO hostLeavingCluster : hostsLeavingCluster) { _hostTransferDao.remove(hostLeavingCluster.getId()); } diff --git a/server/src/com/cloud/cluster/ClusterManagerImpl.java b/server/src/com/cloud/cluster/ClusterManagerImpl.java index c4df1771d59..24a006a2f28 100644 --- a/server/src/com/cloud/cluster/ClusterManagerImpl.java +++ b/server/src/com/cloud/cluster/ClusterManagerImpl.java @@ -127,6 +127,9 @@ public class ClusterManagerImpl implements ClusterManager { private String _name; private String _clusterNodeIP = "127.0.0.1"; private boolean _agentLBEnabled = false; + private double _connectedAgentsThreshold = 0.7; + private static boolean _agentLbHappened = false; + public ClusterManagerImpl() { clusterPeers = new HashMap(); @@ -607,6 +610,26 @@ public class ClusterManagerImpl implements ClusterManager { } peerScan(); + + //initiate agent lb task will be scheduled and executed only once, and only when number of agents loaded exceeds _connectedAgentsThreshold + if (_agentLBEnabled && !_agentLbHappened) { + List allManagedRoutingAgents = _hostDao.listManagedRoutingAgents(); + List allAgents = _hostDao.listAllRoutingAgents(); + double allHostsCount = allAgents.size(); + double managedHostsCount = allManagedRoutingAgents.size(); + if (allHostsCount > 0.0) { + double load = managedHostsCount/allHostsCount; + if (load >= _connectedAgentsThreshold) { + s_logger.debug("Scheduling agent rebalancing task as the average agent load " + load + " is more than the threshold " + _connectedAgentsThreshold); + _rebalanceService.scheduleRebalanceAgents(); + _agentLbHappened = true; + } else { + s_logger.trace("Not scheduling agent rebalancing task as the averages load " + load + " is less than the threshold " + _connectedAgentsThreshold); + } + } + } + + } catch(CloudRuntimeException e) { s_logger.error("Runtime DB exception ", e.getCause()); @@ -937,10 +960,6 @@ public class ClusterManagerImpl implements ClusterManager { _heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), heartbeatInterval, heartbeatInterval, TimeUnit.MILLISECONDS); _notificationExecutor.submit(getNotificationTask()); - //Initiate agent rebalancing after the host is in UP state - if (_agentLBEnabled) { - _rebalanceService.scheduleRebalanceAgents(); - } } catch (Throwable e) { s_logger.error("Unexpected exception : ", e); @@ -1063,6 +1082,12 @@ public class ClusterManagerImpl implements ClusterManager { _agentLBEnabled = Boolean.valueOf(configDao.getValue(Config.AgentLbEnable.key())); + + String connectedAgentsThreshold = configs.get("agent.load.threshold"); + + if (connectedAgentsThreshold != null) { + _connectedAgentsThreshold = Double.parseDouble(connectedAgentsThreshold); + } this.registerListener(new LockMasterListener(_msId)); @@ -1191,5 +1216,4 @@ public class ClusterManagerImpl implements ClusterManager { public boolean isAgentRebalanceEnabled() { return _agentLBEnabled; } - } diff --git a/server/src/com/cloud/cluster/agentlb/dao/HostTransferMapDaoImpl.java b/server/src/com/cloud/cluster/agentlb/dao/HostTransferMapDaoImpl.java index 0fff8268870..acef54a2b58 100644 --- a/server/src/com/cloud/cluster/agentlb/dao/HostTransferMapDaoImpl.java +++ b/server/src/com/cloud/cluster/agentlb/dao/HostTransferMapDaoImpl.java @@ -51,6 +51,7 @@ public class HostTransferMapDaoImpl extends GenericDaoBase _componentClass; private final Class _type; diff --git a/server/src/com/cloud/host/dao/HostDao.java b/server/src/com/cloud/host/dao/HostDao.java index edaefc65e61..e296ff8058b 100644 --- a/server/src/com/cloud/host/dao/HostDao.java +++ b/server/src/com/cloud/host/dao/HostDao.java @@ -184,4 +184,6 @@ public interface HostDao extends GenericDao { List listByManagementServer(long msId); List listSecondaryStorageVM(long dcId); + + List listAllRoutingAgents(); } diff --git a/server/src/com/cloud/host/dao/HostDaoImpl.java b/server/src/com/cloud/host/dao/HostDaoImpl.java index 1b9cd057ca2..c8f6ba83437 100644 --- a/server/src/com/cloud/host/dao/HostDaoImpl.java +++ b/server/src/com/cloud/host/dao/HostDaoImpl.java @@ -50,9 +50,9 @@ import com.cloud.utils.db.Filter; import com.cloud.utils.db.GenericDaoBase; import com.cloud.utils.db.GenericSearchBuilder; import com.cloud.utils.db.JoinBuilder; +import com.cloud.utils.db.JoinBuilder.JoinType; import com.cloud.utils.db.SearchBuilder; import com.cloud.utils.db.SearchCriteria; -import com.cloud.utils.db.JoinBuilder.JoinType; import com.cloud.utils.db.SearchCriteria.Func; import com.cloud.utils.db.SearchCriteria.Op; import com.cloud.utils.db.Transaction; @@ -101,6 +101,9 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao protected final GenericSearchBuilder CountRoutingByDc; protected final SearchBuilder HostTransferSearch; + protected final SearchBuilder RoutingSearch; + + protected final Attribute _statusAttr; protected final Attribute _msIdAttr; protected final Attribute _pingTimeAttr; @@ -285,6 +288,10 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao ManagedRoutingServersSearch.and("server", ManagedRoutingServersSearch.entity().getManagementServerId(), SearchCriteria.Op.NNULL); ManagedRoutingServersSearch.and("type", ManagedRoutingServersSearch.entity().getType(), SearchCriteria.Op.EQ); ManagedRoutingServersSearch.done(); + + RoutingSearch = createSearchBuilder(); + RoutingSearch.and("type", RoutingSearch.entity().getType(), SearchCriteria.Op.EQ); + RoutingSearch.done(); _statusAttr = _allAttributes.get("status"); _msIdAttr = _allAttributes.get("managementServerId"); @@ -922,4 +929,11 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao return listBy(sc); } + + @Override + public List listAllRoutingAgents() { + SearchCriteria sc = RoutingSearch.create(); + sc.setParameters("type", Type.Routing); + return listBy(sc); + } }