diff --git a/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java b/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java index a274803872b..0ff3d666856 100755 --- a/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java +++ b/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java @@ -10,435 +10,435 @@ // limitations under the License. // // Automatically generated by addcopyright.py at 04/03/2012 -package com.cloud.agent.manager; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.UnknownHostException; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import javax.ejb.Local; -import javax.naming.ConfigurationException; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; - -import org.apache.log4j.Logger; - -import com.cloud.agent.AgentManager; -import com.cloud.agent.api.Answer; -import com.cloud.agent.api.CancelCommand; -import com.cloud.agent.api.ChangeAgentCommand; -import com.cloud.agent.api.Command; -import com.cloud.agent.api.TransferAgentCommand; -import com.cloud.agent.transport.Request; -import com.cloud.agent.transport.Request.Version; -import com.cloud.agent.transport.Response; -import com.cloud.cluster.ClusterManager; -import com.cloud.cluster.ClusterManagerListener; -import com.cloud.cluster.ClusteredAgentRebalanceService; -import com.cloud.cluster.ManagementServerHost; -import com.cloud.cluster.ManagementServerHostVO; -import com.cloud.cluster.StackMaid; -import com.cloud.cluster.agentlb.AgentLoadBalancerPlanner; -import com.cloud.cluster.agentlb.HostTransferMapVO; -import com.cloud.cluster.agentlb.HostTransferMapVO.HostTransferState; -import com.cloud.cluster.agentlb.dao.HostTransferMapDao; -import com.cloud.cluster.dao.ManagementServerHostDao; -import com.cloud.configuration.Config; -import com.cloud.configuration.dao.ConfigurationDao; -import com.cloud.exception.AgentUnavailableException; -import com.cloud.exception.OperationTimedoutException; -import com.cloud.host.Host; -import com.cloud.host.HostVO; -import com.cloud.host.Status; -import com.cloud.host.Status.Event; -import com.cloud.resource.ServerResource; -import com.cloud.storage.resource.DummySecondaryStorageResource; -import com.cloud.utils.DateUtil; -import com.cloud.utils.NumbersUtil; -import com.cloud.utils.component.Adapters; -import com.cloud.utils.component.ComponentLocator; -import com.cloud.utils.component.Inject; -import com.cloud.utils.concurrency.NamedThreadFactory; -import com.cloud.utils.db.SearchCriteria.Op; -import com.cloud.utils.db.SearchCriteria2; -import com.cloud.utils.db.SearchCriteriaService; -import com.cloud.utils.db.Transaction; -import com.cloud.utils.exception.CloudRuntimeException; -import com.cloud.utils.nio.Link; -import com.cloud.utils.nio.Task; - -@Local(value = { AgentManager.class, ClusteredAgentRebalanceService.class }) -public class ClusteredAgentManagerImpl extends AgentManagerImpl implements ClusterManagerListener, ClusteredAgentRebalanceService { - final static Logger s_logger = Logger.getLogger(ClusteredAgentManagerImpl.class); - private static final ScheduledExecutorService s_transferExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Cluster-AgentTransferExecutor")); - private final long rebalanceTimeOut = 300000; // 5 mins - after this time remove the agent from the transfer list - - public final static long STARTUP_DELAY = 5000; - public final static long SCAN_INTERVAL = 90000; // 90 seconds, it takes 60 sec for xenserver to fail login - public final static int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 5; // 5 seconds - public long _loadSize = 100; - protected Set _agentToTransferIds = new HashSet(); - - @Inject - protected ClusterManager _clusterMgr = null; - - protected HashMap _peers; - protected HashMap _sslEngines; - private final Timer _timer = new Timer("ClusteredAgentManager Timer"); - - @Inject - protected ManagementServerHostDao _mshostDao; - @Inject - protected HostTransferMapDao _hostTransferDao; - - @Inject(adapter = AgentLoadBalancerPlanner.class) - protected Adapters _lbPlanners; - - @Inject - protected AgentManager _agentMgr; - - protected ClusteredAgentManagerImpl() { - super(); - } - - @Override - public boolean configure(String name, Map xmlParams) throws ConfigurationException { - _peers = new HashMap(7); - _sslEngines = new HashMap(7); - _nodeId = _clusterMgr.getManagementNodeId(); - - s_logger.info("Configuring ClusterAgentManagerImpl. management server node id(msid): " + _nodeId); - - ConfigurationDao configDao = ComponentLocator.getCurrentLocator().getDao(ConfigurationDao.class); - Map params = configDao.getConfiguration(xmlParams); - String value = params.get(Config.DirectAgentLoadSize.key()); - _loadSize = NumbersUtil.parseInt(value, 16); - - ClusteredAgentAttache.initialize(this); - - _clusterMgr.registerListener(this); - - return super.configure(name, xmlParams); - } - - @Override - public boolean start() { - if (!super.start()) { - return false; - } - _timer.schedule(new DirectAgentScanTimerTask(), STARTUP_DELAY, SCAN_INTERVAL); - - // schedule transfer scan executor - if agent LB is enabled - if (_clusterMgr.isAgentRebalanceEnabled()) { - s_transferExecutor.scheduleAtFixedRate(getTransferScanTask(), 60000, ClusteredAgentRebalanceService.DEFAULT_TRANSFER_CHECK_INTERVAL, - TimeUnit.MILLISECONDS); - } - - return true; - } - - private void runDirectAgentScanTimerTask() { - scanDirectAgentToLoad(); - } - - private void scanDirectAgentToLoad() { - if (s_logger.isTraceEnabled()) { - s_logger.trace("Begin scanning directly connected hosts"); - } - - // for agents that are self-managed, threshold to be considered as disconnected is 3 ping intervals - long cutSeconds = (System.currentTimeMillis() >> 10) - (_pingInterval * 3); - List hosts = _hostDao.findAndUpdateDirectAgentToLoad(cutSeconds, _loadSize, _nodeId); - List appliances = _hostDao.findAndUpdateApplianceToLoad(cutSeconds, _nodeId); - hosts.addAll(appliances); - - if (hosts != null && hosts.size() > 0) { - s_logger.debug("Found " + hosts.size() + " unmanaged direct hosts, processing connect for them..."); - for (HostVO host : hosts) { - try { - AgentAttache agentattache = findAttache(host.getId()); - if (agentattache != null) { - // already loaded, skip - if (agentattache.forForward()) { - if (s_logger.isInfoEnabled()) { - s_logger.info(host + " is detected down, but we have a forward attache running, disconnect this one before launching the host"); - } - removeAgent(agentattache, Status.Disconnected); - } else { - continue; - } - } - - if (s_logger.isDebugEnabled()) { - s_logger.debug("Loading directly connected host " + host.getId() + "(" + host.getName() + ")"); - } - loadDirectlyConnectedHost(host, false); - } catch (Throwable e) { - s_logger.warn(" can not load directly connected host " + host.getId() + "(" + host.getName() + ") due to ",e); - } - } - } - - if (s_logger.isTraceEnabled()) { - s_logger.trace("End scanning directly connected hosts"); - } - } - - private class DirectAgentScanTimerTask extends TimerTask { - @Override - public void run() { - try { - runDirectAgentScanTimerTask(); - } catch (Throwable e) { - s_logger.error("Unexpected exception " + e.getMessage(), e); - } - } - } - - @Override - public Task create(Task.Type type, Link link, byte[] data) { - return new ClusteredAgentHandler(type, link, data); - } - - protected AgentAttache createAttache(long id) { - s_logger.debug("create forwarding ClusteredAgentAttache for " + id); - final AgentAttache attache = new ClusteredAgentAttache(this, id); - AgentAttache old = null; - synchronized (_agents) { - old = _agents.get(id); - _agents.put(id, attache); - } - if (old != null) { - old.disconnect(Status.Removed); - } - return attache; - } - - @Override - protected AgentAttache createAttacheForConnect(HostVO host, Link link) { - s_logger.debug("create ClusteredAgentAttache for " + host.getId()); - final AgentAttache attache = new ClusteredAgentAttache(this, host.getId(), link, host.isInMaintenanceStates()); - link.attach(attache); - AgentAttache old = null; - synchronized (_agents) { - old = _agents.get(host.getId()); - _agents.put(host.getId(), attache); - } - if (old != null) { - old.disconnect(Status.Removed); - } - return attache; - } - - @Override - protected AgentAttache createAttacheForDirectConnect(HostVO host, ServerResource resource) { - if (resource instanceof DummySecondaryStorageResource) { - return new DummyAttache(this, host.getId(), false); - } - s_logger.debug("create ClusteredDirectAgentAttache for " + host.getId()); - final DirectAgentAttache attache = new ClusteredDirectAgentAttache(this, host.getId(), _nodeId, resource, host.isInMaintenanceStates(), this); - AgentAttache old = null; - synchronized (_agents) { - old = _agents.get(host.getId()); - _agents.put(host.getId(), attache); - } - if (old != null) { - old.disconnect(Status.Removed); - } - return attache; - } - - @Override - protected boolean handleDisconnectWithoutInvestigation(AgentAttache attache, Status.Event event) { - return handleDisconnect(attache, event, false, true); - } - - @Override - protected boolean handleDisconnectWithInvestigation(AgentAttache attache, Status.Event event) { - return handleDisconnect(attache, event, true, true); - } - - protected boolean handleDisconnect(AgentAttache agent, Status.Event event, boolean investigate, boolean broadcast) { - boolean res; - if (!investigate) { - res = super.handleDisconnectWithoutInvestigation(agent, event); - } else { - res = super.handleDisconnectWithInvestigation(agent, event); - } - - if (res) { - if (broadcast) { - notifyNodesInCluster(agent); - } - return true; - } else { - return false; - } - } - - @Override - public boolean executeUserRequest(long hostId, Event event) throws AgentUnavailableException { - if (event == Event.AgentDisconnected) { - if (s_logger.isDebugEnabled()) { - s_logger.debug("Received agent disconnect event for host " + hostId); - } - AgentAttache attache = findAttache(hostId); - if (attache != null) { - handleDisconnect(attache, Event.AgentDisconnected, false, false); - } - - return true; - } else { - return super.executeUserRequest(hostId, event); - } - } - - @Override - public boolean reconnect(final long hostId) { - Boolean result; - try { - result = _clusterMgr.propagateAgentEvent(hostId, Event.ShutdownRequested); - if (result != null) { - return result; - } - } catch (AgentUnavailableException e) { - s_logger.debug("cannot propagate agent reconnect because agent is not available", e); - return false; - } - - return super.reconnect(hostId); - } - - public void notifyNodesInCluster(AgentAttache attache) { - s_logger.debug("Notifying other nodes of to disconnect"); - Command[] cmds = new Command[] { new ChangeAgentCommand(attache.getId(), Event.AgentDisconnected) }; - _clusterMgr.broadcast(attache.getId(), cmds); - } - - protected static void logT(byte[] bytes, final String msg) { - s_logger.trace("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": " - + (Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg); - } - - protected static void logD(byte[] bytes, final String msg) { - s_logger.debug("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": " - + (Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg); - } - - protected static void logI(byte[] bytes, final String msg) { - s_logger.info("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": " - + (Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg); - } - - public boolean routeToPeer(String peer, byte[] bytes) { - int i = 0; - SocketChannel ch = null; - SSLEngine sslEngine = null; - while (i++ < 5) { - ch = connectToPeer(peer, ch); - if (ch == null) { - try { - logD(bytes, "Unable to route to peer: " + Request.parse(bytes).toString()); - } catch (Exception e) { - } - return false; - } - sslEngine = getSSLEngine(peer); - if (sslEngine == null) { - logD(bytes, "Unable to get SSLEngine of peer: " + peer); - return false; - } - try { - if (s_logger.isDebugEnabled()) { - logD(bytes, "Routing to peer"); - } - Link.write(ch, new ByteBuffer[] { ByteBuffer.wrap(bytes) }, sslEngine); - return true; - } catch (IOException e) { - try { - logI(bytes, "Unable to route to peer: " + Request.parse(bytes).toString() + " due to " + e.getMessage()); - } catch (Exception ex) { - } - } - } - return false; - } - - public String findPeer(long hostId) { - return _clusterMgr.getPeerName(hostId); - } - - public SSLEngine getSSLEngine(String peerName) { - return _sslEngines.get(peerName); - } - - public void cancel(String peerName, long hostId, long sequence, String reason) { - CancelCommand cancel = new CancelCommand(sequence, reason); - Request req = new Request(hostId, _nodeId, cancel, true); - req.setControl(true); - routeToPeer(peerName, req.getBytes()); - } - - public void closePeer(String peerName) { - synchronized (_peers) { - SocketChannel ch = _peers.get(peerName); - if (ch != null) { - try { - ch.close(); - } catch (IOException e) { - s_logger.warn("Unable to close peer socket connection to " + peerName); - } - } - _peers.remove(peerName); - _sslEngines.remove(peerName); - } - } - - public SocketChannel connectToPeer(String peerName, SocketChannel prevCh) { - synchronized (_peers) { - SocketChannel ch = _peers.get(peerName); - SSLEngine sslEngine = null; - if (prevCh != null) { - try { - prevCh.close(); - } catch (Exception e) { - } - } - if (ch == null || ch == prevCh) { - ManagementServerHostVO ms = _clusterMgr.getPeer(peerName); - if (ms == null) { - s_logger.info("Unable to find peer: " + peerName); - return null; - } - String ip = ms.getServiceIP(); - InetAddress addr; - try { - addr = InetAddress.getByName(ip); - } catch (UnknownHostException e) { - throw new CloudRuntimeException("Unable to resolve " + ip); - } - try { - ch = SocketChannel.open(new InetSocketAddress(addr, _port)); - ch.configureBlocking(true); // make sure we are working at blocking mode - ch.socket().setKeepAlive(true); - ch.socket().setSoTimeout(60 * 1000); +package com.cloud.agent.manager; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import javax.ejb.Local; +import javax.naming.ConfigurationException; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; + +import org.apache.log4j.Logger; + +import com.cloud.agent.AgentManager; +import com.cloud.agent.api.Answer; +import com.cloud.agent.api.CancelCommand; +import com.cloud.agent.api.ChangeAgentCommand; +import com.cloud.agent.api.Command; +import com.cloud.agent.api.TransferAgentCommand; +import com.cloud.agent.transport.Request; +import com.cloud.agent.transport.Request.Version; +import com.cloud.agent.transport.Response; +import com.cloud.cluster.ClusterManager; +import com.cloud.cluster.ClusterManagerListener; +import com.cloud.cluster.ClusteredAgentRebalanceService; +import com.cloud.cluster.ManagementServerHost; +import com.cloud.cluster.ManagementServerHostVO; +import com.cloud.cluster.StackMaid; +import com.cloud.cluster.agentlb.AgentLoadBalancerPlanner; +import com.cloud.cluster.agentlb.HostTransferMapVO; +import com.cloud.cluster.agentlb.HostTransferMapVO.HostTransferState; +import com.cloud.cluster.agentlb.dao.HostTransferMapDao; +import com.cloud.cluster.dao.ManagementServerHostDao; +import com.cloud.configuration.Config; +import com.cloud.configuration.dao.ConfigurationDao; +import com.cloud.exception.AgentUnavailableException; +import com.cloud.exception.OperationTimedoutException; +import com.cloud.host.Host; +import com.cloud.host.HostVO; +import com.cloud.host.Status; +import com.cloud.host.Status.Event; +import com.cloud.resource.ServerResource; +import com.cloud.storage.resource.DummySecondaryStorageResource; +import com.cloud.utils.DateUtil; +import com.cloud.utils.NumbersUtil; +import com.cloud.utils.component.Adapters; +import com.cloud.utils.component.ComponentLocator; +import com.cloud.utils.component.Inject; +import com.cloud.utils.concurrency.NamedThreadFactory; +import com.cloud.utils.db.SearchCriteria.Op; +import com.cloud.utils.db.SearchCriteria2; +import com.cloud.utils.db.SearchCriteriaService; +import com.cloud.utils.db.Transaction; +import com.cloud.utils.exception.CloudRuntimeException; +import com.cloud.utils.nio.Link; +import com.cloud.utils.nio.Task; + +@Local(value = { AgentManager.class, ClusteredAgentRebalanceService.class }) +public class ClusteredAgentManagerImpl extends AgentManagerImpl implements ClusterManagerListener, ClusteredAgentRebalanceService { + final static Logger s_logger = Logger.getLogger(ClusteredAgentManagerImpl.class); + private static final ScheduledExecutorService s_transferExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Cluster-AgentTransferExecutor")); + private final long rebalanceTimeOut = 300000; // 5 mins - after this time remove the agent from the transfer list + + public final static long STARTUP_DELAY = 5000; + public final static long SCAN_INTERVAL = 90000; // 90 seconds, it takes 60 sec for xenserver to fail login + public final static int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 5; // 5 seconds + public long _loadSize = 100; + protected Set _agentToTransferIds = new HashSet(); + + @Inject + protected ClusterManager _clusterMgr = null; + + protected HashMap _peers; + protected HashMap _sslEngines; + private final Timer _timer = new Timer("ClusteredAgentManager Timer"); + + @Inject + protected ManagementServerHostDao _mshostDao; + @Inject + protected HostTransferMapDao _hostTransferDao; + + @Inject(adapter = AgentLoadBalancerPlanner.class) + protected Adapters _lbPlanners; + + @Inject + protected AgentManager _agentMgr; + + protected ClusteredAgentManagerImpl() { + super(); + } + + @Override + public boolean configure(String name, Map xmlParams) throws ConfigurationException { + _peers = new HashMap(7); + _sslEngines = new HashMap(7); + _nodeId = _clusterMgr.getManagementNodeId(); + + s_logger.info("Configuring ClusterAgentManagerImpl. management server node id(msid): " + _nodeId); + + ConfigurationDao configDao = ComponentLocator.getCurrentLocator().getDao(ConfigurationDao.class); + Map params = configDao.getConfiguration(xmlParams); + String value = params.get(Config.DirectAgentLoadSize.key()); + _loadSize = NumbersUtil.parseInt(value, 16); + + ClusteredAgentAttache.initialize(this); + + _clusterMgr.registerListener(this); + + return super.configure(name, xmlParams); + } + + @Override + public boolean start() { + if (!super.start()) { + return false; + } + _timer.schedule(new DirectAgentScanTimerTask(), STARTUP_DELAY, SCAN_INTERVAL); + + // schedule transfer scan executor - if agent LB is enabled + if (_clusterMgr.isAgentRebalanceEnabled()) { + s_transferExecutor.scheduleAtFixedRate(getTransferScanTask(), 60000, ClusteredAgentRebalanceService.DEFAULT_TRANSFER_CHECK_INTERVAL, + TimeUnit.MILLISECONDS); + } + + return true; + } + + private void runDirectAgentScanTimerTask() { + scanDirectAgentToLoad(); + } + + private void scanDirectAgentToLoad() { + if (s_logger.isTraceEnabled()) { + s_logger.trace("Begin scanning directly connected hosts"); + } + + // for agents that are self-managed, threshold to be considered as disconnected is 3 ping intervals + long cutSeconds = (System.currentTimeMillis() >> 10) - (_pingInterval * 3); + List hosts = _hostDao.findAndUpdateDirectAgentToLoad(cutSeconds, _loadSize, _nodeId); + List appliances = _hostDao.findAndUpdateApplianceToLoad(cutSeconds, _nodeId); + hosts.addAll(appliances); + + if (hosts != null && hosts.size() > 0) { + s_logger.debug("Found " + hosts.size() + " unmanaged direct hosts, processing connect for them..."); + for (HostVO host : hosts) { + try { + AgentAttache agentattache = findAttache(host.getId()); + if (agentattache != null) { + // already loaded, skip + if (agentattache.forForward()) { + if (s_logger.isInfoEnabled()) { + s_logger.info(host + " is detected down, but we have a forward attache running, disconnect this one before launching the host"); + } + removeAgent(agentattache, Status.Disconnected); + } else { + continue; + } + } + + if (s_logger.isDebugEnabled()) { + s_logger.debug("Loading directly connected host " + host.getId() + "(" + host.getName() + ")"); + } + loadDirectlyConnectedHost(host, false); + } catch (Throwable e) { + s_logger.warn(" can not load directly connected host " + host.getId() + "(" + host.getName() + ") due to ",e); + } + } + } + + if (s_logger.isTraceEnabled()) { + s_logger.trace("End scanning directly connected hosts"); + } + } + + private class DirectAgentScanTimerTask extends TimerTask { + @Override + public void run() { + try { + runDirectAgentScanTimerTask(); + } catch (Throwable e) { + s_logger.error("Unexpected exception " + e.getMessage(), e); + } + } + } + + @Override + public Task create(Task.Type type, Link link, byte[] data) { + return new ClusteredAgentHandler(type, link, data); + } + + protected AgentAttache createAttache(long id) { + s_logger.debug("create forwarding ClusteredAgentAttache for " + id); + final AgentAttache attache = new ClusteredAgentAttache(this, id); + AgentAttache old = null; + synchronized (_agents) { + old = _agents.get(id); + _agents.put(id, attache); + } + if (old != null) { + old.disconnect(Status.Removed); + } + return attache; + } + + @Override + protected AgentAttache createAttacheForConnect(HostVO host, Link link) { + s_logger.debug("create ClusteredAgentAttache for " + host.getId()); + final AgentAttache attache = new ClusteredAgentAttache(this, host.getId(), link, host.isInMaintenanceStates()); + link.attach(attache); + AgentAttache old = null; + synchronized (_agents) { + old = _agents.get(host.getId()); + _agents.put(host.getId(), attache); + } + if (old != null) { + old.disconnect(Status.Removed); + } + return attache; + } + + @Override + protected AgentAttache createAttacheForDirectConnect(HostVO host, ServerResource resource) { + if (resource instanceof DummySecondaryStorageResource) { + return new DummyAttache(this, host.getId(), false); + } + s_logger.debug("create ClusteredDirectAgentAttache for " + host.getId()); + final DirectAgentAttache attache = new ClusteredDirectAgentAttache(this, host.getId(), _nodeId, resource, host.isInMaintenanceStates(), this); + AgentAttache old = null; + synchronized (_agents) { + old = _agents.get(host.getId()); + _agents.put(host.getId(), attache); + } + if (old != null) { + old.disconnect(Status.Removed); + } + return attache; + } + + @Override + protected boolean handleDisconnectWithoutInvestigation(AgentAttache attache, Status.Event event) { + return handleDisconnect(attache, event, false, true); + } + + @Override + protected boolean handleDisconnectWithInvestigation(AgentAttache attache, Status.Event event) { + return handleDisconnect(attache, event, true, true); + } + + protected boolean handleDisconnect(AgentAttache agent, Status.Event event, boolean investigate, boolean broadcast) { + boolean res; + if (!investigate) { + res = super.handleDisconnectWithoutInvestigation(agent, event); + } else { + res = super.handleDisconnectWithInvestigation(agent, event); + } + + if (res) { + if (broadcast) { + notifyNodesInCluster(agent); + } + return true; + } else { + return false; + } + } + + @Override + public boolean executeUserRequest(long hostId, Event event) throws AgentUnavailableException { + if (event == Event.AgentDisconnected) { + if (s_logger.isDebugEnabled()) { + s_logger.debug("Received agent disconnect event for host " + hostId); + } + AgentAttache attache = findAttache(hostId); + if (attache != null) { + handleDisconnect(attache, Event.AgentDisconnected, false, false); + } + + return true; + } else { + return super.executeUserRequest(hostId, event); + } + } + + @Override + public boolean reconnect(final long hostId) { + Boolean result; + try { + result = _clusterMgr.propagateAgentEvent(hostId, Event.ShutdownRequested); + if (result != null) { + return result; + } + } catch (AgentUnavailableException e) { + s_logger.debug("cannot propagate agent reconnect because agent is not available", e); + return false; + } + + return super.reconnect(hostId); + } + + public void notifyNodesInCluster(AgentAttache attache) { + s_logger.debug("Notifying other nodes of to disconnect"); + Command[] cmds = new Command[] { new ChangeAgentCommand(attache.getId(), Event.AgentDisconnected) }; + _clusterMgr.broadcast(attache.getId(), cmds); + } + + protected static void logT(byte[] bytes, final String msg) { + s_logger.trace("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": " + + (Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg); + } + + protected static void logD(byte[] bytes, final String msg) { + s_logger.debug("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": " + + (Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg); + } + + protected static void logI(byte[] bytes, final String msg) { + s_logger.info("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": " + + (Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg); + } + + public boolean routeToPeer(String peer, byte[] bytes) { + int i = 0; + SocketChannel ch = null; + SSLEngine sslEngine = null; + while (i++ < 5) { + ch = connectToPeer(peer, ch); + if (ch == null) { + try { + logD(bytes, "Unable to route to peer: " + Request.parse(bytes).toString()); + } catch (Exception e) { + } + return false; + } + sslEngine = getSSLEngine(peer); + if (sslEngine == null) { + logD(bytes, "Unable to get SSLEngine of peer: " + peer); + return false; + } + try { + if (s_logger.isDebugEnabled()) { + logD(bytes, "Routing to peer"); + } + Link.write(ch, new ByteBuffer[] { ByteBuffer.wrap(bytes) }, sslEngine); + return true; + } catch (IOException e) { + try { + logI(bytes, "Unable to route to peer: " + Request.parse(bytes).toString() + " due to " + e.getMessage()); + } catch (Exception ex) { + } + } + } + return false; + } + + public String findPeer(long hostId) { + return _clusterMgr.getPeerName(hostId); + } + + public SSLEngine getSSLEngine(String peerName) { + return _sslEngines.get(peerName); + } + + public void cancel(String peerName, long hostId, long sequence, String reason) { + CancelCommand cancel = new CancelCommand(sequence, reason); + Request req = new Request(hostId, _nodeId, cancel, true); + req.setControl(true); + routeToPeer(peerName, req.getBytes()); + } + + public void closePeer(String peerName) { + synchronized (_peers) { + SocketChannel ch = _peers.get(peerName); + if (ch != null) { + try { + ch.close(); + } catch (IOException e) { + s_logger.warn("Unable to close peer socket connection to " + peerName); + } + } + _peers.remove(peerName); + _sslEngines.remove(peerName); + } + } + + public SocketChannel connectToPeer(String peerName, SocketChannel prevCh) { + synchronized (_peers) { + SocketChannel ch = _peers.get(peerName); + SSLEngine sslEngine = null; + if (prevCh != null) { + try { + prevCh.close(); + } catch (Exception e) { + } + } + if (ch == null || ch == prevCh) { + ManagementServerHostVO ms = _clusterMgr.getPeer(peerName); + if (ms == null) { + s_logger.info("Unable to find peer: " + peerName); + return null; + } + String ip = ms.getServiceIP(); + InetAddress addr; + try { + addr = InetAddress.getByName(ip); + } catch (UnknownHostException e) { + throw new CloudRuntimeException("Unable to resolve " + ip); + } + try { + ch = SocketChannel.open(new InetSocketAddress(addr, _port)); + ch.configureBlocking(true); // make sure we are working at blocking mode + ch.socket().setKeepAlive(true); + ch.socket().setSoTimeout(60 * 1000); try { SSLContext sslContext = Link.initSSLContext(true); sslEngine = sslContext.createSSLEngine(ip, _port); @@ -449,633 +449,633 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } catch (Exception e) { throw new IOException("SSL: Fail to init SSL! " + e); } - if (s_logger.isDebugEnabled()) { - s_logger.debug("Connection to peer opened: " + peerName + ", ip: " + ip); - } - _peers.put(peerName, ch); - _sslEngines.put(peerName, sslEngine); - } catch (IOException e) { - s_logger.warn("Unable to connect to peer management server: " + peerName + ", ip: " + ip + " due to " + e.getMessage(), e); - return null; - } - } - - if (s_logger.isTraceEnabled()) { - s_logger.trace("Found open channel for peer: " + peerName); - } - return ch; - } - } - - public SocketChannel connectToPeer(long hostId, SocketChannel prevCh) { - String peerName = _clusterMgr.getPeerName(hostId); - if (peerName == null) { - return null; - } - - return connectToPeer(peerName, prevCh); - } - - @Override - protected AgentAttache getAttache(final Long hostId) throws AgentUnavailableException { - assert (hostId != null) : "Who didn't check their id value?"; - HostVO host = _hostDao.findById(hostId); - if (host == null) { - throw new AgentUnavailableException("Can't find the host ", hostId); - } - - AgentAttache agent = findAttache(hostId); - if (agent == null) { - if (host.getStatus() == Status.Up && (host.getManagementServerId() != null && host.getManagementServerId() != _nodeId)) { - agent = createAttache(hostId); - } - } - if (agent == null) { - AgentUnavailableException ex = new AgentUnavailableException("Host with specified id is not in the right state: " + host.getStatus(), hostId); - ex.addProxyObject(host, hostId, "hostId"); - throw ex; - } - - return agent; - } - - @Override - public boolean stop() { - if (_peers != null) { - for (SocketChannel ch : _peers.values()) { - try { - s_logger.info("Closing: " + ch.toString()); - ch.close(); - } catch (IOException e) { - } - } - } - _timer.cancel(); - - //cancel all transfer tasks - s_transferExecutor.shutdownNow(); - cleanupTransferMap(_nodeId); - - return super.stop(); - } - - @Override - public void startDirectlyConnectedHosts() { - // override and let it be dummy for purpose, we will scan and load direct agents periodically. - // We may also pickup agents that have been left over from other crashed management server - } - - public class ClusteredAgentHandler extends AgentHandler { - - public ClusteredAgentHandler(Task.Type type, Link link, byte[] data) { - super(type, link, data); - } - - @Override - protected void doTask(final Task task) throws Exception { - Transaction txn = Transaction.open(Transaction.CLOUD_DB); - try { - if (task.getType() != Task.Type.DATA) { - super.doTask(task); - return; - } - - final byte[] data = task.getData(); - Version ver = Request.getVersion(data); - if (ver.ordinal() != Version.v1.ordinal() && ver.ordinal() != Version.v3.ordinal()) { - s_logger.warn("Wrong version for clustered agent request"); - super.doTask(task); - return; - } - - long hostId = Request.getAgentId(data); - Link link = task.getLink(); - - if (Request.fromServer(data)) { - - AgentAttache agent = findAttache(hostId); - - if (Request.isControl(data)) { - if (agent == null) { - logD(data, "No attache to process cancellation"); - return; - } - Request req = Request.parse(data); - Command[] cmds = req.getCommands(); - CancelCommand cancel = (CancelCommand) cmds[0]; - if (s_logger.isDebugEnabled()) { - logD(data, "Cancel request received"); - } - agent.cancel(cancel.getSequence()); - return; - } - - try { - if (agent == null || agent.isClosed()) { - throw new AgentUnavailableException("Unable to route to agent ", hostId); - } - - if (Request.isRequest(data) && Request.requiresSequentialExecution(data)) { - // route it to the agent. - // But we have the serialize the control commands here so we have - // to deserialize this and send it through the agent attache. - Request req = Request.parse(data); - agent.send(req, null); - return; - } else { - if (agent instanceof Routable) { - Routable cluster = (Routable) agent; - cluster.routeToAgent(data); - } else { - agent.send(Request.parse(data)); - } - return; - } - } catch (AgentUnavailableException e) { - logD(data, e.getMessage()); - cancel(Long.toString(Request.getManagementServerId(data)), hostId, Request.getSequence(data), e.getMessage()); - } - } else { - - long mgmtId = Request.getManagementServerId(data); - if (mgmtId != -1 && mgmtId != _nodeId) { - routeToPeer(Long.toString(mgmtId), data); - if (Request.requiresSequentialExecution(data)) { - AgentAttache attache = (AgentAttache) link.attachment(); - if (attache != null) { - attache.sendNext(Request.getSequence(data)); - } else if (s_logger.isDebugEnabled()) { - logD(data, "No attache to process " + Request.parse(data).toString()); - } - } - return; - } else { - if (Request.isRequest(data)) { - super.doTask(task); - } else { - // received an answer. - final Response response = Response.parse(data); - AgentAttache attache = findAttache(response.getAgentId()); - if (attache == null) { - s_logger.info("SeqA " + response.getAgentId() + "-" + response.getSequence() + "Unable to find attache to forward " + response.toString()); - return; - } - if (!attache.processAnswers(response.getSequence(), response)) { - s_logger.info("SeqA " + attache.getId() + "-" + response.getSequence() + ": Response is not processed: " + response.toString()); - } - } - return; - } - } - } finally { - txn.close(); - } - } - } - - @Override - public void onManagementNodeJoined(List nodeList, long selfNodeId) { - } - - @Override - public void onManagementNodeLeft(List nodeList, long selfNodeId) { - for (ManagementServerHostVO vo : nodeList) { - s_logger.info("Marking hosts as disconnected on Management server" + vo.getMsid()); - long lastPing = (System.currentTimeMillis() >> 10) - _pingTimeout; - _hostDao.markHostsAsDisconnected(vo.getMsid(), lastPing); - s_logger.info("Deleting entries from op_host_transfer table for Management server " + vo.getMsid()); - cleanupTransferMap(vo.getMsid()); - } - } - - @Override - public void onManagementNodeIsolated() { - } - - @Override - public void removeAgent(AgentAttache attache, Status nextState) { - if (attache == null) { - return; - } - - super.removeAgent(attache, nextState); - } - - @Override - public boolean executeRebalanceRequest(long agentId, long currentOwnerId, long futureOwnerId, Event event) throws AgentUnavailableException, OperationTimedoutException { - boolean result = false; - if (event == Event.RequestAgentRebalance) { - return setToWaitForRebalance(agentId, currentOwnerId, futureOwnerId); - } else if (event == Event.StartAgentRebalance) { - try { - result = rebalanceHost(agentId, currentOwnerId, futureOwnerId); - } catch (Exception e) { - s_logger.warn("Unable to rebalance host id=" + agentId, e); - } - } - return result; - } - - @Override - public void scheduleRebalanceAgents() { - _timer.schedule(new AgentLoadBalancerTask(), 30000); - } - - public class AgentLoadBalancerTask extends TimerTask { - protected volatile boolean cancelled = false; - - public AgentLoadBalancerTask() { - s_logger.debug("Agent load balancer task created"); - } - - @Override - public synchronized boolean cancel() { - if (!cancelled) { - cancelled = true; - s_logger.debug("Agent load balancer task cancelled"); - return super.cancel(); - } - return true; - } - - @Override - public synchronized void run() { - try { - if (!cancelled) { - startRebalanceAgents(); - if (s_logger.isInfoEnabled()) { - s_logger.info("The agent load balancer task is now being cancelled"); - } - cancelled = true; - } - } catch(Throwable e) { - s_logger.error("Unexpected exception " + e.toString(), e); - } - } - } - - public void startRebalanceAgents() { - s_logger.debug("Management server " + _nodeId + " is asking other peers to rebalance their agents"); - List allMS = _mshostDao.listBy(ManagementServerHost.State.Up); - SearchCriteriaService sc = SearchCriteria2.create(HostVO.class); - sc.addAnd(sc.getEntity().getManagementServerId(), Op.NNULL); - sc.addAnd(sc.getEntity().getType(), Op.EQ, Host.Type.Routing); - List allManagedAgents = sc.list(); - - int avLoad = 0; - - if (!allManagedAgents.isEmpty() && !allMS.isEmpty()) { - avLoad = allManagedAgents.size() / allMS.size(); - } else { - if (s_logger.isDebugEnabled()) { - s_logger.debug("There are no hosts to rebalance in the system. Current number of active management server nodes in the system is " + allMS.size() + "; number of managed agents is " + allManagedAgents.size()); - } - return; - } - - if (avLoad == 0L) { - if (s_logger.isDebugEnabled()) { - s_logger.debug("As calculated average load is less than 1, rounding it to 1"); - } - avLoad = 1; - } - - for (ManagementServerHostVO node : allMS) { - if (node.getMsid() != _nodeId) { - - List hostsToRebalance = new ArrayList(); - for (AgentLoadBalancerPlanner lbPlanner : _lbPlanners) { - hostsToRebalance = lbPlanner.getHostsToRebalance(node.getMsid(), avLoad); - if (hostsToRebalance != null && !hostsToRebalance.isEmpty()) { - break; - } else { - s_logger.debug("Agent load balancer planner " + lbPlanner.getName() + " found no hosts to be rebalanced from management server " + node.getMsid()); - } - } - - - if (hostsToRebalance != null && !hostsToRebalance.isEmpty()) { - s_logger.debug("Found " + hostsToRebalance.size() + " hosts to rebalance from management server " + node.getMsid()); - for (HostVO host : hostsToRebalance) { - long hostId = host.getId(); - s_logger.debug("Asking management server " + node.getMsid() + " to give away host id=" + hostId); - boolean result = true; - - if (_hostTransferDao.findById(hostId) != null) { - s_logger.warn("Somebody else is already rebalancing host id: " + hostId); - continue; - } - - HostTransferMapVO transfer = null; - try { - transfer = _hostTransferDao.startAgentTransfering(hostId, node.getMsid(), _nodeId); - Answer[] answer = sendRebalanceCommand(node.getMsid(), hostId, node.getMsid(), _nodeId, Event.RequestAgentRebalance); - if (answer == null) { - s_logger.warn("Failed to get host id=" + hostId + " from management server " + node.getMsid()); - result = false; - } - } catch (Exception ex) { - s_logger.warn("Failed to get host id=" + hostId + " from management server " + node.getMsid(), ex); - result = false; - } finally { - if (transfer != null) { - HostTransferMapVO transferState = _hostTransferDao.findByIdAndFutureOwnerId(transfer.getId(), _nodeId); - if (!result && transferState != null && transferState.getState() == HostTransferState.TransferRequested) { - if (s_logger.isDebugEnabled()) { - s_logger.debug("Removing mapping from op_host_transfer as it failed to be set to transfer mode"); - } - //just remove the mapping (if exists) as nothing was done on the peer management server yet - _hostTransferDao.remove(transfer.getId()); - } - } - } - } - } else { - s_logger.debug("Found no hosts to rebalance from the management server " + node.getMsid()); - } - } - } - } - - private Answer[] sendRebalanceCommand(long peer, long agentId, long currentOwnerId, long futureOwnerId, Event event) { - TransferAgentCommand transfer = new TransferAgentCommand(agentId, currentOwnerId, futureOwnerId, event); - Commands commands = new Commands(OnError.Stop); - commands.addCommand(transfer); - - Command[] cmds = commands.toCommands(); - - try { - if (s_logger.isDebugEnabled()) { - s_logger.debug("Forwarding " + cmds[0].toString() + " to " + peer); - } - String peerName = Long.toString(peer); - Answer[] answers = _clusterMgr.execute(peerName, agentId, cmds, true); - return answers; - } catch (Exception e) { - s_logger.warn("Caught exception while talking to " + currentOwnerId, e); - return null; - } - } - - private Runnable getTransferScanTask() { - return new Runnable() { - @Override - public void run() { - try { - if (s_logger.isTraceEnabled()) { - s_logger.trace("Clustered agent transfer scan check, management server id:" + _nodeId); - } - synchronized (_agentToTransferIds) { - if (_agentToTransferIds.size() > 0) { - s_logger.debug("Found " + _agentToTransferIds.size() + " agents to transfer"); - //for (Long hostId : _agentToTransferIds) { - for (Iterator iterator = _agentToTransferIds.iterator(); iterator.hasNext();) { - Long hostId = iterator.next(); - AgentAttache attache = findAttache(hostId); - - // if the thread: - // 1) timed out waiting for the host to reconnect - // 2) recipient management server is not active any more - // 3) if the management server doesn't own the host any more - // remove the host from re-balance list and delete from op_host_transfer DB - // no need to do anything with the real attache as we haven't modified it yet - Date cutTime = DateUtil.currentGMTTime(); - HostTransferMapVO transferMap = _hostTransferDao.findActiveHostTransferMapByHostId(hostId, new Date(cutTime.getTime() - rebalanceTimeOut)); - - if (transferMap == null) { - s_logger.debug("Timed out waiting for the host id=" + hostId + " to be ready to transfer, skipping rebalance for the host"); - iterator.remove(); - _hostTransferDao.completeAgentTransfer(hostId); - continue; - } - - if (transferMap.getInitialOwner() != _nodeId || attache.forForward()) { - s_logger.debug("Management server " + _nodeId + " doesn't own host id=" + hostId + " any more, skipping rebalance for the host"); - iterator.remove(); - _hostTransferDao.completeAgentTransfer(hostId); - continue; - } - - ManagementServerHostVO ms = _mshostDao.findByMsid(transferMap.getFutureOwner()); - if (ms != null && ms.getState() != ManagementServerHost.State.Up) { - s_logger.debug("Can't transfer host " + hostId + " as it's future owner is not in UP state: " + ms + ", skipping rebalance for the host"); - iterator.remove(); - _hostTransferDao.completeAgentTransfer(hostId); - continue; - } - - if (attache.getQueueSize() == 0 && attache.getNonRecurringListenersSize() == 0) { - iterator.remove(); - try { - _executor.execute(new RebalanceTask(hostId, transferMap.getInitialOwner(), transferMap.getFutureOwner())); - } catch (RejectedExecutionException ex) { - s_logger.warn("Failed to submit rebalance task for host id=" + hostId + "; postponing the execution"); - continue; - } - - } else { - s_logger.debug("Agent " + hostId + " can't be transfered yet as its request queue size is " + attache.getQueueSize() + " and listener queue size is " + attache.getNonRecurringListenersSize()); - } - } - } else { - if (s_logger.isTraceEnabled()) { - s_logger.trace("Found no agents to be transfered by the management server " + _nodeId); - } - } - } - - } catch (Throwable e) { - s_logger.error("Problem with the clustered agent transfer scan check!", e); - } - } - }; - } - - - private boolean setToWaitForRebalance(final long hostId, long currentOwnerId, long futureOwnerId) { - s_logger.debug("Adding agent " + hostId + " to the list of agents to transfer"); - synchronized (_agentToTransferIds) { - return _agentToTransferIds.add(hostId); - } - } - - - protected boolean rebalanceHost(final long hostId, long currentOwnerId, long futureOwnerId) throws AgentUnavailableException{ - - boolean result = true; - if (currentOwnerId == _nodeId) { - if (!startRebalance(hostId)) { - s_logger.debug("Failed to start agent rebalancing"); - finishRebalance(hostId, futureOwnerId, Event.RebalanceFailed); - return false; - } - try { - Answer[] answer = sendRebalanceCommand(futureOwnerId, hostId, currentOwnerId, futureOwnerId, Event.StartAgentRebalance); - if (answer == null || !answer[0].getResult()) { - result = false; - } + if (s_logger.isDebugEnabled()) { + s_logger.debug("Connection to peer opened: " + peerName + ", ip: " + ip); + } + _peers.put(peerName, ch); + _sslEngines.put(peerName, sslEngine); + } catch (IOException e) { + s_logger.warn("Unable to connect to peer management server: " + peerName + ", ip: " + ip + " due to " + e.getMessage(), e); + return null; + } + } - } catch (Exception ex) { - s_logger.warn("Host " + hostId + " failed to connect to the management server " + futureOwnerId + " as a part of rebalance process", ex); - result = false; - } - - if (result) { - s_logger.debug("Successfully transfered host id=" + hostId + " to management server " + futureOwnerId); - finishRebalance(hostId, futureOwnerId, Event.RebalanceCompleted); - } else { - s_logger.warn("Failed to transfer host id=" + hostId + " to management server " + futureOwnerId); - finishRebalance(hostId, futureOwnerId, Event.RebalanceFailed); - } - - } else if (futureOwnerId == _nodeId) { - HostVO host = _hostDao.findById(hostId); - try { - if (s_logger.isDebugEnabled()) { - s_logger.debug("Loading directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId + " as a part of rebalance process"); - } - result = loadDirectlyConnectedHost(host, true); - } catch (Exception ex) { - s_logger.warn("Failed to load directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId + " as a part of rebalance process due to:", ex); - result = false; - } - - if (result) { - s_logger.debug("Successfully loaded directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId + " as a part of rebalance process"); - } else { - s_logger.warn("Failed to load directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId + " as a part of rebalance process"); - } - } - - return result; - } - - - protected void finishRebalance(final long hostId, long futureOwnerId, Event event){ - - boolean success = (event == Event.RebalanceCompleted) ? true : false; - if (s_logger.isDebugEnabled()) { - s_logger.debug("Finishing rebalancing for the agent " + hostId + " with event " + event); - } - - AgentAttache attache = findAttache(hostId); - if (attache == null || !(attache instanceof ClusteredAgentAttache)) { - s_logger.debug("Unable to find forward attache for the host id=" + hostId + ", assuming that the agent disconnected already"); - _hostTransferDao.completeAgentTransfer(hostId); - return; - } - - ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache)attache; - - if (success) { - - //1) Set transfer mode to false - so the agent can start processing requests normally - forwardAttache.setTransferMode(false); - - //2) Get all transfer requests and route them to peer - Request requestToTransfer = forwardAttache.getRequestToTransfer(); - while (requestToTransfer != null) { - s_logger.debug("Forwarding request " + requestToTransfer.getSequence() + " held in transfer attache " + hostId + " from the management server " + _nodeId + " to " + futureOwnerId); - boolean routeResult = routeToPeer(Long.toString(futureOwnerId), requestToTransfer.getBytes()); - if (!routeResult) { - logD(requestToTransfer.getBytes(), "Failed to route request to peer"); - } - - requestToTransfer = forwardAttache.getRequestToTransfer(); - } - - s_logger.debug("Management server " + _nodeId + " completed agent " + hostId + " rebalance to " + futureOwnerId); - - } else { - failRebalance(hostId); - } - - s_logger.debug("Management server " + _nodeId + " completed agent " + hostId + " rebalance"); - _hostTransferDao.completeAgentTransfer(hostId); - } - - protected void failRebalance(final long hostId){ - try { - s_logger.debug("Management server " + _nodeId + " failed to rebalance agent " + hostId); - _hostTransferDao.completeAgentTransfer(hostId); - handleDisconnectWithoutInvestigation(findAttache(hostId), Event.RebalanceFailed); - } catch (Exception ex) { - s_logger.warn("Failed to reconnect host id=" + hostId + " as a part of failed rebalance task cleanup"); - } - } - - protected boolean startRebalance(final long hostId) { - HostVO host = _hostDao.findById(hostId); - - if (host == null || host.getRemoved() != null) { - s_logger.warn("Unable to find host record, fail start rebalancing process"); - return false; - } - - synchronized (_agents) { - ClusteredDirectAgentAttache attache = (ClusteredDirectAgentAttache)_agents.get(hostId); - if (attache != null && attache.getQueueSize() == 0 && attache.getNonRecurringListenersSize() == 0) { - handleDisconnectWithoutInvestigation(attache, Event.StartAgentRebalance); - ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache)createAttache(hostId); - if (forwardAttache == null) { - s_logger.warn("Unable to create a forward attache for the host " + hostId + " as a part of rebalance process"); - return false; - } - s_logger.debug("Putting agent id=" + hostId + " to transfer mode"); - forwardAttache.setTransferMode(true); - _agents.put(hostId, forwardAttache); - } else { - if (attache == null) { - s_logger.warn("Attache for the agent " + hostId + " no longer exists on management server " + _nodeId + ", can't start host rebalancing"); - } else { - s_logger.warn("Attache for the agent " + hostId + " has request queue size= " + attache.getQueueSize() + " and listener queue size " + attache.getNonRecurringListenersSize() + ", can't start host rebalancing"); - } - return false; - } + if (s_logger.isTraceEnabled()) { + s_logger.trace("Found open channel for peer: " + peerName); + } + return ch; } - _hostTransferDao.startAgentTransfer(hostId); - return true; - } - - protected void cleanupTransferMap(long msId) { - List hostsJoingingCluster = _hostTransferDao.listHostsJoiningCluster(msId); - - for (HostTransferMapVO hostJoingingCluster : hostsJoingingCluster) { - _hostTransferDao.remove(hostJoingingCluster.getId()); - } - - List hostsLeavingCluster = _hostTransferDao.listHostsLeavingCluster(msId); - for (HostTransferMapVO hostLeavingCluster : hostsLeavingCluster) { - _hostTransferDao.remove(hostLeavingCluster.getId()); - } - } - - - protected class RebalanceTask implements Runnable { - Long hostId = null; - Long currentOwnerId = null; - Long futureOwnerId = null; - - - public RebalanceTask(long hostId, long currentOwnerId, long futureOwnerId) { - this.hostId = hostId; - this.currentOwnerId = currentOwnerId; - this.futureOwnerId = futureOwnerId; - } - - @Override - public void run() { - try { - if (s_logger.isDebugEnabled()) { - s_logger.debug("Rebalancing host id=" + hostId); - } - rebalanceHost(hostId, currentOwnerId, futureOwnerId); - } catch (Exception e) { - s_logger.warn("Unable to rebalance host id=" + hostId, e); - } finally { - StackMaid.current().exitCleanup(); - } - } - } - -} + } + + public SocketChannel connectToPeer(long hostId, SocketChannel prevCh) { + String peerName = _clusterMgr.getPeerName(hostId); + if (peerName == null) { + return null; + } + + return connectToPeer(peerName, prevCh); + } + + @Override + protected AgentAttache getAttache(final Long hostId) throws AgentUnavailableException { + assert (hostId != null) : "Who didn't check their id value?"; + HostVO host = _hostDao.findById(hostId); + if (host == null) { + throw new AgentUnavailableException("Can't find the host ", hostId); + } + + AgentAttache agent = findAttache(hostId); + if (agent == null) { + if (host.getStatus() == Status.Up && (host.getManagementServerId() != null && host.getManagementServerId() != _nodeId)) { + agent = createAttache(hostId); + } + } + if (agent == null) { + AgentUnavailableException ex = new AgentUnavailableException("Host with specified id is not in the right state: " + host.getStatus(), hostId); + ex.addProxyObject(host, hostId, "hostId"); + throw ex; + } + + return agent; + } + + @Override + public boolean stop() { + if (_peers != null) { + for (SocketChannel ch : _peers.values()) { + try { + s_logger.info("Closing: " + ch.toString()); + ch.close(); + } catch (IOException e) { + } + } + } + _timer.cancel(); + + //cancel all transfer tasks + s_transferExecutor.shutdownNow(); + cleanupTransferMap(_nodeId); + + return super.stop(); + } + + @Override + public void startDirectlyConnectedHosts() { + // override and let it be dummy for purpose, we will scan and load direct agents periodically. + // We may also pickup agents that have been left over from other crashed management server + } + + public class ClusteredAgentHandler extends AgentHandler { + + public ClusteredAgentHandler(Task.Type type, Link link, byte[] data) { + super(type, link, data); + } + + @Override + protected void doTask(final Task task) throws Exception { + Transaction txn = Transaction.open(Transaction.CLOUD_DB); + try { + if (task.getType() != Task.Type.DATA) { + super.doTask(task); + return; + } + + final byte[] data = task.getData(); + Version ver = Request.getVersion(data); + if (ver.ordinal() != Version.v1.ordinal() && ver.ordinal() != Version.v3.ordinal()) { + s_logger.warn("Wrong version for clustered agent request"); + super.doTask(task); + return; + } + + long hostId = Request.getAgentId(data); + Link link = task.getLink(); + + if (Request.fromServer(data)) { + + AgentAttache agent = findAttache(hostId); + + if (Request.isControl(data)) { + if (agent == null) { + logD(data, "No attache to process cancellation"); + return; + } + Request req = Request.parse(data); + Command[] cmds = req.getCommands(); + CancelCommand cancel = (CancelCommand) cmds[0]; + if (s_logger.isDebugEnabled()) { + logD(data, "Cancel request received"); + } + agent.cancel(cancel.getSequence()); + return; + } + + try { + if (agent == null || agent.isClosed()) { + throw new AgentUnavailableException("Unable to route to agent ", hostId); + } + + if (Request.isRequest(data) && Request.requiresSequentialExecution(data)) { + // route it to the agent. + // But we have the serialize the control commands here so we have + // to deserialize this and send it through the agent attache. + Request req = Request.parse(data); + agent.send(req, null); + return; + } else { + if (agent instanceof Routable) { + Routable cluster = (Routable) agent; + cluster.routeToAgent(data); + } else { + agent.send(Request.parse(data)); + } + return; + } + } catch (AgentUnavailableException e) { + logD(data, e.getMessage()); + cancel(Long.toString(Request.getManagementServerId(data)), hostId, Request.getSequence(data), e.getMessage()); + } + } else { + + long mgmtId = Request.getManagementServerId(data); + if (mgmtId != -1 && mgmtId != _nodeId) { + routeToPeer(Long.toString(mgmtId), data); + if (Request.requiresSequentialExecution(data)) { + AgentAttache attache = (AgentAttache) link.attachment(); + if (attache != null) { + attache.sendNext(Request.getSequence(data)); + } else if (s_logger.isDebugEnabled()) { + logD(data, "No attache to process " + Request.parse(data).toString()); + } + } + return; + } else { + if (Request.isRequest(data)) { + super.doTask(task); + } else { + // received an answer. + final Response response = Response.parse(data); + AgentAttache attache = findAttache(response.getAgentId()); + if (attache == null) { + s_logger.info("SeqA " + response.getAgentId() + "-" + response.getSequence() + "Unable to find attache to forward " + response.toString()); + return; + } + if (!attache.processAnswers(response.getSequence(), response)) { + s_logger.info("SeqA " + attache.getId() + "-" + response.getSequence() + ": Response is not processed: " + response.toString()); + } + } + return; + } + } + } finally { + txn.close(); + } + } + } + + @Override + public void onManagementNodeJoined(List nodeList, long selfNodeId) { + } + + @Override + public void onManagementNodeLeft(List nodeList, long selfNodeId) { + for (ManagementServerHostVO vo : nodeList) { + s_logger.info("Marking hosts as disconnected on Management server" + vo.getMsid()); + long lastPing = (System.currentTimeMillis() >> 10) - _pingTimeout; + _hostDao.markHostsAsDisconnected(vo.getMsid(), lastPing); + s_logger.info("Deleting entries from op_host_transfer table for Management server " + vo.getMsid()); + cleanupTransferMap(vo.getMsid()); + } + } + + @Override + public void onManagementNodeIsolated() { + } + + @Override + public void removeAgent(AgentAttache attache, Status nextState) { + if (attache == null) { + return; + } + + super.removeAgent(attache, nextState); + } + + @Override + public boolean executeRebalanceRequest(long agentId, long currentOwnerId, long futureOwnerId, Event event) throws AgentUnavailableException, OperationTimedoutException { + boolean result = false; + if (event == Event.RequestAgentRebalance) { + return setToWaitForRebalance(agentId, currentOwnerId, futureOwnerId); + } else if (event == Event.StartAgentRebalance) { + try { + result = rebalanceHost(agentId, currentOwnerId, futureOwnerId); + } catch (Exception e) { + s_logger.warn("Unable to rebalance host id=" + agentId, e); + } + } + return result; + } + + @Override + public void scheduleRebalanceAgents() { + _timer.schedule(new AgentLoadBalancerTask(), 30000); + } + + public class AgentLoadBalancerTask extends TimerTask { + protected volatile boolean cancelled = false; + + public AgentLoadBalancerTask() { + s_logger.debug("Agent load balancer task created"); + } + + @Override + public synchronized boolean cancel() { + if (!cancelled) { + cancelled = true; + s_logger.debug("Agent load balancer task cancelled"); + return super.cancel(); + } + return true; + } + + @Override + public synchronized void run() { + try { + if (!cancelled) { + startRebalanceAgents(); + if (s_logger.isInfoEnabled()) { + s_logger.info("The agent load balancer task is now being cancelled"); + } + cancelled = true; + } + } catch(Throwable e) { + s_logger.error("Unexpected exception " + e.toString(), e); + } + } + } + + public void startRebalanceAgents() { + s_logger.debug("Management server " + _nodeId + " is asking other peers to rebalance their agents"); + List allMS = _mshostDao.listBy(ManagementServerHost.State.Up); + SearchCriteriaService sc = SearchCriteria2.create(HostVO.class); + sc.addAnd(sc.getEntity().getManagementServerId(), Op.NNULL); + sc.addAnd(sc.getEntity().getType(), Op.EQ, Host.Type.Routing); + List allManagedAgents = sc.list(); + + int avLoad = 0; + + if (!allManagedAgents.isEmpty() && !allMS.isEmpty()) { + avLoad = allManagedAgents.size() / allMS.size(); + } else { + if (s_logger.isDebugEnabled()) { + s_logger.debug("There are no hosts to rebalance in the system. Current number of active management server nodes in the system is " + allMS.size() + "; number of managed agents is " + allManagedAgents.size()); + } + return; + } + + if (avLoad == 0L) { + if (s_logger.isDebugEnabled()) { + s_logger.debug("As calculated average load is less than 1, rounding it to 1"); + } + avLoad = 1; + } + + for (ManagementServerHostVO node : allMS) { + if (node.getMsid() != _nodeId) { + + List hostsToRebalance = new ArrayList(); + for (AgentLoadBalancerPlanner lbPlanner : _lbPlanners) { + hostsToRebalance = lbPlanner.getHostsToRebalance(node.getMsid(), avLoad); + if (hostsToRebalance != null && !hostsToRebalance.isEmpty()) { + break; + } else { + s_logger.debug("Agent load balancer planner " + lbPlanner.getName() + " found no hosts to be rebalanced from management server " + node.getMsid()); + } + } + + + if (hostsToRebalance != null && !hostsToRebalance.isEmpty()) { + s_logger.debug("Found " + hostsToRebalance.size() + " hosts to rebalance from management server " + node.getMsid()); + for (HostVO host : hostsToRebalance) { + long hostId = host.getId(); + s_logger.debug("Asking management server " + node.getMsid() + " to give away host id=" + hostId); + boolean result = true; + + if (_hostTransferDao.findById(hostId) != null) { + s_logger.warn("Somebody else is already rebalancing host id: " + hostId); + continue; + } + + HostTransferMapVO transfer = null; + try { + transfer = _hostTransferDao.startAgentTransfering(hostId, node.getMsid(), _nodeId); + Answer[] answer = sendRebalanceCommand(node.getMsid(), hostId, node.getMsid(), _nodeId, Event.RequestAgentRebalance); + if (answer == null) { + s_logger.warn("Failed to get host id=" + hostId + " from management server " + node.getMsid()); + result = false; + } + } catch (Exception ex) { + s_logger.warn("Failed to get host id=" + hostId + " from management server " + node.getMsid(), ex); + result = false; + } finally { + if (transfer != null) { + HostTransferMapVO transferState = _hostTransferDao.findByIdAndFutureOwnerId(transfer.getId(), _nodeId); + if (!result && transferState != null && transferState.getState() == HostTransferState.TransferRequested) { + if (s_logger.isDebugEnabled()) { + s_logger.debug("Removing mapping from op_host_transfer as it failed to be set to transfer mode"); + } + //just remove the mapping (if exists) as nothing was done on the peer management server yet + _hostTransferDao.remove(transfer.getId()); + } + } + } + } + } else { + s_logger.debug("Found no hosts to rebalance from the management server " + node.getMsid()); + } + } + } + } + + private Answer[] sendRebalanceCommand(long peer, long agentId, long currentOwnerId, long futureOwnerId, Event event) { + TransferAgentCommand transfer = new TransferAgentCommand(agentId, currentOwnerId, futureOwnerId, event); + Commands commands = new Commands(OnError.Stop); + commands.addCommand(transfer); + + Command[] cmds = commands.toCommands(); + + try { + if (s_logger.isDebugEnabled()) { + s_logger.debug("Forwarding " + cmds[0].toString() + " to " + peer); + } + String peerName = Long.toString(peer); + Answer[] answers = _clusterMgr.execute(peerName, agentId, cmds, true); + return answers; + } catch (Exception e) { + s_logger.warn("Caught exception while talking to " + currentOwnerId, e); + return null; + } + } + + private Runnable getTransferScanTask() { + return new Runnable() { + @Override + public void run() { + try { + if (s_logger.isTraceEnabled()) { + s_logger.trace("Clustered agent transfer scan check, management server id:" + _nodeId); + } + synchronized (_agentToTransferIds) { + if (_agentToTransferIds.size() > 0) { + s_logger.debug("Found " + _agentToTransferIds.size() + " agents to transfer"); + //for (Long hostId : _agentToTransferIds) { + for (Iterator iterator = _agentToTransferIds.iterator(); iterator.hasNext();) { + Long hostId = iterator.next(); + AgentAttache attache = findAttache(hostId); + + // if the thread: + // 1) timed out waiting for the host to reconnect + // 2) recipient management server is not active any more + // 3) if the management server doesn't own the host any more + // remove the host from re-balance list and delete from op_host_transfer DB + // no need to do anything with the real attache as we haven't modified it yet + Date cutTime = DateUtil.currentGMTTime(); + HostTransferMapVO transferMap = _hostTransferDao.findActiveHostTransferMapByHostId(hostId, new Date(cutTime.getTime() - rebalanceTimeOut)); + + if (transferMap == null) { + s_logger.debug("Timed out waiting for the host id=" + hostId + " to be ready to transfer, skipping rebalance for the host"); + iterator.remove(); + _hostTransferDao.completeAgentTransfer(hostId); + continue; + } + + if (transferMap.getInitialOwner() != _nodeId || attache.forForward()) { + s_logger.debug("Management server " + _nodeId + " doesn't own host id=" + hostId + " any more, skipping rebalance for the host"); + iterator.remove(); + _hostTransferDao.completeAgentTransfer(hostId); + continue; + } + + ManagementServerHostVO ms = _mshostDao.findByMsid(transferMap.getFutureOwner()); + if (ms != null && ms.getState() != ManagementServerHost.State.Up) { + s_logger.debug("Can't transfer host " + hostId + " as it's future owner is not in UP state: " + ms + ", skipping rebalance for the host"); + iterator.remove(); + _hostTransferDao.completeAgentTransfer(hostId); + continue; + } + + if (attache.getQueueSize() == 0 && attache.getNonRecurringListenersSize() == 0) { + iterator.remove(); + try { + _executor.execute(new RebalanceTask(hostId, transferMap.getInitialOwner(), transferMap.getFutureOwner())); + } catch (RejectedExecutionException ex) { + s_logger.warn("Failed to submit rebalance task for host id=" + hostId + "; postponing the execution"); + continue; + } + + } else { + s_logger.debug("Agent " + hostId + " can't be transfered yet as its request queue size is " + attache.getQueueSize() + " and listener queue size is " + attache.getNonRecurringListenersSize()); + } + } + } else { + if (s_logger.isTraceEnabled()) { + s_logger.trace("Found no agents to be transfered by the management server " + _nodeId); + } + } + } + + } catch (Throwable e) { + s_logger.error("Problem with the clustered agent transfer scan check!", e); + } + } + }; + } + + + private boolean setToWaitForRebalance(final long hostId, long currentOwnerId, long futureOwnerId) { + s_logger.debug("Adding agent " + hostId + " to the list of agents to transfer"); + synchronized (_agentToTransferIds) { + return _agentToTransferIds.add(hostId); + } + } + + + protected boolean rebalanceHost(final long hostId, long currentOwnerId, long futureOwnerId) throws AgentUnavailableException{ + + boolean result = true; + if (currentOwnerId == _nodeId) { + if (!startRebalance(hostId)) { + s_logger.debug("Failed to start agent rebalancing"); + finishRebalance(hostId, futureOwnerId, Event.RebalanceFailed); + return false; + } + try { + Answer[] answer = sendRebalanceCommand(futureOwnerId, hostId, currentOwnerId, futureOwnerId, Event.StartAgentRebalance); + if (answer == null || !answer[0].getResult()) { + result = false; + } + + } catch (Exception ex) { + s_logger.warn("Host " + hostId + " failed to connect to the management server " + futureOwnerId + " as a part of rebalance process", ex); + result = false; + } + + if (result) { + s_logger.debug("Successfully transfered host id=" + hostId + " to management server " + futureOwnerId); + finishRebalance(hostId, futureOwnerId, Event.RebalanceCompleted); + } else { + s_logger.warn("Failed to transfer host id=" + hostId + " to management server " + futureOwnerId); + finishRebalance(hostId, futureOwnerId, Event.RebalanceFailed); + } + + } else if (futureOwnerId == _nodeId) { + HostVO host = _hostDao.findById(hostId); + try { + if (s_logger.isDebugEnabled()) { + s_logger.debug("Loading directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId + " as a part of rebalance process"); + } + result = loadDirectlyConnectedHost(host, true); + } catch (Exception ex) { + s_logger.warn("Failed to load directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId + " as a part of rebalance process due to:", ex); + result = false; + } + + if (result) { + s_logger.debug("Successfully loaded directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId + " as a part of rebalance process"); + } else { + s_logger.warn("Failed to load directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId + " as a part of rebalance process"); + } + } + + return result; + } + + + protected void finishRebalance(final long hostId, long futureOwnerId, Event event){ + + boolean success = (event == Event.RebalanceCompleted) ? true : false; + if (s_logger.isDebugEnabled()) { + s_logger.debug("Finishing rebalancing for the agent " + hostId + " with event " + event); + } + + AgentAttache attache = findAttache(hostId); + if (attache == null || !(attache instanceof ClusteredAgentAttache)) { + s_logger.debug("Unable to find forward attache for the host id=" + hostId + ", assuming that the agent disconnected already"); + _hostTransferDao.completeAgentTransfer(hostId); + return; + } + + ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache)attache; + + if (success) { + + //1) Set transfer mode to false - so the agent can start processing requests normally + forwardAttache.setTransferMode(false); + + //2) Get all transfer requests and route them to peer + Request requestToTransfer = forwardAttache.getRequestToTransfer(); + while (requestToTransfer != null) { + s_logger.debug("Forwarding request " + requestToTransfer.getSequence() + " held in transfer attache " + hostId + " from the management server " + _nodeId + " to " + futureOwnerId); + boolean routeResult = routeToPeer(Long.toString(futureOwnerId), requestToTransfer.getBytes()); + if (!routeResult) { + logD(requestToTransfer.getBytes(), "Failed to route request to peer"); + } + + requestToTransfer = forwardAttache.getRequestToTransfer(); + } + + s_logger.debug("Management server " + _nodeId + " completed agent " + hostId + " rebalance to " + futureOwnerId); + + } else { + failRebalance(hostId); + } + + s_logger.debug("Management server " + _nodeId + " completed agent " + hostId + " rebalance"); + _hostTransferDao.completeAgentTransfer(hostId); + } + + protected void failRebalance(final long hostId){ + try { + s_logger.debug("Management server " + _nodeId + " failed to rebalance agent " + hostId); + _hostTransferDao.completeAgentTransfer(hostId); + handleDisconnectWithoutInvestigation(findAttache(hostId), Event.RebalanceFailed); + } catch (Exception ex) { + s_logger.warn("Failed to reconnect host id=" + hostId + " as a part of failed rebalance task cleanup"); + } + } + + protected boolean startRebalance(final long hostId) { + HostVO host = _hostDao.findById(hostId); + + if (host == null || host.getRemoved() != null) { + s_logger.warn("Unable to find host record, fail start rebalancing process"); + return false; + } + + synchronized (_agents) { + ClusteredDirectAgentAttache attache = (ClusteredDirectAgentAttache)_agents.get(hostId); + if (attache != null && attache.getQueueSize() == 0 && attache.getNonRecurringListenersSize() == 0) { + handleDisconnectWithoutInvestigation(attache, Event.StartAgentRebalance); + ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache)createAttache(hostId); + if (forwardAttache == null) { + s_logger.warn("Unable to create a forward attache for the host " + hostId + " as a part of rebalance process"); + return false; + } + s_logger.debug("Putting agent id=" + hostId + " to transfer mode"); + forwardAttache.setTransferMode(true); + _agents.put(hostId, forwardAttache); + } else { + if (attache == null) { + s_logger.warn("Attache for the agent " + hostId + " no longer exists on management server " + _nodeId + ", can't start host rebalancing"); + } else { + s_logger.warn("Attache for the agent " + hostId + " has request queue size= " + attache.getQueueSize() + " and listener queue size " + attache.getNonRecurringListenersSize() + ", can't start host rebalancing"); + } + return false; + } + } + _hostTransferDao.startAgentTransfer(hostId); + return true; + } + + protected void cleanupTransferMap(long msId) { + List hostsJoingingCluster = _hostTransferDao.listHostsJoiningCluster(msId); + + for (HostTransferMapVO hostJoingingCluster : hostsJoingingCluster) { + _hostTransferDao.remove(hostJoingingCluster.getId()); + } + + List hostsLeavingCluster = _hostTransferDao.listHostsLeavingCluster(msId); + for (HostTransferMapVO hostLeavingCluster : hostsLeavingCluster) { + _hostTransferDao.remove(hostLeavingCluster.getId()); + } + } + + + protected class RebalanceTask implements Runnable { + Long hostId = null; + Long currentOwnerId = null; + Long futureOwnerId = null; + + + public RebalanceTask(long hostId, long currentOwnerId, long futureOwnerId) { + this.hostId = hostId; + this.currentOwnerId = currentOwnerId; + this.futureOwnerId = futureOwnerId; + } + + @Override + public void run() { + try { + if (s_logger.isDebugEnabled()) { + s_logger.debug("Rebalancing host id=" + hostId); + } + rebalanceHost(hostId, currentOwnerId, futureOwnerId); + } catch (Exception e) { + s_logger.warn("Unable to rebalance host id=" + hostId, e); + } finally { + StackMaid.current().exitCleanup(); + } + } + } + +}