mirror of https://github.com/apache/cloudstack.git
Remove duplicate field in constructor
This commit is contained in:
parent
d1d8009c68
commit
8a13f44b44
|
|
@ -191,12 +191,12 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
|
|||
protected final ConfigKey<Float> DirectAgentThreadCap = new ConfigKey<Float>("Advanced", Float.class, "direct.agent.thread.cap", "1",
|
||||
"Percentage (as a value between 0 and 1) of direct.agent.pool.size to be used as upper thread cap for a single direct agent to process requests", false);
|
||||
protected final ConfigKey<Boolean> CheckTxnBeforeSending = new ConfigKey<Boolean>(
|
||||
"Developer",
|
||||
Boolean.class,
|
||||
"check.txn.before.sending.agent.commands",
|
||||
"false",
|
||||
"This parameter allows developers to enable a check to see if a transaction wraps commands that are sent to the resource. This is not to be enabled on production systems.",
|
||||
true);
|
||||
"Developer",
|
||||
Boolean.class,
|
||||
"check.txn.before.sending.agent.commands",
|
||||
"false",
|
||||
"This parameter allows developers to enable a check to see if a transaction wraps commands that are sent to the resource. This is not to be enabled on production systems.",
|
||||
true);
|
||||
|
||||
@Override
|
||||
public boolean configure(final String name, final Map<String, Object> params) throws ConfigurationException {
|
||||
|
|
@ -405,8 +405,8 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
|
|||
if (CheckTxnBeforeSending.value()) {
|
||||
if (!noDbTxn()) {
|
||||
throw new CloudRuntimeException("We do not allow transactions to be wrapped around commands sent to be executed on remote agents. "
|
||||
+ "We cannot predict how long it takes a command to complete. "
|
||||
+ "The transaction may be rolled back because the connection took too long.");
|
||||
+ "We cannot predict how long it takes a command to complete. "
|
||||
+ "The transaction may be rolled back because the connection took too long.");
|
||||
}
|
||||
} else {
|
||||
assert noDbTxn() : "I know, I know. Why are we so strict as to not allow txn across an agent call? ... Why are we so cruel ... Why are we such a dictator .... Too bad... Sorry...but NO AGENT COMMANDS WRAPPED WITHIN DB TRANSACTIONS!";
|
||||
|
|
@ -707,12 +707,8 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
|
|||
}
|
||||
|
||||
protected AgentAttache createAttacheForDirectConnect(Host host, ServerResource resource) throws ConnectionException {
|
||||
// if (resource instanceof DummySecondaryStorageResource || resource instanceof KvmDummyResourceBase) {
|
||||
// return new DummyAttache(this, host.getId(), false);
|
||||
// }
|
||||
|
||||
s_logger.debug("create DirectAgentAttache for " + host.getId());
|
||||
DirectAgentAttache attache = new DirectAgentAttache(this, host.getId(), host.getName(), resource, host.isInMaintenanceStates(), this);
|
||||
DirectAgentAttache attache = new DirectAgentAttache(this, host.getId(), host.getName(), resource, host.isInMaintenanceStates());
|
||||
|
||||
AgentAttache old = null;
|
||||
synchronized (_agents) {
|
||||
|
|
@ -1166,7 +1162,6 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
|
|||
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
if (cmd instanceof PingRoutingCommand) {
|
||||
final PingRoutingCommand ping = (PingRoutingCommand)cmd;
|
||||
logD = false;
|
||||
s_logger.debug("Ping from " + hostId);
|
||||
s_logger.trace("SeqA " + hostId + "-" + request.getSequence() + ": Processing " + request);
|
||||
|
|
|
|||
|
|
@ -129,13 +129,13 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
}
|
||||
|
||||
protected final ConfigKey<Boolean> EnableLB = new ConfigKey<Boolean>(Boolean.class, "agent.lb.enabled", "Advanced", "false",
|
||||
"Enable agent load balancing between management server nodes", true);
|
||||
"Enable agent load balancing between management server nodes", true);
|
||||
protected final ConfigKey<Double> ConnectedAgentThreshold = new ConfigKey<Double>(Double.class, "agent.load.threshold", "Advanced", "0.7",
|
||||
"What percentage of the agents can be held by one management server before load balancing happens", true);
|
||||
"What percentage of the agents can be held by one management server before load balancing happens", true);
|
||||
protected final ConfigKey<Integer> LoadSize = new ConfigKey<Integer>(Integer.class, "direct.agent.load.size", "Advanced", "16",
|
||||
"How many agents to connect to in each round", true);
|
||||
"How many agents to connect to in each round", true);
|
||||
protected final ConfigKey<Integer> ScanInterval = new ConfigKey<Integer>(Integer.class, "direct.agent.scan.interval", "Advanced", "90",
|
||||
"Interval between scans to load agents", false, ConfigKey.Scope.Global, 1000);
|
||||
"Interval between scans to load agents", false, ConfigKey.Scope.Global, 1000);
|
||||
|
||||
@Override
|
||||
public boolean configure(String name, Map<String, Object> xmlParams) throws ConfigurationException {
|
||||
|
|
@ -280,11 +280,8 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
|
||||
@Override
|
||||
protected AgentAttache createAttacheForDirectConnect(Host host, ServerResource resource) {
|
||||
// if (resource instanceof DummySecondaryStorageResource) {
|
||||
// return new DummyAttache(this, host.getId(), false);
|
||||
// }
|
||||
s_logger.debug("create ClusteredDirectAgentAttache for " + host.getId());
|
||||
final DirectAgentAttache attache = new ClusteredDirectAgentAttache(this, host.getId(), host.getName(), _nodeId, resource, host.isInMaintenanceStates(), this);
|
||||
final DirectAgentAttache attache = new ClusteredDirectAgentAttache(this, host.getId(), host.getName(), _nodeId, resource, host.isInMaintenanceStates());
|
||||
AgentAttache old = null;
|
||||
synchronized (_agents) {
|
||||
old = _agents.get(host.getId());
|
||||
|
|
@ -338,7 +335,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
if (transferVO != null) {
|
||||
if (transferVO.getFutureOwner() == _nodeId && transferVO.getState() == HostTransferState.TransferStarted) {
|
||||
s_logger.debug("Not processing " + Event.AgentDisconnected + " event for the host id=" + hostId + " as the host is being connected to " +
|
||||
_nodeId);
|
||||
_nodeId);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
@ -348,7 +345,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
// but the host has already reconnected to the current management server
|
||||
if (!attache.forForward()) {
|
||||
s_logger.debug("Not processing " + Event.AgentDisconnected + " event for the host id=" + hostId +
|
||||
" as the host is directly connected to the current management server " + _nodeId);
|
||||
" as the host is directly connected to the current management server " + _nodeId);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
@ -394,17 +391,17 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
|
||||
protected static void logT(byte[] bytes, final String msg) {
|
||||
s_logger.trace("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": " +
|
||||
(Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg);
|
||||
(Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg);
|
||||
}
|
||||
|
||||
protected static void logD(byte[] bytes, final String msg) {
|
||||
s_logger.debug("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": " +
|
||||
(Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg);
|
||||
(Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg);
|
||||
}
|
||||
|
||||
protected static void logI(byte[] bytes, final String msg) {
|
||||
s_logger.info("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": " +
|
||||
(Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg);
|
||||
(Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg);
|
||||
}
|
||||
|
||||
public boolean routeToPeer(String peer, byte[] bytes) {
|
||||
|
|
@ -439,7 +436,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
} catch (ClassNotFoundException | UnsupportedVersionException ex) {
|
||||
// Request.parse thrown exception when we try to log it, log as much as we can
|
||||
logI(bytes, "Unable to route to peer due to" + e.getMessage()
|
||||
+ ". Also caught exception when parsing request: " + ex.getMessage());
|
||||
+ ". Also caught exception when parsing request: " + ex.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -738,7 +735,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
|
||||
@Override
|
||||
public boolean executeRebalanceRequest(long agentId, long currentOwnerId, long futureOwnerId, Event event) throws AgentUnavailableException,
|
||||
OperationTimedoutException {
|
||||
OperationTimedoutException {
|
||||
boolean result = false;
|
||||
if (event == Event.RequestAgentRebalance) {
|
||||
return setToWaitForRebalance(agentId, currentOwnerId, futureOwnerId);
|
||||
|
|
@ -805,7 +802,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
} else {
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("There are no hosts to rebalance in the system. Current number of active management server nodes in the system is " + allMS.size() +
|
||||
"; number of managed agents is " + allManagedAgents.size());
|
||||
"; number of managed agents is " + allManagedAgents.size());
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
|
@ -959,7 +956,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
// no need to do anything with the real attache as we haven't modified it yet
|
||||
Date cutTime = DateUtil.currentGMTTime();
|
||||
HostTransferMapVO transferMap =
|
||||
_hostTransferDao.findActiveHostTransferMapByHostId(hostId, new Date(cutTime.getTime() - rebalanceTimeOut));
|
||||
_hostTransferDao.findActiveHostTransferMapByHostId(hostId, new Date(cutTime.getTime() - rebalanceTimeOut));
|
||||
|
||||
if (transferMap == null) {
|
||||
s_logger.debug("Timed out waiting for the host id=" + hostId + " to be ready to transfer, skipping rebalance for the host");
|
||||
|
|
@ -978,7 +975,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
ManagementServerHostVO ms = _mshostDao.findByMsid(transferMap.getFutureOwner());
|
||||
if (ms != null && ms.getState() != ManagementServerHost.State.Up) {
|
||||
s_logger.debug("Can't transfer host " + hostId + " as it's future owner is not in UP state: " + ms +
|
||||
", skipping rebalance for the host");
|
||||
", skipping rebalance for the host");
|
||||
iterator.remove();
|
||||
_hostTransferDao.completeAgentTransfer(hostId);
|
||||
continue;
|
||||
|
|
@ -995,7 +992,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
|
||||
} else {
|
||||
s_logger.debug("Agent " + hostId + " can't be transfered yet as its request queue size is " + attache.getQueueSize() +
|
||||
" and listener queue size is " + attache.getNonRecurringListenersSize());
|
||||
" and listener queue size is " + attache.getNonRecurringListenersSize());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
|
@ -1062,7 +1059,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
if (result) {
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Loading directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId +
|
||||
" as a part of rebalance process");
|
||||
" as a part of rebalance process");
|
||||
}
|
||||
result = loadDirectlyConnectedHost(host, true);
|
||||
} else {
|
||||
|
|
@ -1071,16 +1068,16 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
|
||||
} catch (Exception ex) {
|
||||
s_logger.warn("Failed to load directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId +
|
||||
" as a part of rebalance process due to:", ex);
|
||||
" as a part of rebalance process due to:", ex);
|
||||
result = false;
|
||||
}
|
||||
|
||||
if (result) {
|
||||
s_logger.debug("Successfully loaded directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId +
|
||||
" as a part of rebalance process");
|
||||
" as a part of rebalance process");
|
||||
} else {
|
||||
s_logger.warn("Failed to load directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId +
|
||||
" as a part of rebalance process");
|
||||
" as a part of rebalance process");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1112,7 +1109,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
Request requestToTransfer = forwardAttache.getRequestToTransfer();
|
||||
while (requestToTransfer != null) {
|
||||
s_logger.debug("Forwarding request " + requestToTransfer.getSequence() + " held in transfer attache " + hostId + " from the management server " +
|
||||
_nodeId + " to " + futureOwnerId);
|
||||
_nodeId + " to " + futureOwnerId);
|
||||
boolean routeResult = routeToPeer(Long.toString(futureOwnerId), requestToTransfer.getBytes());
|
||||
if (!routeResult) {
|
||||
logD(requestToTransfer.getBytes(), "Failed to route request to peer");
|
||||
|
|
@ -1166,7 +1163,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
s_logger.warn("Attache for the agent " + hostId + " no longer exists on management server " + _nodeId + ", can't start host rebalancing");
|
||||
} else {
|
||||
s_logger.warn("Attache for the agent " + hostId + " has request queue size= " + attache.getQueueSize() + " and listener queue size " +
|
||||
attache.getNonRecurringListenersSize() + ", can't start host rebalancing");
|
||||
attache.getNonRecurringListenersSize() + ", can't start host rebalancing");
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
|
@ -1223,7 +1220,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
// Scheduling host scan task in peer MS is a best effort operation during host add, regular host scan
|
||||
// happens at fixed intervals anyways. So handling any exceptions that may be thrown
|
||||
s_logger.warn("Exception happened while trying to schedule host scan task on mgmt server " + _clusterMgr.getSelfPeerName() +
|
||||
", ignoring as regular host scan happens at fixed interval anyways", e);
|
||||
", ignoring as regular host scan happens at fixed interval anyways", e);
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
@ -1340,14 +1337,14 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Completed dispatching -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage() + " in " +
|
||||
(System.currentTimeMillis() - startTick) + " ms, return result: " + jsonReturn);
|
||||
(System.currentTimeMillis() - startTick) + " ms, return result: " + jsonReturn);
|
||||
}
|
||||
|
||||
return jsonReturn;
|
||||
} else {
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Completed dispatching -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage() + " in " +
|
||||
(System.currentTimeMillis() - startTick) + " ms, return null result");
|
||||
(System.currentTimeMillis() - startTick) + " ms, return null result");
|
||||
}
|
||||
}
|
||||
} catch (AgentUnavailableException e) {
|
||||
|
|
@ -1398,12 +1395,12 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
double load = managedHostsCount / allHostsCount;
|
||||
if (load >= ConnectedAgentThreshold.value()) {
|
||||
s_logger.debug("Scheduling agent rebalancing task as the average agent load " + load + " is more than the threshold " +
|
||||
ConnectedAgentThreshold.value());
|
||||
ConnectedAgentThreshold.value());
|
||||
scheduleRebalanceAgents();
|
||||
_agentLbHappened = true;
|
||||
} else {
|
||||
s_logger.debug("Not scheduling agent rebalancing task as the averages load " + load + " is less than the threshold " +
|
||||
ConnectedAgentThreshold.value());
|
||||
ConnectedAgentThreshold.value());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,13 +24,10 @@ import com.cloud.resource.ServerResource;
|
|||
import com.cloud.utils.exception.CloudRuntimeException;
|
||||
|
||||
public class ClusteredDirectAgentAttache extends DirectAgentAttache implements Routable {
|
||||
private final ClusteredAgentManagerImpl _mgr;
|
||||
private final long _nodeId;
|
||||
|
||||
public ClusteredDirectAgentAttache(AgentManagerImpl agentMgr, long id, String name, long mgmtId, ServerResource resource, boolean maintenance,
|
||||
ClusteredAgentManagerImpl mgr) {
|
||||
super(agentMgr, id, name, resource, maintenance, mgr);
|
||||
_mgr = mgr;
|
||||
public ClusteredDirectAgentAttache(ClusteredAgentManagerImpl agentMgr, long id, String name, long mgmtId, ServerResource resource, boolean maintenance) {
|
||||
super(agentMgr, id, name, resource, maintenance);
|
||||
_nodeId = mgmtId;
|
||||
}
|
||||
|
||||
|
|
@ -56,7 +53,7 @@ public class ClusteredDirectAgentAttache extends DirectAgentAttache implements R
|
|||
public boolean processAnswers(long seq, Response response) {
|
||||
long mgmtId = response.getManagementServerId();
|
||||
if (mgmtId != -1 && mgmtId != _nodeId) {
|
||||
_mgr.routeToPeer(Long.toString(mgmtId), response.getBytes());
|
||||
((ClusteredAgentManagerImpl)_agentMgr).routeToPeer(Long.toString(mgmtId), response.getBytes());
|
||||
if (response.executeInSequence()) {
|
||||
sendNext(response.getSequence());
|
||||
}
|
||||
|
|
|
|||
|
|
@ -47,16 +47,14 @@ public class DirectAgentAttache extends AgentAttache {
|
|||
"Interval to wait before retrying a host ping while waiting for check results", true);
|
||||
ServerResource _resource;
|
||||
List<ScheduledFuture<?>> _futures = new ArrayList<ScheduledFuture<?>>();
|
||||
AgentManagerImpl _mgr;
|
||||
long _seq = 0;
|
||||
LinkedList<Task> tasks = new LinkedList<Task>();
|
||||
AtomicInteger _outstandingTaskCount;
|
||||
AtomicInteger _outstandingCronTaskCount;
|
||||
|
||||
public DirectAgentAttache(AgentManagerImpl agentMgr, long id, String name, ServerResource resource, boolean maintenance, AgentManagerImpl mgr) {
|
||||
public DirectAgentAttache(AgentManagerImpl agentMgr, long id, String name, ServerResource resource, boolean maintenance) {
|
||||
super(agentMgr, id, name, maintenance);
|
||||
_resource = resource;
|
||||
_mgr = mgr;
|
||||
_outstandingTaskCount = new AtomicInteger(0);
|
||||
_outstandingCronTaskCount = new AtomicInteger(0);
|
||||
}
|
||||
|
|
@ -186,7 +184,7 @@ public class DirectAgentAttache extends AgentAttache {
|
|||
s_logger.trace("SeqA " + _id + "-" + seq + ": " + new Request(_id, -1, cmd, false).toString());
|
||||
}
|
||||
|
||||
_mgr.handleCommands(DirectAgentAttache.this, seq, new Command[] {cmd});
|
||||
_agentMgr.handleCommands(DirectAgentAttache.this, seq, new Command[] {cmd});
|
||||
} else {
|
||||
s_logger.debug("Unable to send ping because agent is disconnected " + _id + "(" + _name + ")");
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue