advanced startup command

This commit is contained in:
Edison Su 2012-01-20 11:54:23 -08:00
parent 6297820981
commit bced9a6e48
9 changed files with 276 additions and 111 deletions

View File

@ -0,0 +1,17 @@
resource=com.cloud.agent.resource.DummyResource
host.mac.address=06\:95\:a4\:00\:03\:ee
guest.network.device=cloudbr0
type=Routing
local.storage.uuid=8c348eee-ea56-4e4f-9397-00f33340927a
port=8250
host=10.216.132.110
pod=1
LibvirtComputingResource.id=1
cluster=1
public.network.device=cloudbr0
private.network.device=cloudbr0
DummyResource.id=11
developer=True
zone=1
domr.scripts.dir=scripts/network/domr/kvm
workers=5

9
agent/scripts/_run.sh Normal file
View File

@ -0,0 +1,9 @@
#script to start multiple agents on one host
num=$1
port=8787
while [ $num -gt 0 ]
do
let "port=$port + $num"
java -Xrunjdwp:transport=dt_socket,address=$port,server=y,suspend=n -cp ./'*' com.cloud.agent.AgentShell &
let "num=$num - 1"
done

View File

@ -118,6 +118,8 @@ public class Agent implements HandlerFactory, IAgentControl {
AtomicInteger _inProgress = new AtomicInteger();
StartupTask _startup = null;
long _startupWaitDefault = 180000;
long _startupWait = _startupWaitDefault;
boolean _reconnectAllowed = true;
//For time sentitive task, e.g. PingTask
private ThreadPoolExecutor _ugentTaskPool;
@ -283,7 +285,7 @@ public class Agent implements HandlerFactory, IAgentControl {
s_logger.debug("Adding a watch list");
}
final WatchTask task = new WatchTask(link, request, this);
_timer.schedule(task, delay, period);
_timer.schedule(task, 0, period);
_watchList.add(task);
}
}
@ -316,7 +318,7 @@ public class Agent implements HandlerFactory, IAgentControl {
}
synchronized (this) {
_startup = new StartupTask(link);
_timer.schedule(_startup, 180000);
_timer.schedule(_startup, _startupWait);
}
try {
link.send(request.toBytes());
@ -780,6 +782,7 @@ public class Agent implements HandlerFactory, IAgentControl {
// TimerTask.cancel may fail depends on the calling context
if (!cancelled) {
cancelled = true;
_startupWait = _startupWaitDefault;
s_logger.debug("Startup task cancelled");
return super.cancel();
}
@ -794,6 +797,7 @@ public class Agent implements HandlerFactory, IAgentControl {
}
cancelled = true;
_startup = null;
_startupWait = _startupWaitDefault *2;
reconnect(_link);
}
}

View File

@ -35,6 +35,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import javax.naming.ConfigurationException;
@ -351,7 +352,7 @@ public class AgentShell implements IAgentShell {
if (!developer) {
throw new ConfigurationException("Unable to find the guid");
}
_guid = MacAddress.getMacAddress().toString(":");
_guid = UUID.randomUUID().toString();
}
return true;
@ -529,8 +530,12 @@ public class AgentShell implements IAgentShell {
init(args);
String instance = getProperty(null, "instance");
if (instance == null) {
instance = "";
if (instance == null) {
if (Boolean.parseBoolean(getProperty(null, "developer"))) {
instance = UUID.randomUUID().toString();
} else {
instance = "";
}
} else {
instance += ".";
}

View File

@ -17,18 +17,32 @@
*/
package com.cloud.agent.resource;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.ejb.Local;
import com.cloud.agent.IAgentControl;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.CheckNetworkAnswer;
import com.cloud.agent.api.CheckNetworkCommand;
import com.cloud.agent.api.Command;
import com.cloud.agent.api.PingCommand;
import com.cloud.agent.api.StartupCommand;
import com.cloud.agent.api.StartupRoutingCommand;
import com.cloud.agent.api.StartupStorageCommand;
import com.cloud.agent.api.StoragePoolInfo;
import com.cloud.agent.api.StartupRoutingCommand.VmState;
import com.cloud.host.Host;
import com.cloud.host.Host.Type;
import com.cloud.hypervisor.Hypervisor.HypervisorType;
import com.cloud.network.Networks.RouterPrivateIpStrategy;
import com.cloud.resource.ServerResource;
import com.cloud.storage.Storage;
import com.cloud.storage.Storage.StoragePoolType;
@Local(value={ServerResource.class})
public class DummyResource implements ServerResource {
@ -36,6 +50,7 @@ public class DummyResource implements ServerResource {
Host.Type _type;
boolean _negative;
IAgentControl _agentControl;
private Map<String, Object> _params;
@Override
public void disconnected() {
@ -43,6 +58,9 @@ public class DummyResource implements ServerResource {
@Override
public Answer executeRequest(Command cmd) {
if (cmd instanceof CheckNetworkCommand) {
return new CheckNetworkAnswer((CheckNetworkCommand)cmd, true, null);
}
System.out.println("Received Command: " + cmd.toString());
Answer answer = new Answer(cmd, !_negative, "response");
System.out.println("Replying with: " + answer.toString());
@ -58,10 +76,98 @@ public class DummyResource implements ServerResource {
public Type getType() {
return _type;
}
protected String getConfiguredProperty(String key, String defaultValue) {
String val = (String)_params.get(key);
return val==null?defaultValue:val;
}
protected Long getConfiguredProperty(String key, Long defaultValue) {
String val = (String)_params.get(key);
if (val != null) {
Long result = Long.parseLong(val);
return result;
}
return defaultValue;
}
protected List<Object> getHostInfo() {
final ArrayList<Object> info = new ArrayList<Object>();
long speed = getConfiguredProperty("cpuspeed", 4000L) ;
long cpus = getConfiguredProperty("cpus", 4L);
long ram = getConfiguredProperty("memory", 16000L*1024L*1024L);
long dom0ram = Math.min(ram/10, 768*1024*1024L);
String cap = getConfiguredProperty("capabilities", "hvm");
info.add((int)cpus);
info.add(speed);
info.add(ram);
info.add(cap);
info.add(dom0ram);
return info;
}
protected void fillNetworkInformation(final StartupCommand cmd) {
cmd.setPrivateIpAddress((String)getConfiguredProperty("private.ip.address", "127.0.0.1"));
cmd.setPrivateMacAddress((String)getConfiguredProperty("private.mac.address", "8A:D2:54:3F:7C:C3"));
cmd.setPrivateNetmask((String)getConfiguredProperty("private.ip.netmask", "255.255.255.0"));
cmd.setStorageIpAddress((String)getConfiguredProperty("private.ip.address", "127.0.0.1"));
cmd.setStorageMacAddress((String)getConfiguredProperty("private.mac.address", "8A:D2:54:3F:7C:C3"));
cmd.setStorageNetmask((String)getConfiguredProperty("private.ip.netmask", "255.255.255.0"));
cmd.setGatewayIpAddress((String)getConfiguredProperty("gateway.ip.address", "127.0.0.1"));
}
private Map<String, String> getVersionStrings() {
Map<String, String> result = new HashMap<String, String>();
String hostOs = (String) _params.get("Host.OS");
String hostOsVer = (String) _params.get("Host.OS.Version");
String hostOsKernVer = (String) _params.get("Host.OS.Kernel.Version");
result.put("Host.OS", hostOs==null?"Fedora":hostOs);
result.put("Host.OS.Version", hostOsVer==null?"14":hostOsVer);
result.put("Host.OS.Kernel.Version", hostOsKernVer==null?"2.6.35.6-45.fc14.x86_64":hostOsKernVer);
return result;
}
protected StoragePoolInfo initializeLocalStorage() {
String hostIp = (String)getConfiguredProperty("private.ip.address", "127.0.0.1");
String localStoragePath = (String)getConfiguredProperty("local.storage.path", "/mnt");
String lh = hostIp + localStoragePath;
String uuid = UUID.nameUUIDFromBytes(lh.getBytes()).toString();
String capacity = (String)getConfiguredProperty("local.storage.capacity", "1000000000");
String available = (String)getConfiguredProperty("local.storage.avail", "10000000");
return new StoragePoolInfo(uuid, hostIp, localStoragePath,
localStoragePath, StoragePoolType.Filesystem,
Long.parseLong(capacity), Long.parseLong(available));
}
@Override
public StartupCommand[] initialize() {
return new StartupCommand[] {new StartupCommand(Host.Type.Storage)};
Map<String, VmState> changes = null;
final List<Object> info = getHostInfo();
final StartupRoutingCommand cmd = new StartupRoutingCommand((Integer)info.get(0), (Long)info.get(1), (Long)info.get(2), (Long)info.get(4), (String)info.get(3), HypervisorType.KVM, RouterPrivateIpStrategy.HostLocal, changes);
fillNetworkInformation(cmd);
cmd.getHostDetails().putAll(getVersionStrings());
cmd.setCluster(getConfiguredProperty("cluster", "1"));
StoragePoolInfo pi = initializeLocalStorage();
StartupStorageCommand sscmd = new StartupStorageCommand();
sscmd.setPoolInfo(pi);
sscmd.setGuid(pi.getUuid());
sscmd.setDataCenter((String)_params.get("zone"));
sscmd.setResourceType(Storage.StorageResourceType.STORAGE_POOL);
return new StartupCommand[]{cmd, sscmd};
}
@Override
@ -73,9 +179,13 @@ public class DummyResource implements ServerResource {
value = (String)params.get("negative.reply");
_negative = Boolean.parseBoolean(value);
setParams(params);
return true;
}
public void setParams(Map<String, Object> _params) {
this._params = _params;
}
@Override
public String getName() {

View File

@ -18,7 +18,7 @@
package com.cloud.agent.api;
public class ReadyCommand extends Command {
private String _details;
public ReadyCommand() {
super();
}
@ -30,6 +30,14 @@ public class ReadyCommand extends Command {
this.dcId = dcId;
}
public void setDetails(String details) {
_details = details;
}
public String getDetails() {
return _details;
}
public Long getDataCenterId() {
return dcId;
}

View File

@ -605,19 +605,19 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager {
ConnectionException ce = (ConnectionException)e;
if (ce.isSetupError()) {
s_logger.warn("Monitor " + monitor.second().getClass().getSimpleName() + " says there is an error in the connect process for " + hostId + " due to " + e.getMessage());
handleDisconnectWithoutInvestigation(attache, Event.AgentDisconnected);
handleDisconnectWithInvestigation(attache, Event.AgentDisconnected);
throw ce;
} else {
s_logger.info("Monitor " + monitor.second().getClass().getSimpleName() + " says not to continue the connect process for " + hostId + " due to " + e.getMessage());
handleDisconnectWithoutInvestigation(attache, Event.ShutdownRequested);
handleDisconnectWithInvestigation(attache, Event.ShutdownRequested);
return attache;
}
} else if (e instanceof HypervisorVersionChangedException) {
handleDisconnectWithoutInvestigation(attache, Event.ShutdownRequested);
handleDisconnectWithInvestigation(attache, Event.ShutdownRequested);
throw new CloudRuntimeException("Unable to connect " + attache.getId(), e);
} else {
s_logger.error("Monitor " + monitor.second().getClass().getSimpleName() + " says there is an error in the connect process for " + hostId + " due to " + e.getMessage(), e);
handleDisconnectWithoutInvestigation(attache, Event.AgentDisconnected);
handleDisconnectWithInvestigation(attache, Event.AgentDisconnected);
throw new CloudRuntimeException("Unable to connect " + attache.getId(), e);
}
}
@ -631,7 +631,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager {
// this is tricky part for secondary storage
// make it as disconnected, wait for secondary storage VM to be up
// return the attache instead of null, even it is disconnectede
handleDisconnectWithoutInvestigation(attache, Event.AgentDisconnected);
handleDisconnectWithInvestigation(attache, Event.AgentDisconnected);
}
agentStatusTransitTo(host, Event.Ready, _nodeId);
@ -1067,48 +1067,91 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager {
}
//TODO: handle mycloud specific
private AgentAttache handleConnectedAgent(final Link link, final StartupCommand[] startup) throws ConnectionException {
AgentAttache attache = null;
HostVO host = _resourceMgr.createHostVOForConnectedAgent(startup);
if (host != null) {
attache = createAttacheForConnect(host, link);
private AgentAttache handleConnectedAgent(final Link link, final StartupCommand[] startup, Request request) {
AgentAttache attache = null;
StartupAnswer[] answers = new StartupAnswer[startup.length];
try {
HostVO host = _resourceMgr.createHostVOForConnectedAgent(startup);
if (host != null) {
attache = createAttacheForConnect(host, link);
}
Command cmd;
for (int i = 0; i < startup.length; i++) {
cmd = startup[i];
if ((cmd instanceof StartupRoutingCommand) || (cmd instanceof StartupProxyCommand) || (cmd instanceof StartupSecondaryStorageCommand) || (cmd instanceof StartupStorageCommand)) {
answers[i] = new StartupAnswer(startup[i], attache.getId(), getPingInterval());
break;
}
}
}catch (ConnectionException e) {
Command cmd;
for (int i = 0; i < startup.length; i++) {
cmd = startup[i];
if ((cmd instanceof StartupRoutingCommand) || (cmd instanceof StartupProxyCommand) || (cmd instanceof StartupSecondaryStorageCommand) || (cmd instanceof StartupStorageCommand)) {
answers[i] = new StartupAnswer(startup[i], e.toString());
break;
}
}
} catch (IllegalArgumentException e) {
Command cmd;
for (int i = 0; i < startup.length; i++) {
cmd = startup[i];
if ((cmd instanceof StartupRoutingCommand) || (cmd instanceof StartupProxyCommand) || (cmd instanceof StartupSecondaryStorageCommand) || (cmd instanceof StartupStorageCommand)) {
answers[i] = new StartupAnswer(startup[i], e.toString());
break;
}
}
} catch (CloudRuntimeException e) {
Command cmd;
for (int i = 0; i < startup.length; i++) {
cmd = startup[i];
if ((cmd instanceof StartupRoutingCommand) || (cmd instanceof StartupProxyCommand) || (cmd instanceof StartupSecondaryStorageCommand) || (cmd instanceof StartupStorageCommand)) {
answers[i] = new StartupAnswer(startup[i], e.toString());
break;
}
}
}
Response response = null;
if (attache != null) {
response = new Response(request, answers[0], _nodeId, attache.getId());
} else {
response = new Response(request, answers[0], _nodeId, -1);
}
attache = notifyMonitorsOfConnection(attache, startup, false);
try {
link.send(response.toBytes());
} catch (ClosedChannelException e) {
s_logger.debug("Failed to send startupanswer: " + e.toString());
return null;
}
if (attache == null) {
return null;
}
return attache;
try {
attache = notifyMonitorsOfConnection(attache, startup, false);
return attache;
} catch (ConnectionException e) {
ReadyCommand ready = new ReadyCommand(null);
ready.setDetails(e.toString());
try {
easySend(attache.getId(), ready);
} catch (Exception e1) {
s_logger.debug("Failed to send readycommand, due to " + e.toString());
}
return null;
} catch (CloudRuntimeException e) {
ReadyCommand ready = new ReadyCommand(null);
ready.setDetails(e.toString());
try {
easySend(attache.getId(), ready);
} catch (Exception e1) {
s_logger.debug("Failed to send readycommand, due to " + e.toString());
}
return null;
}
}
// protected void upgradeAgent(final Link link, final byte[] request, final
// String reason) {
//
// if (reason == UnsupportedVersionException.IncompatibleVersion) {
// final UpgradeResponse response = new UpgradeResponse(request,
// _upgradeMgr.getAgentUrl());
// try {
// s_logger.info("Asking for the agent to update due to incompatible version: "
// + response.toString());
// link.send(response.toBytes());
// } catch (final ClosedChannelException e) {
// s_logger.warn("Unable to send response due to connection closed: " +
// response.toString());
// }
// return;
// }
//
// assert (reason == UnsupportedVersionException.UnknownVersion) :
// "Unknown reason: " + reason;
// final UpgradeResponse response = new UpgradeResponse(request,
// _upgradeMgr.getAgentUrl());
// try {
// s_logger.info("Asking for the agent to update due to unknown version: " +
// response.toString());
// link.send(response.toBytes());
// } catch (final ClosedChannelException e) {
// s_logger.warn("Unable to send response due to connection closed: " +
// response.toString());
// }
// }
protected class SimulateStartTask implements Runnable {
ServerResource resource;
@ -1155,60 +1198,21 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager {
Response response = null;
if (attache == null) {
request.logD("Processing the first command ");
if (!(cmd instanceof StartupCommand)) {
s_logger.warn("Throwing away a request because it came through as the first command on a connect: " + request);
return;
}
StartupCommand startup = (StartupCommand) cmd;
// if ((_upgradeMgr.registerForUpgrade(-1, startup.getVersion())
// == UpgradeManager.State.RequiresUpdate) &&
// (_upgradeMgr.getAgentUrl() != null)) {
// final UpgradeCommand upgrade = new
// UpgradeCommand(_upgradeMgr.getAgentUrl());
// final Request req = new Request(1, -1, -1, new Command[] {
// upgrade }, true, true);
// s_logger.info("Agent requires upgrade: " + req.toString());
// try {
// link.send(req.toBytes());
// } catch (ClosedChannelException e) {
// s_logger.warn("Unable to tell agent it should update.");
// }
// return;
// }
try {
StartupCommand[] startups = new StartupCommand[cmds.length];
for (int i = 0; i < cmds.length; i++) {
startups[i] = (StartupCommand) cmds[i];
}
attache = handleConnectedAgent(link, startups);
} catch (final IllegalArgumentException e) {
_alertMgr.sendAlert(AlertManager.ALERT_TYPE_HOST, 0, new Long(0), "Agent from " + startup.getPrivateIpAddress() + " is unable to connect due to " + e.getMessage(), "Agent from "
+ startup.getPrivateIpAddress() + " is unable to connect with " + request + " because of " + e.getMessage());
s_logger.warn("Unable to create attache for agent: " + request, e);
response = new Response(request, new StartupAnswer((StartupCommand) cmd, e.getMessage()), _nodeId, -1);
} catch (ConnectionException e) {
_alertMgr.sendAlert(AlertManager.ALERT_TYPE_HOST, 0, new Long(0), "Agent from " + startup.getPrivateIpAddress() + " is unable to connect due to " + e.getMessage(), "Agent from "
+ startup.getPrivateIpAddress() + " is unable to connect with " + request + " because of " + e.getMessage());
s_logger.warn("Unable to create attache for agent: " + request, e);
response = new Response(request, new StartupAnswer((StartupCommand) cmd, e.getMessage()), _nodeId, -1);
} catch (final CloudRuntimeException e) {
_alertMgr.sendAlert(AlertManager.ALERT_TYPE_HOST, 0, new Long(0), "Agent from " + startup.getPrivateIpAddress() + " is unable to connect due to " + e.getMessage(), "Agent from "
+ startup.getPrivateIpAddress() + " is unable to connect with " + request + " because of " + e.getMessage());
s_logger.warn("Unable to create attache for agent: " + request, e);
}
if (attache == null) {
if (response == null) {
s_logger.warn("Unable to create attache for agent: " + request);
response = new Response(request, new StartupAnswer((StartupCommand) request.getCommand(), "Unable to register this agent"), _nodeId, -1);
}
try {
link.send(response.toBytes(), true);
} catch (final ClosedChannelException e) {
s_logger.warn("Response was not sent: " + response);
}
return;
}
request.logD("Processing the first command ");
if (!(cmd instanceof StartupCommand)) {
s_logger.warn("Throwing away a request because it came through as the first command on a connect: " + request);
return;
}
StartupCommand[] startups = new StartupCommand[cmds.length];
for (int i = 0; i < cmds.length; i++) {
startups[i] = (StartupCommand) cmds[i];
}
attache = handleConnectedAgent(link, startups, request);
if (attache == null) {
s_logger.warn("Unable to create attache for agent: " + request);
}
return;
}
final long hostId = attache.getId();

View File

@ -23,14 +23,18 @@ import java.util.Map;
import org.apache.log4j.Logger;
import com.cloud.agent.AgentManager;
import com.cloud.agent.Listener;
import com.cloud.agent.api.AgentControlAnswer;
import com.cloud.agent.api.AgentControlCommand;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.Command;
import com.cloud.agent.api.ModifySshKeysCommand;
import com.cloud.agent.api.StartupCommand;
import com.cloud.agent.api.StartupRoutingCommand;
import com.cloud.agent.manager.Commands;
import com.cloud.configuration.dao.ConfigurationDao;
import com.cloud.exception.AgentUnavailableException;
import com.cloud.exception.ConnectionException;
import com.cloud.host.HostVO;
import com.cloud.host.Status;
@ -42,11 +46,11 @@ import com.cloud.network.router.VirtualNetworkApplianceManager;
public class SshKeysDistriMonitor implements Listener {
private static final Logger s_logger = Logger.getLogger(SshKeysDistriMonitor.class);
private final VirtualNetworkApplianceManager _routerMgr;
AgentManager _agentMgr;
private final HostDao _hostDao;
private ConfigurationDao _configDao;
public SshKeysDistriMonitor(VirtualNetworkApplianceManager mgr, HostDao host, ConfigurationDao config) {
this._routerMgr = mgr;
public SshKeysDistriMonitor(AgentManager mgr, HostDao host, ConfigurationDao config) {
this._agentMgr = mgr;
_hostDao = host;
_configDao = config;
}
@ -81,9 +85,13 @@ public class SshKeysDistriMonitor implements Listener {
Map<String, String> configs = _configDao.getConfiguration("management-server", new HashMap<String, Object>());
String pubKey = configs.get("ssh.publickey");
String prvKey = configs.get("ssh.privatekey");
if (!_routerMgr.sendSshKeysToHost(host.getId(), pubKey, prvKey)) {
try {
ModifySshKeysCommand cmds = new ModifySshKeysCommand(pubKey, prvKey);
Commands c = new Commands(cmds);
_agentMgr.send(host.getId(), c, this);
} catch (AgentUnavailableException e) {
s_logger.debug("Failed to send keys to agent: " + host.getId());
throw new ConnectionException(true, "Unable to send keys to the agent");
}
}
}

View File

@ -639,7 +639,7 @@ public class VirtualNetworkApplianceManagerImpl implements VirtualNetworkApplian
throw new ConfigurationException("Unable to get " + UserStatisticsDao.class.getName());
}
_agentMgr.registerForHostEvents(new SshKeysDistriMonitor(this, _hostDao, _configDao), true, false, false);
_agentMgr.registerForHostEvents(new SshKeysDistriMonitor(_agentMgr, _hostDao, _configDao), true, false, false);
_itMgr.registerGuru(VirtualMachine.Type.DomainRouter, this);
boolean useLocalStorage = Boolean.parseBoolean(configs.get(Config.SystemVMUseLocalStorage.key()));