Addded back Agent Load Balancing functionality (was temporarely disabled in master by vmSync merge)

This commit is contained in:
Alena Prokharchyk 2013-10-07 09:46:03 -07:00
parent 23ddf29532
commit 66185076df
1 changed files with 54 additions and 49 deletions

View File

@ -43,16 +43,13 @@ import javax.naming.ConfigurationException;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.apache.log4j.Logger;
import com.google.gson.Gson;
import org.apache.cloudstack.framework.config.ConfigDepot;
import org.apache.cloudstack.framework.config.ConfigKey;
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
import org.apache.cloudstack.managed.context.ManagedContextTimerTask;
import org.apache.cloudstack.utils.identity.ManagementServerNode;
import org.apache.log4j.Logger;
import com.cloud.agent.AgentManager;
import com.cloud.agent.api.Answer;
@ -86,7 +83,6 @@ import com.cloud.host.Status.Event;
import com.cloud.resource.ServerResource;
import com.cloud.serializer.GsonHelper;
import com.cloud.utils.DateUtil;
import com.cloud.utils.Profiler;
import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.db.QueryBuilder;
import com.cloud.utils.db.SearchCriteria.Op;
@ -94,36 +90,35 @@ import com.cloud.utils.db.Transaction;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.utils.nio.Link;
import com.cloud.utils.nio.Task;
import com.google.gson.Gson;
@Local(value = { AgentManager.class, ClusteredAgentRebalanceService.class })
public class ClusteredAgentManagerImpl extends AgentManagerImpl implements ClusterManagerListener, ClusteredAgentRebalanceService {
final static Logger s_logger = Logger.getLogger(ClusteredAgentManagerImpl.class);
private static final ScheduledExecutorService s_transferExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Cluster-AgentTransferExecutor"));
private static final ScheduledExecutorService s_transferExecutor = Executors.newScheduledThreadPool(2, new NamedThreadFactory("Cluster-AgentRebalancingExecutor"));
private final long rebalanceTimeOut = 300000; // 5 mins - after this time remove the agent from the transfer list
public final static long STARTUP_DELAY = 5000;
public final static long SCAN_INTERVAL = 90000; // 90 seconds, it takes 60 sec for xenserver to fail login
public final static int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 5; // 5 seconds
protected Set<Long> _agentToTransferIds = new HashSet<Long>();
Gson _gson;
@Inject
protected ClusterManager _clusterMgr = null;
protected HashMap<String, SocketChannel> _peers;
protected HashMap<String, SSLEngine> _sslEngines;
private final Timer _timer = new Timer("ClusteredAgentManager Timer");
private final Timer _agentLbTimer = new Timer("ClusteredAgentManager AgentRebalancing Timer");
boolean _agentLbHappened = false;
@Inject
protected ClusterManager _clusterMgr = null;
@Inject
protected ManagementServerHostDao _mshostDao;
@Inject
protected HostTransferMapDao _hostTransferDao;
// @com.cloud.utils.component.Inject(adapter = AgentLoadBalancerPlanner.class)
@Inject protected List<AgentLoadBalancerPlanner> _lbPlanners;
@Inject ConfigurationDao _configDao;
@Inject
protected List<AgentLoadBalancerPlanner> _lbPlanners;
@Inject
ConfigurationDao _configDao;
@Inject
ConfigDepot _configDepot;
@ -168,9 +163,10 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
if (s_logger.isDebugEnabled()) {
s_logger.debug("Scheduled direct agent scan task to run at an interval of " + ScanInterval.value() + " seconds");
}
// schedule transfer scan executor - if agent LB is enabled
// Schedule tasks for agent rebalancing
if (isAgentRebalanceEnabled()) {
s_transferExecutor.scheduleAtFixedRate(getAgentRebalanceScanTask(), 60000, 60000, TimeUnit.MILLISECONDS);
s_transferExecutor.scheduleAtFixedRate(getTransferScanTask(), 60000, ClusteredAgentRebalanceService.DEFAULT_TRANSFER_CHECK_INTERVAL,
TimeUnit.MILLISECONDS);
}
@ -571,6 +567,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
}
_timer.cancel();
_agentLbTimer.cancel();
//cancel all transfer tasks
s_transferExecutor.shutdownNow();
@ -1354,44 +1351,52 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
public boolean rebalanceAgent(long agentId, Event event, long currentOwnerId, long futureOwnerId) throws AgentUnavailableException, OperationTimedoutException {
return _rebalanceService.executeRebalanceRequest(agentId, currentOwnerId, futureOwnerId, event);
return executeRebalanceRequest(agentId, currentOwnerId, futureOwnerId, event);
}
public boolean isAgentRebalanceEnabled() {
return EnableLB.value();
}
private ClusteredAgentRebalanceService _rebalanceService;
boolean _agentLbHappened = false;
public void agentrebalance() {
Profiler profilerAgentLB = new Profiler();
profilerAgentLB.start();
//initiate agent lb task will be scheduled and executed only once, and only when number of agents loaded exceeds _connectedAgentsThreshold
if (EnableLB.value() && !_agentLbHappened) {
QueryBuilder<HostVO> sc = QueryBuilder.create(HostVO.class);
sc.and(sc.entity().getManagementServerId(), Op.NNULL);
sc.and(sc.entity().getType(), Op.EQ, Host.Type.Routing);
List<HostVO> allManagedRoutingAgents = sc.list();
sc = QueryBuilder.create(HostVO.class);
sc.and(sc.entity().getType(), Op.EQ, Host.Type.Routing);
List<HostVO> allAgents = sc.list();
double allHostsCount = allAgents.size();
double managedHostsCount = allManagedRoutingAgents.size();
if (allHostsCount > 0.0) {
double load = managedHostsCount / allHostsCount;
if (load >= ConnectedAgentThreshold.value()) {
s_logger.debug("Scheduling agent rebalancing task as the average agent load " + load + " is more than the threshold " + ConnectedAgentThreshold.value());
_rebalanceService.scheduleRebalanceAgents();
_agentLbHappened = true;
} else {
s_logger.trace("Not scheduling agent rebalancing task as the averages load " + load + " is less than the threshold " + ConnectedAgentThreshold.value());
private Runnable getAgentRebalanceScanTask() {
return new ManagedContextRunnable() {
@Override
protected void runInContext() {
try {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Agent rebalance task check, management server id:" + _nodeId);
}
//initiate agent lb task will be scheduled and executed only once, and only when number of agents loaded exceeds _connectedAgentsThreshold
if (!_agentLbHappened) {
QueryBuilder<HostVO> sc = QueryBuilder.create(HostVO.class);
sc.and(sc.entity().getManagementServerId(), Op.NNULL);
sc.and(sc.entity().getType(), Op.EQ, Host.Type.Routing);
List<HostVO> allManagedRoutingAgents = sc.list();
sc = QueryBuilder.create(HostVO.class);
sc.and(sc.entity().getType(), Op.EQ, Host.Type.Routing);
List<HostVO> allAgents = sc.list();
double allHostsCount = allAgents.size();
double managedHostsCount = allManagedRoutingAgents.size();
if (allHostsCount > 0.0) {
double load = managedHostsCount / allHostsCount;
if (load >= ConnectedAgentThreshold.value()) {
s_logger.debug("Scheduling agent rebalancing task as the average agent load " + load + " is more than the threshold " + ConnectedAgentThreshold.value());
scheduleRebalanceAgents();
_agentLbHappened = true;
} else {
s_logger.debug("Not scheduling agent rebalancing task as the averages load " + load + " is less than the threshold " + ConnectedAgentThreshold.value());
}
}
}
} catch (Throwable e) {
s_logger.error("Problem with the clustered agent transfer scan check!", e);
}
}
profilerAgentLB.stop();
}
};
}
@Override
public void rescan() {