diff --git a/server/src/com/cloud/configuration/Config.java b/server/src/com/cloud/configuration/Config.java index ea32025ba32..92313ea3b79 100755 --- a/server/src/com/cloud/configuration/Config.java +++ b/server/src/com/cloud/configuration/Config.java @@ -167,6 +167,7 @@ public enum Config { RouterStatsInterval("Advanced", NetworkManager.class, Integer.class, "router.stats.interval", "300", "Interval (in seconds) to report router statistics.", null), ExternalNetworkStatsInterval("Advanced", NetworkManager.class, Integer.class, "external.network.stats.interval", "300", "Interval (in seconds) to report external network statistics.", null), RouterCheckInterval("Advanced", NetworkManager.class, Integer.class, "router.check.interval", "30", "Interval (in seconds) to report redundant router status.", null), + RouterCheckPoolSize("Advanced", NetworkManager.class, Integer.class, "router.check.poolsize", "10", "Numbers of threads using to check redundant router status.", null), RouterTemplateId("Advanced", NetworkManager.class, Long.class, "router.template.id", "1", "Default ID for template.", null), RouterExtraPublicNics("Advanced", NetworkManager.class, Integer.class, "router.extra.public.nics", "2", "specify extra public nics used for virtual router(up to 5)", "0-5"), StartRetry("Advanced", AgentManager.class, Integer.class, "start.retry", "10", "Number of times to retry create and start commands", null), diff --git a/server/src/com/cloud/network/dao/NetworkDao.java b/server/src/com/cloud/network/dao/NetworkDao.java index 4079955c2e9..1fefb75a360 100644 --- a/server/src/com/cloud/network/dao/NetworkDao.java +++ b/server/src/com/cloud/network/dao/NetworkDao.java @@ -110,4 +110,5 @@ public interface NetworkDao extends GenericDao { List listNetworksByAccount(long accountId, long zoneId, Network.GuestType type, boolean isSystem); + List listRedundantNetworks(); } diff --git a/server/src/com/cloud/network/dao/NetworkDaoImpl.java b/server/src/com/cloud/network/dao/NetworkDaoImpl.java index 8228393f93f..27ae3a3c7c6 100644 --- a/server/src/com/cloud/network/dao/NetworkDaoImpl.java +++ b/server/src/com/cloud/network/dao/NetworkDaoImpl.java @@ -103,6 +103,7 @@ public class NetworkDaoImpl extends GenericDaoBase implements N AllFieldsSearch.and("vpcId", AllFieldsSearch.entity().getVpcId(), Op.EQ); SearchBuilder join1 = _ntwkOffDao.createSearchBuilder(); join1.and("isSystem", join1.entity().isSystemOnly(), Op.EQ); + join1.and("isRedundant", join1.entity().getRedundantRouter(), Op.EQ); AllFieldsSearch.join("offerings", join1, AllFieldsSearch.entity().getNetworkOfferingId(), join1.entity().getId(), JoinBuilder.JoinType.INNER); AllFieldsSearch.done(); @@ -574,4 +575,12 @@ public class NetworkDaoImpl extends GenericDaoBase implements N List networks = search(sc, null); return networks; } + + @Override + public List listRedundantNetworks() { + SearchCriteria sc = AllFieldsSearch.create(); + sc.setJoinParameters("offerings", "isRedundant", true); + + return listBy(sc, null); + } } diff --git a/server/src/com/cloud/network/router/VirtualNetworkApplianceManagerImpl.java b/server/src/com/cloud/network/router/VirtualNetworkApplianceManagerImpl.java index 8cb03d9dff5..1a6eb0982a9 100755 --- a/server/src/com/cloud/network/router/VirtualNetworkApplianceManagerImpl.java +++ b/server/src/com/cloud/network/router/VirtualNetworkApplianceManagerImpl.java @@ -27,9 +27,14 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.TimeZone; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -336,6 +341,7 @@ public class VirtualNetworkApplianceManagerImpl implements VirtualNetworkApplian int _routerStatsInterval = 300; int _routerCheckInterval = 30; + int _rvrStatusUpdatePoolSize = 10; protected ServiceOfferingVO _offering; private String _dnsBasicZoneUpdates = "all"; private Set _guestOSNeedGatewayOnNonDefaultNetwork = new HashSet(); @@ -350,8 +356,11 @@ public class VirtualNetworkApplianceManagerImpl implements VirtualNetworkApplian ScheduledExecutorService _executor; ScheduledExecutorService _checkExecutor; ScheduledExecutorService _networkStatsUpdateExecutor; + ExecutorService _rvrStatusUpdateExecutor; Account _systemAcct; + + BlockingQueue _vrUpdateQueue = null; @Override public boolean sendSshKeysToHost(Long hostId, String pubKey, String prvKey) { @@ -579,7 +588,7 @@ public class VirtualNetworkApplianceManagerImpl implements VirtualNetworkApplian _executor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("RouterMonitor")); _checkExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("RouterStatusMonitor")); _networkStatsUpdateExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("NetworkStatsUpdater")); - + final ComponentLocator locator = ComponentLocator.getCurrentLocator(); final Map configs = _configDao.getConfiguration("AgentManager", params); @@ -606,7 +615,18 @@ public class VirtualNetworkApplianceManagerImpl implements VirtualNetworkApplian value = configs.get("router.check.interval"); _routerCheckInterval = NumbersUtil.parseInt(value, 30); - + + value = configs.get("router.check.poolsize"); + _rvrStatusUpdatePoolSize = NumbersUtil.parseInt(value, 10); + + /* + * We assume that one thread can handle 20 requests in 1 minute in normal situation, so here we give the queue size up to 50 minutes. + * It's mostly for buffer, since each time CheckRouterTask running, it would add all the redundant networks in the queue immediately + */ + _vrUpdateQueue = new LinkedBlockingQueue(_rvrStatusUpdatePoolSize * 1000); + + _rvrStatusUpdateExecutor = Executors.newFixedThreadPool(_rvrStatusUpdatePoolSize, new NamedThreadFactory("RedundantRouterStatusMonitor")); + _instance = configs.get("instance.name"); if (_instance == null) { _instance = "DEFAULT"; @@ -704,6 +724,9 @@ public class VirtualNetworkApplianceManagerImpl implements VirtualNetworkApplian if (_routerCheckInterval > 0) { _checkExecutor.scheduleAtFixedRate(new CheckRouterTask(), _routerCheckInterval, _routerCheckInterval, TimeUnit.SECONDS); + for (int i = 0; i < _rvrStatusUpdatePoolSize; i++) { + _rvrStatusUpdateExecutor.execute(new RvRStatusUpdateTask()); + } } else { s_logger.debug("router.check.interval - " + _routerCheckInterval+ " so not scheduling the redundant router checking thread"); } @@ -1039,14 +1062,11 @@ public class VirtualNetworkApplianceManagerImpl implements VirtualNetworkApplian if (host == null || host.getStatus() != Status.Up) { router.setRedundantState(RedundantState.UNKNOWN); updated = true; - } else if (host.getManagementServerId() != ManagementServerNode.getManagementServerId()) { - /* Only cover hosts managed by this management server */ - continue; } else if (privateIP != null) { final CheckRouterCommand command = new CheckRouterCommand(); command.setAccessDetail(NetworkElementCommand.ROUTER_IP, getRouterControlIp(router.getId())); command.setAccessDetail(NetworkElementCommand.ROUTER_NAME, router.getInstanceName()); - command.setWait(60); + command.setWait(30); final Answer origAnswer = _agentMgr.easySend(router.getHostId(), command); CheckRouterAnswer answer = null; if (origAnswer instanceof CheckRouterAnswer) { @@ -1128,11 +1148,11 @@ public class VirtualNetworkApplianceManagerImpl implements VirtualNetworkApplian return priority; } - protected class CheckRouterTask implements Runnable { + protected class RvRStatusUpdateTask implements Runnable { - public CheckRouterTask() { + public RvRStatusUpdateTask() { } - + /* * In order to make fail-over works well at any time, we have to ensure: * 1. Backup router's priority = Master's priority - DELTA + 1 @@ -1195,9 +1215,9 @@ public class VirtualNetworkApplianceManagerImpl implements VirtualNetworkApplian String title = "More than one redundant virtual router is in MASTER state! Router " + router.getHostName() + " and router " + dupRouter.getHostName(); String context = "Virtual router (name: " + router.getHostName() + ", id: " + router.getId() + " and router (name: " + dupRouter.getHostName() + ", id: " + router.getId() + ") are both in MASTER state! If the problem persist, restart both of routers. "; - _alertMgr.sendAlert(AlertManager.ALERT_TYPE_DOMAIN_ROUTER, router.getDataCenterIdToDeployIn(), router.getPodIdToDeployIn(), title, context); _alertMgr.sendAlert(AlertManager.ALERT_TYPE_DOMAIN_ROUTER, dupRouter.getDataCenterIdToDeployIn(), dupRouter.getPodIdToDeployIn(), title, context); + s_logger.warn(context); } else { networkRouterMaps.put(routerGuestNtwkId, router); } @@ -1206,19 +1226,61 @@ public class VirtualNetworkApplianceManagerImpl implements VirtualNetworkApplian } } + @Override + public void run() { + while (true) { + try { + Long networkId = _vrUpdateQueue.take(); + List routers = _routerDao.listByNetworkAndRole(networkId, Role.VIRTUAL_ROUTER); + + if (routers.size() != 2) { + continue; + } + /* + * We update the router pair which the lower id router owned by this mgmt server, in order + * to prevent duplicate update of router status from cluster mgmt servers + */ + DomainRouterVO router = routers.get(0); + if (routers.get(1).getId() < router.getId()) { + router = routers.get(1); + } + HostVO host = _hostDao.findById(router.getHostId()); + if (host == null || host.getManagementServerId() == null || + host.getManagementServerId() != ManagementServerNode.getManagementServerId()) { + continue; + } + updateRoutersRedundantState(routers); + checkDuplicateMaster(routers); + checkSanity(routers); + } catch (Exception ex) { + s_logger.error("Fail to complete the RvRStatusUpdateTask! ", ex); + } + } + } + + } + + protected class CheckRouterTask implements Runnable { + + public CheckRouterTask() { + } + @Override public void run() { try { final List routers = _routerDao.listIsolatedByHostId(null); - s_logger.debug("Found " + routers.size() + " routers. "); + s_logger.debug("Found " + routers.size() + " routers to update status. "); - updateRoutersRedundantState(routers); updateSite2SiteVpnConnectionState(routers); - /* FIXME assumed the a pair of redundant routers managed by same mgmt server, - * then the update above can get the latest status */ - checkDuplicateMaster(routers); - checkSanity(routers); + final List networks = _networkDao.listRedundantNetworks(); + s_logger.debug("Found " + networks.size() + " networks to update RvR status. "); + for (NetworkVO network : networks) { + if (!_vrUpdateQueue.offer(network.getId(), 500, TimeUnit.MILLISECONDS)) { + s_logger.warn("Cannot insert into virtual router update queue! Adjustment of router.check.interval and router.check.poolsize maybe needed."); + break; + } + } } catch (Exception ex) { s_logger.error("Fail to complete the CheckRouterTask! ", ex); } diff --git a/server/test/com/cloud/vpc/dao/MockNetworkDaoImpl.java b/server/test/com/cloud/vpc/dao/MockNetworkDaoImpl.java index 509d9c72f73..7c9a5823516 100644 --- a/server/test/com/cloud/vpc/dao/MockNetworkDaoImpl.java +++ b/server/test/com/cloud/vpc/dao/MockNetworkDaoImpl.java @@ -351,4 +351,10 @@ public class MockNetworkDaoImpl extends GenericDaoBase implemen return null; } + @Override + public List listRedundantNetworks() { + // TODO Auto-generated method stub + return null; + } + } diff --git a/setup/db/db/schema-40to410.sql b/setup/db/db/schema-40to410.sql index a66fd7a691b..64370528be3 100644 --- a/setup/db/db/schema-40to410.sql +++ b/setup/db/db/schema-40to410.sql @@ -52,6 +52,8 @@ CREATE TABLE `cloud`.`template_s3_ref` ( INSERT IGNORE INTO `cloud`.`configuration` VALUES ('Advanced', 'DEFAULT', 'management-server', 's3.enable', 'false', 'enable s3'); +INSERT IGNORE INTO `cloud`.`configuration` VALUES ('Advanced', 'DEFAULT', 'NetworkManager', 'router.check.poolsize' , '10', 'Numbers of threads using to check redundant router status.'); + ALTER TABLE `cloud`.`snapshots` ADD COLUMN `s3_id` bigint unsigned COMMENT 'S3 to which this snapshot will be stored'; ALTER TABLE `cloud`.`snapshots` ADD CONSTRAINT `fk_snapshots__s3_id` FOREIGN KEY `fk_snapshots__s3_id` (`s3_id`) REFERENCES `s3` (`id`); diff --git a/tools/cli/cloudmonkey/cloudmonkey.py b/tools/cli/cloudmonkey/cloudmonkey.py index 3bc87169f63..a573972481d 100644 --- a/tools/cli/cloudmonkey/cloudmonkey.py +++ b/tools/cli/cloudmonkey/cloudmonkey.py @@ -84,6 +84,7 @@ class CloudMonkeyShell(cmd.Cmd, object): config = self.write_config() print "Welcome! Using `set` configure the necessary settings:" print " ".join(sorted(self.config_fields.keys())) + print "Config file:", self.config_file print "For debugging, tail -f", self.log_file, "\n" for key in self.config_fields.keys(): @@ -204,7 +205,7 @@ class CloudMonkeyShell(cmd.Cmd, object): def print_result_as_dict(result, result_filter=None): for key in sorted(result.keys(), - key=lambda x: x!='id' and x!='count' and x): + key=lambda x: x != 'id' and x != 'count' and x): if not (isinstance(result[key], list) or isinstance(result[key], dict)): self.print_shell("%s = %s" % (key, result[key])) @@ -267,8 +268,8 @@ class CloudMonkeyShell(cmd.Cmd, object): requests = {'jobid': jobId} timeout = int(self.timeout) pollperiod = 3 + progress = 1 while timeout > 0: - progress = int((int(self.timeout) - timeout) / pollperiod ) + 1 print '\r' + '.' * progress, sys.stdout.flush() response = process_json(conn.make_request_with_auth(command, @@ -290,6 +291,7 @@ class CloudMonkeyShell(cmd.Cmd, object): return response time.sleep(pollperiod) timeout = timeout - pollperiod + progress += 1 logger.debug("job: %s to timeout in %ds" % (jobId, timeout)) self.print_shell("Error:", "Async query timeout for jobid=", jobId) @@ -363,30 +365,6 @@ class CloudMonkeyShell(cmd.Cmd, object): except Exception as e: self.print_shell("🙈 Error on parsing and printing", e) - def cache_verb_miss(self, verb): - self.print_shell("Oops: Verb %s should have been precached" % verb) - completions_found = filter(lambda x: x.startswith(verb), completions) - self.cache_verbs[verb] = {} - for api_name in completions_found: - api_cmd_str = "%sCmd" % api_name - api_mod = self.get_api_module(api_name, [api_cmd_str]) - if api_mod is None: - continue - try: - api_cmd = getattr(api_mod, api_cmd_str)() - required = api_cmd.required - doc = api_mod.__doc__ - except AttributeError, e: - self.print_shell("Error: API attribute %s not found!" % e) - params = filter(lambda x: '__' not in x and 'required' not in x, - dir(api_cmd)) - if len(required) > 0: - doc += "\nRequired args: %s" % " ".join(required) - doc += "\nArgs: %s" % " ".join(params) - api_name_lower = api_name.replace(verb, '').lower() - self.cache_verbs[verb][api_name_lower] = [api_name, params, doc, - required] - def completedefault(self, text, line, begidx, endidx): partitions = line.partition(" ") verb = partitions[0] @@ -401,9 +379,6 @@ class CloudMonkeyShell(cmd.Cmd, object): autocompletions = [] search_string = "" - if verb not in self.cache_verbs: - self.cache_verb_miss(verb) - if separator != " ": # Complete verb subjects autocompletions = self.cache_verbs[verb].keys() search_string = subject @@ -445,8 +420,8 @@ class CloudMonkeyShell(cmd.Cmd, object): """ args = args.strip().partition(" ") key, value = (args[0], args[2]) - # Note: keys and class attributes should have same names - setattr(self, key, value) + setattr(self, key, value) # keys and attributes should have same names + self.prompt += " " # prompt fix self.write_config() def complete_set(self, text, line, begidx, endidx): @@ -484,8 +459,6 @@ class CloudMonkeyShell(cmd.Cmd, object): else: verb = fields[0] subject = fields[2].partition(" ")[0] - if verb not in self.cache_verbs: - self.cache_verb_miss(verb) if subject in self.cache_verbs[verb]: self.print_shell(self.cache_verbs[verb][subject][2]) @@ -537,8 +510,6 @@ def main(): prog_name = "python " + prog_name self.do_shell("%s %s %s" % (prog_name, rule, args)) return - if not rule in self.cache_verbs: - self.cache_verb_miss(rule) try: args_partition = args.partition(" ") res = self.cache_verbs[rule][args_partition[0]]