mirror of https://github.com/apache/cloudstack.git
Fix failure on agent reconnection (#8089)
This commit is contained in:
parent
6ae3b73ca2
commit
3b11663d87
|
|
@ -40,6 +40,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
|
||||
import javax.naming.ConfigurationException;
|
||||
|
||||
import com.cloud.agent.api.PingAnswer;
|
||||
import com.cloud.utils.NumbersUtil;
|
||||
import org.apache.cloudstack.agent.lb.SetupMSListAnswer;
|
||||
import org.apache.cloudstack.agent.lb.SetupMSListCommand;
|
||||
|
|
@ -822,6 +823,9 @@ public class Agent implements HandlerFactory, IAgentControl {
|
|||
listener.processControlResponse(response, (AgentControlAnswer)answer);
|
||||
}
|
||||
}
|
||||
} else if (answer instanceof PingAnswer && (((PingAnswer) answer).isSendStartup()) && _reconnectAllowed) {
|
||||
s_logger.info("Management server requested startup command to reinitialize the agent");
|
||||
sendStartup(link);
|
||||
} else {
|
||||
setLastPingResponseTime();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,15 +22,26 @@ package com.cloud.agent.api;
|
|||
public class PingAnswer extends Answer {
|
||||
private PingCommand _command = null;
|
||||
|
||||
private boolean sendStartup = false;
|
||||
|
||||
protected PingAnswer() {
|
||||
}
|
||||
|
||||
public PingAnswer(PingCommand cmd) {
|
||||
public PingAnswer(PingCommand cmd, boolean sendStartup) {
|
||||
super(cmd);
|
||||
_command = cmd;
|
||||
this.sendStartup = sendStartup;
|
||||
}
|
||||
|
||||
public PingCommand getCommand() {
|
||||
return _command;
|
||||
}
|
||||
|
||||
public boolean isSendStartup() {
|
||||
return sendStartup;
|
||||
}
|
||||
|
||||
public void setSendStartup(boolean sendStartup) {
|
||||
this.sendStartup = sendStartup;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -40,6 +40,7 @@ import javax.naming.ConfigurationException;
|
|||
|
||||
import com.cloud.configuration.Config;
|
||||
import com.cloud.utils.NumbersUtil;
|
||||
import com.cloud.utils.db.GlobalLock;
|
||||
import org.apache.cloudstack.agent.lb.IndirectAgentLB;
|
||||
import org.apache.cloudstack.ca.CAManager;
|
||||
import org.apache.cloudstack.engine.orchestration.service.NetworkOrchestrationService;
|
||||
|
|
@ -798,49 +799,65 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
|
|||
return true;
|
||||
}
|
||||
|
||||
protected Status getNextStatusOnDisconnection(Host host, final Status.Event event) {
|
||||
final Status currentStatus = host.getStatus();
|
||||
Status nextStatus;
|
||||
if (currentStatus == Status.Down || currentStatus == Status.Alert || currentStatus == Status.Removed) {
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug(String.format("Host %s is already %s", host.getUuid(), currentStatus));
|
||||
}
|
||||
nextStatus = currentStatus;
|
||||
} else {
|
||||
try {
|
||||
nextStatus = currentStatus.getNextStatus(event);
|
||||
} catch (final NoTransitionException e) {
|
||||
final String err = String.format("Cannot find next status for %s as current status is %s for agent %s", event, currentStatus, host.getUuid());
|
||||
s_logger.debug(err);
|
||||
throw new CloudRuntimeException(err);
|
||||
}
|
||||
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug(String.format("The next status of agent %s is %s, current status is %s", host.getUuid(), nextStatus, currentStatus));
|
||||
}
|
||||
}
|
||||
return nextStatus;
|
||||
}
|
||||
|
||||
protected boolean handleDisconnectWithoutInvestigation(final AgentAttache attache, final Status.Event event, final boolean transitState, final boolean removeAgent) {
|
||||
final long hostId = attache.getId();
|
||||
|
||||
s_logger.info("Host " + hostId + " is disconnecting with event " + event);
|
||||
Status nextStatus = null;
|
||||
final HostVO host = _hostDao.findById(hostId);
|
||||
if (host == null) {
|
||||
s_logger.warn("Can't find host with " + hostId);
|
||||
nextStatus = Status.Removed;
|
||||
} else {
|
||||
final Status currentStatus = host.getStatus();
|
||||
if (currentStatus == Status.Down || currentStatus == Status.Alert || currentStatus == Status.Removed) {
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Host " + hostId + " is already " + currentStatus);
|
||||
}
|
||||
nextStatus = currentStatus;
|
||||
} else {
|
||||
try {
|
||||
nextStatus = currentStatus.getNextStatus(event);
|
||||
} catch (final NoTransitionException e) {
|
||||
final String err = "Cannot find next status for " + event + " as current status is " + currentStatus + " for agent " + hostId;
|
||||
s_logger.debug(err);
|
||||
throw new CloudRuntimeException(err);
|
||||
boolean result = false;
|
||||
GlobalLock joinLock = getHostJoinLock(hostId);
|
||||
if (joinLock.lock(60)) {
|
||||
try {
|
||||
s_logger.info(String.format("Host %d is disconnecting with event %s", hostId, event));
|
||||
Status nextStatus = null;
|
||||
final HostVO host = _hostDao.findById(hostId);
|
||||
if (host == null) {
|
||||
s_logger.warn(String.format("Can't find host with %d", hostId));
|
||||
nextStatus = Status.Removed;
|
||||
} else {
|
||||
nextStatus = getNextStatusOnDisconnection(host, event);
|
||||
caService.purgeHostCertificate(host);
|
||||
}
|
||||
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("The next status of agent " + hostId + "is " + nextStatus + ", current status is " + currentStatus);
|
||||
s_logger.debug(String.format("Deregistering link for %d with state %s", hostId, nextStatus));
|
||||
}
|
||||
|
||||
removeAgent(attache, nextStatus);
|
||||
|
||||
if (host != null && transitState) {
|
||||
// update the state for host in DB as per the event
|
||||
disconnectAgent(host, event, _nodeId);
|
||||
}
|
||||
} finally {
|
||||
joinLock.unlock();
|
||||
}
|
||||
caService.purgeHostCertificate(host);
|
||||
result = true;
|
||||
}
|
||||
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Deregistering link for " + hostId + " with state " + nextStatus);
|
||||
}
|
||||
|
||||
removeAgent(attache, nextStatus);
|
||||
// update the DB
|
||||
if (host != null && transitState) {
|
||||
disconnectAgent(host, event, _nodeId);
|
||||
}
|
||||
|
||||
return true;
|
||||
joinLock.releaseRef();
|
||||
return result;
|
||||
}
|
||||
|
||||
protected boolean handleDisconnectWithInvestigation(final AgentAttache attache, Status.Event event) {
|
||||
|
|
@ -1101,26 +1118,23 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
|
|||
return attache;
|
||||
}
|
||||
|
||||
private AgentAttache handleConnectedAgent(final Link link, final StartupCommand[] startup, final Request request) {
|
||||
AgentAttache attache = null;
|
||||
ReadyCommand ready = null;
|
||||
try {
|
||||
final List<String> agentMSHostList = new ArrayList<>();
|
||||
String lbAlgorithm = null;
|
||||
if (startup != null && startup.length > 0) {
|
||||
final String agentMSHosts = startup[0].getMsHostList();
|
||||
if (StringUtils.isNotEmpty(agentMSHosts)) {
|
||||
String[] msHosts = agentMSHosts.split("@");
|
||||
if (msHosts.length > 1) {
|
||||
lbAlgorithm = msHosts[1];
|
||||
}
|
||||
agentMSHostList.addAll(Arrays.asList(msHosts[0].split(",")));
|
||||
private AgentAttache sendReadyAndGetAttache(HostVO host, ReadyCommand ready, Link link, StartupCommand[] startup) throws ConnectionException {
|
||||
final List<String> agentMSHostList = new ArrayList<>();
|
||||
String lbAlgorithm = null;
|
||||
if (startup != null && startup.length > 0) {
|
||||
final String agentMSHosts = startup[0].getMsHostList();
|
||||
if (StringUtils.isNotEmpty(agentMSHosts)) {
|
||||
String[] msHosts = agentMSHosts.split("@");
|
||||
if (msHosts.length > 1) {
|
||||
lbAlgorithm = msHosts[1];
|
||||
}
|
||||
agentMSHostList.addAll(Arrays.asList(msHosts[0].split(",")));
|
||||
}
|
||||
|
||||
final HostVO host = _resourceMgr.createHostVOForConnectedAgent(startup);
|
||||
if (host != null) {
|
||||
ready = new ReadyCommand(host.getDataCenterId(), host.getId(), NumbersUtil.enableHumanReadableSizes);
|
||||
}
|
||||
AgentAttache attache = null;
|
||||
GlobalLock joinLock = getHostJoinLock(host.getId());
|
||||
if (joinLock.lock(60)) {
|
||||
try {
|
||||
|
||||
if (!indirectAgentLB.compareManagementServerList(host.getId(), host.getDataCenterId(), agentMSHostList, lbAlgorithm)) {
|
||||
final List<String> newMSList = indirectAgentLB.getManagementServerList(host.getId(), host.getDataCenterId(), null);
|
||||
|
|
@ -1132,6 +1146,24 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
|
|||
|
||||
attache = createAttacheForConnect(host, link);
|
||||
attache = notifyMonitorsOfConnection(attache, startup, false);
|
||||
} finally {
|
||||
joinLock.unlock();
|
||||
}
|
||||
} else {
|
||||
throw new ConnectionException(true, "Unable to acquire lock on host " + host.getUuid());
|
||||
}
|
||||
joinLock.releaseRef();
|
||||
return attache;
|
||||
}
|
||||
|
||||
private AgentAttache handleConnectedAgent(final Link link, final StartupCommand[] startup, final Request request) {
|
||||
AgentAttache attache = null;
|
||||
ReadyCommand ready = null;
|
||||
try {
|
||||
final HostVO host = _resourceMgr.createHostVOForConnectedAgent(startup);
|
||||
if (host != null) {
|
||||
ready = new ReadyCommand(host.getDataCenterId(), host.getId(), NumbersUtil.enableHumanReadableSizes);
|
||||
attache = sendReadyAndGetAttache(host, ready, link, startup);
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
s_logger.debug("Failed to handle host connection: ", e);
|
||||
|
|
@ -1265,6 +1297,8 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
|
|||
connectAgent(link, cmds, request);
|
||||
}
|
||||
return;
|
||||
} else if (cmd instanceof StartupCommand) {
|
||||
connectAgent(link, cmds, request);
|
||||
}
|
||||
|
||||
final long hostId = attache.getId();
|
||||
|
|
@ -1318,13 +1352,14 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
|
|||
handleCommands(attache, request.getSequence(), new Command[] {cmd});
|
||||
if (cmd instanceof PingCommand) {
|
||||
final long cmdHostId = ((PingCommand)cmd).getHostId();
|
||||
boolean requestStartupCommand = false;
|
||||
|
||||
final HostVO host = _hostDao.findById(Long.valueOf(cmdHostId));
|
||||
boolean gatewayAccessible = true;
|
||||
// if the router is sending a ping, verify the
|
||||
// gateway was pingable
|
||||
if (cmd instanceof PingRoutingCommand) {
|
||||
final boolean gatewayAccessible = ((PingRoutingCommand)cmd).isGatewayAccessible();
|
||||
final HostVO host = _hostDao.findById(Long.valueOf(cmdHostId));
|
||||
|
||||
gatewayAccessible = ((PingRoutingCommand)cmd).isGatewayAccessible();
|
||||
if (host != null) {
|
||||
if (!gatewayAccessible) {
|
||||
// alert that host lost connection to
|
||||
|
|
@ -1342,7 +1377,10 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
|
|||
s_logger.debug("Not processing " + PingRoutingCommand.class.getSimpleName() + " for agent id=" + cmdHostId + "; can't find the host in the DB");
|
||||
}
|
||||
}
|
||||
answer = new PingAnswer((PingCommand)cmd);
|
||||
if (host!= null && host.getStatus() != Status.Up && gatewayAccessible) {
|
||||
requestStartupCommand = true;
|
||||
}
|
||||
answer = new PingAnswer((PingCommand)cmd, requestStartupCommand);
|
||||
} else if (cmd instanceof ReadyAnswer) {
|
||||
final HostVO host = _hostDao.findById(attache.getId());
|
||||
if (host == null) {
|
||||
|
|
@ -1864,4 +1902,8 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
|
|||
sendCommandToAgents(hostsPerZone, params);
|
||||
}
|
||||
}
|
||||
|
||||
private GlobalLock getHostJoinLock(Long hostId) {
|
||||
return GlobalLock.getInternLock(String.format("%s-%s", "Host-Join", hostId));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,102 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
""" Check state transition of host from Alert to Up on Ping
|
||||
"""
|
||||
|
||||
# Import Local Modules
|
||||
from marvin.cloudstackTestCase import *
|
||||
from marvin.lib.common import *
|
||||
from marvin.lib.utils import *
|
||||
from nose.plugins.attrib import attr
|
||||
|
||||
_multiprocess_shared_ = False
|
||||
|
||||
|
||||
class TestHostPing(cloudstackTestCase):
|
||||
|
||||
def setUp(self, handler=logging.StreamHandler()):
|
||||
self.logger = logging.getLogger('TestHM')
|
||||
self.stream_handler = handler
|
||||
self.logger.setLevel(logging.DEBUG)
|
||||
self.logger.addHandler(self.stream_handler)
|
||||
self.apiclient = self.testClient.getApiClient()
|
||||
self.hypervisor = self.testClient.getHypervisorInfo()
|
||||
self.mgtSvrDetails = self.config.__dict__["mgtSvr"][0].__dict__
|
||||
self.dbConnection = self.testClient.getDbConnection()
|
||||
self.services = self.testClient.getParsedTestDataConfig()
|
||||
self.zone = get_zone(self.apiclient, self.testClient.getZoneForTests())
|
||||
self.pod = get_pod(self.apiclient, self.zone.id)
|
||||
self.cleanup = []
|
||||
|
||||
def tearDown(self):
|
||||
super(TestHostPing, self).tearDown()
|
||||
|
||||
def checkHostStateInCloudstack(self, state, host_id):
|
||||
try:
|
||||
listHost = Host.list(
|
||||
self.apiclient,
|
||||
type='Routing',
|
||||
zoneid=self.zone.id,
|
||||
podid=self.pod.id,
|
||||
id=host_id
|
||||
)
|
||||
self.assertEqual(
|
||||
isinstance(listHost, list),
|
||||
True,
|
||||
"Check if listHost returns a valid response"
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
len(listHost),
|
||||
1,
|
||||
"Check if listHost returns a host"
|
||||
)
|
||||
self.logger.debug(" Host state is %s " % listHost[0].state)
|
||||
if listHost[0].state == state:
|
||||
return True, 1
|
||||
else:
|
||||
return False, 1
|
||||
except Exception as e:
|
||||
self.logger.debug("Got exception %s" % e)
|
||||
return False, 1
|
||||
|
||||
@attr(
|
||||
tags=[
|
||||
"advanced",
|
||||
"advancedns",
|
||||
"smoke",
|
||||
"basic"],
|
||||
required_hardware="true")
|
||||
def test_01_host_ping_on_alert(self):
|
||||
listHost = Host.list(
|
||||
self.apiclient,
|
||||
type='Routing',
|
||||
zoneid=self.zone.id,
|
||||
podid=self.pod.id,
|
||||
)
|
||||
for host in listHost:
|
||||
self.logger.debug('Hypervisor = {}'.format(host.id))
|
||||
|
||||
hostToTest = listHost[0]
|
||||
sql_query = "UPDATE host SET status = 'Alert' WHERE uuid = '" + hostToTest.id + "'"
|
||||
self.dbConnection.execute(sql_query)
|
||||
|
||||
hostUpInCloudstack = wait_until(30, 8, self.checkHostStateInCloudstack, "Up", hostToTest.id)
|
||||
|
||||
if not (hostUpInCloudstack):
|
||||
raise self.fail("Host is not up %s, in cloudstack so failing test " % (hostToTest.ipaddress))
|
||||
return
|
||||
Loading…
Reference in New Issue