diff --git a/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java b/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java index 48f096ade22..2b9e5419376 100755 --- a/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java +++ b/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java @@ -43,16 +43,13 @@ import javax.naming.ConfigurationException; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; -import org.apache.log4j.Logger; - -import com.google.gson.Gson; - import org.apache.cloudstack.framework.config.ConfigDepot; import org.apache.cloudstack.framework.config.ConfigKey; import org.apache.cloudstack.framework.config.dao.ConfigurationDao; import org.apache.cloudstack.managed.context.ManagedContextRunnable; import org.apache.cloudstack.managed.context.ManagedContextTimerTask; import org.apache.cloudstack.utils.identity.ManagementServerNode; +import org.apache.log4j.Logger; import com.cloud.agent.AgentManager; import com.cloud.agent.api.Answer; @@ -86,7 +83,6 @@ import com.cloud.host.Status.Event; import com.cloud.resource.ServerResource; import com.cloud.serializer.GsonHelper; import com.cloud.utils.DateUtil; -import com.cloud.utils.Profiler; import com.cloud.utils.concurrency.NamedThreadFactory; import com.cloud.utils.db.QueryBuilder; import com.cloud.utils.db.SearchCriteria.Op; @@ -94,36 +90,35 @@ import com.cloud.utils.db.Transaction; import com.cloud.utils.exception.CloudRuntimeException; import com.cloud.utils.nio.Link; import com.cloud.utils.nio.Task; +import com.google.gson.Gson; @Local(value = { AgentManager.class, ClusteredAgentRebalanceService.class }) public class ClusteredAgentManagerImpl extends AgentManagerImpl implements ClusterManagerListener, ClusteredAgentRebalanceService { final static Logger s_logger = Logger.getLogger(ClusteredAgentManagerImpl.class); - private static final ScheduledExecutorService s_transferExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Cluster-AgentTransferExecutor")); + private static final ScheduledExecutorService s_transferExecutor = Executors.newScheduledThreadPool(2, new NamedThreadFactory("Cluster-AgentRebalancingExecutor")); private final long rebalanceTimeOut = 300000; // 5 mins - after this time remove the agent from the transfer list public final static long STARTUP_DELAY = 5000; public final static long SCAN_INTERVAL = 90000; // 90 seconds, it takes 60 sec for xenserver to fail login public final static int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 5; // 5 seconds protected Set _agentToTransferIds = new HashSet(); - Gson _gson; - - @Inject - protected ClusterManager _clusterMgr = null; - protected HashMap _peers; protected HashMap _sslEngines; private final Timer _timer = new Timer("ClusteredAgentManager Timer"); - + private final Timer _agentLbTimer = new Timer("ClusteredAgentManager AgentRebalancing Timer"); + boolean _agentLbHappened = false; + + @Inject + protected ClusterManager _clusterMgr = null; @Inject protected ManagementServerHostDao _mshostDao; @Inject protected HostTransferMapDao _hostTransferDao; - - // @com.cloud.utils.component.Inject(adapter = AgentLoadBalancerPlanner.class) - @Inject protected List _lbPlanners; - - @Inject ConfigurationDao _configDao; + @Inject + protected List _lbPlanners; + @Inject + ConfigurationDao _configDao; @Inject ConfigDepot _configDepot; @@ -168,9 +163,10 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust if (s_logger.isDebugEnabled()) { s_logger.debug("Scheduled direct agent scan task to run at an interval of " + ScanInterval.value() + " seconds"); } - - // schedule transfer scan executor - if agent LB is enabled + + // Schedule tasks for agent rebalancing if (isAgentRebalanceEnabled()) { + s_transferExecutor.scheduleAtFixedRate(getAgentRebalanceScanTask(), 60000, 60000, TimeUnit.MILLISECONDS); s_transferExecutor.scheduleAtFixedRate(getTransferScanTask(), 60000, ClusteredAgentRebalanceService.DEFAULT_TRANSFER_CHECK_INTERVAL, TimeUnit.MILLISECONDS); } @@ -571,6 +567,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } } _timer.cancel(); + _agentLbTimer.cancel(); //cancel all transfer tasks s_transferExecutor.shutdownNow(); @@ -1354,44 +1351,52 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } public boolean rebalanceAgent(long agentId, Event event, long currentOwnerId, long futureOwnerId) throws AgentUnavailableException, OperationTimedoutException { - return _rebalanceService.executeRebalanceRequest(agentId, currentOwnerId, futureOwnerId, event); + return executeRebalanceRequest(agentId, currentOwnerId, futureOwnerId, event); } public boolean isAgentRebalanceEnabled() { return EnableLB.value(); } - - private ClusteredAgentRebalanceService _rebalanceService; - - boolean _agentLbHappened = false; - public void agentrebalance() { - Profiler profilerAgentLB = new Profiler(); - profilerAgentLB.start(); - //initiate agent lb task will be scheduled and executed only once, and only when number of agents loaded exceeds _connectedAgentsThreshold - if (EnableLB.value() && !_agentLbHappened) { - QueryBuilder sc = QueryBuilder.create(HostVO.class); - sc.and(sc.entity().getManagementServerId(), Op.NNULL); - sc.and(sc.entity().getType(), Op.EQ, Host.Type.Routing); - List allManagedRoutingAgents = sc.list(); - - sc = QueryBuilder.create(HostVO.class); - sc.and(sc.entity().getType(), Op.EQ, Host.Type.Routing); - List allAgents = sc.list(); - double allHostsCount = allAgents.size(); - double managedHostsCount = allManagedRoutingAgents.size(); - if (allHostsCount > 0.0) { - double load = managedHostsCount / allHostsCount; - if (load >= ConnectedAgentThreshold.value()) { - s_logger.debug("Scheduling agent rebalancing task as the average agent load " + load + " is more than the threshold " + ConnectedAgentThreshold.value()); - _rebalanceService.scheduleRebalanceAgents(); - _agentLbHappened = true; - } else { - s_logger.trace("Not scheduling agent rebalancing task as the averages load " + load + " is less than the threshold " + ConnectedAgentThreshold.value()); + + + private Runnable getAgentRebalanceScanTask() { + return new ManagedContextRunnable() { + @Override + protected void runInContext() { + try { + if (s_logger.isTraceEnabled()) { + s_logger.trace("Agent rebalance task check, management server id:" + _nodeId); } + //initiate agent lb task will be scheduled and executed only once, and only when number of agents loaded exceeds _connectedAgentsThreshold + if (!_agentLbHappened) { + QueryBuilder sc = QueryBuilder.create(HostVO.class); + sc.and(sc.entity().getManagementServerId(), Op.NNULL); + sc.and(sc.entity().getType(), Op.EQ, Host.Type.Routing); + List allManagedRoutingAgents = sc.list(); + + sc = QueryBuilder.create(HostVO.class); + sc.and(sc.entity().getType(), Op.EQ, Host.Type.Routing); + List allAgents = sc.list(); + double allHostsCount = allAgents.size(); + double managedHostsCount = allManagedRoutingAgents.size(); + if (allHostsCount > 0.0) { + double load = managedHostsCount / allHostsCount; + if (load >= ConnectedAgentThreshold.value()) { + s_logger.debug("Scheduling agent rebalancing task as the average agent load " + load + " is more than the threshold " + ConnectedAgentThreshold.value()); + scheduleRebalanceAgents(); + _agentLbHappened = true; + } else { + s_logger.debug("Not scheduling agent rebalancing task as the averages load " + load + " is less than the threshold " + ConnectedAgentThreshold.value()); + } + } + } + } catch (Throwable e) { + s_logger.error("Problem with the clustered agent transfer scan check!", e); } } - profilerAgentLB.stop(); - } + }; +} + @Override public void rescan() {