Indirect agent connection improvements, includes the following improvements.

- Enhances the Host connecting logic to avoid connecting storm (where Agent opens multiple sockets against Management Server).
- Implements HostConnectProcess task where Host upon connection checks whether lock is available, traces Host connecting progress, status and timeout.
- Introduces AgentConnectStatusCommand, where Host checks whether lock for the Host is available (i.e. "previous" connect process is finished).
- Implementes logic to check whether Management Server has lock against Host (exposed MySQL DB lock presence via API)
- Removes synchronization on Host disconnect process, double-disconnect logic in clustered Management Server environment, added early removal from ping map (in case of combination ping timeout delay + synchronized disconnect process the Agent Manager submits more disconnect requests)
- Introduces parameterized connection and status check timeouts
- Implements backoff algorithm abstraction - can be used either constant backoff timeout or exponential with jitter to wait between connection Host attempts to Management Server
- Implements ServerAttache to be used on the Agent side of communication (similar to Attache on Management Server side)
- Enhances/Adds logs significantly to Host Agent and Agent Manager logic to trace Host connecting and disconnecting process, including ids, names, context UUIDs and timings (how much time took overall initialization/deinitialization)
- Adds logs to communication between Management Servers (PDU requests)
- Adds DB indexes to improve search performance, uses IDEMPOTENT_ADD_INDEX for safer DB schema updates
This commit is contained in:
mprokopchuk 2026-04-10 11:39:45 +05:30 committed by Suresh Kumar Anaparti
parent 96ca1b2a7c
commit fbacf27221
No known key found for this signature in database
GPG Key ID: D7CEAE3A9E71D0AA
47 changed files with 4126 additions and 798 deletions

File diff suppressed because it is too large Load Diff

View File

@ -32,6 +32,7 @@ import java.util.UUID;
import javax.naming.ConfigurationException;
import com.cloud.utils.backoff.BackoffFactory;
import org.apache.commons.daemon.Daemon;
import org.apache.commons.daemon.DaemonContext;
import org.apache.commons.daemon.DaemonInitException;
@ -52,7 +53,6 @@ import com.cloud.utils.LogUtils;
import com.cloud.utils.ProcessUtil;
import com.cloud.utils.PropertiesUtil;
import com.cloud.utils.backoff.BackoffAlgorithm;
import com.cloud.utils.backoff.impl.ConstantTimeBackoff;
import com.cloud.utils.exception.CloudRuntimeException;
public class AgentShell implements IAgentShell, Daemon {
@ -95,6 +95,16 @@ public class AgentShell implements IAgentShell, Daemon {
return _backoff;
}
@Override
public void setBackoffAlgorithm(BackoffAlgorithm backoffAlgorithm) {
this._backoff = backoffAlgorithm;
try {
backoffAlgorithm.getConfiguration().forEach((key, value) -> setPersistentProperty(null, key, value));
} catch (RuntimeException e) {
LOGGER.warn("Failed to persist backoff properties");
}
}
@Override
public int getPingRetries() {
return _pingRetries;
@ -400,7 +410,7 @@ public class AgentShell implements IAgentShell, Daemon {
final Class<?> c = this.getClass();
_version = c.getPackage().getImplementationVersion();
if (_version == null) {
throw new CloudRuntimeException("Unable to find the implementation version of this agent");
_version = "unknown";
}
LOGGER.info("Implementation Version is {}", _version);
@ -410,7 +420,7 @@ public class AgentShell implements IAgentShell, Daemon {
if (LOGGER.isDebugEnabled()) {
List<String> properties = Collections.list((Enumeration<String>)_properties.propertyNames());
for (String property : properties) {
LOGGER.debug("Found property: {}", property);
LOGGER.debug("Found property: {}, value: {}", property, _properties.getProperty(property));
}
}
@ -424,11 +434,15 @@ public class AgentShell implements IAgentShell, Daemon {
_properties.put(cmdLineProp.getKey(), cmdLineProp.getValue());
}
LOGGER.info("Defaulting to the constant time backoff algorithm");
_backoff = new ConstantTimeBackoff();
Map<String, Object> map = new HashMap<>();
map.put("seconds", _properties.getProperty("backoff.seconds"));
_backoff.configure("ConstantTimeBackoff", map);
try {
LOGGER.info("Creating backoff delay algorithm implementation");
setBackoffAlgorithm(BackoffFactory.create(_properties));
LOGGER.info("Created {} delay algorithm implementation", getBackoffAlgorithm().getClass().getName());
} catch (RuntimeException e) {
String msg = String.format("Failed to create backoff with provided properties %s, failing back to default", _properties);
LOGGER.warn(msg, e);
setBackoffAlgorithm(BackoffFactory.createDefault(_properties));
}
}
private void launchAgent() throws ConfigurationException {
@ -546,7 +560,7 @@ public class AgentShell implements IAgentShell, Daemon {
}
} catch (final Exception e) {
LOGGER.error("Unable to start agent: ", e);
LOGGER.error("Unable to start agent: {}", e.getLocalizedMessage(), e);
System.exit(ExitStatus.Error.value());
}
}
@ -568,6 +582,7 @@ public class AgentShell implements IAgentShell, Daemon {
shell.init(args);
shell.start();
} catch (ConfigurationException e) {
LOGGER.fatal(e.getMessage(), e);
System.out.println(e.getMessage());
}
}

View File

@ -0,0 +1,355 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.agent;
import com.cloud.agent.api.AgentConnectStatusAnswer;
import com.cloud.agent.api.AgentConnectStatusCommand;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.Command;
import com.cloud.agent.api.StartupAnswer;
import com.cloud.agent.api.StartupCommand;
import com.cloud.agent.properties.AgentProperties;
import com.cloud.agent.properties.AgentPropertiesFileHandler;
import com.cloud.agent.transport.Request;
import com.cloud.exception.CloudException;
import com.cloud.exception.OperationTimedoutException;
import com.cloud.host.Status;
import com.cloud.resource.ResourceStatusUpdater;
import com.cloud.resource.ServerResource;
import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.nio.Link;
import org.apache.cloudstack.threadcontext.ThreadContextUtil;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.ThreadContext;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
public class HostConnectProcess {
private static final Logger logger = LogManager.getLogger(HostConnectProcess.class);
public static final int DEFAULT_ASYNC_COMMAND_TIMEOUT_SEC =
AgentPropertiesFileHandler.getPropertyValue(AgentProperties.ASYNC_COMMAND_TIMEOUT_SEC);
public static final int DEFAULT_ASYNC_STARTUP_COMMAND_TIMEOUT_SEC =
AgentPropertiesFileHandler.getPropertyValue(AgentProperties.ASYNC_STARTUP_COMMAND_TIMEOUT_SEC);
static final long HOST_STATUS_CHECK_INITIAL_DELAY_SEC = 10;
private long hostStatusCheckDelaySec = AgentPropertiesFileHandler.getPropertyValue(AgentProperties.AGENT_HOST_STATUS_CHECK_DELAY_SEC);
private final AtomicReference<ScheduledFuture<?>> hostStatusFutureRef = new AtomicReference<>();
private final Agent agent;
private ScheduledExecutorService hostStatusExecutor;
public HostConnectProcess(Agent agent) {
this.agent = agent;
initExecutors();
}
private void initExecutors() {
stop();
var threadFactory = new NamedThreadFactory("Agent-" + HostStatusTask.class.getSimpleName());
hostStatusExecutor = Executors.newScheduledThreadPool(1, threadFactory);
}
/**
* Stops the whole connect process and cancels all scheduled asynchronous tasks.
* Returns {@link Boolean#TRUE} if {@link HostConnectProcess} was waiting for {@link StartupAnswer}.
*/
public boolean stop() {
logger.debug("Stopping connect process. The process is active: {}", isInProgress());
stopHostStatusExecutor();
logger.debug("Stopped executor");
Optional<? extends ScheduledFuture<?>> hostStatusOpt = Optional.ofNullable(hostStatusFutureRef.getAndSet(null))
.filter(Predicate.not(ScheduledFuture::isCancelled));
hostStatusOpt.ifPresent(future -> future.cancel(true));
logger.debug("Cancelled future");
return hostStatusOpt.isPresent();
}
private void stopHostStatusExecutor() {
if (hostStatusExecutor != null) {
hostStatusExecutor.shutdown();
hostStatusExecutor = null;
}
}
public void scheduleConnectProcess(Link link, boolean connectionTransfer) {
logger.debug("Scheduling connect process for {}", link);
initExecutors();
var task = new HostStatusTask(link, connectionTransfer, agent, hostStatusFutureRef);
var future = hostStatusExecutor.scheduleWithFixedDelay(ThreadContextUtil.wrapThreadContext(task),
HOST_STATUS_CHECK_INITIAL_DELAY_SEC,
hostStatusCheckDelaySec, TimeUnit.SECONDS);
hostStatusFutureRef.set(future);
}
/**
* Returns {@link Boolean#TRUE} if {@link HostStatusTask} created and scheduled.
* That means there is already {@link Status#Connecting} process is running.
*/
public boolean isInProgress() {
return Optional.ofNullable(hostStatusFutureRef.get())
.filter(Predicate.not(ScheduledFuture::isCancelled)).isPresent();
}
public void updateHostStatusCheckDelay(int newDelaySec) {
logger.info("Updating host status check delay from {} to {} seconds", hostStatusCheckDelaySec, newDelaySec);
this.hostStatusCheckDelaySec = newDelaySec;
}
/**
* Task wait for the Host to be available to connect to submit {@link StartupCommand}.
* Checks Host status on Management Server cluster and submit {@link StartupCommand} only if there is no lock and
* Host is not {@link Status#Connecting}.
*/
public static class HostStatusTask implements Runnable, AsyncSend {
private final Set<Status> operationalStatuses = Set.of(Status.Connecting, Status.Up, Status.Rebalancing);
private final Link _link;
private final boolean _forceConnect;
private final Agent _agent;
private final AtomicReference<? extends ScheduledFuture<?>> _futureRef;
public HostStatusTask(Link link, boolean forceConnect, Agent agent,
AtomicReference<? extends ScheduledFuture<?>> futureRef) {
logger.debug("{} created", this.getClass().getSimpleName());
_link = link;
_forceConnect = forceConnect;
_agent = agent;
_futureRef = futureRef;
}
private void cancel() {
logger.debug("Cancelling future");
Optional.ofNullable(_futureRef.get())
.filter(Predicate.not(ScheduledFuture::isCancelled))
.ifPresent(future -> future.cancel(true));
logger.debug("Cancelled future");
}
@Override
public void run() {
try {
logger.debug("Running {}", getClass().getSimpleName());
runInternal();
} catch (Exception e) {
logger.error("Failed to run {}", getClass().getSimpleName(), e);
}
}
private void runInternal() {
ServerAttache attache = (ServerAttache) _link.attachment();
if (attache == null || attache.getLink() == null) {
cancel();
return;
}
AgentConnectStatusAnswer answer;
try {
answer = getAgentConnectStatusAnswer(attache);
} catch (IOException e) {
cancel();
logger.error("The connection to {} interrupted, restarting the whole process", _link, e);
_agent.getRequestHandler().submit(() -> _agent.reconnect(_link, null, _forceConnect));
return;
}
if (answer == null) {
logger.warn("Received empty agent connect status answer, will retry later");
return;
}
Boolean lockAvailable = answer.isLockAvailable();
Status status = answer.getHostStatus();
if (Boolean.TRUE.equals(lockAvailable)) {
// send startup command here
logger.info("There is no lock and Host status is {}", status);
try {
sendStartupCommand(_link, _forceConnect);
logger.debug("Sending startup command to {} finished", _link);
cancel();
logger.debug("Unscheduled {}", getClass().getSimpleName());
} catch (RuntimeException e) {
logger.error("Failed to send startup command to {}", _link, e);
} catch (IOException e) {
cancel();
logger.error("The connection to {} interrupted, restarting the whole process", _link, e);
_agent.getRequestHandler().submit(() -> _agent.reconnect(_link, null, _forceConnect));
}
} else {
logger.info("There is lock and Host status is {}, will retry later", status);
}
}
private AgentConnectStatusAnswer getAgentConnectStatusAnswer(ServerAttache attache) throws IOException {
AgentConnectStatusCommand command = _agent.setupAgentConnectStatusCommand(new AgentConnectStatusCommand());
var commands = new Command[]{command};
try {
return send(attache, commands, AgentConnectStatusAnswer.class, DEFAULT_ASYNC_COMMAND_TIMEOUT_SEC);
} catch (RuntimeException e) {
String commandName = commands[0].getClass().getSimpleName();
logger.error("Failed to retrieve {}, will retry later", commandName, e);
return null;
}
}
public void sendStartupCommand(Link link, boolean connectionTransfer) throws IOException {
ServerAttache attache = (ServerAttache) link.attachment();
if (attache == null || attache.getLink() == null) {
return;
}
ServerResource serverResource = _agent.getResource();
StartupCommand[] startup = serverResource.initialize();
if (ArrayUtils.isEmpty(startup)) {
logger.warn("No startup commands returned from {}, Startup command sending skipped", serverResource.getName());
return;
}
String logId = Optional.ofNullable(ThreadContext.get("logcontextid"))
.map(String.class::cast).orElse(null);
String msHostList = _agent.getPersistentProperty("host");
// need to downcast StartupCommand[] to Command[], otherwise logger will fail to decode JSON on MS side
Command[] commands = new Command[startup.length];
for (int i = 0; i < startup.length; i++) {
StartupCommand command = startup[i];
commands[i] = command;
_agent.setupStartupCommand(command);
command.setMSHostList(msHostList);
command.setConnectionTransferred(connectionTransfer);
if (logId != null) {
command.setContextParam("logid", logId);
}
}
String commandName = commands[0].getClass().getSimpleName();
boolean needAdditionalValidation = false;
try {
logger.debug("Sending command {} to {}", commandName, link);
var answer = send(attache, commands, StartupAnswer.class, DEFAULT_ASYNC_STARTUP_COMMAND_TIMEOUT_SEC);
logger.info("Received answer for {} from {}: {}, cancelling", commandName, link, answer);
} catch (RuntimeException e) {
Throwable cause = e.getCause();
if (cause instanceof OperationTimedoutException) {
String msg = String.format("Failed to retrieve answer for the command %s, sent to %s",
commandName, link);
logger.error(msg, e);
needAdditionalValidation = true;
} else {
String msg = String.format("Failed to send command %s to %s", commandName, link);
logger.error(msg, e);
throw new IllegalStateException(msg, e);
}
}
if (needAdditionalValidation) {
AgentConnectStatusAnswer answer = getAgentConnectStatusAnswer(attache);
if (answer == null) {
logger.warn("Received empty agent connect status answer, reconnecting");
_agent.getRequestHandler().submit(() -> _agent.reconnect(link, null, connectionTransfer));
return;
}
Boolean lockAvailable = answer.isLockAvailable();
Status status = answer.getHostStatus();
if (Boolean.TRUE.equals(lockAvailable) && status != null && operationalStatuses.contains(status)) {
logger.info("Host is in operational state {} on {}", status, _link);
} else if (Boolean.FALSE.equals(lockAvailable)) {
logger.info("Host is locked and has state {} on {}", status, _link);
} else {
logger.info("Host is locked and has state {} on {}, reconnecting", status, _link);
_agent.getRequestHandler().submit(() -> _agent.reconnect(link, null, connectionTransfer));
return;
}
}
if (serverResource instanceof ResourceStatusUpdater) {
((ResourceStatusUpdater) serverResource).registerStatusUpdater(_agent);
}
}
public Agent getAgent() {
return _agent;
}
}
interface AsyncSend {
Agent getAgent();
default <T> T send(ServerAttache attache, Command[] commands, Class<T> answerType,
int asyncCommandTimeoutSec) throws IOException {
String logId = Optional.ofNullable(ThreadContext.get("logcontextid"))
.filter(String.class::isInstance)
.map(String.class::cast)
.orElse(null);
if (logId != null) {
for (Command command : commands) {
if (command.getContextParam("logid") == null) {
command.setContextParam("logid", logId);
}
}
}
Link link = attache.getLink();
String commandName = commands[0].getClass().getSimpleName();
Long id = Optional.ofNullable(getAgent().getId()).orElse(-1L);
Request request = new Request(id, -1, commands, true, false);
Answer[] answers;
try {
answers = attache.send(request, asyncCommandTimeoutSec);
} catch (OperationTimedoutException e) {
String msg = String.format("Failed to retrieve answer for command %s to %s", commandName, link);
logger.error(msg, e);
throw new RuntimeException(msg, e);
} catch (CloudException e) {
String msg = String.format("Failed to send command %s to %s", commandName, link);
logger.error(msg, e);
Throwable cause = e.getCause();
if (cause instanceof ClosedChannelException) {
throw (ClosedChannelException) cause;
}
throw new RuntimeException(msg, e);
}
if (ArrayUtils.isEmpty(answers)) {
String msg = String.format("Received empty %s response from %s", commandName, link);
logger.warn(msg);
throw new IllegalArgumentException(msg);
}
Answer answer = answers[0];
String details = answer.getDetails();
if (!answer.getResult()) {
String msg = String.format("Received unsuccessful %s response status from %s: %s", commandName, link,
details);
logger.warn(msg);
throw new IllegalArgumentException(msg);
}
String answerName = answer.getClass().getSimpleName();
if (!answerType.isInstance(answer)) {
String msg = String.format("Received unexpected %s response type %s from %s: %s", commandName,
answerName, link, details);
logger.warn(msg);
throw new IllegalArgumentException(msg);
}
return answerType.cast(answer);
}
}
}

View File

@ -54,6 +54,8 @@ public interface IAgentShell {
BackoffAlgorithm getBackoffAlgorithm();
void setBackoffAlgorithm(BackoffAlgorithm backoffAlgorithm);
int getPingRetries();
String getVersion();

View File

@ -0,0 +1,476 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.agent;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.Command;
import com.cloud.agent.transport.Request;
import com.cloud.agent.transport.Response;
import com.cloud.exception.AgentUnavailableException;
import com.cloud.exception.CloudException;
import com.cloud.exception.OperationTimedoutException;
import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.nio.Link;
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
import org.apache.cloudstack.utils.reflectiontostringbuilderutils.ReflectionToStringBuilderUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.ThreadContext;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.security.SecureRandom;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* ServerAttache provides basic server communication commands to be implemented.
*
* @author mprokopchuk
*/
public class ServerAttache {
private static final Logger logger = LogManager.getLogger(ServerAttache.class);
private static final ScheduledExecutorService s_listenerExecutor = Executors.newScheduledThreadPool(10,
new NamedThreadFactory("ListenerTimer"));
protected static Comparator<Request> s_reqComparator = (o1, o2) -> {
long seq1 = o1.getSequence();
long seq2 = o2.getSequence();
if (seq1 < seq2) {
return -1;
} else if (seq1 > seq2) {
return 1;
} else {
return 0;
}
};
protected static Comparator<Object> s_seqComparator = (o1, o2) -> {
long seq1 = ((Request) o1).getSequence();
long seq2 = (Long) o2;
if (seq1 < seq2) {
return -1;
} else if (seq1 > seq2) {
return 1;
} else {
return 0;
}
};
private static final Random s_rand = new SecureRandom();
protected String _name;
private Link _link;
protected ConcurrentHashMap<Long, ServerListener> _waitForList;
protected LinkedList<Request> _requests;
protected Long _currentSequence;
protected long _nextSequence;
protected ServerAttache(Link link) {
_name = link.getIpAddress();
_link = link;
_waitForList = new ConcurrentHashMap<>();
_requests = new LinkedList<>();
_nextSequence = Long.valueOf(s_rand.nextInt(Short.MAX_VALUE)) << 48;
}
@Override
public String toString() {
return String.format("ServerAttache %s", ReflectionToStringBuilderUtils.reflectOnlySelectedFields(this,
"_name"));
}
public synchronized long getNextSequence() {
return ++_nextSequence;
}
protected synchronized void addRequest(Request req) {
int index = findRequest(req);
assert (index < 0) : "How can we get index again? " + index + ":" + req.toString();
_requests.add(-index - 1, req);
}
protected void cancel(Request req) {
cancel(req.getSequence());
}
protected synchronized void cancel(long seq) {
logger.debug(log(seq, "Cancelling."));
ServerListener listener = _waitForList.remove(seq);
if (listener != null) {
listener.processDisconnect();
}
int index = findRequest(seq);
if (index >= 0) {
_requests.remove(index);
}
}
protected synchronized int findRequest(Request req) {
return Collections.binarySearch(_requests, req, s_reqComparator);
}
protected synchronized int findRequest(long seq) {
return Collections.binarySearch(_requests, seq, s_seqComparator);
}
protected String log(long seq, String msg) {
return "Seq " + _name + "-" + seq + ": " + msg;
}
protected void registerListener(long seq, ServerListener listener) {
if (logger.isTraceEnabled()) {
logger.trace(log(seq, "Registering listener"));
}
if (listener.getTimeout() != -1) {
s_listenerExecutor.schedule(new Alarm(seq), listener.getTimeout(), TimeUnit.SECONDS);
}
_waitForList.put(seq, listener);
}
protected ServerListener unregisterListener(long sequence) {
if (logger.isTraceEnabled()) {
logger.trace(log(sequence, "Unregistering listener"));
}
return _waitForList.remove(sequence);
}
protected ServerListener getListener(long sequence) {
return _waitForList.get(sequence);
}
public String getName() {
return _name;
}
public int getQueueSize() {
return _requests.size();
}
public boolean processAnswers(long seq, Response resp) {
resp.logD("Processing: ", true);
boolean processed = false;
Answer[] answers = resp.getAnswers();
try {
ServerListener monitor = getListener(seq);
if (monitor == null) {
if (answers[0] != null && answers[0].getResult()) {
processed = true;
}
logger.debug(log(seq, "Unable to find listener."));
} else {
processed = monitor.processAnswers(seq, answers);
logger.trace(log(seq, (processed ? "" : " did not ") + " processed "));
if (!monitor.isRecurring()) {
unregisterListener(seq);
}
}
} finally {
// we should always trigger next command execution, even in failure cases - otherwise in exception case
// all the remaining will be stuck in the sync queue forever
if (resp.executeInSequence()) {
sendNext(seq);
}
}
return processed;
}
protected void cancelAllCommands() {
for (Iterator<Map.Entry<Long, ServerListener>> it = _waitForList.entrySet().iterator(); it.hasNext(); ) {
Map.Entry<Long, ServerListener> entry = it.next();
it.remove();
ServerListener monitor = entry.getValue();
logger.debug(log(entry.getKey(), "Sending disconnect to " + monitor.getClass()));
monitor.processDisconnect();
}
}
public void cleanup() {
cancelAllCommands();
_requests.clear();
}
@Override
public boolean equals(Object obj) {
// Return false straight away.
if (obj == null) {
return false;
}
// No need to handle a ClassCastException. If the classes are different, then equals can return false
// straight ahead.
if (this.getClass() != obj.getClass()) {
return false;
}
ServerAttache that = (ServerAttache) obj;
return _name.equals(that.getName());
}
public void send(Request req, ServerListener listener) throws CloudException {
long seq = req.getSequence();
if (listener != null) {
registerListener(seq, listener);
}
synchronized (this) {
try {
if (isClosed()) {
throw new CloudException("The link to the server " + _name + " has been closed", new ClosedChannelException());
}
if (req.executeInSequence() && _currentSequence != null) {
req.logD("Waiting for Seq " + _currentSequence + " Scheduling: ", true);
addRequest(req);
return;
}
// If we got to here either we're not supposed to set the _currentSequence or it is null already.
req.logD("Sending ", true);
send(req);
if (req.executeInSequence() && _currentSequence == null) {
_currentSequence = seq;
logger.trace(log(seq, " is current sequence"));
}
} catch (CloudException e) {
logger.info(log(seq, "Unable to send due to " + e.getMessage()), e);
cancel(seq);
throw e;
} catch (Exception e) {
logger.warn(log(seq, "Unable to send due to " + e.getMessage()), e);
cancel(seq);
throw new CloudException("Problem due to other exception " + e.getMessage(), e);
}
}
}
public Answer[] send(Request req, int wait) throws CloudException {
if (req.getSequence() <= 0) {
req.setSequence(getNextSequence());
}
SynchronousListener sl = new SynchronousListener();
sl.setTimeout(wait + 5);
send(req, sl);
long seq = req.getSequence();
try {
for (int i = 0; i < 2; i++) {
Answer[] answers = null;
try {
answers = sl.waitFor(wait);
} catch (InterruptedException e) {
logger.debug(log(seq, "Interrupted"));
}
if (answers != null) {
new Response(req, answers).logD("Received: ", false);
return answers;
}
answers = sl.getAnswers(); // Try it again.
if (answers != null) {
new Response(req, answers).logD("Received after timeout: ", true);
return answers;
}
Long current = _currentSequence;
if (current != null && seq != current) {
if (logger.isDebugEnabled()) {
logger.debug(log(seq, "Waited too long."));
}
throw new OperationTimedoutException(req.getCommands(), -1, seq, wait, false);
}
logger.debug(log(seq, "Waiting some more time because this is the current command"));
}
throw new OperationTimedoutException(req.getCommands(), -1, seq, wait * 2, true);
} catch (OperationTimedoutException e) {
logger.warn(log(seq, "Timed out on " + req));
cancel(seq);
Long current = _currentSequence;
if (req.executeInSequence() && (current != null && current == seq)) {
sendNext(seq);
}
throw e;
} catch (Exception e) {
logger.warn(log(seq, "Exception while waiting for answer"), e);
cancel(seq);
Long current = _currentSequence;
if (req.executeInSequence() && (current != null && current == seq)) {
sendNext(seq);
}
throw new OperationTimedoutException(req.getCommands(), -1, seq, wait, false);
} finally {
unregisterListener(seq);
}
}
protected synchronized void sendNext(long seq) {
_currentSequence = null;
if (_requests.isEmpty()) {
if (logger.isDebugEnabled()) {
logger.debug(log(seq, "No more commands found"));
}
return;
}
Request req = _requests.pop();
if (logger.isDebugEnabled()) {
logger.debug(log(req.getSequence(), "Sending now. is current sequence."));
}
try {
send(req);
} catch (CloudException e) {
logger.debug(log(req.getSequence(), "Unable to send the next sequence"));
cancel(req.getSequence());
}
_currentSequence = req.getSequence();
}
public void process(Answer[] answers) {
//do nothing
}
/**
* sends the request asynchronously.
*
* @param req
* @throws AgentUnavailableException
*/
public synchronized void send(final Request req) throws CloudException {
try {
_link.send(req.toBytes());
} catch (ClosedChannelException e) {
throw new CloudException("Channel is closed", e);
}
}
/**
* Process disconnect.
*/
public void disconnect() {
synchronized (this) {
logger.debug("Processing Disconnect.");
if (_link != null) {
_link.close();
_link.terminated();
}
_link = null;
}
cancelAllCommands();
_requests.clear();
}
/**
* Is the agent closed for more commands?
*
* @return true if unable to reach agent or false if reachable.
*/
protected boolean isClosed() {
return _link == null;
}
public Link getLink() {
return _link;
}
public <T> T send(Long agentId, Command[] commands, Class<T> answerType, int asyncCommandTimeoutSec) throws IOException {
String logId = Optional.ofNullable(ThreadContext.get("logcontextid"))
.filter(String.class::isInstance)
.map(String.class::cast)
.orElse(null);
if (logId != null) {
for (Command command : commands) {
if (command.getContextParam("logid") == null) {
command.setContextParam("logid", logId);
}
}
}
Link link = getLink();
String commandName = commands[0].getClass().getSimpleName();
Long id = Optional.ofNullable(agentId).orElse(-1L);
Request request = new Request(id, -1, commands, true, false);
Answer[] answers;
try {
answers = send(request, asyncCommandTimeoutSec);
} catch (OperationTimedoutException e) {
String msg = String.format("Failed to retrieve answer for command %s to %s", commandName, link);
logger.error(msg, e);
throw new RuntimeException(msg, e);
} catch (CloudException e) {
String msg = String.format("Failed to send command %s to %s", commandName, link);
logger.error(msg, e);
Throwable cause = e.getCause();
if (cause instanceof ClosedChannelException) {
throw (ClosedChannelException) cause;
}
throw new RuntimeException(msg, e);
}
if (ArrayUtils.isEmpty(answers)) {
String msg = String.format("Received empty %s response from %s", commandName, link);
logger.warn(msg);
throw new IllegalArgumentException(msg);
}
Answer answer = answers[0];
String details = answer.getDetails();
if (!answer.getResult()) {
String msg = String.format("Received unsuccessful %s response status from %s: %s", commandName, link,
details);
logger.warn(msg);
throw new IllegalArgumentException(msg);
}
String answerName = answer.getClass().getSimpleName();
if (!answerType.isInstance(answer)) {
String msg = String.format("Received unexpected %s response type %s from %s: %s", commandName,
answerName, link, details);
logger.warn(msg);
throw new IllegalArgumentException(msg);
}
return answerType.cast(answer);
}
protected class Alarm extends ManagedContextRunnable {
long _seq;
public Alarm(long seq) {
_seq = seq;
}
@Override
protected void runInContext() {
try {
ServerListener listener = unregisterListener(_seq);
if (listener != null) {
cancel(_seq);
listener.processTimeout(_seq);
}
} catch (Exception e) {
ServerAttache.logger.warn("Exception ", e);
}
}
}
}

View File

@ -0,0 +1,91 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.agent;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.Command;
/**
* There are several types of events that the Agent forwards.
* <p>
* 1. Commands sent by the Agent.
* 2. Answers sent by the Management Server.
*/
public interface ServerListener {
/**
* @param seq sequence number return by the send() method.
* @param answers answers to the commands.
* @return true if processed. false if not.
*/
boolean processAnswers(long seq, Answer[] answers);
/**
* This method is called by the ServerHandler when Management Server sent
* a command to the server.
* In order to process these commands, the Server Attache Listener must be registered for commands.
*
* @param seq sequence number of the command sent.
* @param commands commands that were sent.
* @return true if you processed the commands. false if not.
*/
boolean processCommands(long seq, Command[] commands);
/**
* This method is called by ServerHandler when an agent disconnects
* from the Management Server if the listener has been registered for host events.
* <p>
* If the Listener is passed to the send() method, this method is
* also called by ServerHandler if the agent disconnected.
*/
boolean processDisconnect();
/**
* If this Listener is passed to the send() method, this method
* is called by ServerHandler after processing an answer
* from the agent.
* Returning true means you're expecting more
* answers from the agent using the same sequence number.
*
* @return true if expecting more answers using the same sequence number.
*/
boolean isRecurring();
/**
* If the Listener is passed to the send() method, this method is
* called to determine how long to wait for the reply. The value
* is in seconds. -1 indicates to wait forever. 0 indicates to
* use the default timeout. If the timeout is
* reached, processTimeout on this same Listener is called.
*
* @return timeout in seconds before processTimeout is called.
*/
default int getTimeout() {
return -1;
}
/**
* If the Listener is passed to the send() method, this method is
* called by the ServerHandler to process a command timeout.
*
* @param seq sequence number returned by send().
* @return true if processed; false if not.
*/
default boolean processTimeout(long seq) {
return false;
}
}

View File

@ -0,0 +1,102 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.agent;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.Command;
import com.cloud.utils.Profiler;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* Blocks the thread and waits for the answer.
*
* @author mprokopchuk
*/
public class SynchronousListener implements ServerListener {
private static final Logger logger = LogManager.getLogger(SynchronousListener.class);
protected Answer[] _answers;
protected boolean _disconnected;
private int _timeout;
public synchronized Answer[] getAnswers() {
return _answers;
}
@Override
public boolean isRecurring() {
return false;
}
public synchronized boolean isDisconnected() {
return _disconnected;
}
@Override
public synchronized boolean processAnswers(long seq, Answer[] resp) {
_answers = resp;
notifyAll();
return true;
}
@Override
public boolean processCommands(long seq, Command[] commands) {
return false;
}
@Override
public synchronized boolean processDisconnect() {
logger.trace("Server disconnected. Will notify waiters");
_disconnected = true;
notifyAll();
return true;
}
public synchronized Answer[] waitFor(int s) throws InterruptedException {
if (_disconnected) {
return null;
}
if (_answers != null) {
return _answers;
}
Profiler profiler = new Profiler();
profiler.start();
if (s <= 0) {
wait();
} else {
int ms = s * 1000;
wait(ms);
}
profiler.stop();
logger.trace("Synchronized command - sending completed, time: {}, answer: {}",
profiler.getDurationInMillis(), _answers != null ? _answers[0].toString() : "null");
return _answers;
}
@Override
public int getTimeout() {
return _timeout;
}
public void setTimeout(int timeout) {
_timeout = timeout;
}
}

View File

@ -888,10 +888,31 @@ public class AgentProperties{
/**
* Timeout for SSL handshake in seconds
* Data type: Integer.<br>
* Default value: <code>null</code>
* Default value: <code>30</code>
*/
public static final Property<Integer> SSL_HANDSHAKE_TIMEOUT = new Property<>("ssl.handshake.timeout", 30, Integer.class);
/**
* Timeout in seconds for asynchronous command to be sent and response received, see {@code wait} configuration key in Management Server.
* Data type: Integer.
* Default value: <code>180<code/>
*/
public static final Property<Integer> ASYNC_COMMAND_TIMEOUT_SEC = new Property<>("async.command.timeout.sec", 180, Integer.class);
/**
* Timeout in seconds for asynchronous startup command to be sent and response received, see {@code wait} configuration key in Management Server.
* Data type: Integer.
* Default value: <code>300<code/>
*/
public static final Property<Integer> ASYNC_STARTUP_COMMAND_TIMEOUT_SEC = new Property<>("async.startup.command.timeout.sec", 300, Integer.class);
/**
* Delay in seconds between host status checks on the agent side.
* Data type: Integer.
* Default value: <code>15</code>
*/
public static final Property<Integer> AGENT_HOST_STATUS_CHECK_DELAY_SEC = new Property<>("agent.host.status.check.delay.sec", 15, Integer.class);
/**
* Timeout (in seconds) to wait for the incremental snapshot to complete.
* */

View File

@ -21,6 +21,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
@ -31,6 +32,8 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.net.InetSocketAddress;
import javax.naming.ConfigurationException;
@ -67,21 +70,18 @@ public class AgentTest {
}
@Test
public void testGetLinkLogNullLinkReturnsEmptyString() {
Link link = null;
String result = agent.getLinkLog(link);
assertEquals("", result);
}
@Test
public void testGetLinkLogLinkWithTraceEnabledReturnsLinkLogWithHashCode() {
Link link = mock(Link.class);
InetSocketAddress socketAddress = new InetSocketAddress("192.168.1.100", 1111);
when(link.getSocketAddress()).thenReturn(socketAddress);
public void testGetLinkLogLinkWithTraceEnabledReturnsLinkLogWithHashCode() throws ReflectiveOperationException {
Link link = new Link(new InetSocketAddress("192.168.1.100", 8250), null);
// fix LOGGER access to inject mock
Field field = link.getClass().getDeclaredField("LOGGER");
field.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
field.set(null, logger);
when(logger.isTraceEnabled()).thenReturn(true);
String result = agent.getLinkLog(link);
System.out.println(result);
String result = link.toString();
assertTrue(result.startsWith(System.identityHashCode(link) + "-"));
assertTrue(result.contains("192.168.1.100"));
}
@ -115,7 +115,6 @@ public class AgentTest {
when(shell.getPingRetries()).thenReturn(3);
when(shell.getWorkers()).thenReturn(5);
agent.setupShutdownHookAndInitExecutors();
assertNotNull(agent.selfTaskExecutor);
assertNotNull(agent.outRequestHandler);
assertNotNull(agent.requestHandler);
}
@ -211,7 +210,10 @@ public class AgentTest {
@Test
public void testCloseAndTerminateLinkValidLinkCallsCloseAndTerminate() {
Link mockLink = mock(Link.class);
ServerAttache attache = new ServerAttache(mockLink);
when(mockLink.attachment()).thenReturn(attache);
agent.closeAndTerminateLink(mockLink);
verify(mockLink).attachment();
verify(mockLink).close();
verify(mockLink).terminated();
}
@ -219,14 +221,14 @@ public class AgentTest {
@Test
public void testStopAndCleanupConnectionConnectionIsNullDoesNothing() {
agent.connection = null;
agent.stopAndCleanupConnection(false);
agent.stopAndCleanupConnection();
}
@Test
public void testStopAndCleanupConnectionValidConnectionNoWaitStopsAndCleansUp() throws IOException {
NioConnection mockConnection = mock(NioConnection.class);
agent.connection = mockConnection;
agent.stopAndCleanupConnection(false);
agent.stopAndCleanupConnection();
verify(mockConnection).stop();
verify(mockConnection).cleanUp();
}
@ -236,9 +238,9 @@ public class AgentTest {
NioConnection mockConnection = mock(NioConnection.class);
agent.connection = mockConnection;
doThrow(new IOException("Cleanup failed")).when(mockConnection).cleanUp();
agent.stopAndCleanupConnection(false);
agent.stopAndCleanupConnection();
verify(mockConnection).stop();
verify(logger).warn(eq("Fail to clean up old connection. {}"), any(IOException.class));
verify(logger).warn(eq("Fail to clean up old connection"), isA(IOException.class));
}
@Test
@ -249,9 +251,47 @@ public class AgentTest {
agent.connection = mockConnection;
when(shell.getBackoffAlgorithm()).thenReturn(mockBackoff);
when(mockConnection.isStartup()).thenReturn(true, true, false);
agent.stopAndCleanupConnection(true);
verify(mockConnection).stop();
agent.stopAndCleanupConnection();
verify(mockConnection, times(3)).stop();
verify(mockConnection).cleanUp();
verify(mockBackoff, times(3)).waitBeforeRetry();
verify(mockBackoff, times(2)).waitBeforeRetry();
}
@Test
public void testSelectReconnectionHostWithPreferredHost() {
Link mockLink = mock(Link.class);
// No need to stub mockLink or shell since preferred host short-circuits
String result = agent.selectReconnectionHost("preferred.host.com", mockLink);
assertEquals("preferred.host.com", result);
verify(shell, times(0)).getNextHost();
}
@Test
public void testSelectReconnectionHostWithNullPreferredHostUsesLinkAddress() {
Link mockLink = mock(Link.class);
InetSocketAddress socketAddress = new InetSocketAddress("192.168.1.100", 8080);
when(mockLink.getSocketAddress()).thenReturn(socketAddress);
// No need to stub shell.getNextHost() since link address is used
String result = agent.selectReconnectionHost(null, mockLink);
assertEquals("192.168.1.100", result);
verify(shell, times(0)).getNextHost();
}
@Test
public void testSelectReconnectionHostWithNullLinkUsesShellNextHost() {
when(shell.getNextHost()).thenReturn("fallback.host.com");
String result = agent.selectReconnectionHost(null, null);
assertEquals("fallback.host.com", result);
verify(shell, times(1)).getNextHost();
}
@Test
public void testSelectReconnectionHostWithNullSocketAddressUsesShellNextHost() {
Link mockLink = mock(Link.class);
when(mockLink.getSocketAddress()).thenReturn(null);
when(shell.getNextHost()).thenReturn("fallback.host.com");
String result = agent.selectReconnectionHost(null, mockLink);
assertEquals("fallback.host.com", result);
verify(shell, times(1)).getNextHost();
}
}

View File

@ -0,0 +1,59 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.agent;
import com.cloud.exception.CloudException;
import com.cloud.utils.nio.Link;
import org.apache.logging.log4j.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.test.util.ReflectionTestUtils;
import javax.naming.ConfigurationException;
import static org.mockito.Mockito.mock;
@RunWith(MockitoJUnitRunner.class)
public class HostConnectProcessTest {
private Agent agent;
private Logger logger;
private Link link;
private ServerAttache attache;
private HostConnectProcess hostConnectProcess;
private boolean connectionTransfer;
@Before
public void setUp() throws ConfigurationException {
agent = mock(Agent.class);
logger = mock(Logger.class);
link = mock(Link.class);
attache = mock(ServerAttache.class);
hostConnectProcess = new HostConnectProcess(agent);
ReflectionTestUtils.setField(agent, "logger", logger);
}
@Test
public void testScheduleConnectProcess() throws InterruptedException, CloudException {
hostConnectProcess.scheduleConnectProcess(link, connectionTransfer);
Assert.assertTrue(hostConnectProcess.isInProgress());
}
}

View File

@ -0,0 +1,65 @@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
package com.cloud.agent.api;
import com.cloud.host.HostVO;
import com.cloud.host.Status;
import com.cloud.utils.db.GlobalLock;
/**
* Answer for {@link AgentConnectStatusCommand}.
*
* @author mprokopchuk
*/
public class AgentConnectStatusAnswer extends Answer {
/**
* {@link Boolean#TRUE} means host has {@link GlobalLock#lock(int)} acquired, otherwise {@link Boolean#FALSE},
* and null if there is an error during executing {@link AgentConnectStatusCommand}.
*/
private Boolean lockAvailable;
/**
* Represents value of {@link HostVO#getStatus()}.
*/
private Status hostStatus;
public AgentConnectStatusAnswer() {
}
public AgentConnectStatusAnswer(Command command, boolean success, String details) {
super(command, success, details);
}
public Boolean isLockAvailable() {
return lockAvailable;
}
public void setLockAvailable(Boolean lockAvailable) {
this.lockAvailable = lockAvailable;
}
public Status getHostStatus() {
return hostStatus;
}
public void setHostStatus(Status hostStatus) {
this.hostStatus = hostStatus;
}
}

View File

@ -0,0 +1,58 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.agent.api;
/**
* Command to check status of {@link StartupCommand} from the Agent.
*
* @author mprokopchuk
*/
public class AgentConnectStatusCommand extends Command {
private String hostGuid;
private String hostName;
private Long hostId;
public Long getHostId() {
return hostId;
}
public void setHostId(Long hostId) {
this.hostId = hostId;
}
public String getHostGuid() {
return hostGuid;
}
public void setHostGuid(String hostGuid) {
this.hostGuid = hostGuid;
}
public String getHostName() {
return hostName;
}
public void setHostName(String hostName) {
this.hostName = hostName;
}
@Override
public boolean executeInSequence() {
return false;
}
}

View File

@ -19,13 +19,20 @@
package com.cloud.agent.api;
import java.util.HashMap;
import java.util.Map;
public class StartupAnswer extends Answer {
long hostId;
String hostName;
String hostUuid;
int pingInterval;
Integer agentHostStatusCheckDelaySec;
private Map<String, String> params;
protected StartupAnswer() {
params = new HashMap<>();
}
public StartupAnswer(StartupCommand cmd, long hostId, String hostUuid, String hostName, int pingInterval) {
@ -34,10 +41,12 @@ public class StartupAnswer extends Answer {
this.hostUuid = hostUuid;
this.hostName = hostName;
this.pingInterval = pingInterval;
params = new HashMap<>();
}
public StartupAnswer(StartupCommand cmd, String details) {
super(cmd, false, details);
params = new HashMap<>();
}
public long getHostId() {
@ -55,4 +64,20 @@ public class StartupAnswer extends Answer {
public int getPingInterval() {
return pingInterval;
}
public Map<String, String> getParams() {
return params;
}
public void setParams(Map<String, String> params) {
this.params = params;
}
public Integer getAgentHostStatusCheckDelaySec() {
return agentHostStatusCheckDelaySec;
}
public void setAgentHostStatusCheckDelaySec(Integer agentHostStatusCheckDelaySec) {
this.agentHostStatusCheckDelaySec = agentHostStatusCheckDelaySec;
}
}

View File

@ -404,7 +404,7 @@ public class Request {
try {
_cmds = s_gson.fromJson(_content, this instanceof Response ? Answer[].class : Command[].class);
} catch (RuntimeException e) {
LOGGER.error("Unable to deserialize from json: " + _content);
LOGGER.error("Unable to deserialize from json: " + _content, e);
throw e;
}
}

View File

@ -84,7 +84,8 @@ public interface ServerResource extends Manager {
void setAgentControl(IAgentControl agentControl);
default boolean isExitOnFailures() {
return true;
// true would cause unnecessary Agent service restart, don't want it by default
return false;
}
default boolean isAppendAgentNameToLogs() {

View File

@ -0,0 +1,59 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.cloudstack.threadcontext;
import org.apache.logging.log4j.ThreadContext;
import java.util.HashMap;
import java.util.Map;
/**
* Utility class, helps to propagate {@link ThreadContext} values from parent to child threads.
*
* @author mprokopchuk
*/
public class ThreadContextUtil {
/**
* Wrap {@link Runnable} to propagate {@link ThreadContext} values.
*
* @param delegate
* @return
*/
public static Runnable wrapThreadContext(Runnable delegate) {
@SuppressWarnings("unchecked")
Map<String, String> context = ThreadContext.getContext() != null ?
new HashMap<>(ThreadContext.getContext()) : null;
return () -> {
@SuppressWarnings("unchecked")
Map<String, String> oldContext = ThreadContext.getContext() != null ?
new HashMap<>(ThreadContext.getContext()) : null;
try {
ThreadContext.clearMap();
if (context != null) {
context.forEach(ThreadContext::put);
}
delegate.run();
} finally {
ThreadContext.clearMap();
if (oldContext != null) {
oldContext.forEach(ThreadContext::put);
}
}
};
}
}

View File

@ -193,7 +193,7 @@ public abstract class AgentAttache {
if (_maintenance) {
for (final Command cmd : cmds) {
if (Arrays.binarySearch(s_commandsAllowedInMaintenanceMode, cmd.getClass().toString()) < 0 && !cmd.isBypassHostMaintenance()) {
throw new AgentUnavailableException("Unable to send " + cmd.getClass().toString() + " because agent " + _name + " is in maintenance mode", _id);
throw new AgentUnavailableException("Unable to send " + cmd.getClass() + " because agent " + _name + " is in maintenance mode", _id);
}
}
}
@ -201,7 +201,7 @@ public abstract class AgentAttache {
if (_status == Status.Connecting) {
for (final Command cmd : cmds) {
if (Arrays.binarySearch(s_commandsNotAllowedInConnectingMode, cmd.getClass().toString()) >= 0) {
throw new AgentUnavailableException("Unable to send " + cmd.getClass().toString() + " because agent " + _name + " is in connecting mode", _id);
throw new AgentUnavailableException("Unable to send " + cmd.getClass() + " because agent " + _name + " is in connecting mode", _id);
}
}
}
@ -396,13 +396,13 @@ public abstract class AgentAttache {
logger.trace(LOG_SEQ_FORMATTED_STRING, seq, " is current sequence");
}
} catch (AgentUnavailableException e) {
logger.info(LOG_SEQ_FORMATTED_STRING, seq, "Unable to send due to " + e.getMessage());
logger.info(LOG_SEQ_FORMATTED_STRING, seq, "Unable to send due to " + e.getMessage(), e);
cancel(seq);
throw e;
} catch (Exception e) {
logger.warn(LOG_SEQ_FORMATTED_STRING, seq, "Unable to send due to " + e.getMessage(), e);
cancel(seq);
throw new AgentUnavailableException("Problem due to other exception " + e.getMessage(), _id);
throw new AgentUnavailableException("Problem due to other exception " + e.getMessage(), _id, e);
}
}
}

View File

@ -121,6 +121,11 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
protected HashMap<String, SocketChannel> _peers;
protected HashMap<String, SSLEngine> _sslEngines;
private final Timer _timer = new Timer("ClusteredAgentManager Timer");
/**
* State flag to ensure Agent load-balancing performed only when Management Server started (once)
* or when Management Server goes back from {@link ManagementServerHost.State#Maintenance}
* (or {@link ManagementServerHost.State#PreparingForMaintenance}).
*/
boolean _agentLbHappened = false;
private int _mshostCounter = 0;
@ -285,8 +290,9 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
_agents.put(host.getId(), attache);
}
if (old != null) {
logger.debug("Remove stale agent attache from current management server");
removeAgent(old, Status.Removed);
logger.debug("Remove stale agent attache from current management server {}", _nodeId);
// just remove agent but do not deinitialize
removeAgent(old.getId(), attache);
}
return attache;
}
@ -350,6 +356,16 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
}
/**
* Overrides {@link Event#AgentDisconnected} logic and falls back to
* {@link AgentManagerImpl#executeUserRequest(long, Event)} for other event types ({@link Event#ShutdownRequested}).
*
* @param hostId Host Id
* @param event {@link Event}
* @return {@link Boolean#TRUE} if request is successful or have been anything done (mostly useless as it does
* not reflect resource state change)
* @throws AgentUnavailableException
*/
@Override
public boolean executeUserRequest(final long hostId, final Event event) throws AgentUnavailableException {
if (event == Event.AgentDisconnected) {
@ -369,6 +385,8 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
}
// preload value before validating attache for forward to avoid race condition in case of configuration retrieval slowness
boolean performFullDisconnectOnAgentDisconnectEventBroadcast = PerformFullDisconnectOnAgentDisconnectEventBroadcast.value();
// don't process disconnect if the disconnect came for the host via delayed cluster notification,
// but the host has already reconnected to the current management server
if (!attache.forForward()) {
@ -378,7 +396,19 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
return true;
}
return super.handleDisconnectWithoutInvestigation(attache, Event.AgentDisconnected, false, true);
// If we received an AgentDisconnected event and we are here,
// it means the current server holds a forward attachment for an already disconnected host.
// We could either run full disconnect again (if the feature flag is "true"),
// or skip that and just deregister attache.
if (performFullDisconnectOnAgentDisconnectEventBroadcast) {
logger.debug("Processing {} event for the forward attache of host [id: {}, uuid: {}, name: {}]",
Event.AgentDisconnected, hostId, attache.getUuid(), attache.getName());
return super.handleDisconnectWithoutInvestigation(attache, Event.AgentDisconnected, false, true);
} else {
logger.debug("Processing {} event (deregistering agent only) for the forward attache of host [id: {}, uuid: {}, name: {}]",
Event.AgentDisconnected, hostId, attache.getUuid(), attache.getName());
return super.handleDeregisterAttache(attache, Event.AgentDisconnected);
}
}
return true;
@ -400,7 +430,8 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
public void notifyNodesInCluster(final AgentAttache attache) {
logger.debug("Notifying other nodes of to disconnect");
// this code will send ChangeAgentCommand to all instances in the cluster
logger.debug("Notifying other nodes of to disconnect agent {} ({})", attache.getId(), attache.getName());
final Command[] cmds = new Command[]{new ChangeAgentCommand(attache.getId(), Event.AgentDisconnected)};
_clusterMgr.broadcast(attache.getId(), _gson.toJson(cmds));
}
@ -581,7 +612,8 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
AgentAttache agent = findAttache(hostId);
if (agent == null || !agent.forForward()) {
if (isHostOwnerSwitched(host)) {
logger.debug("{} has switched to another management server, need to update agent map with a forwarding agent attache", host);
logger.debug("Host {} has switched (from {}) to another management server ({}), " +
"need to update agent map with a forwarding agent attache", host, _nodeId, host.getManagementServerId());
agent = createAttache(host);
}
}
@ -754,15 +786,6 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
public void onManagementNodeIsolated() {
}
@Override
public void removeAgent(final AgentAttache attache, final Status nextState) {
if (attache == null) {
return;
}
super.removeAgent(attache, nextState);
}
@Override
public boolean executeRebalanceRequest(final long agentId, final long currentOwnerId, final long futureOwnerId, final Event event) throws AgentUnavailableException, OperationTimedoutException {
return executeRebalanceRequest(agentId, currentOwnerId, futureOwnerId, event, false);
@ -873,11 +896,13 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
transfer = _hostTransferDao.startAgentTransfering(hostId, node.getMsid(), _nodeId);
final Answer[] answer = sendRebalanceCommand(node.getMsid(), hostId, node.getMsid(), _nodeId);
if (answer == null) {
logger.warn("Failed to get host {} from management server {}", host, node);
logger.warn("Failed to get host {} from management server {} to {}", host, node, _nodeId);
result = false;
} else {
logger.debug("Succeeded to get host {} from management server {} to {}", host, node, _nodeId);
}
} catch (final Exception ex) {
logger.warn("Failed to get host {} from management server {}", host, node, ex);
logger.warn("Failed to get host {} from management server {} to {}", host, node, _nodeId, ex);
result = false;
} finally {
if (transfer != null) {
@ -910,13 +935,15 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
final Command[] cmds = commands.toCommands();
try {
logger.debug("Forwarding {} to {}", cmds[0].toString(), peer);
logger.debug("Forwarding host {} from {} to {} as part of {} - cmd: {}, peer: {}",
agentId, currentOwnerId, futureOwnerId, event.name(), cmds[0].toString(), peer);
final String peerName = Long.toString(peer);
final String cmdStr = _gson.toJson(cmds);
final String ansStr = _clusterMgr.execute(peerName, agentId, cmdStr, true);
return _gson.fromJson(ansStr, Answer[].class);
} catch (final Exception e) {
logger.warn("Caught exception while talking to {}", currentOwnerId, e);
logger.warn("Caught exception during forwarding host {} from {} to {} as part of {}",
agentId, currentOwnerId, futureOwnerId, event.name(), e);
return null;
}
}
@ -1269,7 +1296,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
logger.debug("Intercepting command for agent change: agent {} event: {}", cmd.getAgentId(), cmd.getEvent());
boolean result;
try {
result = executeAgentUserRequest(cmd.getAgentId(), cmd.getEvent());
result = executeUserRequest(cmd.getAgentId(), cmd.getEvent());
logger.debug("Result is {}", result);
} catch (final AgentUnavailableException e) {
@ -1520,7 +1547,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
@Override
public void onManagementServerCancelPreparingForMaintenance() {
logger.debug("Management server cancel preparing for maintenance");
super.onManagementServerPreparingForMaintenance();
super.onManagementServerCancelPreparingForMaintenance();
// needed for the case when Management Server in Preparing For Maintenance but didn't go to Maintenance state
// (where this variable will be reset)
@ -1550,14 +1577,6 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
}
public boolean executeAgentUserRequest(final long agentId, final Event event) throws AgentUnavailableException {
return executeUserRequest(agentId, event);
}
public boolean rebalanceAgent(final long agentId, final Event event, final long currentOwnerId, final long futureOwnerId) throws AgentUnavailableException, OperationTimedoutException {
return executeRebalanceRequest(agentId, currentOwnerId, futureOwnerId, event);
}
public boolean rebalanceAgent(final long agentId, final Event event, final long currentOwnerId, final long futureOwnerId, boolean isConnectionTransfer) throws AgentUnavailableException, OperationTimedoutException {
return executeRebalanceRequest(agentId, currentOwnerId, futureOwnerId, event, isConnectionTransfer);
}
@ -1566,6 +1585,12 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
return EnableLB.value();
}
/**
* Returns Agent Rebalancing Task.
* Runs on timer, but expects to be executed only when Management Server started (once)
* or when Management Server goes back from {@link ManagementServerHost.State#Maintenance}
* (or {@link ManagementServerHost.State#PreparingForMaintenance}).
*/
private Runnable getAgentRebalanceScanTask() {
return new ManagedContextRunnable() {
@Override

View File

@ -21,6 +21,7 @@ import com.cloud.agent.api.Answer;
import com.cloud.agent.api.ReadyCommand;
import com.cloud.agent.api.StartupCommand;
import com.cloud.agent.api.StartupRoutingCommand;
import com.cloud.exception.AgentUnavailableException;
import com.cloud.exception.ConnectionException;
import com.cloud.host.Host;
import com.cloud.host.HostVO;
@ -28,15 +29,30 @@ import com.cloud.host.Status;
import com.cloud.host.dao.HostDao;
import com.cloud.hypervisor.Hypervisor;
import com.cloud.utils.Pair;
import com.cloud.utils.nio.Link;
import org.apache.cloudstack.framework.config.ConfigKey;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Set;
import java.util.UUID;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
public class AgentManagerImplTest {
private static final Long HOST_ID = 1L;
private static final String HOST_UUID = UUID.randomUUID().toString();
private static final String HOST_NAME = "test-host-name";
private static final Link LINK = new Link(null, null);
private static final Status.Event EVENT = Status.Event.AgentDisconnected;
private HostDao hostDao;
private Listener storagePoolMonitor;
private AgentAttache attache;
@ -47,9 +63,10 @@ public class AgentManagerImplTest {
@Before
public void setUp() throws Exception {
host = new HostVO("some-Uuid");
FieldUtils.writeField(host, "id", HOST_ID, true);
host.setDataCenterId(1L);
cmds = new StartupCommand[]{new StartupRoutingCommand()};
attache = new ConnectedAgentAttache(null, 1L, "uuid", "kvm-attache", Hypervisor.HypervisorType.KVM, null, false);
attache = createClusterAttache(false);
hostDao = Mockito.mock(HostDao.class);
storagePoolMonitor = Mockito.mock(Listener.class);
@ -59,13 +76,18 @@ public class AgentManagerImplTest {
mgr._hostMonitors.add(new Pair<>(0, storagePoolMonitor));
}
private AgentAttache createClusterAttache(boolean forForward) {
Link link = forForward ? null : LINK;
return new ClusteredAgentAttache(mgr, HOST_ID, HOST_UUID, HOST_NAME, Hypervisor.HypervisorType.KVM, link, false);
}
@Test
public void testNotifyMonitorsOfConnectionNormal() throws ConnectionException {
Mockito.when(hostDao.findById(Mockito.anyLong())).thenReturn(host);
Mockito.doNothing().when(storagePoolMonitor).processConnect(Mockito.eq(host), Mockito.eq(cmds[0]), Mockito.eq(false));
when(hostDao.findById(Mockito.anyLong())).thenReturn(host);
Mockito.doNothing().when(storagePoolMonitor).processConnect(eq(host), eq(cmds[0]), eq(false));
Mockito.doReturn(true).when(mgr).handleDisconnectWithoutInvestigation(Mockito.any(attache.getClass()), Mockito.any(Status.Event.class), Mockito.anyBoolean(), Mockito.anyBoolean());
Mockito.doReturn(Mockito.mock(Answer.class)).when(mgr).easySend(Mockito.anyLong(), Mockito.any(ReadyCommand.class));
Mockito.doReturn(true).when(mgr).agentStatusTransitTo(Mockito.eq(host), Mockito.eq(Status.Event.Ready), Mockito.anyLong());
Mockito.doReturn(true).when(mgr).agentStatusTransitTo(eq(host), eq(Status.Event.Ready), Mockito.anyLong());
final AgentAttache agentAttache = mgr.notifyMonitorsOfConnection(attache, cmds, false);
Assert.assertTrue(agentAttache.isReady()); // Agent is in UP state
@ -74,8 +96,8 @@ public class AgentManagerImplTest {
@Test
public void testNotifyMonitorsOfConnectionWhenStoragePoolConnectionHostFailure() throws ConnectionException {
ConnectionException connectionException = new ConnectionException(true, "storage pool could not be connected on host");
Mockito.when(hostDao.findById(Mockito.anyLong())).thenReturn(host);
Mockito.doThrow(connectionException).when(storagePoolMonitor).processConnect(Mockito.eq(host), Mockito.eq(cmds[0]), Mockito.eq(false));
when(hostDao.findById(Mockito.anyLong())).thenReturn(host);
Mockito.doThrow(connectionException).when(storagePoolMonitor).processConnect(eq(host), eq(cmds[0]), eq(false));
Mockito.doReturn(true).when(mgr).handleDisconnectWithoutInvestigation(Mockito.any(attache.getClass()), Mockito.any(Status.Event.class), Mockito.anyBoolean(), Mockito.anyBoolean());
try {
mgr.notifyMonitorsOfConnection(attache, cmds, false);
@ -83,7 +105,7 @@ public class AgentManagerImplTest {
} catch (ConnectionException e) {
Assert.assertEquals(e.getMessage(), connectionException.getMessage());
}
Mockito.verify(mgr, Mockito.times(1)).handleDisconnectWithoutInvestigation(Mockito.any(attache.getClass()), Mockito.eq(Status.Event.AgentDisconnected), Mockito.eq(true), Mockito.eq(true));
Mockito.verify(mgr, Mockito.times(1)).handleDisconnectWithoutInvestigation(Mockito.any(attache.getClass()), eq(Status.Event.AgentDisconnected), eq(true), eq(true));
}
@Test
@ -106,6 +128,128 @@ public class AgentManagerImplTest {
Assert.assertEquals(50, result);
}
@Test
public void testAliveHostStatusesConfigKey() {
ConfigKey<String> configKey = mgr.AliveHostStatuses;
Assert.assertNotNull("Config key should not be null", configKey);
Assert.assertEquals("Config key should have correct key name", "alive.host.statuses", configKey.key());
Assert.assertEquals("Config key should have correct default value", "Up,Creating,Connecting,Rebalancing", configKey.defaultValue());
Assert.assertEquals("Config key should have correct category", "Advanced", configKey.category());
}
@Test
public void testGetConfigKeysIncludesAliveHostStatuses() {
AgentManagerImpl agentManager = new AgentManagerImpl();
ConfigKey<?>[] configKeys = agentManager.getConfigKeys();
boolean found = false;
for (ConfigKey<?> configKey : configKeys) {
if ("alive.host.statuses".equals(configKey.key())) {
found = true;
break;
}
}
Assert.assertTrue("AliveHostStatuses should be included in getConfigKeys()", found);
}
@Test(expected = AgentUnavailableException.class)
public void testHandleDeregisterAttacheInAlertState() throws Exception {
FieldUtils.writeField(host, "status", Status.Alert, true);
attache = createClusterAttache(true);
when(hostDao.findById(eq(HOST_ID))).thenReturn(host);
boolean result = mgr.handleDeregisterAttache(attache, EVENT);
Assert.assertTrue(result);
mgr.getAttache(HOST_ID);
}
@Test
public void testHandleDeregisterAttacheInAlertStateAnotherAttacheIsPresent() throws Exception {
FieldUtils.writeField(host, "status", Status.Alert, true);
attache = createClusterAttache(true);
when(hostDao.findById(eq(HOST_ID))).thenReturn(host);
AgentAttache preExistingAttache = mgr.createAttacheForConnect(host, LINK);
boolean result = mgr.handleDeregisterAttache(attache, EVENT);
Assert.assertTrue(result);
// ensure pre-existing attache remains in place
AgentAttache testAttache = mgr.getAttache(HOST_ID);
Assert.assertEquals(preExistingAttache, testAttache);
}
@Test
public void testHandleDeregisterAttacheInUpState() throws IllegalAccessException {
FieldUtils.writeField(host, "status", Status.Up, true);
attache = createClusterAttache(true);
when(hostDao.findById(eq(HOST_ID))).thenReturn(host);
boolean result = mgr.handleDeregisterAttache(attache, EVENT);
Assert.assertTrue(result);
}
@Test
public void testHandleDeregisterAttacheHostDoesNotExist() {
attache = createClusterAttache(true);
when(hostDao.findById(eq(HOST_ID))).thenReturn(null);
boolean result = mgr.handleDeregisterAttache(attache, EVENT);
Assert.assertTrue(result);
}
@Test
public void testHandleDeregisterAttacheNotForward() {
attache = createClusterAttache(false);
boolean result = mgr.handleDeregisterAttache(attache, EVENT);
Assert.assertFalse(result);
}
@Test
public void testGetAliveHostStatusesDefaultConfiguration() throws Exception {
ConfigKey<String> mockAliveHostStatuses = Mockito.mock(ConfigKey.class);
when(mockAliveHostStatuses.value()).thenReturn("Up");
FieldUtils.writeField(mgr, "AliveHostStatuses", mockAliveHostStatuses, true);
Method initializeMethod = AgentManagerImpl.class.getDeclaredMethod("initializeAliveHostStatuses");
initializeMethod.setAccessible(true);
initializeMethod.invoke(mgr);
Method getAliveHostStatusesMethod = AgentManagerImpl.class.getDeclaredMethod("getAliveHostStatuses");
getAliveHostStatusesMethod.setAccessible(true);
@SuppressWarnings("unchecked")
Set<Status> result = (Set<Status>) getAliveHostStatusesMethod.invoke(mgr);
Assert.assertEquals(1, result.size());
Assert.assertTrue(result.contains(Status.Up));
}
@Test
public void testGetAliveHostStatusesExpandedConfiguration() throws Exception {
ConfigKey<String> mockAliveHostStatuses = Mockito.mock(ConfigKey.class);
when(mockAliveHostStatuses.value()).thenReturn("Up,Alert,Connecting,Creating,Rebalancing");
FieldUtils.writeField(mgr, "AliveHostStatuses", mockAliveHostStatuses, true);
Method initializeMethod = AgentManagerImpl.class.getDeclaredMethod("initializeAliveHostStatuses");
initializeMethod.setAccessible(true);
initializeMethod.invoke(mgr);
Method getAliveHostStatusesMethod = AgentManagerImpl.class.getDeclaredMethod("getAliveHostStatuses");
getAliveHostStatusesMethod.setAccessible(true);
@SuppressWarnings("unchecked")
Set<Status> result = (Set<Status>) getAliveHostStatusesMethod.invoke(mgr);
Assert.assertEquals(5, result.size());
Assert.assertTrue(result.contains(Status.Up));
Assert.assertTrue(result.contains(Status.Alert));
Assert.assertTrue(result.contains(Status.Connecting));
Assert.assertTrue(result.contains(Status.Creating));
Assert.assertTrue(result.contains(Status.Rebalancing));
}
@Test
public void testGetHostSshPortWithHostNull() {
int hostSshPort = mgr.getHostSshPort(null);
@ -115,7 +259,7 @@ public class AgentManagerImplTest {
@Test
public void testGetHostSshPortWithNonKVMHost() {
HostVO host = Mockito.mock(HostVO.class);
Mockito.when(host.getHypervisorType()).thenReturn(Hypervisor.HypervisorType.XenServer);
when(host.getHypervisorType()).thenReturn(Hypervisor.HypervisorType.XenServer);
int hostSshPort = mgr.getHostSshPort(host);
Assert.assertEquals(22, hostSshPort);
}
@ -123,8 +267,8 @@ public class AgentManagerImplTest {
@Test
public void testGetHostSshPortWithKVMHostDefaultPort() {
HostVO host = Mockito.mock(HostVO.class);
Mockito.when(host.getHypervisorType()).thenReturn(Hypervisor.HypervisorType.KVM);
Mockito.when(host.getClusterId()).thenReturn(1L);
when(host.getHypervisorType()).thenReturn(Hypervisor.HypervisorType.KVM);
when(host.getClusterId()).thenReturn(1L);
int hostSshPort = mgr.getHostSshPort(host);
Assert.assertEquals(22, hostSshPort);
}
@ -132,8 +276,8 @@ public class AgentManagerImplTest {
@Test
public void testGetHostSshPortWithKVMHostCustomPort() {
HostVO host = Mockito.mock(HostVO.class);
Mockito.when(host.getHypervisorType()).thenReturn(Hypervisor.HypervisorType.KVM);
Mockito.when(host.getDetail(Host.HOST_SSH_PORT)).thenReturn(String.valueOf(3922));
when(host.getHypervisorType()).thenReturn(Hypervisor.HypervisorType.KVM);
when(host.getDetail(Host.HOST_SSH_PORT)).thenReturn(String.valueOf(3922));
int hostSshPort = mgr.getHostSshPort(host);
Assert.assertEquals(3922, hostSshPort);
}

View File

@ -18,16 +18,24 @@
package com.cloud.agent.manager;
import com.cloud.configuration.ManagementServiceConfiguration;
import com.cloud.exception.AgentUnavailableException;
import com.cloud.ha.HighAvailabilityManagerImpl;
import com.cloud.host.HostVO;
import com.cloud.host.Status;
import com.cloud.host.dao.HostDao;
import com.cloud.resource.ResourceManagerImpl;
import com.cloud.utils.db.GlobalLock;
import org.apache.cloudstack.framework.config.ConfigKey;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.ArrayList;
@ -35,29 +43,86 @@ import java.util.List;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class ClusteredAgentManagerImplTest {
private static final Long HOST_ID = 1L;
private static final Status.Event EVENT = Status.Event.AgentDisconnected;
private HostDao _hostDao;
@Mock
ManagementServiceConfiguration _mgmtServiceConf;
ManagementServiceConfiguration mgmtServiceConf;
@Spy
private ClusteredAgentManagerImpl mgr = new ClusteredAgentManagerImpl();
@Mock
private HostDao hostDao;
@Mock
private ConfigKey<Boolean> PerformFullDisconnectOnAgentDisconnectEventBroadcast;
private GlobalLock hostJoinLockMock;
private HostVO host;
MockedStatic<GlobalLock> globalLockMocked;
@Before
public void setUp() throws Exception {
_hostDao = mock(HostDao.class);
host = new HostVO("some-Uuid");
FieldUtils.writeField(host, "id", HOST_ID, true);
mgr._hostDao = hostDao;
mgr.PerformFullDisconnectOnAgentDisconnectEventBroadcast = PerformFullDisconnectOnAgentDisconnectEventBroadcast;
mgr.createAttache(host);
globalLockMocked = Mockito.mockStatic(GlobalLock.class);
hostJoinLockMock = mock(GlobalLock.class);
}
@After
public void tearDown() throws Exception {
globalLockMocked.close();
}
@Test
public void testFullDisconnectPerformed() throws AgentUnavailableException {
when(PerformFullDisconnectOnAgentDisconnectEventBroadcast.value()).thenReturn(Boolean.TRUE);
globalLockMocked.when(() -> GlobalLock.getInternLock(anyString())).thenReturn(hostJoinLockMock);
when(hostJoinLockMock.lock(anyInt())).thenReturn(Boolean.TRUE);
boolean result = mgr.executeUserRequest(HOST_ID, EVENT);
Assert.assertTrue(result);
GlobalLock.getInternLock(anyString());
verify(hostJoinLockMock, times(1)).lock(anyInt());
}
@Test
public void testDeregisterOnlyPerformed() throws AgentUnavailableException {
lenient().when(PerformFullDisconnectOnAgentDisconnectEventBroadcast.value()).thenReturn(Boolean.FALSE);
lenient().when(hostJoinLockMock.lock(anyInt())).thenReturn(Boolean.TRUE);
boolean result = mgr.executeUserRequest(HOST_ID, EVENT);
Assert.assertTrue(result);
GlobalLock.getInternLock(anyString());
// we do not expect any lock to be called as lock happens only in handleDisconnectWithoutInvestigation
verify(hostJoinLockMock, never()).lock(anyInt());
}
@Test
public void scanDirectAgentToLoadNoHostsTest() {
ClusteredAgentManagerImpl clusteredAgentManagerImpl = mock(ClusteredAgentManagerImpl.class);
clusteredAgentManagerImpl._hostDao = _hostDao;
clusteredAgentManagerImpl._hostDao = hostDao;
clusteredAgentManagerImpl.scanDirectAgentToLoad();
verify(clusteredAgentManagerImpl, never()).findAttache(anyLong());
verify(clusteredAgentManagerImpl, never()).loadDirectlyConnectedHost(any(), anyBoolean());
@ -68,15 +133,14 @@ public class ClusteredAgentManagerImplTest {
// Arrange
ClusteredAgentManagerImpl clusteredAgentManagerImpl = Mockito.spy(ClusteredAgentManagerImpl.class);
HostVO hostVO = mock(HostVO.class);
clusteredAgentManagerImpl._hostDao = _hostDao;
clusteredAgentManagerImpl.mgmtServiceConf = _mgmtServiceConf;
clusteredAgentManagerImpl._hostDao = hostDao;
clusteredAgentManagerImpl.mgmtServiceConf = mgmtServiceConf;
clusteredAgentManagerImpl._resourceMgr = mock(ResourceManagerImpl.class);
when(_mgmtServiceConf.getTimeout()).thenReturn(16000L);
when(mgmtServiceConf.getTimeout()).thenReturn(16000L);
when(hostVO.getId()).thenReturn(1L);
List hosts = new ArrayList<>();
List<HostVO> hosts = new ArrayList<>();
hosts.add(hostVO);
when(_hostDao.findAndUpdateDirectAgentToLoad(anyLong(), anyLong(), anyLong())).thenReturn(hosts);
AgentAttache agentAttache = mock(AgentAttache.class);
when(hostDao.findAndUpdateDirectAgentToLoad(anyLong(), anyLong(), anyLong())).thenReturn(hosts);
doReturn(Boolean.TRUE).when(clusteredAgentManagerImpl).loadDirectlyConnectedHost(hostVO, false);
clusteredAgentManagerImpl.scanDirectAgentToLoad();
verify(clusteredAgentManagerImpl).loadDirectlyConnectedHost(hostVO, false);
@ -86,13 +150,13 @@ public class ClusteredAgentManagerImplTest {
public void scanDirectAgentToLoadHostWithForwardAttacheTest() {
ClusteredAgentManagerImpl clusteredAgentManagerImpl = Mockito.spy(ClusteredAgentManagerImpl.class);
HostVO hostVO = mock(HostVO.class);
clusteredAgentManagerImpl._hostDao = _hostDao;
clusteredAgentManagerImpl.mgmtServiceConf = _mgmtServiceConf;
when(_mgmtServiceConf.getTimeout()).thenReturn(16000L);
clusteredAgentManagerImpl._hostDao = hostDao;
clusteredAgentManagerImpl.mgmtServiceConf = mgmtServiceConf;
when(mgmtServiceConf.getTimeout()).thenReturn(16000L);
when(hostVO.getId()).thenReturn(1L);
List hosts = new ArrayList<>();
List<HostVO> hosts = new ArrayList<>();
hosts.add(hostVO);
when(_hostDao.findAndUpdateDirectAgentToLoad(anyLong(), anyLong(), anyLong())).thenReturn(hosts);
when(hostDao.findAndUpdateDirectAgentToLoad(anyLong(), anyLong(), anyLong())).thenReturn(hosts);
AgentAttache agentAttache = mock(AgentAttache.class);
when(agentAttache.forForward()).thenReturn(Boolean.TRUE);
when(clusteredAgentManagerImpl.findAttache(1L)).thenReturn(agentAttache);
@ -106,14 +170,14 @@ public class ClusteredAgentManagerImplTest {
// Arrange
ClusteredAgentManagerImpl clusteredAgentManagerImpl = Mockito.spy(new ClusteredAgentManagerImpl());
HostVO hostVO = mock(HostVO.class);
clusteredAgentManagerImpl._hostDao = _hostDao;
clusteredAgentManagerImpl.mgmtServiceConf = _mgmtServiceConf;
clusteredAgentManagerImpl._hostDao = hostDao;
clusteredAgentManagerImpl.mgmtServiceConf = mgmtServiceConf;
clusteredAgentManagerImpl._haMgr = mock(HighAvailabilityManagerImpl.class);
when(_mgmtServiceConf.getTimeout()).thenReturn(16000L);
when(mgmtServiceConf.getTimeout()).thenReturn(16000L);
when(hostVO.getId()).thenReturn(0L);
List hosts = new ArrayList<>();
List<HostVO> hosts = new ArrayList<>();
hosts.add(hostVO);
when(_hostDao.findAndUpdateDirectAgentToLoad(anyLong(), anyLong(), anyLong())).thenReturn(hosts);
when(hostDao.findAndUpdateDirectAgentToLoad(anyLong(), anyLong(), anyLong())).thenReturn(hosts);
AgentAttache agentAttache = mock(AgentAttache.class);
when(agentAttache.forForward()).thenReturn(Boolean.FALSE);
@ -130,15 +194,15 @@ public class ClusteredAgentManagerImplTest {
public void scanDirectAgentToLoadHostWithNonForwardAttacheAndDisconnectedTest() {
ClusteredAgentManagerImpl clusteredAgentManagerImpl = Mockito.spy(ClusteredAgentManagerImpl.class);
HostVO hostVO = mock(HostVO.class);
clusteredAgentManagerImpl._hostDao = _hostDao;
clusteredAgentManagerImpl.mgmtServiceConf = _mgmtServiceConf;
clusteredAgentManagerImpl._hostDao = hostDao;
clusteredAgentManagerImpl.mgmtServiceConf = mgmtServiceConf;
clusteredAgentManagerImpl._haMgr = mock(HighAvailabilityManagerImpl.class);
clusteredAgentManagerImpl._resourceMgr = mock(ResourceManagerImpl.class);
when(_mgmtServiceConf.getTimeout()).thenReturn(16000L);
when(mgmtServiceConf.getTimeout()).thenReturn(16000L);
when(hostVO.getId()).thenReturn(0L);
List hosts = new ArrayList<>();
List<HostVO> hosts = new ArrayList<>();
hosts.add(hostVO);
when(_hostDao.findAndUpdateDirectAgentToLoad(anyLong(), anyLong(), anyLong())).thenReturn(hosts);
when(hostDao.findAndUpdateDirectAgentToLoad(anyLong(), anyLong(), anyLong())).thenReturn(hosts);
AgentAttache agentAttache = mock(AgentAttache.class);
when(agentAttache.forForward()).thenReturn(Boolean.FALSE);
when(clusteredAgentManagerImpl.findAttache(0L)).thenReturn(agentAttache);

View File

@ -0,0 +1,28 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
-- in cloud
DROP PROCEDURE IF EXISTS `cloud`.`IDEMPOTENT_ADD_INDEX`;
CREATE PROCEDURE `cloud`.`IDEMPOTENT_ADD_INDEX` (
IN in_index_name VARCHAR(200)
,IN in_table_name VARCHAR(200)
,IN in_index_definition VARCHAR(1000)
)
BEGIN
DECLARE CONTINUE HANDLER FOR 1061 BEGIN END; SET @ddl = CONCAT('ALTER TABLE ', in_table_name, ' ADD INDEX ', in_index_name, ' ', in_index_definition, ', ALGORITHM=INPLACE, LOCK=NONE'); PREPARE stmt FROM @ddl; EXECUTE stmt; DEALLOCATE PREPARE stmt; END;

View File

@ -131,3 +131,17 @@ CREATE TABLE IF NOT EXISTS `cloud_usage`.`quota_tariff_usage` (
-- Add the 'keep_mac_address_on_public_nic' column to the 'cloud.networks' and 'cloud.vpc' tables
CALL `cloud`.`IDEMPOTENT_ADD_COLUMN`('cloud.networks', 'keep_mac_address_on_public_nic', 'TINYINT(1) NOT NULL DEFAULT 1');
CALL `cloud`.`IDEMPOTENT_ADD_COLUMN`('cloud.vpc', 'keep_mac_address_on_public_nic', 'TINYINT(1) NOT NULL DEFAULT 1');
-- these indexes created from top 20 resource consuming DB queries
CALL `cloud`.`IDEMPOTENT_ADD_INDEX`('idx_mshost_removed_state','cloud.mshost', '(removed, state)');
CALL `cloud`.`IDEMPOTENT_ADD_INDEX`('idx_host_id_removed','cloud.host', '(id, removed)');
CALL `cloud`.`IDEMPOTENT_ADD_INDEX`('idx_networks_id_removed','cloud.networks', '(id, removed)');
CALL `cloud`.`IDEMPOTENT_ADD_INDEX`('idx_alert_type_data_center_id_archived_pod_id','cloud.alert', '(type, data_center_id, archived, pod_id)');
CALL `cloud`.`IDEMPOTENT_ADD_INDEX`('idx_configuration_name','cloud.configuration', '(name)');
CALL `cloud`.`IDEMPOTENT_ADD_INDEX`('idx_account_id_removed','cloud.account', '(id, removed)');
CALL `cloud`.`IDEMPOTENT_ADD_INDEX`('idx_async_job_id_removed','cloud.async_job', '(id, removed)');
CALL `cloud`.`IDEMPOTENT_ADD_INDEX`('idx_user_id_api_key_removed','cloud.user', '(id, api_key, removed)');
CALL `cloud`.`IDEMPOTENT_ADD_INDEX`('idx_resource_limit_type_domain_id_tag','cloud.resource_limit', '(type, domain_id, tag)');
CALL `cloud`.`IDEMPOTENT_ADD_INDEX`('idx_vm_instance_id_removed','cloud.vm_instance', '(id, removed)');
CALL `cloud`.`IDEMPOTENT_ADD_INDEX`('idx_vm_host_state_removed','cloud.vm_instance', '(host_id, state, removed)');
CALL `cloud`.`IDEMPOTENT_ADD_INDEX`('idx_vm_instance_name_removed','cloud.vm_instance', '(instance_name, removed)');

View File

@ -19,12 +19,14 @@ package com.cloud.cluster;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URLDecoder;
import java.util.Optional;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpException;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.RequestLine;
import org.apache.http.entity.BasicHttpEntity;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.HttpRequestHandler;
@ -73,10 +75,11 @@ public class ClusterServiceServletHttpHandler implements HttpRequestHandler {
@SuppressWarnings("deprecation")
private void parseRequest(HttpRequest request) throws IOException {
String body = null;
if (request instanceof HttpEntityEnclosingRequest) {
final HttpEntityEnclosingRequest entityRequest = (HttpEntityEnclosingRequest)request;
final String body = EntityUtils.toString(entityRequest.getEntity());
body = EntityUtils.toString(entityRequest.getEntity());
if (body != null) {
final String[] paramArray = body.split("&");
if (paramArray != null) {
@ -97,6 +100,7 @@ public class ClusterServiceServletHttpHandler implements HttpRequestHandler {
}
}
}
logRequest(request, body);
}
private void writeResponse(HttpResponse response, int statusCode, String content) {
@ -113,6 +117,14 @@ public class ClusterServiceServletHttpHandler implements HttpRequestHandler {
response.setEntity(body);
}
private void logRequest(HttpRequest request, String requestBody) {
Optional<HttpRequest> requestOpt = Optional.ofNullable(request);
Optional<RequestLine> requestLineOpt = requestOpt.map(HttpRequest::getRequestLine);
String method = requestLineOpt.map(RequestLine::getMethod).orElse(null);
String uri = requestLineOpt.map(RequestLine::getUri).orElse(null);
logger.debug("{} {} {}", method, uri, requestBody);
}
protected void handleRequest(HttpRequest req, HttpResponse response) {
final String method = (String)req.getParams().getParameter("method");

View File

@ -18,6 +18,7 @@ package com.cloud.cluster;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.rmi.RemoteException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
@ -98,9 +99,10 @@ public class ClusterServiceServletImpl implements ClusterService {
try {
method.setEntity(new UrlEncodedFormEntity(postParameters, HttpUtils.UTF_8));
} catch (UnsupportedEncodingException e) {
logger.error("Failed to encode request POST parameters", e);
String msg = "Failed to encode request POST parameters: " + postParameters;
logger.error(msg, e);
logPostParametersForFailedEncoding(postParameters);
throw new RemoteException("Failed to encode request POST parameters", e);
throw new RemoteException(msg, e);
}
return executePostMethod(client, method);
@ -126,9 +128,10 @@ public class ClusterServiceServletImpl implements ClusterService {
try {
method.setEntity(new UrlEncodedFormEntity(postParameters, HttpUtils.UTF_8));
} catch (UnsupportedEncodingException e) {
logger.error("Failed to encode ping request POST parameters", e);
String msg = "Failed to encode ping request POST parameters: " + postParameters;
logger.error(msg, e);
logPostParametersForFailedEncoding(postParameters);
throw new RemoteException("Failed to encode ping request POST parameters", e);
throw new RemoteException(msg, e);
}
final String returnVal = executePostMethod(client, method);
@ -137,6 +140,12 @@ public class ClusterServiceServletImpl implements ClusterService {
private String executePostMethod(final CloseableHttpClient client, final HttpPost method) {
String result = null;
String request = null;
try {
request = EntityUtils.toString(method.getEntity(), Charset.defaultCharset());
} catch (Exception e) {
logger.warn("Failed to retrieve request entity for POST {}", serviceUrl, e);
}
try {
final Profiler profiler = new Profiler();
profiler.start();
@ -146,15 +155,15 @@ public class ClusterServiceServletImpl implements ClusterService {
result = EntityUtils.toString(httpResponse.getEntity());
profiler.stop();
if (logger.isDebugEnabled()) {
logger.debug("POST " + serviceUrl + " response :" + result + ", responding time: " + profiler.getDurationInMillis() + " ms");
logger.debug("POST " + serviceUrl + " request: " + request + ", response :" + result + ", responding time: " + profiler.getDurationInMillis() + " ms");
}
} else {
profiler.stop();
logger.error("Invalid response code : " + response + ", from : " + serviceUrl + ", method : " + method.getParams().getParameter("method") + " responding time: " +
logger.error("Invalid response code : " + response + ", from : " + serviceUrl + " request: " + request + ", method : " + method.getParams().getParameter("method") + " responding time: " +
profiler.getDurationInMillis());
}
} catch (IOException e) {
logger.error("Exception from : " + serviceUrl + ", method : " + method.getParams().getParameter("method") + ", exception :", e);
logger.error("Exception from : " + serviceUrl + " request: " + request + ", method : " + method.getParams().getParameter("method") + ", exception :", e);
} finally {
method.releaseConnection();
}

View File

@ -29,6 +29,11 @@ public interface ManagementServerHostDao extends GenericDao<ManagementServerHost
@Override
boolean remove(Long id);
/**
* Returns all management servers (including down and removed).
*/
List<ManagementServerHostVO> findAllIncludingRemoved();
ManagementServerHostVO findByMsid(long msid);
int increaseAlertCount(long id);

View File

@ -64,6 +64,11 @@ public class ManagementServerHostDaoImpl extends GenericDaoBase<ManagementServer
}
}
@Override
public List<ManagementServerHostVO> findAllIncludingRemoved() {
return listIncludingRemovedBy(ActiveSearch.create());
}
@Override
public ManagementServerHostVO findByMsid(long msid) {
SearchCriteria<ManagementServerHostVO> sc = MsIdSearch.create();

View File

@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.cloudstack.framework.config;
import org.apache.commons.lang3.StringUtils;
import java.util.Arrays;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* Utility class that helps with configuration key manipulation.
*
* @author mprokopchuk
*/
public final class ConfigKeyUtil {
/**
* Split by {@code ;} with optional space symbols (space, tab, new line, etc.) before and after.
*/
private static Pattern ENTRY_SEPARATOR_PATTERN = Pattern.compile("\\s*;\\s*");
/**
* Split by {@code =} with optional space symbols (space, tab, new line, etc.) before and after.
*/
private static Pattern KEY_VALUE_SEPARATOR_PATTERN = Pattern.compile("\\s*=\\s*");
private ConfigKeyUtil() {
}
/**
* Convert configuration value of format {@code key1=value1;key2=value2;...} to {@link Map<String, String>}.
*
* @param configValue configuration value string
* @return configuration values map
*/
public static Map<String, String> toMap(String configValue) {
if (StringUtils.isEmpty(configValue)) {
return Map.of();
}
return Arrays.stream(ENTRY_SEPARATOR_PATTERN.split(configValue))
.map(pair -> KEY_VALUE_SEPARATOR_PATTERN.split(pair, 2))
.filter(keyValue -> keyValue.length == 2)
.collect(Collectors.toMap(keyValue -> keyValue[0], keyValue -> keyValue[1]));
}
}

View File

@ -51,11 +51,16 @@ public class DbUtil {
private static Map<String, Connection> s_connectionForGlobalLocks = new HashMap<String, Connection>();
/**
* @param name lock name
* @param forLock true to add lock and false to remove
* @return {@link Connection}
*/
public static Connection getConnectionForGlobalLocks(String name, boolean forLock) {
synchronized (s_connectionForGlobalLocks) {
if (forLock) {
if (s_connectionForGlobalLocks.get(name) != null) {
LOGGER.error("Sanity check failed, global lock name " + name + " is already in use");
LOGGER.error("Sanity check failed, global lock name {} is already in use", name);
assert (false);
}
@ -64,15 +69,18 @@ public class DbUtil {
try {
connection.setAutoCommit(true);
} catch (SQLException e) {
closeAutoCloseable(connection, "error closing connection for global locks");
closeAutoCloseable(connection, String.format("error closing connection for global lock %s", name));
return null;
}
LOGGER.debug("Storing connection for global lock {}", name);
s_connectionForGlobalLocks.put(name, connection);
return connection;
}
return null;
} else {
// remove connection from references map, expecting it will be disposed later (and lock removed)
Connection connection = s_connectionForGlobalLocks.get(name);
LOGGER.debug("Removing DB connection for global lock {}", name);
s_connectionForGlobalLocks.remove(name);
return connection;
}
@ -80,9 +88,7 @@ public class DbUtil {
}
public static void removeConnectionForGlobalLocks(String name) {
synchronized (s_connectionForGlobalLocks) {
s_connectionForGlobalLocks.remove(name);
}
getConnectionForGlobalLocks(name, false);
}
public static String getColumnName(Field field, AttributeOverride[] overrides) {
@ -199,7 +205,7 @@ public class DbUtil {
public static boolean getGlobalLock(String name, int timeoutSeconds) {
Connection conn = getConnectionForGlobalLocks(name, true);
if (conn == null) {
LOGGER.error("Unable to acquire DB connection for global lock system");
LOGGER.error("Unable to acquire DB connection for global lock {}", name);
return false;
}
@ -210,24 +216,54 @@ public class DbUtil {
try (ResultSet rs = pstmt.executeQuery();) {
if (rs != null && rs.first()) {
if (rs.getInt(1) > 0) {
LOGGER.debug("GET_LOCK() for global lock {} succeeded", name);
return true;
} else {
if (LOGGER.isDebugEnabled())
LOGGER.debug("GET_LOCK() timed out on lock : " + name);
LOGGER.debug("GET_LOCK() for global lock {} is timed out", name);
}
}
}
} catch (SQLException e) {
LOGGER.error("GET_LOCK() throws exception ", e);
} catch (Throwable e) {
LOGGER.error("GET_LOCK() throws exception ", e);
LOGGER.error("GET_LOCK() for global lock {} throws exception", name, e);
}
removeConnectionForGlobalLocks(name);
closeAutoCloseable(conn, "connection for global lock");
closeAutoCloseable(conn, String.format("connection for global lock %s", name));
return false;
}
public static boolean isFreeLock(String name) {
boolean result = false;
Connection conn = TransactionLegacy.getStandaloneConnection();
if (conn == null) {
LOGGER.error("Unable to acquire DB connection for IS_FREE_LOCK('{}'), returning default " +
"value {}", name, result);
return result;
}
try {
conn.setAutoCommit(true);
} catch (SQLException e) {
closeAutoCloseable(conn, String.format("error closing connection for IS_FREE_LOCK('%s'), returning " +
"default value %s", name, result));
return result;
}
try (PreparedStatement pstmt = conn.prepareStatement("SELECT COALESCE(IS_FREE_LOCK(?),0)");) {
pstmt.setString(1, name);
try (ResultSet rs = pstmt.executeQuery();) {
if (rs.next()) {
result = rs.getInt(1) > 0;
}
}
LOGGER.debug("IS_FREE_LOCK('{}') returned: {}", name, result);
} catch (Throwable e) {
LOGGER.error("IS_FREE_LOCK('{}') threw exception {}", name, e.getClass(), e);
} finally {
closeAutoCloseable(conn, String.format("connection for IS_FREE_LOCK('%s')", name));
}
return result;
}
public static Class<?> getEntityBeanType(GenericDao<?, Long> dao) {
return dao.getEntityBeanType();
}
@ -235,7 +271,7 @@ public class DbUtil {
public static boolean releaseGlobalLock(String name) {
try (Connection conn = getConnectionForGlobalLocks(name, false);) {
if (conn == null) {
LOGGER.error("Unable to acquire DB connection for global lock system");
LOGGER.error("Unable to obtain DB connection to release global lock {}", name);
assert (false);
return false;
}
@ -244,15 +280,14 @@ public class DbUtil {
pstmt.setString(1, name);
try (ResultSet rs = pstmt.executeQuery();) {
if (rs != null && rs.first()) {
LOGGER.debug("RELEASE_LOCK() for global lock {} succeeded", name);
return rs.getInt(1) > 0;
}
LOGGER.error("releaseGlobalLock:RELEASE_LOCK() returns unexpected result");
LOGGER.error("RELEASE_LOCK() for global lock {} returns unexpected result", name);
}
}
} catch (SQLException e) {
LOGGER.error("RELEASE_LOCK() throws exception ", e);
} catch (Throwable e) {
LOGGER.error("RELEASE_LOCK() throws exception ", e);
LOGGER.error("RELEASE_LOCK() for global lock {} throws exception", name, e);
}
return false;
}

View File

@ -16,11 +16,9 @@
// under the License.
package com.cloud.utils.db;
import static java.lang.String.format;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.Optional;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
@ -45,21 +43,58 @@ import com.cloud.utils.Profiler;
* </p>
*/
public class GlobalLock {
protected Logger logger = LogManager.getLogger(getClass());
protected final static Logger logger = LogManager.getLogger(GlobalLock.class);
private String name;
private int lockCount = 0;
private Thread ownerThread = null;
private int referenceCount = 0;
private long holdingStartTick = 0;
/**
* DB lock count.
* Increments on {@link GlobalLock#lock(int)} and decrements on {@link GlobalLock#unlock()}.
* Upon {@link GlobalLock#unlock()}, if {@link GlobalLock#lockCount} is less than 1, then lock removed from DB
*/
private int lockCount;
private static Map<String, GlobalLock> s_lockMap = new HashMap<String, GlobalLock>();
/**
* Internal (in-memory) lock count.
* Increments on {@link GlobalLock#addRef()} and indirectly on {@link GlobalLock#getInternLock(String)} and
* decrements on {@link GlobalLock#releaseRef()}, {@link GlobalLock#unlock()} and on {@link GlobalLock#lock(int)}
* if DB lock is unsuccessful
*/
private int referenceCount;
/**
* Thread that owns lock. If lock called from different thread, it will be waiting for the owner to unlock it
* within requested timeout. If owner thread call {@link GlobalLock#lock(int)} again, then
* {@link GlobalLock#lockCount} will be incremented.
* If {@link GlobalLock#unlock()} called by owner thread, or DB lock will be unsuccessful, then owner thread will be
* nullified.
*/
private Thread ownerThread;
/**
* Variable to hold lock duration in milliseconds. Used for information only.
*/
private long holdingStartTick;
/**
* Holds all created locks.
*/
private static Map<String, GlobalLock> s_lockMap = new HashMap<>();
/**
* Create lock.
*
* @param name lock name
*/
private GlobalLock(String name) {
this.name = name;
}
/**
* Increment reference count to lock.
*
* @return reference count
*/
public int addRef() {
synchronized (this) {
referenceCount++;
@ -67,54 +102,118 @@ public class GlobalLock {
}
}
/**
* Decrement reference count to lock.
*
* @return reference count
*/
public int releaseRef() {
int refCount;
boolean needToRemove = false;
synchronized (this) {
if (logger.isDebugEnabled()) {
logger.debug("Releasing reference for internal lock {}, reference count: {}, lock count: {}",
name, referenceCount, lockCount);
}
referenceCount--;
refCount = referenceCount;
if (referenceCount < 0)
logger.warn("Unmatched Global lock " + name + " reference usage detected, check your code!");
if (referenceCount == 0)
if (referenceCount < 0) {
logger.warn("Unmatched internal lock {} reference usage detected (reference count: {}, " +
"lock count: {}), check your code!", name, referenceCount, lockCount);
} else if (referenceCount < 1) {
needToRemove = true;
}
if (needToRemove)
releaseInternLock(name);
return refCount;
}
public static GlobalLock getInternLock(String name) {
synchronized (s_lockMap) {
if (s_lockMap.containsKey(name)) {
GlobalLock lock = s_lockMap.get(name);
lock.addRef();
return lock;
} else {
GlobalLock lock = new GlobalLock(name);
lock.addRef();
s_lockMap.put(name, lock);
return lock;
}
}
if (needToRemove) {
if (logger.isDebugEnabled()) {
logger.debug("Need to release internal lock {}", name);
}
releaseInternLock(name);
}
if (logger.isDebugEnabled()) {
logger.debug("Released reference for lock {}, reference count: {}", name, referenceCount);
}
return referenceCount;
}
public static boolean isLockAvailable(String name) {
if (logger.isDebugEnabled()) {
logger.debug("Checking lock present for {}", name);
}
boolean result = false;
try {
result = DbUtil.isFreeLock(name);
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Result of checking lock present for {}: {}", name, result);
}
}
return result;
}
/**
* Registers internal lock (in memory) object. Does not create any lock in DB yet.
*
* @param name lock name
* @return lock object
*/
public static GlobalLock getInternLock(String name) {
synchronized (s_lockMap) {
GlobalLock lock;
if (s_lockMap.containsKey(name)) {
lock = s_lockMap.get(name);
if (logger.isDebugEnabled()) {
logger.debug("Internal lock {} already exists with reference count {} and lock count {}",
name, lock.referenceCount, lock.lockCount);
}
} else {
lock = new GlobalLock(name);
if (logger.isDebugEnabled()) {
logger.debug("Internal lock {} does not exist, adding", name);
}
s_lockMap.put(name, lock);
}
lock.addRef();
if (logger.isDebugEnabled()) {
logger.debug("Added reference to internal lock {}, reference count {}, lock count {}",
name, lock.referenceCount, lock.lockCount);
}
return lock;
}
}
/**
* Unregister internal lock (in memory) object. Does not remove any lock from DB.
*
* @param name lock name
*/
private void releaseInternLock(String name) {
synchronized (s_lockMap) {
GlobalLock lock = s_lockMap.get(name);
if (lock != null) {
if (lock.referenceCount == 0)
if (lock.referenceCount == 0) {
if (logger.isDebugEnabled()) {
logger.debug("Released internal lock {}", name);
}
s_lockMap.remove(name);
} else {
if (logger.isDebugEnabled()) {
logger.debug("Not releasing internal lock {} as it has references count: {}, lock count: {}",
name, lock.referenceCount, lock.lockCount);
}
}
} else {
logger.warn("Releasing " + name + ", but it is already released.");
logger.warn("Internal lock {} already released", name);
}
}
}
/**
* Acquire or join existing DB lock.
*
* @param timeoutSeconds time in seconds during which lock needs to be obtained (it is not the lock duration)
* @return true if lock successfully obtained
*/
public boolean lock(int timeoutSeconds) {
int remainingMilliSeconds = timeoutSeconds * 1000;
Profiler profiler = new Profiler();
@ -122,50 +221,70 @@ public class GlobalLock {
try {
while (true) {
synchronized (this) {
if (ownerThread != null && ownerThread == Thread.currentThread()) {
logger.warn("Global lock re-entrance detected");
if (ownerThread == Thread.currentThread()) {
logger.warn("Global lock {} re-entrance detected, owner thread: {}, reference count: {}, " +
"lock count: {}", getThreadName(ownerThread), name, referenceCount, lockCount);
// if it is re-entrance, then we may have more lock counts than needed?
lockCount++;
if (logger.isTraceEnabled())
logger.trace("lock " + name + " is acquired, lock count :" + lockCount);
if (logger.isDebugEnabled()) {
logger.debug("Global lock {} joined, reference count: {}, lock count: {}",
name, referenceCount, lockCount);
}
return true;
}
if (ownerThread != null) {
} else if (ownerThread != null) {
profiler.start();
try {
wait((timeoutSeconds) * 1000L);
logger.debug("Waiting {} seconds to acquire global lock {}", timeoutSeconds, name);
wait(timeoutSeconds * 1000L);
} catch (InterruptedException e) {
interrupted = true;
}
profiler.stop();
remainingMilliSeconds -= profiler.getDurationInMillis();
if (remainingMilliSeconds < 0)
if (remainingMilliSeconds < 0) {
logger.warn("Timeout of {} seconds to acquire global lock {} has been reached, " +
"owner thread {}, reference count: {}, lock count: {}", timeoutSeconds, name, getThreadName(ownerThread), referenceCount, lockCount);
return false;
}
continue;
} else {
// take ownership temporarily to prevent others enter into stage of acquiring DB lock
ownerThread = Thread.currentThread();
// XXX: do we need it here (???)
addRef();
if (logger.isDebugEnabled()) {
logger.debug("Taking ownership on global lock {} to acquire, owner thread: {}, "
+ "reference count: {}, lock count: {}", name, getThreadName(ownerThread), referenceCount, lockCount);
}
}
}
if (DbUtil.getGlobalLock(name, remainingMilliSeconds / 1000)) {
int remainingSeconds = remainingMilliSeconds / 1000;
if (logger.isDebugEnabled()) {
logger.debug("Acquiring global lock {} in DB within remaining {} seconds", name, remainingSeconds);
}
if (DbUtil.getGlobalLock(name, remainingSeconds)) {
synchronized (this) {
lockCount++;
holdingStartTick = System.currentTimeMillis();
if (logger.isTraceEnabled())
logger.trace("lock " + name + " is acquired, lock count :" + lockCount);
if (logger.isDebugEnabled()) {
logger.debug("Global lock {} acquired, reference count: {}, lock count: {}",
name, referenceCount, lockCount);
}
return true;
}
} else {
synchronized (this) {
ownerThread = null;
releaseRef();
if (logger.isDebugEnabled()) {
logger.debug("Failed to acquire global lock in DB {}, reference count: {}, " +
"lock count: {}", name, referenceCount, lockCount);
}
return false;
}
}
@ -177,16 +296,31 @@ public class GlobalLock {
}
}
private String getThreadName(Thread thread) {
return Optional.ofNullable(thread).map(Thread::getName).orElse("N/A");
}
/**
* Decrements lock count, decrements lock reference if no more locks left, and remove lock if no more references
* left. Does the job only if lock owned by current thread.
*
* @return true if lock is owned by current thread
*/
public boolean unlock() {
synchronized (this) {
if (ownerThread != null && ownerThread == Thread.currentThread()) {
if (logger.isDebugEnabled()) {
logger.debug("Unlock {}", name);
}
if (ownerThread == Thread.currentThread()) {
lockCount--;
if (lockCount == 0) {
if (lockCount < 0) {
ownerThread = null;
DbUtil.releaseGlobalLock(name);
boolean result = DbUtil.releaseGlobalLock(name);
if (logger.isTraceEnabled())
logger.trace("lock " + name + " is returned to free state, total holding time :" + (System.currentTimeMillis() - holdingStartTick));
if (logger.isDebugEnabled()) {
logger.debug("Global lock {} is returned to {} state, total holding time {} seconds",
name, result ? "successful" : "unsuccessful", (System.currentTimeMillis() - holdingStartTick) / 1000);
}
holdingStartTick = 0;
// release holding position in intern map when we released the DB connection
@ -194,8 +328,9 @@ public class GlobalLock {
notifyAll();
}
if (logger.isTraceEnabled())
logger.trace("lock " + name + " is released, lock count :" + lockCount);
if (logger.isDebugEnabled()) {
logger.debug("lock {} released, lock count: {}", name, lockCount);
}
return true;
}
return false;
@ -205,36 +340,4 @@ public class GlobalLock {
public String getName() {
return name;
}
public <T> T executeWithLock(final String operationId, final int lockAcquisitionTimeout, final Callable<T> operation) throws Exception {
final GlobalLock lock = GlobalLock.getInternLock(operationId);
try {
if (!lock.lock(lockAcquisitionTimeout)) {
if (logger.isDebugEnabled()) {
logger.debug(format("Failed to acquire lock for operation id %1$s", operationId));
}
return null;
}
return operation.call();
} finally {
if (lock != null) {
lock.unlock();
}
}
}
public <T> T executeWithNoWaitLock(final String operationId, final Callable<T> operation) throws Exception {
return executeWithLock(operationId, 0, operation);
}
}

View File

@ -446,6 +446,7 @@ public class IndirectAgentLBServiceImpl extends ComponentLifecycleBase implement
break;
}
// FIXME: it is fire and forget, Management Server will never know if task failed
migrateAgentsExecutorService.submit(new MigrateAgentConnectionTask(fromMsId, hostId, dc.getId(), orderedHostIdList, avoidMsList, lbCheckInterval, lbAlgorithm, lbAlgorithmChanged));
}
@ -465,6 +466,7 @@ public class IndirectAgentLBServiceImpl extends ComponentLifecycleBase implement
logger.debug(String.format("Force shutdown migrate non-routing agents service as it did not shutdown in the desired time due to: %s", e.getMessage()));
}
// FIXME: This task can fail only if it is timed out, otherwise it is always succeeds no matter what is the migration result.
return true;
}
@ -595,7 +597,9 @@ public class IndirectAgentLBServiceImpl extends ComponentLifecycleBase implement
msList = getManagementServerList(hostId, dcId, orderedHostIdList, lbAlgorithm);
}
// ask Host to reconnect to another Management Server
final MigrateAgentConnectionCommand cmd = new MigrateAgentConnectionCommand(msList, avoidMsList, lbAlgorithm, lbCheckInterval);
// timeout 1 minute (FIXME: should it be configurable?)
cmd.setWait(60);
final Answer answer = agentManager.easySend(hostId, cmd); //may not receive answer when the agent disconnects immediately and try reconnecting to other ms host
if (answer == null) {

View File

@ -22,12 +22,15 @@ package com.cloud.utils;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.YearMonth;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.TimeZone;
import java.time.format.DateTimeFormatter;
@ -346,4 +349,34 @@ public class DateUtil {
public static int getHoursInCurrentMonth(Date date) {
return YearMonth.of(date.getYear(), date.getMonth() + 1).lengthOfMonth() * 24;
}
/**
* Formats duration in milliseconds to string {@code X min, Y sec, Z ms}.
*
* @param durationMs duration in milliseconds
* @return formatted string
*/
public static String formatMillis(long durationMs) {
Duration duration = Duration.ofMillis(durationMs);
long minutes = duration.toMinutes();
duration = duration.minusMinutes(minutes);
long seconds = duration.toSeconds();
duration = duration.minusSeconds(seconds);
long millis = duration.toMillis();
List<String> parts = new ArrayList<>();
if (minutes > 0) {
parts.add(minutes + " min");
}
if (seconds > 0) {
parts.add(seconds + " sec");
}
if (millis > 0) {
parts.add(millis + " ms");
}
if (parts.isEmpty()) {
parts.add("0 ms");
}
return String.join(", ", parts);
}
}

View File

@ -20,6 +20,8 @@
package com.cloud.utils;
import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@ -106,4 +108,29 @@ public class LogUtils {
return errorMsg;
}
}
/**
* Generates address entry for log in format of {@code IP_ADDRESS/HOST_NAME:PORT}, where {@code HOST_NAME} is
* optional if it cannot be resolved.
*
* @param address IP address or Host name
* @param port port
*/
public static String getHostLog(String address, Integer port) {
try {
InetAddress inetAddress = InetAddress.getByName(address);
String hostName = inetAddress.getHostName();
String ipAddress = inetAddress.getHostAddress();
if (port == null) {
return String.format("%s/%s", ipAddress, hostName);
}
return String.format("%s/%s:%s", ipAddress, hostName, port);
} catch (UnknownHostException e) {
LOGGER.warn("Failed to resolve name for address {}", address, e);
}
if (port == null) {
return address;
}
return String.format("%s:%s", address, port);
}
}

View File

@ -21,6 +21,8 @@ package com.cloud.utils.backoff;
import com.cloud.utils.component.Adapter;
import java.util.Map;
/**
* BackoffAlgorithm implements multiple BackoffAlgorithm.
*/
@ -35,4 +37,5 @@ public interface BackoffAlgorithm extends Adapter {
*/
void reset();
Map<String, String> getConfiguration();
}

View File

@ -0,0 +1,96 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.utils.backoff;
import com.cloud.utils.backoff.impl.ConstantTimeBackoff;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import javax.naming.ConfigurationException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
/**
* Backoff implementation factory.
*
* @author mprokopchuk
*/
public interface BackoffFactory {
Logger logger = LogManager.getLogger(BackoffFactory.class);
/**
* Property name for the implementation class (that extends {@link BackoffAlgorithm}) to be used either
* by {@code agent.properties} file or by configuration key.
*/
String BACKOFF_IMPLEMENTATION_KEY = "backoff.implementation";
/**
* Default backoff implementation class name ({@link ConstantTimeBackoff}).
*/
String DEFAULT_BACKOFF_IMPLEMENTATION = ConstantTimeBackoff.class.getName();
/**
* Creates default {@link BackoffAlgorithm} implementation object ({@link ConstantTimeBackoff}).
*
* @param properties configuration properties
* @return {@link BackoffAlgorithm} implementation object
*/
static BackoffAlgorithm createDefault(Properties properties) {
Properties newProperties = new Properties(properties);
newProperties.put(BACKOFF_IMPLEMENTATION_KEY, DEFAULT_BACKOFF_IMPLEMENTATION);
return create(newProperties);
}
/**
* Creates {@link BackoffAlgorithm} implementation object, falls back to
*
* @param properties configuration properties
* @return {@link BackoffAlgorithm} implementation object
*/
static BackoffAlgorithm create(Properties properties) {
Map<String, String> params = properties.entrySet().stream()
.collect(Collectors.toMap(e -> (String) e.getKey(), e -> (String) e.getValue()));
return create(params);
}
/**
* Creates {@link BackoffAlgorithm} implementation object.
*
* @param params configuration parameters map
* @return {@link BackoffAlgorithm} implementation object
*/
static BackoffAlgorithm create(Map<String, String> params) {
String className = params.getOrDefault(BACKOFF_IMPLEMENTATION_KEY, DEFAULT_BACKOFF_IMPLEMENTATION);
BackoffAlgorithm backoff;
try {
backoff = (BackoffAlgorithm) Class.forName(className).getDeclaredConstructor().newInstance();
backoff.configure("Configuration", new HashMap<>(params));
} catch (ReflectiveOperationException e) {
String msg = String.format("Failed to create backoff implementation for %s", className);
logger.warn(msg, e);
throw new RuntimeException(msg, e);
} catch (ConfigurationException e) {
String msg = String.format("Failed to configure backoff implementation for %s with parameters %s",
className, params);
logger.warn(msg, e);
throw new RuntimeException(msg, e);
}
return backoff;
}
}

View File

@ -20,11 +20,13 @@
package com.cloud.utils.backoff.impl;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.cloud.utils.NumbersUtil;
import com.cloud.utils.backoff.BackoffAlgorithm;
import com.cloud.utils.backoff.BackoffFactory;
import com.cloud.utils.component.AdapterBase;
/**
@ -38,7 +40,17 @@ import com.cloud.utils.component.AdapterBase;
* }
**/
public class ConstantTimeBackoff extends AdapterBase implements BackoffAlgorithm, ConstantTimeBackoffMBean {
long _time;
/**
* Property name for the delay between retries to be used either by {@code agent.properties} file or by configuration key.
*/
public static final String DELAY_SEC_CONFIG_KEY = "backoff.seconds";
/**
* Default value for the delay between retries for the property {@link ConstantTimeBackoff#DELAY_SEC_CONFIG_KEY}.
*/
public static final int DELAY_SEC_DEFAULT = 5;
private long _time;
private final Map<String, Thread> _asleep = new ConcurrentHashMap<String, Thread>();
@Override
@ -60,9 +72,17 @@ public class ConstantTimeBackoff extends AdapterBase implements BackoffAlgorithm
public void reset() {
}
@Override
public Map<String, String> getConfiguration() {
Map<String, String> configuration = new HashMap<>();
configuration.put(BackoffFactory.BACKOFF_IMPLEMENTATION_KEY, getClass().getName());
configuration.put(DELAY_SEC_CONFIG_KEY, String.valueOf(_time / 1000));
return configuration;
}
@Override
public boolean configure(String name, Map<String, Object> params) {
_time = NumbersUtil.parseLong((String)params.get("seconds"), 5) * 1000;
_time = NumbersUtil.parseLong((String) params.get(DELAY_SEC_CONFIG_KEY), DELAY_SEC_DEFAULT) * 1000;
return true;
}

View File

@ -0,0 +1,166 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.utils.backoff.impl;
import com.cloud.utils.DateUtil;
import com.cloud.utils.NumbersUtil;
import com.cloud.utils.backoff.BackoffAlgorithm;
import com.cloud.utils.backoff.BackoffFactory;
import com.cloud.utils.component.AdapterBase;
import java.security.SecureRandom;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
/**
* Exponential backoff with up/down cycling.
* Delay grows exponentially until a maximum, then decreases back to base, then repeats.
*
* @author mprokopchuk
*/
public class ExponentialWithJitterBackoff extends AdapterBase implements BackoffAlgorithm,
ExponentialWithJitterBackoffMBean {
/**
* Property name for the minimal delay to be used either by {@code agent.properties} file or by configuration key.
*/
public static final String MIN_DELAY_MS_CONFIG_KEY = "backoff.min_delay_ms";
/**
* Property name for the maximal delay to be used either by {@code agent.properties} file or by configuration key.
*/
public static final String MAX_DELAY_MS_CONFIG_KEY = "backoff.max_delay_ms";
/**
* Default value for minimal delay for the property {@link ExponentialWithJitterBackoff#MIN_DELAY_MS_DEFAULT}.
*/
public static final int MIN_DELAY_MS_DEFAULT = 5_000;
/**
* Default value for maximal delay for the property {@link ExponentialWithJitterBackoff#MAX_DELAY_MS_DEFAULT}.
*/
public static final int MAX_DELAY_MS_DEFAULT = 15_000;
private final Map<String, Thread> asleep = new ConcurrentHashMap<>();
private final Random random = new SecureRandom();
private int minDelayMs;
private int maxDelayMs;
private int maxAttempts;
private int attemptNumber;
private boolean increasing;
@Override
public void waitBeforeRetry() {
boolean interrupted = false;
long waitMs = getTimeToWait();
Thread current = Thread.currentThread();
try {
asleep.put(current.getName(), current);
logger.debug(String.format("Going to sleep for %s", DateUtil.formatMillis(waitMs)));
Thread.sleep(waitMs);
logger.debug(String.format("Sleep done for %s", DateUtil.formatMillis(waitMs)));
} catch (InterruptedException e) {
logger.info(String.format("Thread %s interrupted while waiting for retry", current.getName()), e);
} finally {
asleep.remove(current.getName());
calculateNextAttempt();
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
/**
* Calculates next attempt and direction.
*/
private void calculateNextAttempt() {
if (increasing) {
int nextAttemptNumber = attemptNumber + 1;
increasing = getNextDelay() <= maxDelayMs && nextAttemptNumber <= maxAttempts;
if (increasing) {
attemptNumber = nextAttemptNumber;
}
} else {
int nextAttemptNumber = Math.max(attemptNumber - 1, 0);
increasing = nextAttemptNumber == 0;
if (!increasing) {
attemptNumber = nextAttemptNumber;
}
}
}
@Override
public void reset() {
attemptNumber = 0;
}
@Override
public Map<String, String> getConfiguration() {
Map<String, String> configuration = new HashMap<>();
configuration.put(MIN_DELAY_MS_CONFIG_KEY, String.valueOf(minDelayMs));
configuration.put(MAX_DELAY_MS_CONFIG_KEY, String.valueOf(maxDelayMs));
configuration.put(BackoffFactory.BACKOFF_IMPLEMENTATION_KEY, this.getClass().getName());
return configuration;
}
@Override
public boolean configure(String name, Map<String, Object> params) {
minDelayMs = NumbersUtil.parseInt((String) params.get(MIN_DELAY_MS_CONFIG_KEY), MIN_DELAY_MS_DEFAULT);
maxDelayMs = NumbersUtil.parseInt((String) params.get(MAX_DELAY_MS_CONFIG_KEY), MAX_DELAY_MS_DEFAULT);
maxAttempts = (int) Math.round(Math.log((double) maxDelayMs / minDelayMs) / Math.log(2));
attemptNumber = random.nextInt(maxAttempts + 1);
increasing = random.nextBoolean();
// do nothing
return true;
}
@Override
public Collection<String> getWaiters() {
return asleep.keySet();
}
@Override
public boolean wakeup(String threadName) {
Thread th = asleep.get(threadName);
if (th != null) {
th.interrupt();
return true;
}
return false;
}
private long getNextDelay() {
return (long) Math.min(minDelayMs * Math.pow(2, attemptNumber), maxDelayMs);
}
@Override
public long getTimeToWait() {
long delay = getNextDelay();
int jitter = random.nextInt((int) delay / 2);
return delay + jitter;
}
@Override
public void setTimeToWait(long seconds) {
// ignore
}
}

View File

@ -0,0 +1,32 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.utils.backoff.impl;
import com.cloud.utils.mgmt.ManagementBean;
import java.util.Collection;
public interface ExponentialWithJitterBackoffMBean extends ManagementBean {
long getTimeToWait();
void setTimeToWait(long seconds);
Collection<String> getWaiters();
boolean wakeup(String threadName);
}

View File

@ -69,6 +69,7 @@ public class CSExceptionErrorCode {
ExceptionErrorCodeMap.put("com.cloud.exception.StorageUnavailableException", 4385);
ExceptionErrorCodeMap.put("com.cloud.exception.UnsupportedServiceException", 4390);
ExceptionErrorCodeMap.put("com.cloud.exception.VirtualMachineMigrationException", 4395);
ExceptionErrorCodeMap.put("com.cloud.exception.ConnectionException", 4435);
ExceptionErrorCodeMap.put("com.cloud.async.AsyncCommandQueued", 4540);
ExceptionErrorCodeMap.put("com.cloud.exception.RequestLimitException", 4545);
ExceptionErrorCodeMap.put("com.cloud.exception.StorageConflictException", 4550);

View File

@ -178,6 +178,9 @@ public class NetUtils {
}
public static String resolveToIp(final String host) {
if (host == null) {
return null;
}
try {
final InetAddress addr = InetAddress.getByName(host);
return addr.getHostAddress();

View File

@ -19,7 +19,7 @@
package com.cloud.utils.nio;
import java.net.SocketAddress;
import java.net.InetSocketAddress;
/**
* WorkerFactory creates and selects workers.
@ -32,6 +32,6 @@ public interface HandlerFactory {
default int getNewConnectionsCount() {
return 0;
}
default void registerNewConnection(SocketAddress address) {}
default void unregisterNewConnection(SocketAddress address) {}
default void registerNewConnection(InetSocketAddress address) {}
default void unregisterNewConnection(InetSocketAddress address) {}
}

View File

@ -31,6 +31,7 @@ import java.nio.channels.SocketChannel;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.security.SecureRandom;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@ -63,6 +64,7 @@ public class Link {
private final InetSocketAddress _addr;
private final NioConnection _connection;
private SelectionKey _key;
private Integer _localPort;
private final ConcurrentLinkedQueue<ByteBuffer[]> _writeQueue;
private ByteBuffer _readBuffer;
private ByteBuffer _plaintextBuffer;
@ -76,11 +78,8 @@ public class Link {
_addr = addr;
_connection = connection;
_readBuffer = ByteBuffer.allocate(2048);
_attach = null;
_key = null;
_writeQueue = new ConcurrentLinkedQueue<ByteBuffer[]>();
_readHeader = true;
_gotFollowingPacket = false;
}
public Link(Link link) {
@ -98,6 +97,26 @@ public class Link {
public void setKey(SelectionKey key) {
synchronized (this) {
_key = key;
try {
_localPort = Optional.ofNullable(_key)
.map(SelectionKey::channel)
.filter(SocketChannel.class::isInstance)
.map(SocketChannel.class::cast)
.map(channel -> {
try {
return channel.getLocalAddress();
} catch (IOException e) {
return null;
}
})
.filter(InetSocketAddress.class::isInstance)
.map(InetSocketAddress.class::cast)
.map(InetSocketAddress::getPort)
.orElse(null);
} catch (Exception e) {
// do nothing
}
}
}
@ -316,6 +335,9 @@ public class Link {
}
synchronized (this) {
if (_key == null) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("SelectionKey is null for {}", _addr);
}
throw new ClosedChannelException();
}
_connection.change(SelectionKey.OP_WRITE, _key, null);
@ -354,16 +376,28 @@ public class Link {
return _addr;
}
public Integer getLocalPort() {
return _localPort;
}
public String getIpAddress() {
return _addr.getAddress().toString();
}
public synchronized void terminated() {
if (LOGGER.isTraceEnabled()) {
LOGGER.debug("Terminating connection to {}", _addr);
}
_key = null;
}
public boolean isTerminated() {
return _key == null;
}
public synchronized void schedule(Task task) throws ClosedChannelException {
if (_key == null) {
LOGGER.warn("Failed to schedule task as SelectionKey is null for {}", _addr);
throw new ClosedChannelException();
}
_connection.scheduleTask(task);
@ -504,8 +538,8 @@ public class Link {
result = sslEngine.unwrap(peerNetData, peerAppData);
peerNetData.compact();
} catch (final SSLException sslException) {
LOGGER.error(String.format("SSL error caught during unwrap data: %s, for local address=%s, remote address=%s. The client may have invalid ca-certificates.",
sslException.getMessage(), socketChannel.getLocalAddress(), socketChannel.getRemoteAddress()));
LOGGER.error("SSL error caught during unwrap data: {}, for local address={}, remote address={}. The client may have invalid ca-certificates.",
sslException.getMessage(), socketChannel.getLocalAddress(), socketChannel.getRemoteAddress(), sslException);
sslEngine.closeOutbound();
return new HandshakeHolder(peerAppData, peerNetData, false);
}
@ -683,4 +717,16 @@ public class Link {
}
}
public String toString() {
StringBuilder str = new StringBuilder();
if (LOGGER.isTraceEnabled()) {
str.append(System.identityHashCode(this)).append("-");
}
Integer localPort = getLocalPort();
if (localPort != null) {
str.append(localPort).append("-> ");
}
str.append(getSocketAddress());
return str.toString();
}
}

View File

@ -21,16 +21,27 @@ package com.cloud.utils.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.security.GeneralSecurityException;
import java.util.Arrays;
import java.util.Optional;
import java.util.Set;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import com.cloud.utils.LogUtils;
import org.apache.cloudstack.utils.security.SSLUtils;
/**
* NIO Client for the {@code com.cloud.agent.Agent}.
*/
public class NioClient extends NioConnection {
protected String host;
@ -43,57 +54,65 @@ public class NioClient extends NioConnection {
this.host = host;
}
protected void closeChannel() {
try {
if (clientConnection != null && clientConnection.isOpen()) {
clientConnection.close();
}
} catch (IOException e) {
logger.error("Failed to close SocketChannel", e);
}
}
@Override
protected void init() throws IOException {
Task task;
String hostLog = host + ":" + _port;
String hostLog = LogUtils.getHostLog(host, _port);
try {
logger.info("Connecting to {}", hostLog);
_selector = Selector.open();
clientConnection = SocketChannel.open();
clientConnection.socket().setKeepAlive(true);
final InetSocketAddress serverAddress = new InetSocketAddress(host, _port);
clientConnection.connect(serverAddress);
logger.info("Connected to {}", hostLog);
boolean isConnected = clientConnection.connect(serverAddress);
logger.info("Connected to {}: {}", hostLog, isConnected);
clientConnection.configureBlocking(false);
logger.debug("Initializing client SSL context");
final SSLContext sslContext = Link.initClientSSLContext();
logger.debug("Initialized client SSL context");
logger.debug("Creating SSL Engine for {}", hostLog);
SSLEngine sslEngine = sslContext.createSSLEngine(host, _port);
sslEngine.setUseClientMode(true);
sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols()));
logger.debug("Created SSL Engine for {}", hostLog);
String[] enabledProtocols = SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols());
logger.debug("Enabled SSL Engine protocols for {}: {}", hostLog, Arrays.asList(enabledProtocols));
sslEngine.setEnabledProtocols(enabledProtocols);
Integer sshHandshakeTimeout = getSslHandshakeTimeout();
logger.debug("Begin SSL Handshake for {} with timeout {}s (default 30s)", hostLog, sshHandshakeTimeout);
sslEngine.beginHandshake();
if (!Link.doHandshake(clientConnection, sslEngine, getSslHandshakeTimeout())) {
if (!Link.doHandshake(clientConnection, sslEngine, sshHandshakeTimeout)) {
throw new IOException(String.format("SSL Handshake failed while connecting to host: %s", hostLog));
}
logger.info("SSL: Handshake done");
logger.info("SSL Handshake done for {}", hostLog);
final Link link = new Link(serverAddress, this);
link.setSSLEngine(sslEngine);
final SelectionKey key = clientConnection.register(_selector, SelectionKey.OP_READ);
link.setKey(key);
key.attach(link);
// Notice we've already connected due to the handshake, so let's get the
// remaining task done
logger.info("Creating task {} for {}", Task.Type.CONNECT, hostLog);
// Notice we've already connected due to the handshake, so let's get the remaining task done
task = _factory.create(Task.Type.CONNECT, link, null);
} catch (final GeneralSecurityException e) {
closeChannel();
throw new IOException("Failed to initialise security", e);
} catch (final IOException e) {
closeChannel();
logger.error("IOException while connecting to {}", hostLog, e);
logger.info("Created task {} for {}", Task.Type.CONNECT, hostLog);
} catch (GeneralSecurityException e) {
cleanUp();
throw new IOException(String.format("Exception while connecting to %s", hostLog), e);
} catch (Exception e) {
cleanUp();
throw e;
}
if (task != null) {
logger.info("Submit task {} for {}", task.getType(), hostLog);
_executor.submit(task);
logger.info("Submitted task {} for {}", task.getType(), hostLog);
} else {
logger.info("Task is null, nothing to submit for {}", hostLog);
}
}
@ -109,11 +128,85 @@ public class NioClient extends NioConnection {
@Override
public void cleanUp() throws IOException {
super.cleanUp();
if (clientConnection != null && clientConnection.isOpen()) {
clientConnection.close();
}
logger.info("NioClient connection closed");
Optional<Selector> selectorOptional = Optional.ofNullable(_selector);
selectorOptional
.filter(Selector::isOpen)
.map(selector -> {
Set<SelectionKey> keys;
try {
logger.trace("Getting keys from Selector");
keys = selector.keys();
logger.trace("Got {} keys from Selector", keys.size());
} catch (ClosedSelectorException e) {
logger.trace("Failed to get keys from Selector", e);
keys = Set.of();
}
return keys;
})
.orElseGet(Set::of)
.forEach(key -> {
try {
if (key.isValid()) {
logger.trace("Cancelling SelectionKey");
key.cancel();
logger.trace("Cancelled SelectionKey");
} else {
logger.trace("SelectionKey already cancelled");
}
} catch (CancelledKeyException e) {
logger.trace("Failed to cancel SelectionKey", e);
}
Optional.ofNullable(key.channel())
.filter(SelectableChannel::isOpen)
.ifPresent(channel -> {
try {
logger.trace("Closing SelectableChannel");
channel.close();
logger.trace("Closed SelectableChannel");
} catch (IOException e) {
logger.trace("Failed to close SelectableChannel", e);
}
});
});
selectorOptional.ifPresent(selector -> {
try {
logger.trace("Closing Selector");
selector.close();
logger.trace("Closed Selector");
} catch (IOException e) {
logger.trace("Failed to close Selector", e);
}
});
// socket channel should be closed here already, but just in case
Optional.ofNullable(clientConnection)
.filter(SocketChannel::isOpen)
.ifPresent(socketChannel -> {
SocketAddress address;
try {
address = socketChannel.getRemoteAddress();
} catch (IOException e) {
logger.trace("Failed to get SocketAddress from SocketChannel", e);
address = null;
}
try {
socketChannel.shutdownOutput();
} catch (IOException e) {
logger.trace("Failed to shutdown output in SocketChannel", e);
}
try {
socketChannel.shutdownInput();
} catch (IOException e) {
logger.trace("Failed to shutdown input in SocketChannel", e);
}
try {
socketChannel.close();
} catch (IOException e) {
logger.trace("Failed to close SocketChannel", e);
}
logger.info("NioClient connection to {} closed", address);
});
}
public String getHost() {

View File

@ -36,6 +36,7 @@ import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
@ -70,8 +71,8 @@ public abstract class NioConnection implements Callable<Boolean> {
protected ExecutorService _threadExecutor;
protected Future<Boolean> _futureTask;
protected boolean _isRunning;
protected boolean _isStartup;
protected volatile boolean _isRunning;
protected volatile boolean _isStartup;
protected int _port;
protected int _workers;
protected List<ChangeRequest> _todos;
@ -80,66 +81,72 @@ public abstract class NioConnection implements Callable<Boolean> {
protected ExecutorService _executor;
protected ExecutorService _sslHandshakeExecutor;
protected CAService caService;
protected Set<SocketChannel> socketChannels = ConcurrentHashMap.newKeySet();
protected Integer sslHandshakeTimeout = null;
protected Set<SocketChannel> socketChannels;
protected Integer sslHandshakeTimeout;
private final int factoryMaxNewConnectionsCount;
protected boolean blockNewConnections;
protected volatile boolean blockNewConnections;
public NioConnection(final String name, final int port, final int workers, final HandlerFactory factory) {
socketChannels = ConcurrentHashMap.newKeySet();
_name = name;
_isRunning = false;
blockNewConnections = false;
_selector = null;
_port = port;
_workers = workers;
_factory = factory;
this.factoryMaxNewConnectionsCount = factory.getMaxConcurrentNewConnectionsCount();
initWorkersExecutor();
initSSLHandshakeExecutor();
}
public void setCAService(final CAService caService) {
public void setCAService(CAService caService) {
this.caService = caService;
}
public void start() throws NioConnectionException {
_todos = new ArrayList<>();
if (_executor == null || _executor.isShutdown() || _executor.isTerminated()) {
initWorkersExecutor();
}
if (_sslHandshakeExecutor == null || _sslHandshakeExecutor.isShutdown() || _sslHandshakeExecutor.isTerminated()) {
initSSLHandshakeExecutor();
}
try {
init();
} catch (final ConnectException e) {
logger.warn("Unable to connect to remote: is there a server running on port {}?", _port, e);
throw new NioConnectionException(e.getMessage(), e);
} catch (final IOException e) {
logger.error("Unable to initialize the threads.", e);
throw new NioConnectionException(e.getMessage(), e);
String msg = String.format("Unable to connect to remote: is there a server running on port %s: %s", _port, e.getMessage());
throw new NioConnectionException(msg, e);
} catch (final Exception e) {
logger.error("Unable to initialize the threads due to unknown exception.", e);
throw new NioConnectionException(e.getMessage(), e);
String msg = String.format("Unable to initialize the threads: %s", e.getMessage());
throw new NioConnectionException(msg, e);
}
_isStartup = true;
if (_executor.isShutdown()) {
initWorkersExecutor();
}
if (_sslHandshakeExecutor.isShutdown()) {
initSSLHandshakeExecutor();
}
_threadExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory(this._name + "-NioConnectionHandler"));
_isRunning = true;
// in case start() called after stop()
blockNewConnections = false;
_futureTask = _threadExecutor.submit(this);
}
public void stop() {
_executor.shutdown();
_sslHandshakeExecutor.shutdown();
_isRunning = false;
blockNewConnections = true;
if (_executor != null) {
if (logger.isTraceEnabled()) {
logger.trace("Shutting down handler tasks");
}
_executor.shutdownNow();
}
if (_sslHandshakeExecutor != null) {
if (logger.isTraceEnabled()) {
logger.trace("Shutting down SSL Handshake executor");
}
_sslHandshakeExecutor.shutdownNow();
}
if (_threadExecutor != null) {
_futureTask.cancel(false);
_threadExecutor.shutdown();
_threadExecutor.shutdownNow();
}
_isRunning = false;
blockNewConnections = true;
}
private void initWorkersExecutor() {
@ -174,44 +181,46 @@ public abstract class NioConnection implements Callable<Boolean> {
_selector.select(50);
// Someone is ready for I/O, get the ready keys
final Set<SelectionKey> readyKeys = _selector.selectedKeys();
final Iterator<SelectionKey> i = readyKeys.iterator();
final Set<SelectionKey> keys = _selector.selectedKeys();
logger.trace("Keys Processing: {}", readyKeys.size());
// Walk through the ready keys collection.
while (i.hasNext()) {
final SelectionKey sk = i.next();
i.remove();
for (final Iterator<SelectionKey> it = keys.iterator(); it.hasNext();) {
final SelectionKey key = it.next();
it.remove();
if (!sk.isValid()) {
logger.trace("Selection Key is invalid: {}", sk);
final Link link = (Link)sk.attachment();
// isValid() means key is not cancelled and channel is not closed
if (!key.isValid()) {
logger.trace("Selection Key is invalid: {}", key);
final Link link = (Link)key.attachment();
if (link != null) {
link.terminated();
} else {
closeConnection(sk);
closeConnection(key);
}
} else if (sk.isReadable()) {
read(sk);
} else if (sk.isWritable()) {
write(sk);
} else if (sk.isAcceptable()) {
accept(sk);
} else if (sk.isConnectable()) {
connect(sk);
} else if (key.isReadable()) {
read(key);
} else if (key.isWritable()) {
write(key);
// this is for NIO Server only to accept new connection
} else if (key.isAcceptable()) {
accept(key);
// this is for NIO Client only to finish connect
} else if (key.isConnectable()) {
connect(key);
}
}
logger.trace("Keys Done Processing.");
processTodos();
// and we cannot use existing selector anymore
} catch (final ClosedSelectorException e) {
/*
* Exception occurred when calling java.nio.channels.Selector.selectedKeys() method. It means the connection has not yet been established. Let's continue trying
* Exception occurred when calling java.nio.channels.Selector.selectedKeys() method.
* It means the connection has not yet been established. Let's continue trying.
* We do not log it here otherwise we will fill the disk with messages.
*/
} catch (final IOException e) {
logger.error("Agent will die due to this IOException!", e);
} catch (final Exception e) {
_isStartup = false;
logger.error("Agent will die due to this Exception", e);
throw new NioConnectionException(e.getMessage(), e);
}
}
@ -229,7 +238,18 @@ public abstract class NioConnection implements Callable<Boolean> {
if (!blockNewConnections) {
return false;
}
logger.warn("Rejecting new connection as the server is blocked from accepting new connections");
try {
socketChannel.shutdownOutput();
} catch (IOException ignore) {
// ignore exception
}
try {
socketChannel.shutdownInput();
} catch (IOException ignore) {
// ignore exception
}
logger.warn("Rejecting new connection from {} as the server is blocked from accepting new connections", socketChannel.getRemoteAddress());
socketChannel.close();
_selector.wakeup();
return true;
@ -239,9 +259,19 @@ public abstract class NioConnection implements Callable<Boolean> {
if (factoryMaxNewConnectionsCount <= 0 || _factory.getNewConnectionsCount() < factoryMaxNewConnectionsCount) {
return false;
}
try {
socketChannel.shutdownOutput();
} catch (IOException ignore) {
// ignore exception
}
try {
socketChannel.shutdownInput();
} catch (IOException ignore) {
// ignore exception
}
// Reject new connection if the server is busy
logger.warn("{} Rejecting new connection. {} active connections currently",
SERVER_BUSY_MESSAGE, factoryMaxNewConnectionsCount);
logger.warn("{} Rejecting new connection from {}. {} active connections currently",
SERVER_BUSY_MESSAGE, socketChannel.getRemoteAddress(), factoryMaxNewConnectionsCount);
socketChannel.close();
_selector.wakeup();
return true;
@ -264,9 +294,12 @@ public abstract class NioConnection implements Callable<Boolean> {
final NioConnection nioConnection = this;
_sslHandshakeExecutor.submit(() -> {
final InetSocketAddress socketAddress = (InetSocketAddress)socket.getRemoteSocketAddress();
_factory.registerNewConnection(socketAddress);
_selector.wakeup();
try {
if (logger.isTraceEnabled()) {
logger.trace("Registering new connection for {}", socketAddress.getAddress());
}
_factory.registerNewConnection(socketAddress);
_selector.wakeup();
final SSLEngine sslEngine = Link.initServerSSLEngine(caService, socketChannel.getRemoteAddress().toString());
sslEngine.setUseClientMode(false);
sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols()));
@ -274,20 +307,26 @@ public abstract class NioConnection implements Callable<Boolean> {
if (!Link.doHandshake(socketChannel, sslEngine, getSslHandshakeTimeout())) {
throw new IOException("SSL handshake timed out with " + socketAddress);
}
logger.trace("SSL: Handshake done");
logger.trace("SSL Handshake done for {}", socketAddress);
final Link link = new Link(socketAddress, nioConnection);
link.setSSLEngine(sslEngine);
link.setKey(socketChannel.register(key.selector(), SelectionKey.OP_READ, link));
final Task task = _factory.create(Task.Type.CONNECT, link, null);
registerLink(socketAddress, link);
_executor.submit(task);
} catch (final GeneralSecurityException | IOException e) {
} catch (final GeneralSecurityException | IOException | RuntimeException e) {
_factory.unregisterNewConnection(socketAddress);
logger.trace("Connection closed with {} due to failure: {}", socket.getRemoteSocketAddress(), e.getMessage());
logger.trace("Connection closed with {} due to failure: {}", socket.getRemoteSocketAddress(), e.getMessage(), e);
closeAutoCloseable(socket, "accepting socket");
closeAutoCloseable(socketChannel, "accepting socketChannel");
} finally {
_selector.wakeup();
try {
_selector.wakeup();
} catch (final Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to wakeup selector for {}", socketAddress, e);
}
}
}
});
} catch (final RejectedExecutionException e) {
@ -295,15 +334,21 @@ public abstract class NioConnection implements Callable<Boolean> {
closeAutoCloseable(socket, "Rejecting connection - accepting socket");
closeAutoCloseable(socketChannel, "Rejecting connection - accepting socketChannel");
} finally {
_selector.wakeup();
try {
_selector.wakeup();
} catch (final Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to wakeup selector for {}", socket, e);
}
}
}
}
protected void terminate(final SelectionKey key, String msg) {
protected void terminate(final SelectionKey key, final Exception cause) {
final Link link = (Link)key.attachment();
closeConnection(key);
if (link != null) {
logger.trace("Will terminate connection due to: {}", msg);
logger.trace("Will terminate connection to {} due to: {}", link.getSocketAddress(), cause.getLocalizedMessage(), cause);
link.terminated();
final Task task = _factory.create(Task.Type.DISCONNECT, link, null);
unregisterLink(link.getSocketAddress());
@ -311,7 +356,7 @@ public abstract class NioConnection implements Callable<Boolean> {
try {
_executor.submit(task);
} catch (final Exception e) {
logger.warn("Exception occurred when submitting the task", e);
logger.warn("Exception occurred when submitting the task for {}", link.getSocketAddress(), e);
}
}
}
@ -320,7 +365,7 @@ public abstract class NioConnection implements Callable<Boolean> {
final Link link = (Link)key.attachment();
try {
final SocketChannel socketChannel = (SocketChannel)key.channel();
logger.trace("Reading from: {}", socketChannel.socket().toString());
logger.trace("Reading from: {}", socketChannel.socket());
final byte[] data = link.read(socketChannel);
if (data == null) {
logger.trace("Packet is incomplete. Waiting for more.");
@ -331,11 +376,11 @@ public abstract class NioConnection implements Callable<Boolean> {
try {
_executor.submit(task);
} catch (final Exception e) {
logger.warn("Exception occurred when submitting the task", e);
logger.warn("Exception occurred when submitting the task for {}", link.getSocketAddress(), e);
}
} catch (final Exception e) {
logDebug(e, key, 1);
terminate(key, e.getMessage());
terminate(key, e);
}
}
@ -349,7 +394,9 @@ public abstract class NioConnection implements Callable<Boolean> {
}
}
logger.trace("Location " + loc + ": Socket " + socket + " closed on read. Probably -1 returned.");
if (logger.isTraceEnabled()) {
logger.trace("Location {}: Socket {} closed on read. Probably -1 returned.", loc, socket, e);
}
}
}
@ -363,7 +410,7 @@ public abstract class NioConnection implements Callable<Boolean> {
}
}
logger.debug("Location " + loc + ": Socket " + socket + " closed on read. Probably -1 returned: " + e.getMessage());
logger.debug("Location " + loc + ": Socket " + socket + " closed on read. Probably -1 returned: " + e.getMessage(), e);
}
}
@ -383,55 +430,55 @@ public abstract class NioConnection implements Callable<Boolean> {
SelectionKey key;
for (final ChangeRequest todo : todos) {
switch (todo.type) {
case ChangeRequest.CHANGEOPS:
try {
key = (SelectionKey)todo.key;
if (key != null && key.isValid()) {
case ChangeRequest.CHANGEOPS:
try {
key = (SelectionKey)todo.key;
if (key != null && key.isValid()) {
if (todo.att != null) {
key.attach(todo.att);
final Link link = (Link)todo.att;
link.setKey(key);
}
key.interestOps(todo.ops);
}
} catch (final CancelledKeyException e) {
logger.debug("key has been cancelled", e);
}
break;
case ChangeRequest.REGISTER:
try {
key = ((SocketChannel)todo.key).register(_selector, todo.ops, todo.att);
if (todo.att != null) {
key.attach(todo.att);
final Link link = (Link)todo.att;
link.setKey(key);
}
key.interestOps(todo.ops);
} catch (final ClosedChannelException e) {
logger.warn("Couldn't register socket: {}", todo.key);
try {
((SocketChannel)todo.key).close();
} catch (final IOException ignore) {
logger.info("[ignored] socket channel");
} finally {
final Link link = (Link)todo.att;
link.terminated();
}
}
} catch (final CancelledKeyException e) {
logger.debug("key has been cancelled");
break;
case ChangeRequest.CLOSE:
logger.trace("Trying to close {}", todo.key);
key = (SelectionKey)todo.key;
closeConnection(key);
if (key != null) {
final Link link = (Link)key.attachment();
if (link != null) {
link.terminated();
}
}
break;
default:
logger.warn("Shouldn't be here");
throw new RuntimeException("Shouldn't be here");
}
break;
case ChangeRequest.REGISTER:
try {
key = ((SocketChannel)todo.key).register(_selector, todo.ops, todo.att);
if (todo.att != null) {
final Link link = (Link)todo.att;
link.setKey(key);
}
} catch (final ClosedChannelException e) {
logger.warn("Couldn't register socket: {}", todo.key);
try {
((SocketChannel)todo.key).close();
} catch (final IOException ignore) {
logger.info("[ignored] socket channel");
} finally {
final Link link = (Link)todo.att;
link.terminated();
}
}
break;
case ChangeRequest.CLOSE:
logger.trace("Trying to close {}", todo.key);
key = (SelectionKey)todo.key;
closeConnection(key);
if (key != null) {
final Link link = (Link)key.attachment();
if (link != null) {
link.terminated();
}
}
break;
default:
logger.warn("Shouldn't be here");
throw new RuntimeException("Shouldn't be here");
}
}
logger.trace("Todos Done processing");
}
@ -459,7 +506,7 @@ public abstract class NioConnection implements Callable<Boolean> {
}
} catch (final IOException e) {
logTrace(e, key, 2);
terminate(key, e.getMessage());
terminate(key, e);
}
}
@ -467,14 +514,18 @@ public abstract class NioConnection implements Callable<Boolean> {
try {
_executor.submit(task);
} catch (final Exception e) {
logger.warn("Exception occurred when submitting the task", e);
InetSocketAddress socketAddress = Optional.ofNullable(task)
.map(Task::getLink)
.map(Link::getSocketAddress)
.orElse(null);
logger.warn("Exception occurred when submitting the task for {}", socketAddress, e);
}
}
protected void write(final SelectionKey key) throws IOException {
final Link link = (Link)key.attachment();
try {
logger.trace("Writing to {}", link.getSocketAddress().toString());
logger.trace("Writing to {}", link.getSocketAddress());
final boolean close = link.write((SocketChannel)key.channel());
if (close) {
closeConnection(key);
@ -484,7 +535,7 @@ public abstract class NioConnection implements Callable<Boolean> {
}
} catch (final Exception e) {
logDebug(e, key, 3);
terminate(key, e.getMessage());
terminate(key, e);
}
}
@ -506,16 +557,11 @@ public abstract class NioConnection implements Callable<Boolean> {
// 2. Cancel the key (safe to call even if already cancelled)
key.cancel();
if (channel == null) {
logger.trace("Channel was null, invalid, or not a SocketChannel for key: " + key);
return;
}
// 3. Try to close the channel if we obtained it
if (channel != null) {
closeChannel(channel);
} else {
logger.trace("Channel was null, invalid, or not a SocketChannel for key: " + key);
logger.trace("Channel was null, invalid, or not a SocketChannel for key: {}", key);
}
}

View File

@ -23,12 +23,15 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.cloudstack.framework.ca.CAService;
/**
* NIO Server for the {@code com.cloud.agent.manager.AgentManagerImpl}.
*/
public class NioServer extends NioConnection {
protected InetSocketAddress localAddress;
@ -41,7 +44,6 @@ public class NioServer extends NioConnection {
super(name, port, workers, factory);
setCAService(caService);
setSslHandshakeTimeout(sslHandShakeTimeout);
localAddress = null;
links = new ConcurrentHashMap<>(1024);
}
@ -51,7 +53,7 @@ public class NioServer extends NioConnection {
@Override
protected void init() throws IOException {
_selector = SelectorProvider.provider().openSelector();
_selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.configureBlocking(false);
@ -59,7 +61,7 @@ public class NioServer extends NioConnection {
localAddress = new InetSocketAddress(_port);
serverSocket.socket().bind(localAddress);
serverSocket.register(_selector, SelectionKey.OP_ACCEPT, null);
serverSocket.register(_selector, SelectionKey.OP_ACCEPT);
logger.info("NioServer started and listening on {}", serverSocket.socket().getLocalSocketAddress());
}

View File

@ -75,7 +75,7 @@ public class ConstantTimeBackoffTest {
public void configureWithValue() {
final ConstantTimeBackoff backoff = new ConstantTimeBackoff();
HashMap<String, Object> params = new HashMap<String, Object>();
params.put("seconds", "100");
params.put("backoff.seconds", "100");
backoff.configure("foo", params);
Assert.assertEquals(100000, backoff.getTimeToWait());
}

View File

@ -100,7 +100,7 @@ public class NioTest {
testBytes = new byte[1000000];
randomGenerator.nextBytes(testBytes);
server = new NioServer("NioTestServer", 0, 1, new NioTestServer(), null, null);
server = new NioServer("NioTestServer", 0, 1, new NioTestServer(), null, null);
try {
server.start();
} catch (final NioConnectionException e) {
@ -245,8 +245,7 @@ public class NioTest {
logger.info("Sending data to server");
task.getLink().send(getTestBytes());
} catch (ClosedChannelException e) {
logger.error(e.getMessage());
e.printStackTrace();
logger.error(e.getMessage(), e);
}
} else if (task.getType() == Task.Type.DATA) {
logger.info("Client: Received DATA task");