From 9461ad9ddff69be2d8b8e9a3d395e9487969339e Mon Sep 17 00:00:00 2001 From: alena Date: Fri, 24 Jun 2011 15:38:09 -0700 Subject: [PATCH] bug 10445: Submit rebalancing task in a separate thread for each host status 10445: resolved fixed --- .../manager/ClusteredAgentManagerImpl.java | 49 +++++++++++++++++-- .../src/com/cloud/host/dao/HostDaoImpl.java | 12 ++++- 2 files changed, 55 insertions(+), 6 deletions(-) diff --git a/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java b/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java index beef91b0a96..bf1a1e2244c 100644 --- a/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java +++ b/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java @@ -21,8 +21,12 @@ import java.util.Map; import java.util.Set; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.ejb.Local; @@ -38,6 +42,7 @@ import com.cloud.agent.api.CancelCommand; import com.cloud.agent.api.ChangeAgentCommand; import com.cloud.agent.api.Command; import com.cloud.agent.api.TransferAgentCommand; +import com.cloud.agent.manager.AgentManagerImpl.SimulateStartTask; import com.cloud.agent.transport.Request; import com.cloud.agent.transport.Request.Version; import com.cloud.agent.transport.Response; @@ -47,6 +52,7 @@ import com.cloud.cluster.ClusterManagerListener; import com.cloud.cluster.ClusteredAgentRebalanceService; import com.cloud.cluster.ManagementServerHost; import com.cloud.cluster.ManagementServerHostVO; +import com.cloud.cluster.StackMaid; import com.cloud.cluster.agentlb.AgentLoadBalancerPlanner; import com.cloud.cluster.agentlb.HostTransferMapVO; import com.cloud.cluster.agentlb.HostTransferMapVO.HostTransferState; @@ -79,14 +85,13 @@ import com.cloud.utils.nio.Task; public class ClusteredAgentManagerImpl extends AgentManagerImpl implements ClusterManagerListener, ClusteredAgentRebalanceService { final static Logger s_logger = Logger.getLogger(ClusteredAgentManagerImpl.class); private static final ScheduledExecutorService s_transferExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Cluster-AgentTransferExecutor")); + private final long rebalanceTimeOut = 300000; // 5 mins - after this time remove the agent from the transfer list public final static long STARTUP_DELAY = 5000; public final static long SCAN_INTERVAL = 90000; // 90 seconds, it takes 60 sec for xenserver to fail login public final static int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 5; // 5 seconds public long _loadSize = 100; - protected Set _agentToTransferIds = new HashSet(); - - private final long rebalanceTimeOut = 300000; // 5 mins - after this time remove the agent from the transfer list + protected Set _agentToTransferIds = new HashSet(); @Inject protected ClusterManager _clusterMgr = null; @@ -121,7 +126,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust ClusteredAgentAttache.initialize(this); _clusterMgr.registerListener(this); - + return super.configure(name, xmlParams); } @@ -906,7 +911,13 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust if (attache.getQueueSize() == 0 && attache.getNonRecurringListenersSize() == 0) { iterator.remove(); - rebalanceHost(hostId, transferMap.getInitialOwner(), transferMap.getFutureOwner()); + try { + _executor.execute(new RebalanceTask(hostId, transferMap.getInitialOwner(), transferMap.getFutureOwner())); + } catch (RejectedExecutionException ex) { + s_logger.warn("Failed to submit rebalance task for host id=" + hostId + "; postponing the execution"); + continue; + } + } else { s_logger.debug("Agent " + hostId + " can't be transfered yet as its request queue size is " + attache.getQueueSize() + " and listener queue size is " + attache.getNonRecurringListenersSize()); } @@ -1083,4 +1094,32 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } } + + protected class RebalanceTask implements Runnable { + Long hostId = null; + Long currentOwnerId = null; + Long futureOwnerId = null; + + + public RebalanceTask(long hostId, long currentOwnerId, long futureOwnerId) { + this.hostId = hostId; + this.currentOwnerId = currentOwnerId; + this.futureOwnerId = futureOwnerId; + } + + @Override + public void run() { + try { + if (s_logger.isDebugEnabled()) { + s_logger.debug("Rebalancing host id=" + hostId); + } + rebalanceHost(hostId, currentOwnerId, futureOwnerId); + } catch (Exception e) { + s_logger.warn("Unable to rebalance host id=" + hostId, e); + } finally { + StackMaid.current().exitCleanup(); + } + } + } + } diff --git a/server/src/com/cloud/host/dao/HostDaoImpl.java b/server/src/com/cloud/host/dao/HostDaoImpl.java index 06717dbfd27..6acd0990e72 100644 --- a/server/src/com/cloud/host/dao/HostDaoImpl.java +++ b/server/src/com/cloud/host/dao/HostDaoImpl.java @@ -32,6 +32,8 @@ import javax.persistence.TableGenerator; import org.apache.log4j.Logger; +import com.cloud.cluster.agentlb.HostTransferMapVO; +import com.cloud.cluster.agentlb.dao.HostTransferMapDaoImpl; import com.cloud.host.Host; import com.cloud.host.Host.Type; import com.cloud.host.HostTagVO; @@ -50,6 +52,7 @@ import com.cloud.utils.db.GenericSearchBuilder; import com.cloud.utils.db.JoinBuilder; import com.cloud.utils.db.SearchBuilder; import com.cloud.utils.db.SearchCriteria; +import com.cloud.utils.db.JoinBuilder.JoinType; import com.cloud.utils.db.SearchCriteria.Func; import com.cloud.utils.db.SearchCriteria.Op; import com.cloud.utils.db.Transaction; @@ -95,6 +98,7 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao protected final GenericSearchBuilder HostsInStatusSearch; protected final GenericSearchBuilder CountRoutingByDc; + protected final SearchBuilder HostTransferSearch; protected final Attribute _statusAttr; protected final Attribute _msIdAttr; @@ -102,6 +106,7 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao protected final HostDetailsDaoImpl _detailsDao = ComponentLocator.inject(HostDetailsDaoImpl.class); protected final HostTagsDaoImpl _hostTagsDao = ComponentLocator.inject(HostTagsDaoImpl.class); + protected final HostTransferMapDaoImpl _hostTransferDao = ComponentLocator.inject(HostTransferMapDaoImpl.class); public HostDaoImpl() { @@ -218,7 +223,11 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao * UnmanagedDirectConnectSearch.and("lastPinged", UnmanagedDirectConnectSearch.entity().getLastPinged(), * SearchCriteria.Op.LTEQ); UnmanagedDirectConnectSearch.cp(); UnmanagedDirectConnectSearch.cp(); */ + HostTransferSearch = _hostTransferDao.createSearchBuilder(); + HostTransferSearch.and("id", HostTransferSearch.entity().getId(), SearchCriteria.Op.NULL); + UnmanagedDirectConnectSearch.join("hostTransferSearch", HostTransferSearch, HostTransferSearch.entity().getId(), UnmanagedDirectConnectSearch.entity().getId(), JoinType.LEFTOUTER); UnmanagedDirectConnectSearch.done(); + DirectConnectSearch = createSearchBuilder(); DirectConnectSearch.and("resource", DirectConnectSearch.entity().getResource(), SearchCriteria.Op.NNULL); @@ -350,7 +359,8 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao @Override public List findDirectAgentToLoad(long lastPingSecondsAfter, Long limit) { SearchCriteria sc = UnmanagedDirectConnectSearch.create(); - sc.setParameters("lastPinged", lastPingSecondsAfter); + sc.setParameters("lastPinged", lastPingSecondsAfter); + return search(sc, new Filter(HostVO.class, "clusterId", true, 0L, limit)); }