From 00a35314bbf14ca36914cded06df534cdd371ec9 Mon Sep 17 00:00:00 2001 From: alena Date: Mon, 13 Jun 2011 17:18:03 -0700 Subject: [PATCH] Some fixes to agent lb code: * Schedule agent LB as a TimerTask as it can take time for the management server to start; and it can accept rebalance requests only when it's up and running * Removed Starting state from mshost as it's not being used anywhere * Fixed the bug where requests weren't routed properly from the old host owner to the new one. --- .../agent/manager/ClusteredAgentAttache.java | 79 +++++++++++++- .../manager/ClusteredAgentManagerImpl.java | 100 ++++++++++++------ .../manager/ClusteredDirectAgentAttache.java | 59 ----------- .../com/cloud/cluster/ClusterManagerImpl.java | 35 ++---- .../ClusteredAgentRebalanceService.java | 2 +- .../dao/ManagementServerHostDaoImpl.java | 2 +- 6 files changed, 155 insertions(+), 122 deletions(-) diff --git a/server/src/com/cloud/agent/manager/ClusteredAgentAttache.java b/server/src/com/cloud/agent/manager/ClusteredAgentAttache.java index a4929f18799..0374e3969e8 100644 --- a/server/src/com/cloud/agent/manager/ClusteredAgentAttache.java +++ b/server/src/com/cloud/agent/manager/ClusteredAgentAttache.java @@ -9,6 +9,10 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SocketChannel; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; import javax.net.ssl.SSLEngine; @@ -16,8 +20,10 @@ import org.apache.log4j.Logger; import com.cloud.agent.AgentManager; import com.cloud.agent.Listener; +import com.cloud.agent.api.Command; import com.cloud.agent.transport.Request; import com.cloud.exception.AgentUnavailableException; +import com.cloud.host.Status; import com.cloud.utils.nio.Link; @@ -26,6 +32,8 @@ public class ClusteredAgentAttache extends ConnectedAgentAttache implements Rout private static ClusteredAgentManagerImpl s_clusteredAgentMgr; protected ByteBuffer _buffer = ByteBuffer.allocate(2048); private boolean _forward = false; + protected final LinkedList _transferRequests; + protected boolean _transferMode = false; static public void initialize(ClusteredAgentManagerImpl agentMgr) { s_clusteredAgentMgr = agentMgr; @@ -34,11 +42,13 @@ public class ClusteredAgentAttache extends ConnectedAgentAttache implements Rout public ClusteredAgentAttache(AgentManager agentMgr, long id) { super(agentMgr, id, null, false); _forward = true; + _transferRequests = new LinkedList(); } public ClusteredAgentAttache(AgentManager agentMgr, long id, Link link, boolean maintenance) { super(agentMgr, id, link, maintenance); _forward = link == null; + _transferRequests = new LinkedList(); } @Override @@ -50,7 +60,22 @@ public class ClusteredAgentAttache extends ConnectedAgentAttache implements Rout public boolean forForward() { return _forward; } - + + protected void checkAvailability(final Command[] cmds) throws AgentUnavailableException { + + if (_transferMode) { + // need to throw some other exception while agent is in rebalancing mode + for (final Command cmd : cmds) { + if (!cmd.allowCaching()) { + throw new AgentUnavailableException("Unable to send " + cmd.getClass().toString() + " because agent is in Rebalancing mode", _id); + } + } + } else { + super.checkAvailability(cmds); + } + } + + @Override public void cancel(long seq) { if (forForward()) { @@ -104,12 +129,24 @@ public class ClusteredAgentAttache extends ConnectedAgentAttache implements Rout super.send(req, listener); return; } - + long seq = req.getSequence(); if (listener != null) { registerListener(req.getSequence(), listener); } + + if (_transferMode) { + + if (s_logger.isDebugEnabled()) { + s_logger.debug(log(seq, "Holding request as the corresponding agent is in transfer mode: ")); + } + + synchronized (this) { + addRequestToTransfer(req); + return; + } + } int i = 0; SocketChannel ch = null; @@ -162,4 +199,42 @@ public class ClusteredAgentAttache extends ConnectedAgentAttache implements Rout } throw new AgentUnavailableException("Unable to reach the peer that the agent is connected", _id); } + + public synchronized void setTransferMode(final boolean transfer) { + _transferMode = transfer; + } + + + public boolean getTransferMode() { + return _transferMode; + } + + public Request getRequestToTransfer() { + if (_transferRequests.isEmpty()) { + return null; + } else { + return _transferRequests.pop(); + } + } + + protected synchronized void addRequestToTransfer(Request req) { + int index = findTransferRequest(req); + assert (index < 0) : "How can we get index again? " + index + ":" + req.toString(); + _transferRequests.add(-index - 1, req); + } + + protected synchronized int findTransferRequest(Request req) { + return Collections.binarySearch(_transferRequests, req, s_reqComparator); + } + + @Override + public void disconnect(final Status state) { + super.disconnect(state); + _transferRequests.clear(); + } + + public void cleanup(final Status state) { + super.cleanup(state); + _transferRequests.clear(); + } } diff --git a/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java b/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java index fbb90bd959c..2dabf5fd53b 100644 --- a/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java +++ b/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java @@ -728,11 +728,45 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust return true; } - + @Override + public void scheduleRebalanceAgents() { + _timer.schedule(new AgentLoadBalancerTask(), 30000); + } + + public class AgentLoadBalancerTask extends TimerTask { + protected volatile boolean cancelled = false; + + public AgentLoadBalancerTask() { + s_logger.debug("Agent load balancer task created"); + } + + @Override + public synchronized boolean cancel() { + if (!cancelled) { + cancelled = true; + s_logger.debug("Agent load balancer task cancelled"); + return super.cancel(); + } + return true; + } + + @Override + public synchronized void run() { + if (!cancelled) { + startRebalanceAgents(); + if (s_logger.isInfoEnabled()) { + s_logger.info("The agent load balancer task is now being cancelled"); + } + cancelled = true; + } + } + } + + public void startRebalanceAgents() { - - List allMS = _mshostDao.listBy(ManagementServerHost.State.Up, ManagementServerHost.State.Starting); + s_logger.debug("Management server " + _nodeId + " is asking other peers to rebalance their agents"); + List allMS = _mshostDao.listBy(ManagementServerHost.State.Up); List allManagedAgents = _hostDao.listManagedRoutingAgents(); int avLoad = 0; @@ -899,9 +933,10 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust boolean result = true; if (currentOwnerId == _nodeId) { + _agentToTransferIds.remove(hostId); if (!startRebalance(hostId)) { s_logger.debug("Failed to start agent rebalancing"); - failStartRebalance(hostId); + failRebalance(hostId); return false; } try { @@ -948,46 +983,43 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } AgentAttache attache = findAttache(hostId); - if (attache == null) { - s_logger.debug("Unable to find attache for the host id=" + hostId + ", assuming that the agent disconnected already"); + if (attache == null || !(attache instanceof ClusteredAgentAttache)) { + s_logger.debug("Unable to find forward attache for the host id=" + hostId + ", assuming that the agent disconnected already"); _hostTransferDao.completeAgentTransfer(hostId); return; - } else if (success) { - s_logger.debug("Management server " + _nodeId + " is completing agent " + hostId + " rebalance"); - //1) Get all the requests before removing transfer attache - LinkedList requests = ((ClusteredDirectAgentAttache) attache).getRequests(); - removeAgent(attache, Status.Rebalancing); + } + + ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache)attache; + + if (success) { - //2) Create forward attache - try { - getAttache(hostId); - //3) forward all the requests to the management server which owns the host now - if (!requests.isEmpty()) { - s_logger.debug("Forwarding requests held in transfer attache " + hostId + " from the management server " + _nodeId + " to " + futureOwnerId); - - for (Iterator iter = requests.iterator(); iter.hasNext();) { - Request req = iter.next(); - boolean routeResult = routeToPeer(Long.toString(futureOwnerId), req.getBytes()); - if (!routeResult) { - logD(req.getBytes(), "Failed to route request to peer"); - } - } + //1) Set transfer mode to false - so the agent can start processing requests normally + forwardAttache.setTransferMode(false); + + //2) Get all transfer requests and route them to peer + Request requestToTransfer = forwardAttache.getRequestToTransfer(); + while (requestToTransfer != null) { + s_logger.debug("Forwarding request " + requestToTransfer.getSequence() + " held in transfer attache " + hostId + " from the management server " + _nodeId + " to " + futureOwnerId); + boolean routeResult = routeToPeer(Long.toString(futureOwnerId), requestToTransfer.getBytes()); + if (!routeResult) { + logD(requestToTransfer.getBytes(), "Failed to route request to peer"); } - } catch (AgentUnavailableException ex) { - s_logger.warn("Failed to finish host " + hostId + " rebalance: couldn't create forward attache as agent is not available", ex); - failRebalance(hostId); + requestToTransfer = forwardAttache.getRequestToTransfer(); } + } else { failRebalance(hostId); } + s_logger.debug("Management server " + _nodeId + " completed agent " + hostId + " rebalance"); _hostTransferDao.completeAgentTransfer(hostId); } protected void failRebalance(final long hostId) throws AgentUnavailableException{ - reconnect(hostId); + s_logger.debug("Management server " + _nodeId + " failed to rebalance agent " + hostId); _hostTransferDao.completeAgentTransfer(hostId); + reconnect(hostId); } @DB @@ -1003,9 +1035,15 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust ClusteredDirectAgentAttache attache = (ClusteredDirectAgentAttache)_agents.get(hostId); if (attache != null && attache.getQueueSize() == 0 && attache.getNonRecurringListenersSize() == 0) { _agentToTransferIds.remove(hostId); + removeAgent(attache, Status.Rebalancing); + ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache)createAttache(hostId); + if (forwardAttache == null) { + s_logger.warn("Unable to create a forward attache for the host " + hostId + " as a part of rebalance process"); + return false; + } s_logger.debug("Putting agent id=" + hostId + " to transfer mode"); - attache.setTransferMode(true); - _agents.put(hostId, attache); + forwardAttache.setTransferMode(true); + _agents.put(hostId, forwardAttache); } else { if (attache == null) { s_logger.warn("Attache for the agent " + hostId + " no longer exists on management server " + _nodeId + ", can't start host rebalancing"); diff --git a/server/src/com/cloud/agent/manager/ClusteredDirectAgentAttache.java b/server/src/com/cloud/agent/manager/ClusteredDirectAgentAttache.java index 58ef0ef73f2..37371ab44f6 100644 --- a/server/src/com/cloud/agent/manager/ClusteredDirectAgentAttache.java +++ b/server/src/com/cloud/agent/manager/ClusteredDirectAgentAttache.java @@ -17,13 +17,7 @@ */ package com.cloud.agent.manager; -import java.util.LinkedList; - -import org.apache.log4j.Logger; - import com.cloud.agent.AgentManager; -import com.cloud.agent.Listener; -import com.cloud.agent.api.Command; import com.cloud.agent.transport.Request; import com.cloud.agent.transport.Response; import com.cloud.exception.AgentUnavailableException; @@ -32,10 +26,8 @@ import com.cloud.resource.ServerResource; import com.cloud.utils.exception.CloudRuntimeException; public class ClusteredDirectAgentAttache extends DirectAgentAttache implements Routable { - private final static Logger s_logger = Logger.getLogger(ClusteredDirectAgentAttache.class); private final ClusteredAgentManagerImpl _mgr; private final long _nodeId; - private boolean _transferMode = false; public ClusteredDirectAgentAttache(AgentManager agentMgr, long id, long mgmtId, ServerResource resource, boolean maintenance, ClusteredAgentManagerImpl mgr) { super(agentMgr, id, resource, maintenance, mgr); @@ -43,25 +35,6 @@ public class ClusteredDirectAgentAttache extends DirectAgentAttache implements R _nodeId = mgmtId; } - public synchronized void setTransferMode(final boolean transfer) { - _transferMode = transfer; - } - - @Override - protected void checkAvailability(final Command[] cmds) throws AgentUnavailableException { - - if (_transferMode) { - // need to throw some other exception while agent is in rebalancing mode - for (final Command cmd : cmds) { - if (!cmd.allowCaching()) { - throw new AgentUnavailableException("Unable to send " + cmd.getClass().toString() + " because agent is in Rebalancing mode", _id); - } - } - } - - super.checkAvailability(cmds); - } - @Override public void routeToAgent(byte[] data) throws AgentUnavailableException { Request req; @@ -93,37 +66,5 @@ public class ClusteredDirectAgentAttache extends DirectAgentAttache implements R return super.processAnswers(seq, response); } } - - @Override - public void send(Request req, final Listener listener) throws AgentUnavailableException { - checkAvailability(req.getCommands()); - - if (_transferMode) { - long seq = req.getSequence(); - - if (s_logger.isDebugEnabled()) { - s_logger.debug(log(seq, "Holding request as the corresponding agent is in transfer mode: ")); - } - - synchronized (this) { - addRequest(req); - return; - } - } else { - super.send(req, listener); - } - } - - public boolean getTransferMode() { - return _transferMode; - } - - public LinkedList getRequests() { - if (_transferMode) { - return _requests; - } else { - return null; - } - } } diff --git a/server/src/com/cloud/cluster/ClusterManagerImpl.java b/server/src/com/cloud/cluster/ClusterManagerImpl.java index 37d701f4c32..a5da74746c5 100644 --- a/server/src/com/cloud/cluster/ClusterManagerImpl.java +++ b/server/src/com/cloud/cluster/ClusterManagerImpl.java @@ -126,9 +126,7 @@ public class ClusterManagerImpl implements ClusterManager { private String _name; private String _clusterNodeIP = "127.0.0.1"; private boolean _agentLBEnabled = false; - private State _state = State.Starting; - private final Object stateLock = new Object(); - + public ClusterManagerImpl() { clusterPeers = new HashMap(); asyncCalls = new HashMap(); @@ -572,14 +570,6 @@ public class ClusterManagerImpl implements ClusterManager { Connection conn = getHeartbeatConnection(); _mshostDao.update(conn, _mshostId, getCurrentRunId(), DateUtil.currentGMTTime()); - if (_state == State.Starting) { - synchronized (stateLock) { - _mshostDao.update(conn, _mshostId, getCurrentRunId(), State.Up, DateUtil.currentGMTTime()); - _state = State.Up; - stateLock.notifyAll(); - } - } - if (s_logger.isTraceEnabled()) { s_logger.trace("Cluster manager peer-scan, id:" + _mshostId); } @@ -887,7 +877,7 @@ public class ClusterManagerImpl implements ClusterManager { mshost.setLastUpdateTime(DateUtil.currentGMTTime()); mshost.setRemoved(null); mshost.setAlertCount(0); - mshost.setState(ManagementServerHost.State.Starting); + mshost.setState(ManagementServerHost.State.Up); _mshostDao.persist(mshost); if (s_logger.isInfoEnabled()) { @@ -907,26 +897,15 @@ public class ClusterManagerImpl implements ClusterManager { if (s_logger.isInfoEnabled()) { s_logger.info("Management server (host id : " + _mshostId + ") is being started at " + _clusterNodeIP + ":" + _currentServiceAdapter.getServicePort()); } - - // Initiate agent rebalancing - if (_agentLBEnabled) { - s_logger.debug("Management server " + _msId + " is asking other peers to rebalance their agents"); - _rebalanceService.startRebalanceAgents(); - } // use seperate thread for heartbeat updates _heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), heartbeatInterval, heartbeatInterval, TimeUnit.MILLISECONDS); _notificationExecutor.submit(getNotificationTask()); - - //wait here for heartbeat task to update the host state - try { - synchronized (stateLock) { - while (_state != State.Up) { - stateLock.wait(); - } - } - } catch (final InterruptedException e) { - } + + //Initiate agent rebalancing after the host is in UP state + if (_agentLBEnabled) { + _rebalanceService.scheduleRebalanceAgents(); + } } catch (Throwable e) { s_logger.error("Unexpected exception : ", e); diff --git a/server/src/com/cloud/cluster/ClusteredAgentRebalanceService.java b/server/src/com/cloud/cluster/ClusteredAgentRebalanceService.java index 73177c6e47f..2bc6a6641a1 100644 --- a/server/src/com/cloud/cluster/ClusteredAgentRebalanceService.java +++ b/server/src/com/cloud/cluster/ClusteredAgentRebalanceService.java @@ -7,7 +7,7 @@ import com.cloud.host.Status.Event; public interface ClusteredAgentRebalanceService { public static final int DEFAULT_TRANSFER_CHECK_INTERVAL = 10000; - void startRebalanceAgents(); + void scheduleRebalanceAgents(); boolean executeRebalanceRequest(long agentId, long currentOwnerId, long futureOwnerId, Event event) throws AgentUnavailableException, OperationTimedoutException; diff --git a/server/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java b/server/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java index 5db287b202c..89b14779ccc 100644 --- a/server/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java +++ b/server/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java @@ -183,7 +183,7 @@ public class ManagementServerHostDaoImpl extends GenericDaoBase