Add weight to command to help future load management, fix issues found in system vm pooling

This commit is contained in:
Kelven Yang 2011-04-01 18:48:00 -07:00
parent 1e180de60b
commit 2759eefcf5
11 changed files with 44 additions and 25 deletions

View File

@ -230,4 +230,6 @@ public interface AgentManager extends Manager {
boolean isHostNativeHAEnabled(long hostId);
Answer sendTo(Long dcId, HypervisorType type, Command cmd);
void notifyAnswersFromAttache(long agentId, long seq, Answer[] answers);
}

View File

@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import com.cloud.agent.AgentManager;
import com.cloud.agent.Listener;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.CheckHealthCommand;
@ -98,6 +99,8 @@ public abstract class AgentAttache {
protected Status _status = Status.Connecting;
protected boolean _maintenance;
protected AgentManager _agentMgr;
public final static String[] s_commandsAllowedInMaintenanceMode =
new String[] { MaintainCommand.class.toString(), MigrateCommand.class.toString(), StopCommand.class.toString(), CheckVirtualMachineCommand.class.toString(), PingTestCommand.class.toString(), CheckHealthCommand.class.toString(), ReadyCommand.class.toString(), ShutdownCommand.class.toString() };
protected final static String[] s_commandsNotAllowedInConnectingMode =
@ -108,12 +111,13 @@ public abstract class AgentAttache {
}
protected AgentAttache(final long id, boolean maintenance) {
protected AgentAttache(AgentManager agentMgr, final long id, boolean maintenance) {
_id = id;
_waitForList = new ConcurrentHashMap<Long, Listener>();
_currentSequence = null;
_maintenance = maintenance;
_requests = new LinkedList<Request>();
_agentMgr = agentMgr;
}
public synchronized void setMaintenanceMode(final boolean value) {
@ -263,6 +267,8 @@ public abstract class AgentAttache {
if (resp.executeInSequence()) {
sendNext(seq);
}
_agentMgr.notifyAnswersFromAttache(_id, seq, answers);
return processed;
}
@ -384,6 +390,8 @@ public abstract class AgentAttache {
new Response(req, answers).log(_id, "Received after timeout: ");
// s_logger.debug(log(seq, "Received after timeout: " + new Response(req, answers).toString()));
}
_agentMgr.notifyAnswersFromAttache(_id, seq, answers);
return answers;
}

View File

@ -484,6 +484,12 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory,
}
}
}
public void notifyAnswersFromAttache(long agentId, long seq, Answer[] answers) {
for (Pair<Integer, Listener> listener : _cmdMonitors) {
listener.second().processCommands(agentId, seq, answers);
}
}
public AgentAttache findAttache(long hostId) {
return _agents.get(hostId);
@ -2656,7 +2662,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory,
protected AgentAttache createAttache(long id, HostVO server, Link link) {
s_logger.debug("create ConnectedAgentAttache for " + id);
final AgentAttache attache = new ConnectedAgentAttache(id, link,
final AgentAttache attache = new ConnectedAgentAttache(this, id, link,
server.getStatus() == Status.Maintenance
|| server.getStatus() == Status.ErrorInMaintenance
|| server.getStatus() == Status.PrepareForMaintenance);
@ -2676,10 +2682,10 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory,
ServerResource resource) {
if (resource instanceof DummySecondaryStorageResource
|| resource instanceof KvmDummyResourceBase) {
return new DummyAttache(id, false);
return new DummyAttache(this, id, false);
}
s_logger.debug("create DirectAgentAttache for " + id);
final DirectAgentAttache attache = new DirectAgentAttache(id, resource,
final DirectAgentAttache attache = new DirectAgentAttache(this, id, resource,
server.getStatus() == Status.Maintenance
|| server.getStatus() == Status.ErrorInMaintenance
|| server.getStatus() == Status.PrepareForMaintenance,

View File

@ -12,6 +12,7 @@ import java.nio.channels.SocketChannel;
import org.apache.log4j.Logger;
import com.cloud.agent.AgentManager;
import com.cloud.agent.Listener;
import com.cloud.agent.transport.Request;
import com.cloud.exception.AgentUnavailableException;
@ -28,13 +29,13 @@ public class ClusteredAgentAttache extends ConnectedAgentAttache implements Rout
s_clusteredAgentMgr = agentMgr;
}
public ClusteredAgentAttache(long id) {
super(id, null, false);
public ClusteredAgentAttache(AgentManager agentMgr, long id) {
super(agentMgr, id, null, false);
_forward = true;
}
public ClusteredAgentAttache(long id, Link link, boolean maintenance) {
super(id, link, maintenance);
public ClusteredAgentAttache(AgentManager agentMgr, long id, Link link, boolean maintenance) {
super(agentMgr, id, link, maintenance);
_forward = link == null;
}

View File

@ -186,7 +186,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
protected AgentAttache createAttache(long id) {
s_logger.debug("create forwarding ClusteredAgentAttache for " + id);
final AgentAttache attache = new ClusteredAgentAttache(id);
final AgentAttache attache = new ClusteredAgentAttache(this, id);
AgentAttache old = null;
synchronized(_agents) {
old = _agents.get(id);
@ -201,7 +201,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
@Override
protected AgentAttache createAttache(long id, HostVO server, Link link) {
s_logger.debug("create ClusteredAgentAttache for " + id);
final AgentAttache attache = new ClusteredAgentAttache(id, link, server.getStatus() == Status.Maintenance || server.getStatus() == Status.ErrorInMaintenance || server.getStatus() == Status.PrepareForMaintenance);
final AgentAttache attache = new ClusteredAgentAttache(this, id, link, server.getStatus() == Status.Maintenance || server.getStatus() == Status.ErrorInMaintenance || server.getStatus() == Status.PrepareForMaintenance);
link.attach(attache);
AgentAttache old = null;
synchronized(_agents) {
@ -217,10 +217,10 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
@Override
protected AgentAttache createAttache(long id, HostVO server, ServerResource resource) {
if (resource instanceof DummySecondaryStorageResource) {
return new DummyAttache(id, false);
return new DummyAttache(this, id, false);
}
s_logger.debug("create ClusteredDirectAgentAttache for " + id);
final DirectAgentAttache attache = new ClusteredDirectAgentAttache(id, _nodeId, resource, server.getStatus() == Status.Maintenance
final DirectAgentAttache attache = new ClusteredDirectAgentAttache(this, id, _nodeId, resource, server.getStatus() == Status.Maintenance
|| server.getStatus() == Status.ErrorInMaintenance || server.getStatus() == Status.PrepareForMaintenance, this);
AgentAttache old = null;
synchronized (_agents) {

View File

@ -17,7 +17,7 @@
*/
package com.cloud.agent.manager;
import com.cloud.agent.manager.DirectAgentAttache;
import com.cloud.agent.AgentManager;
import com.cloud.agent.transport.Request;
import com.cloud.agent.transport.Response;
import com.cloud.exception.AgentUnavailableException;
@ -29,8 +29,8 @@ public class ClusteredDirectAgentAttache extends DirectAgentAttache implements R
private final ClusteredAgentManagerImpl _mgr;
private final long _nodeId;
public ClusteredDirectAgentAttache(long id, long mgmtId, ServerResource resource, boolean maintenance, ClusteredAgentManagerImpl mgr) {
super(id, resource, maintenance, mgr);
public ClusteredDirectAgentAttache(AgentManager agentMgr, long id, long mgmtId, ServerResource resource, boolean maintenance, ClusteredAgentManagerImpl mgr) {
super(agentMgr, id, resource, maintenance, mgr);
_mgr = mgr;
_nodeId = mgmtId;
}

View File

@ -22,6 +22,7 @@ import java.nio.channels.ClosedChannelException;
import org.apache.log4j.Logger;
import com.cloud.agent.AgentManager;
import com.cloud.agent.transport.Request;
import com.cloud.exception.AgentUnavailableException;
import com.cloud.host.Status;
@ -35,8 +36,8 @@ public class ConnectedAgentAttache extends AgentAttache {
protected Link _link;
public ConnectedAgentAttache(final long id, final Link link, boolean maintenance) {
super(id, maintenance);
public ConnectedAgentAttache(AgentManager agentMgr, final long id, final Link link, boolean maintenance) {
super(agentMgr, id, maintenance);
_link = link;
}

View File

@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import com.cloud.agent.AgentManager;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.Command;
import com.cloud.agent.api.CronCommand;
@ -48,8 +49,8 @@ public class DirectAgentAttache extends AgentAttache {
AgentManagerImpl _mgr;
long _seq = 0;
public DirectAgentAttache(long id, ServerResource resource, boolean maintenance, AgentManagerImpl mgr) {
super(id, maintenance);
public DirectAgentAttache(AgentManager agentMgr, long id, ServerResource resource, boolean maintenance, AgentManagerImpl mgr) {
super(agentMgr, id, maintenance);
_resource = resource;
_mgr = mgr;
}

View File

@ -17,6 +17,7 @@
*/
package com.cloud.agent.manager;
import com.cloud.agent.AgentManager;
import com.cloud.agent.transport.Request;
import com.cloud.exception.AgentUnavailableException;
import com.cloud.host.Status;
@ -24,8 +25,8 @@ import com.cloud.host.Status;
public class DummyAttache extends AgentAttache {
public DummyAttache(long id, boolean maintenance) {
super(id, maintenance);
public DummyAttache(AgentManager agentMgr, long id, boolean maintenance) {
super(agentMgr, id, maintenance);
}

View File

@ -216,11 +216,9 @@ public class SecondaryStorageVmDaoImpl extends GenericDaoBase<SecondaryStorageVm
try {
String sql;
if(role == null)
sql = "SELECT s.id, count(l.id) as count FROM secondary_storage_vm s, vm_instance v, cmd_exec_log l " +
"WHERE s.id=v.id AND v.state='Running' AND v.data_center_id=? AND v.id=l.instance_id GROUP BY s.id ORDER BY count";
sql = "SELECT s.id, count(l.id) as count FROM secondary_storage_vm s INNER JOIN vm_instance v ON s.id=v.id LEFT JOIN cmd_exec_log l ON s.id=l.instance_id WHERE v.state='Running' AND v.data_center_id=? GROUP BY s.id ORDER BY count";
else
sql = "SELECT s.id, count(l.id) as count FROM secondary_storage_vm s, vm_instance v, cmd_exec_log l " +
"WHERE s.id=v.id AND v.state='Running' AND v.data_center_id=? AND s.role=? AND v.id=l.instance_id GROUP BY s.id ORDER BY count";
sql = "SELECT s.id, count(l.id) as count FROM secondary_storage_vm s INNER JOIN vm_instance v ON s.id=v.id LEFT JOIN cmd_exec_log l ON s.id=l.instance_id WHERE v.state='Running' AND v.data_center_id=? AND s.role=? GROUP BY s.id ORDER BY count";
pstmt = txn.prepareAutoCloseStatement(sql);

View File

@ -1487,6 +1487,7 @@ CREATE TABLE `cloud`.`cmd_exec_log` (
`host_id` bigint unsigned NOT NULL COMMENT 'host id of the system VM agent that command is sent to',
`instance_id` bigint unsigned NOT NULL COMMENT 'instance id of the system VM that command is executed on',
`command_name` varchar(255) NOT NULL COMMENT 'command name',
`weight` integer NOT NULL DEFAULT 1 COMMENT 'command weight in consideration of the load factor added to host that is executing the command',
`created` datetime NOT NULL COMMENT 'date created',
PRIMARY KEY (`id`),
CONSTRAINT `fk_cmd_exec_log_ref__inst_id` FOREIGN KEY (`instance_id`) REFERENCES `vm_instance`(`id`) ON DELETE CASCADE