mirror of https://github.com/apache/cloudstack.git
bug 9127: covered failure scenarios for agent LB.
status 9127: resolved fixed The feature is completed; please file separate bugs if any issue arises during the testing. Wiki link describing how agentLB works: http://intranet.lab.vmops.com/engineering/release-2.2-features/agent-load-balancing
This commit is contained in:
parent
019cc78976
commit
14cdc7de14
|
|
@ -22,13 +22,15 @@ import com.cloud.host.Status.Event;
|
|||
public class TransferAgentCommand extends Command {
|
||||
protected long agentId;
|
||||
protected long futureOwner;
|
||||
protected long currentOwner;
|
||||
Event event;
|
||||
|
||||
protected TransferAgentCommand() {
|
||||
}
|
||||
|
||||
public TransferAgentCommand(long agentId, long futureOwner, Event event) {
|
||||
public TransferAgentCommand(long agentId, long currentOwner, long futureOwner, Event event) {
|
||||
this.agentId = agentId;
|
||||
this.currentOwner = currentOwner;
|
||||
this.futureOwner = futureOwner;
|
||||
this.event = event;
|
||||
}
|
||||
|
|
@ -45,6 +47,10 @@ public class TransferAgentCommand extends Command {
|
|||
return event;
|
||||
}
|
||||
|
||||
public long getCurrentOwner() {
|
||||
return currentOwner;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean executeInSequence() {
|
||||
return false;
|
||||
|
|
|
|||
|
|
@ -183,7 +183,7 @@ public enum Status {
|
|||
s_fsm.addTransition(Status.Alert, Event.Ping, Status.Up);
|
||||
s_fsm.addTransition(Status.Alert, Event.Remove, Status.Removed);
|
||||
s_fsm.addTransition(Status.Alert, Event.ManagementServerDown, Status.Alert);
|
||||
s_fsm.addTransition(Status.Rebalancing, Event.RebalanceFailed, Status.Alert);
|
||||
s_fsm.addTransition(Status.Rebalancing, Event.RebalanceFailed, Status.Disconnected);
|
||||
s_fsm.addTransition(Status.Rebalancing, Event.RebalanceCompleted, Status.Connecting);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1136,12 +1136,12 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager {
|
|||
public void startDirectlyConnectedHosts() {
|
||||
List<HostVO> hosts = _hostDao.findDirectlyConnectedHosts();
|
||||
for (HostVO host : hosts) {
|
||||
loadDirectlyConnectedHost(host);
|
||||
loadDirectlyConnectedHost(host, false);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
protected void loadDirectlyConnectedHost(HostVO host) {
|
||||
protected boolean loadDirectlyConnectedHost(HostVO host, boolean executeNow) {
|
||||
String resourceName = host.getResource();
|
||||
ServerResource resource = null;
|
||||
try {
|
||||
|
|
@ -1150,25 +1150,25 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager {
|
|||
resource = (ServerResource) constructor.newInstance();
|
||||
} catch (ClassNotFoundException e) {
|
||||
s_logger.warn("Unable to find class " + host.getResource(), e);
|
||||
return;
|
||||
return false;
|
||||
} catch (InstantiationException e) {
|
||||
s_logger.warn("Unablet to instantiate class " + host.getResource(), e);
|
||||
return;
|
||||
return false;
|
||||
} catch (IllegalAccessException e) {
|
||||
s_logger.warn("Illegal access " + host.getResource(), e);
|
||||
return;
|
||||
return false;
|
||||
} catch (SecurityException e) {
|
||||
s_logger.warn("Security error on " + host.getResource(), e);
|
||||
return;
|
||||
return false;
|
||||
} catch (NoSuchMethodException e) {
|
||||
s_logger.warn("NoSuchMethodException error on " + host.getResource(), e);
|
||||
return;
|
||||
return false;
|
||||
} catch (IllegalArgumentException e) {
|
||||
s_logger.warn("IllegalArgumentException error on " + host.getResource(), e);
|
||||
return;
|
||||
return false;
|
||||
} catch (InvocationTargetException e) {
|
||||
s_logger.warn("InvocationTargetException error on " + host.getResource(), e);
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
_hostDao.loadDetails(host);
|
||||
|
|
@ -1204,14 +1204,25 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager {
|
|||
} catch (ConfigurationException e) {
|
||||
e.printStackTrace();
|
||||
s_logger.warn("Unable to configure resource due to ", e);
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!resource.start()) {
|
||||
s_logger.warn("Unable to start the resource");
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
if (executeNow) {
|
||||
AgentAttache attache = simulateStart(host.getId(), resource, host.getDetails(), false, null, null);
|
||||
if (attache == null) {
|
||||
return false;
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
_executor.execute(new SimulateStartTask(host.getId(), resource, host.getDetails(), null));
|
||||
return true;
|
||||
}
|
||||
_executor.execute(new SimulateStartTask(host.getId(), resource, host.getDetails(), null));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ import java.util.ArrayList;
|
|||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
|
@ -85,7 +86,8 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
public final static int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 5; // 5 seconds
|
||||
public long _loadSize = 100;
|
||||
protected Set<Long> _agentToTransferIds = new HashSet<Long>();
|
||||
private final long rebalanceTimeOut = 300000; // 5 mins - after this time remove the agent from the transfer list
|
||||
|
||||
private final long rebalanceTimeOut = 300000; // 5 mins - after this time remove the agent from the transfer list
|
||||
|
||||
@Inject
|
||||
protected ClusterManager _clusterMgr = null;
|
||||
|
|
@ -194,7 +196,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Loading directly connected host " + host.getId() + "(" + host.getName() + ")");
|
||||
}
|
||||
loadDirectlyConnectedHost(host);
|
||||
loadDirectlyConnectedHost(host, false);
|
||||
} catch (Throwable e) {
|
||||
s_logger.debug(" can not load directly connected host " + host.getId() + "(" + host.getName() + ") due to " + e.toString());
|
||||
}
|
||||
|
|
@ -569,7 +571,11 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
}
|
||||
}
|
||||
_timer.cancel();
|
||||
|
||||
//cancel all transfer tasks
|
||||
s_transferExecutor.shutdownNow();
|
||||
cleanupTransferMap();
|
||||
|
||||
return super.stop();
|
||||
}
|
||||
|
||||
|
|
@ -713,12 +719,12 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean executeRebalanceRequest(long agentId, Event event) throws AgentUnavailableException, OperationTimedoutException {
|
||||
public boolean executeRebalanceRequest(long agentId, long currentOwnerId, long futureOwnerId, Event event) throws AgentUnavailableException, OperationTimedoutException {
|
||||
if (event == Event.RequestAgentRebalance) {
|
||||
return setToWaitForRebalance(agentId);
|
||||
} else if (event == Event.StartAgentRebalance) {
|
||||
return rebalanceHost(agentId);
|
||||
}
|
||||
return setToWaitForRebalance(agentId, currentOwnerId, futureOwnerId);
|
||||
} else if (event == Event.StartAgentRebalance) {
|
||||
return rebalanceHost(agentId, currentOwnerId, futureOwnerId);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
|
@ -765,9 +771,15 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
long hostId = host.getId();
|
||||
s_logger.debug("Asking management server " + node.getMsid() + " to give away host id=" + hostId);
|
||||
boolean result = true;
|
||||
|
||||
if (_hostTransferDao.findById(hostId) != null) {
|
||||
s_logger.warn("Somebody else is already rebalancing host id: " + hostId);
|
||||
continue;
|
||||
}
|
||||
|
||||
HostTransferMapVO transfer = _hostTransferDao.startAgentTransfering(hostId, node.getMsid(), _nodeId);
|
||||
try {
|
||||
Answer[] answer = sendRebalanceCommand(hostId, node.getMsid(), Event.RequestAgentRebalance);
|
||||
Answer[] answer = sendRebalanceCommand(node.getMsid(), hostId, node.getMsid(), _nodeId, Event.RequestAgentRebalance);
|
||||
if (answer == null) {
|
||||
s_logger.warn("Failed to get host id=" + hostId + " from management server " + node.getMsid());
|
||||
result = false;
|
||||
|
|
@ -776,8 +788,8 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
s_logger.warn("Failed to get host id=" + hostId + " from management server " + node.getMsid(), ex);
|
||||
result = false;
|
||||
} finally {
|
||||
HostTransferMapVO updatedTransfer = _hostTransferDao.findById(transfer.getId());
|
||||
if (!result && updatedTransfer.getState() == HostTransferState.TransferRequested) {
|
||||
HostTransferMapVO transferState = _hostTransferDao.findByIdAndFutureOwnerId(transfer.getId(), _nodeId);
|
||||
if (!result && transferState != null && transferState.getState() == HostTransferState.TransferRequested) {
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Removing mapping from op_host_transfer as it failed to be set to transfer mode");
|
||||
}
|
||||
|
|
@ -793,23 +805,22 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
}
|
||||
}
|
||||
|
||||
private Answer[] sendRebalanceCommand(long agentId, long peer, Event event) {
|
||||
TransferAgentCommand transfer = new TransferAgentCommand(agentId, peer, event);
|
||||
private Answer[] sendRebalanceCommand(long peer, long agentId, long currentOwnerId, long futureOwnerId, Event event) {
|
||||
TransferAgentCommand transfer = new TransferAgentCommand(agentId, currentOwnerId, futureOwnerId, event);
|
||||
Commands commands = new Commands(OnError.Stop);
|
||||
commands.addCommand(transfer);
|
||||
|
||||
Command[] cmds = commands.toCommands();
|
||||
|
||||
String peerName = Long.toString(peer);
|
||||
|
||||
try {
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Forwarding " + cmds[0].toString() + " to " + peer);
|
||||
}
|
||||
String peerName = Long.toString(peer);
|
||||
Answer[] answers = _clusterMgr.execute(peerName, agentId, cmds, true);
|
||||
return answers;
|
||||
} catch (Exception e) {
|
||||
s_logger.warn("Caught exception while talking to " + peer, e);
|
||||
s_logger.warn("Caught exception while talking to " + currentOwnerId, e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
@ -819,45 +830,52 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
// TODO - change to trace level later on
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Clustered agent transfer scan check, management server id:" + _nodeId);
|
||||
if (s_logger.isTraceEnabled()) {
|
||||
s_logger.trace("Clustered agent transfer scan check, management server id:" + _nodeId);
|
||||
}
|
||||
|
||||
if (_agentToTransferIds.size() > 0) {
|
||||
s_logger.debug("Found " + _agentToTransferIds.size() + " agents to transfer");
|
||||
for (Long hostId : _agentToTransferIds) {
|
||||
AgentAttache attache = findAttache(hostId);
|
||||
if (attache.getQueueSize() == 0 && attache.getNonRecurringListenersSize() == 0) {
|
||||
boolean result = false;
|
||||
_agentToTransferIds.remove(hostId);
|
||||
try {
|
||||
_hostTransferDao.startAgentTransfer(hostId);
|
||||
result = rebalanceHost(hostId);
|
||||
} finally {
|
||||
if (result) {
|
||||
finishRebalance(hostId, Event.RebalanceCompleted);
|
||||
} else {
|
||||
finishRebalance(hostId, Event.RebalanceFailed);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// if we timed out waiting for the host to reconnect, remove host from rebalance list and delete from op_host_transfer DB
|
||||
// no need to do anything with the real attache
|
||||
Date cutTime = DateUtil.currentGMTTime();
|
||||
if (_hostTransferDao.isNotActive(hostId, new Date(cutTime.getTime() - rebalanceTimeOut))) {
|
||||
s_logger.debug("Timed out waiting for the host id=" + hostId + " to be ready to transfer, failing rebalance for this host");
|
||||
_agentToTransferIds.remove(hostId);
|
||||
_hostTransferDao.completeAgentTransfer(hostId);
|
||||
} else {
|
||||
s_logger.debug("Agent " + hostId + " can't be transfered yet as its request queue size is " + attache.getQueueSize() + " and listener queue size is " + attache.getNonRecurringListenersSize());
|
||||
}
|
||||
|
||||
// if the thread:
|
||||
// 1) timed out waiting for the host to reconnect
|
||||
// 2) recipient management server is not active any more
|
||||
// remove the host from re-balance list and delete from op_host_transfer DB
|
||||
// no need to do anything with the real attache as we haven't modified it yet
|
||||
Date cutTime = DateUtil.currentGMTTime();
|
||||
if (_hostTransferDao.isNotActive(hostId, new Date(cutTime.getTime() - rebalanceTimeOut))) {
|
||||
s_logger.debug("Timed out waiting for the host id=" + hostId + " to be ready to transfer, skipping rebalance for the host");
|
||||
failStartRebalance(hostId);
|
||||
return;
|
||||
}
|
||||
|
||||
HostTransferMapVO transferMap = _hostTransferDao.findByIdAndCurrentOwnerId(hostId, _nodeId);
|
||||
|
||||
if (transferMap == null) {
|
||||
s_logger.debug("Can't transfer host id=" + hostId + "; record for the host no longer exists in op_host_transfer table");
|
||||
failStartRebalance(hostId);
|
||||
return;
|
||||
}
|
||||
|
||||
ManagementServerHostVO ms = _mshostDao.findByMsid(transferMap.getFutureOwner());
|
||||
if (ms != null && ms.getState() != ManagementServerHost.State.Up) {
|
||||
s_logger.debug("Can't transfer host " + hostId + " as it's future owner is not in UP state: " + ms + ", skipping rebalance for the host");
|
||||
failStartRebalance(hostId);
|
||||
return;
|
||||
}
|
||||
|
||||
if (attache.getQueueSize() == 0 && attache.getNonRecurringListenersSize() == 0) {
|
||||
rebalanceHost(hostId, transferMap.getInitialOwner(), transferMap.getFutureOwner());
|
||||
} else {
|
||||
s_logger.debug("Agent " + hostId + " can't be transfered yet as its request queue size is " + attache.getQueueSize() + " and listener queue size is " + attache.getNonRecurringListenersSize());
|
||||
}
|
||||
|
||||
}
|
||||
} else {
|
||||
// TODO - change to trace level later on
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Found no agents to be transfered by the management server " + _nodeId);
|
||||
if (s_logger.isTraceEnabled()) {
|
||||
s_logger.trace("Found no agents to be transfered by the management server " + _nodeId);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -869,7 +887,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
}
|
||||
|
||||
|
||||
private boolean setToWaitForRebalance(final long hostId) {
|
||||
private boolean setToWaitForRebalance(final long hostId, long currentOwnerId, long futureOwnerId) {
|
||||
s_logger.debug("Adding agent " + hostId + " to the list of agents to transfer");
|
||||
synchronized (_agentToTransferIds) {
|
||||
return _agentToTransferIds.add(hostId);
|
||||
|
|
@ -877,65 +895,52 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
}
|
||||
|
||||
|
||||
private boolean rebalanceHost(final long hostId) {
|
||||
HostTransferMapVO map = _hostTransferDao.findById(hostId);
|
||||
HostVO host = _hostDao.findById(hostId);
|
||||
|
||||
protected boolean rebalanceHost(final long hostId, long currentOwnerId, long futureOwnerId) throws AgentUnavailableException{
|
||||
|
||||
boolean result = true;
|
||||
if (map.getInitialOwner() == _nodeId) {
|
||||
ClusteredDirectAgentAttache attache = (ClusteredDirectAgentAttache)findAttache(hostId);
|
||||
|
||||
if (attache != null && !attache.getTransferMode()) {
|
||||
attache.setTransferMode(true);
|
||||
s_logger.debug("Putting agent id=" + hostId + " to transfer mode");
|
||||
_agents.put(hostId, attache);
|
||||
if (host != null && host.getRemoved() == null) {
|
||||
host.setManagementServerId(null);
|
||||
s_logger.debug("Updating host id=" + hostId + " with the status " + Status.Rebalancing);
|
||||
_hostDao.updateStatus(host, Event.StartAgentRebalance, _nodeId);
|
||||
}
|
||||
|
||||
try {
|
||||
Answer[] answer = sendRebalanceCommand(hostId, map.getFutureOwner(), Event.StartAgentRebalance);
|
||||
if (answer == null) {
|
||||
s_logger.warn("Host " + hostId + " failed to connect to the management server " + map.getFutureOwner() + " as a part of rebalance process");
|
||||
result = false;
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
s_logger.warn("Host " + hostId + " failed to connect to the management server " + map.getFutureOwner() + " as a part of rebalance process", ex);
|
||||
if (currentOwnerId == _nodeId) {
|
||||
if (!startRebalance(hostId)) {
|
||||
s_logger.debug("Failed to start agent rebalancing");
|
||||
failStartRebalance(hostId);
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
Answer[] answer = sendRebalanceCommand(futureOwnerId, hostId, currentOwnerId, futureOwnerId, Event.StartAgentRebalance);
|
||||
if (answer == null || !answer[0].getResult()) {
|
||||
s_logger.warn("Host " + hostId + " failed to connect to the management server " + futureOwnerId + " as a part of rebalance process");
|
||||
result = false;
|
||||
}
|
||||
if (result) {
|
||||
s_logger.debug("Got host id=" + hostId + " from management server " + map.getFutureOwner());
|
||||
}
|
||||
|
||||
} else {
|
||||
s_logger.warn("Unable to find agent " + hostId + " on management server " + _nodeId);
|
||||
|
||||
} catch (Exception ex) {
|
||||
s_logger.warn("Host " + hostId + " failed to connect to the management server " + futureOwnerId + " as a part of rebalance process", ex);
|
||||
result = false;
|
||||
}
|
||||
} else if (map.getFutureOwner() == _nodeId) {
|
||||
|
||||
if (result) {
|
||||
s_logger.debug("Got host id=" + hostId + " from management server " + futureOwnerId);
|
||||
finishRebalance(hostId, futureOwnerId, Event.RebalanceCompleted);
|
||||
} else {
|
||||
finishRebalance(hostId, futureOwnerId, Event.RebalanceFailed);
|
||||
}
|
||||
|
||||
} else if (futureOwnerId == _nodeId) {
|
||||
HostVO host = _hostDao.findById(hostId);
|
||||
try {
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Loading directly connected host " + host.getId() + "(" + host.getName() + ") as a part of rebalance process");
|
||||
}
|
||||
//TODO - 1) no need to do vmfullSync/storageSetup on the agent side 2) Make sure that if connection fails, host goes from Rebalance state to Alert
|
||||
loadDirectlyConnectedHost(host);
|
||||
result = loadDirectlyConnectedHost(host, true);
|
||||
} catch (Exception ex) {
|
||||
s_logger.warn("Unable to load directly connected host " + host.getId() + " as a part of rebalance due to exception: ", ex);
|
||||
result = false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private boolean finishRebalance(final long hostId, Event event) {
|
||||
HostTransferMapVO map = _hostTransferDao.findById(hostId);
|
||||
|
||||
if (map.getInitialOwner() != _nodeId) {
|
||||
s_logger.warn("Why finish rebalance called not by initial host owner???");
|
||||
return false;
|
||||
}
|
||||
|
||||
protected void finishRebalance(final long hostId, long futureOwnerId, Event event) throws AgentUnavailableException{
|
||||
|
||||
boolean success = (event == Event.RebalanceCompleted) ? true : false;
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
|
|
@ -945,7 +950,8 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
AgentAttache attache = findAttache(hostId);
|
||||
if (attache == null) {
|
||||
s_logger.debug("Unable to find attache for the host id=" + hostId + ", assuming that the agent disconnected already");
|
||||
return true;
|
||||
_hostTransferDao.completeAgentTransfer(hostId);
|
||||
return;
|
||||
} else if (success) {
|
||||
s_logger.debug("Management server " + _nodeId + " is completing agent " + hostId + " rebalance");
|
||||
//1) Get all the requests before removing transfer attache
|
||||
|
|
@ -957,22 +963,87 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
getAttache(hostId);
|
||||
//3) forward all the requests to the management server which owns the host now
|
||||
if (!requests.isEmpty()) {
|
||||
s_logger.debug("Forwarding requests held in transfer attache " + hostId + " from the management server " + _nodeId + " to " + map.getFutureOwner());
|
||||
for (Request request : requests) {
|
||||
routeToPeer(Long.toString(map.getFutureOwner()), request.getBytes());
|
||||
s_logger.debug("Forwarding requests held in transfer attache " + hostId + " from the management server " + _nodeId + " to " + futureOwnerId);
|
||||
|
||||
for (Iterator<Request> iter = requests.iterator(); iter.hasNext();) {
|
||||
Request req = iter.next();
|
||||
boolean routeResult = routeToPeer(Long.toString(futureOwnerId), req.getBytes());
|
||||
if (!routeResult) {
|
||||
logD(req.getBytes(), "Failed to route request to peer");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} catch (AgentUnavailableException ex) {
|
||||
s_logger.warn("Not creating forward attache as agent is not available", ex);
|
||||
//TODO - - have to handle the case when requests can't be forwarded due to lack of forward attache
|
||||
s_logger.warn("Failed to finish host " + hostId + " rebalance: couldn't create forward attache as agent is not available", ex);
|
||||
failRebalance(hostId);
|
||||
}
|
||||
|
||||
} else {
|
||||
((ClusteredDirectAgentAttache) attache).setTransferMode(false);
|
||||
//TODO - have to handle the case when agent fails to rebalance 1) Either connect it back 2) Or disconnect it
|
||||
failRebalance(hostId);
|
||||
}
|
||||
|
||||
_hostTransferDao.completeAgentTransfer(hostId);
|
||||
}
|
||||
|
||||
protected void failRebalance(final long hostId) throws AgentUnavailableException{
|
||||
reconnect(hostId);
|
||||
_hostTransferDao.completeAgentTransfer(hostId);
|
||||
}
|
||||
|
||||
@DB
|
||||
protected boolean startRebalance(final long hostId) {
|
||||
HostVO host = _hostDao.findById(hostId);
|
||||
|
||||
if (host == null || host.getRemoved() != null) {
|
||||
s_logger.warn("Unable to find host record, fail start rebalancing process");
|
||||
return false;
|
||||
}
|
||||
|
||||
synchronized (_agents) {
|
||||
ClusteredDirectAgentAttache attache = (ClusteredDirectAgentAttache)_agents.get(hostId);
|
||||
if (attache != null && attache.getQueueSize() == 0 && attache.getNonRecurringListenersSize() == 0) {
|
||||
_agentToTransferIds.remove(hostId);
|
||||
s_logger.debug("Putting agent id=" + hostId + " to transfer mode");
|
||||
attache.setTransferMode(true);
|
||||
_agents.put(hostId, attache);
|
||||
} else {
|
||||
if (attache == null) {
|
||||
s_logger.warn("Attache for the agent " + hostId + " no longer exists on management server " + _nodeId + ", can't start host rebalancing");
|
||||
} else {
|
||||
s_logger.warn("Attache for the agent " + hostId + " has request queue size= " + attache.getQueueSize() + " and listener queue size " + attache.getNonRecurringListenersSize() + ", can't start host rebalancing");
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
Transaction txn = Transaction.currentTxn();
|
||||
txn.start();
|
||||
|
||||
s_logger.debug("Updating host id=" + hostId + " with the status " + Status.Rebalancing);
|
||||
host.setManagementServerId(null);
|
||||
_hostDao.updateStatus(host, Event.StartAgentRebalance, _nodeId);
|
||||
_hostTransferDao.startAgentTransfer(hostId);
|
||||
txn.commit();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
protected void failStartRebalance(final long hostId) {
|
||||
_agentToTransferIds.remove(hostId);
|
||||
_hostTransferDao.completeAgentTransfer(hostId);
|
||||
}
|
||||
|
||||
protected void cleanupTransferMap() {
|
||||
List<HostTransferMapVO> hostsJoingingCluster = _hostTransferDao.listHostsJoiningCluster(_nodeId);
|
||||
|
||||
for (HostTransferMapVO hostJoingingCluster : hostsJoingingCluster) {
|
||||
_hostTransferDao.remove(hostJoingingCluster.getId());
|
||||
}
|
||||
|
||||
List<HostTransferMapVO> hostsLeavingCluster = _hostTransferDao.listHostsLeavingCluster(_nodeId);
|
||||
for (HostTransferMapVO hostLeavingCluster : hostsLeavingCluster) {
|
||||
_hostTransferDao.remove(hostLeavingCluster.getId());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -63,7 +63,7 @@ public interface ClusterManager extends Manager {
|
|||
*/
|
||||
public void broadcast(long agentId, Command[] cmds);
|
||||
|
||||
boolean rebalanceAgent(long agentId, Event event) throws AgentUnavailableException, OperationTimedoutException;
|
||||
boolean rebalanceAgent(long agentId, Event event, long currentOwnerId, long futureOwnerId) throws AgentUnavailableException, OperationTimedoutException;
|
||||
|
||||
boolean isAgentRebalanceEnabled();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -928,9 +928,7 @@ public class ClusterManagerImpl implements ClusterManager {
|
|||
}
|
||||
}
|
||||
} catch (final InterruptedException e) {
|
||||
} finally {
|
||||
s_logger.debug("Agent rebalancing is completed, management server " + _mshostId + " is ready");
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Throwable e) {
|
||||
s_logger.error("Unexpected exception : ", e);
|
||||
|
|
@ -1169,8 +1167,8 @@ public class ClusterManagerImpl implements ClusterManager {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean rebalanceAgent(long agentId, Event event) throws AgentUnavailableException, OperationTimedoutException {
|
||||
return _rebalanceService.executeRebalanceRequest(agentId, event);
|
||||
public boolean rebalanceAgent(long agentId, Event event, long currentOwnerId, long futureOwnerId) throws AgentUnavailableException, OperationTimedoutException {
|
||||
return _rebalanceService.executeRebalanceRequest(agentId, currentOwnerId, futureOwnerId, event);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -213,7 +213,7 @@ public class ClusterServiceServletHttpHandler implements HttpRequestHandler {
|
|||
}
|
||||
boolean result = false;
|
||||
try {
|
||||
result = manager.rebalanceAgent(cmd.getAgentId(), cmd.getEvent());
|
||||
result = manager.rebalanceAgent(cmd.getAgentId(), cmd.getEvent(), cmd.getCurrentOwner(), cmd.getFutureOwner());
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Result is " + result);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,6 +9,6 @@ public interface ClusteredAgentRebalanceService {
|
|||
|
||||
void startRebalanceAgents();
|
||||
|
||||
boolean executeRebalanceRequest(long agentId, Event event) throws AgentUnavailableException, OperationTimedoutException;
|
||||
boolean executeRebalanceRequest(long agentId, long currentOwnerId, long futureOwnerId, Event event) throws AgentUnavailableException, OperationTimedoutException;
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -167,7 +167,7 @@ public class DummyClusterManagerImpl implements ClusterManager {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean rebalanceAgent(long agentId, Event event) throws AgentUnavailableException, OperationTimedoutException {
|
||||
public boolean rebalanceAgent(long agentId, Event event, long currentOwnerId, long futureOwnerId) throws AgentUnavailableException, OperationTimedoutException {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -174,5 +174,10 @@ public class ManagementServerHostVO implements ManagementServerHost{
|
|||
|
||||
public void setAlertCount(int count) {
|
||||
alertCount = count;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return new StringBuilder("ManagementServer[").append("-").append(id).append("-").append(msid).append("-").append(state).append("]").toString();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ import com.cloud.utils.db.GenericDao;
|
|||
|
||||
public interface HostTransferMapDao extends GenericDao<HostTransferMapVO, Long> {
|
||||
|
||||
List<HostTransferMapVO> listHostsLeavingCluster(long clusterId);
|
||||
List<HostTransferMapVO> listHostsLeavingCluster(long currentOwnerId);
|
||||
|
||||
List<HostTransferMapVO> listHostsJoiningCluster(long futureOwnerId);
|
||||
|
||||
|
|
@ -40,4 +40,8 @@ public interface HostTransferMapDao extends GenericDao<HostTransferMapVO, Long>
|
|||
boolean isNotActive(long hostId, Date cutTime);
|
||||
|
||||
boolean startAgentTransfer(long hostId);
|
||||
|
||||
HostTransferMapVO findByIdAndFutureOwnerId(long id, long futureOwnerId);
|
||||
|
||||
HostTransferMapVO findByIdAndCurrentOwnerId(long id, long currentOwnerId);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -63,10 +63,9 @@ public class HostTransferMapDaoImpl extends GenericDaoBase<HostTransferMapVO, Lo
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<HostTransferMapVO> listHostsLeavingCluster(long clusterId) {
|
||||
public List<HostTransferMapVO> listHostsLeavingCluster(long currentOwnerId) {
|
||||
SearchCriteria<HostTransferMapVO> sc = IntermediateStateSearch.create();
|
||||
sc.setParameters("initialOwner", clusterId);
|
||||
sc.setParameters("state", HostTransferState.TransferRequested, HostTransferState.TransferStarted);
|
||||
sc.setParameters("initialOwner", currentOwnerId);
|
||||
|
||||
return listBy(sc);
|
||||
}
|
||||
|
|
@ -75,12 +74,10 @@ public class HostTransferMapDaoImpl extends GenericDaoBase<HostTransferMapVO, Lo
|
|||
public List<HostTransferMapVO> listHostsJoiningCluster(long futureOwnerId) {
|
||||
SearchCriteria<HostTransferMapVO> sc = IntermediateStateSearch.create();
|
||||
sc.setParameters("futureOwner", futureOwnerId);
|
||||
sc.setParameters("state", HostTransferState.TransferRequested);
|
||||
|
||||
return listBy(sc);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public HostTransferMapVO startAgentTransfering(long hostId, long initialOwner, long futureOwner) {
|
||||
HostTransferMapVO transfer = new HostTransferMapVO(hostId, initialOwner, futureOwner);
|
||||
|
|
@ -122,4 +119,24 @@ public class HostTransferMapDaoImpl extends GenericDaoBase<HostTransferMapVO, Lo
|
|||
return update(hostId, transfer);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public HostTransferMapVO findByIdAndFutureOwnerId(long id, long futureOwnerId) {
|
||||
SearchCriteria<HostTransferMapVO> sc = AllFieldsSearch.create();
|
||||
sc.setParameters("futureOwner", futureOwnerId);
|
||||
sc.setParameters("id", id);
|
||||
|
||||
return findOneBy(sc);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public HostTransferMapVO findByIdAndCurrentOwnerId(long id, long currentOwnerId) {
|
||||
SearchCriteria<HostTransferMapVO> sc = AllFieldsSearch.create();
|
||||
sc.setParameters("initialOwner", currentOwnerId);
|
||||
sc.setParameters("id", id);
|
||||
|
||||
return findOneBy(sc);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -69,7 +69,7 @@ public class SecurityGroupListener implements Listener {
|
|||
|
||||
@Override
|
||||
public boolean isRecurring() {
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -1530,7 +1530,7 @@ CREATE TABLE `cloud`.`op_host_transfer` (
|
|||
`state` varchar(32) NOT NULL COMMENT 'the transfer state of the host',
|
||||
`created` datetime NOT NULL COMMENT 'date created',
|
||||
PRIMARY KEY (`id`),
|
||||
CONSTRAINT `fk_op_host_transfer__id` FOREIGN KEY `fk_op_host_transfer__id` (`id`) REFERENCES `host` (`id`),
|
||||
CONSTRAINT `fk_op_host_transfer__id` FOREIGN KEY `fk_op_host_transfer__id` (`id`) REFERENCES `host` (`id`) ON DELETE CASCADE,
|
||||
CONSTRAINT `fk_op_host_transfer__initial_mgmt_server_id` FOREIGN KEY `fk_op_host_transfer__initial_mgmt_server_id`(`initial_mgmt_server_id`) REFERENCES `mshost`(`msid`),
|
||||
CONSTRAINT `fk_op_host_transfer__future_mgmt_server_id` FOREIGN KEY `fk_op_host_transfer__future_mgmt_server_id`(`future_mgmt_server_id`) REFERENCES `mshost`(`msid`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
|
||||
|
|
|
|||
|
|
@ -90,7 +90,7 @@ CREATE TABLE `cloud`.`op_host_transfer` (
|
|||
`state` varchar(32) NOT NULL COMMENT 'the transfer state of the host',
|
||||
`created` datetime NOT NULL COMMENT 'date created',
|
||||
PRIMARY KEY (`id`),
|
||||
CONSTRAINT `fk_op_host_transfer__id` FOREIGN KEY `fk_op_host_transfer__id` (`id`) REFERENCES `host` (`id`),
|
||||
CONSTRAINT `fk_op_host_transfer__id` FOREIGN KEY `fk_op_host_transfer__id` (`id`) REFERENCES `host` (`id`) ON DELETE CASCADE,
|
||||
CONSTRAINT `fk_op_host_transfer__initial_mgmt_server_id` FOREIGN KEY `fk_op_host_transfer__initial_mgmt_server_id`(`initial_mgmt_server_id`) REFERENCES `mshost`(`msid`),
|
||||
CONSTRAINT `fk_op_host_transfer__future_mgmt_server_id` FOREIGN KEY `fk_op_host_transfer__future_mgmt_server_id`(`future_mgmt_server_id`) REFERENCES `mshost`(`msid`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
|
||||
|
|
|
|||
Loading…
Reference in New Issue