mirror of https://github.com/apache/cloudstack.git
Merge branch 'master' into api_refactoring
Getting ready for merge request for master, sync changes Conflicts: client/tomcatconf/commands.properties.in Signed-off-by: Rohit Yadav <bhaisaab@apache.org>
This commit is contained in:
commit
b720675bfe
|
|
@ -167,6 +167,7 @@ public enum Config {
|
|||
RouterStatsInterval("Advanced", NetworkManager.class, Integer.class, "router.stats.interval", "300", "Interval (in seconds) to report router statistics.", null),
|
||||
ExternalNetworkStatsInterval("Advanced", NetworkManager.class, Integer.class, "external.network.stats.interval", "300", "Interval (in seconds) to report external network statistics.", null),
|
||||
RouterCheckInterval("Advanced", NetworkManager.class, Integer.class, "router.check.interval", "30", "Interval (in seconds) to report redundant router status.", null),
|
||||
RouterCheckPoolSize("Advanced", NetworkManager.class, Integer.class, "router.check.poolsize", "10", "Numbers of threads using to check redundant router status.", null),
|
||||
RouterTemplateId("Advanced", NetworkManager.class, Long.class, "router.template.id", "1", "Default ID for template.", null),
|
||||
RouterExtraPublicNics("Advanced", NetworkManager.class, Integer.class, "router.extra.public.nics", "2", "specify extra public nics used for virtual router(up to 5)", "0-5"),
|
||||
StartRetry("Advanced", AgentManager.class, Integer.class, "start.retry", "10", "Number of times to retry create and start commands", null),
|
||||
|
|
|
|||
|
|
@ -110,4 +110,5 @@ public interface NetworkDao extends GenericDao<NetworkVO, Long> {
|
|||
|
||||
List<NetworkVO> listNetworksByAccount(long accountId, long zoneId, Network.GuestType type, boolean isSystem);
|
||||
|
||||
List<NetworkVO> listRedundantNetworks();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -103,6 +103,7 @@ public class NetworkDaoImpl extends GenericDaoBase<NetworkVO, Long> implements N
|
|||
AllFieldsSearch.and("vpcId", AllFieldsSearch.entity().getVpcId(), Op.EQ);
|
||||
SearchBuilder<NetworkOfferingVO> join1 = _ntwkOffDao.createSearchBuilder();
|
||||
join1.and("isSystem", join1.entity().isSystemOnly(), Op.EQ);
|
||||
join1.and("isRedundant", join1.entity().getRedundantRouter(), Op.EQ);
|
||||
AllFieldsSearch.join("offerings", join1, AllFieldsSearch.entity().getNetworkOfferingId(), join1.entity().getId(), JoinBuilder.JoinType.INNER);
|
||||
AllFieldsSearch.done();
|
||||
|
||||
|
|
@ -574,4 +575,12 @@ public class NetworkDaoImpl extends GenericDaoBase<NetworkVO, Long> implements N
|
|||
List<NetworkVO> networks = search(sc, null);
|
||||
return networks;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<NetworkVO> listRedundantNetworks() {
|
||||
SearchCriteria<NetworkVO> sc = AllFieldsSearch.create();
|
||||
sc.setJoinParameters("offerings", "isRedundant", true);
|
||||
|
||||
return listBy(sc, null);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -27,9 +27,14 @@ import java.util.HashSet;
|
|||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.TimeZone;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
|
@ -336,6 +341,7 @@ public class VirtualNetworkApplianceManagerImpl implements VirtualNetworkApplian
|
|||
|
||||
int _routerStatsInterval = 300;
|
||||
int _routerCheckInterval = 30;
|
||||
int _rvrStatusUpdatePoolSize = 10;
|
||||
protected ServiceOfferingVO _offering;
|
||||
private String _dnsBasicZoneUpdates = "all";
|
||||
private Set<String> _guestOSNeedGatewayOnNonDefaultNetwork = new HashSet<String>();
|
||||
|
|
@ -350,8 +356,11 @@ public class VirtualNetworkApplianceManagerImpl implements VirtualNetworkApplian
|
|||
ScheduledExecutorService _executor;
|
||||
ScheduledExecutorService _checkExecutor;
|
||||
ScheduledExecutorService _networkStatsUpdateExecutor;
|
||||
ExecutorService _rvrStatusUpdateExecutor;
|
||||
|
||||
Account _systemAcct;
|
||||
|
||||
BlockingQueue<Long> _vrUpdateQueue = null;
|
||||
|
||||
@Override
|
||||
public boolean sendSshKeysToHost(Long hostId, String pubKey, String prvKey) {
|
||||
|
|
@ -579,7 +588,7 @@ public class VirtualNetworkApplianceManagerImpl implements VirtualNetworkApplian
|
|||
_executor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("RouterMonitor"));
|
||||
_checkExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("RouterStatusMonitor"));
|
||||
_networkStatsUpdateExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("NetworkStatsUpdater"));
|
||||
|
||||
|
||||
final ComponentLocator locator = ComponentLocator.getCurrentLocator();
|
||||
|
||||
final Map<String, String> configs = _configDao.getConfiguration("AgentManager", params);
|
||||
|
|
@ -606,7 +615,18 @@ public class VirtualNetworkApplianceManagerImpl implements VirtualNetworkApplian
|
|||
|
||||
value = configs.get("router.check.interval");
|
||||
_routerCheckInterval = NumbersUtil.parseInt(value, 30);
|
||||
|
||||
|
||||
value = configs.get("router.check.poolsize");
|
||||
_rvrStatusUpdatePoolSize = NumbersUtil.parseInt(value, 10);
|
||||
|
||||
/*
|
||||
* We assume that one thread can handle 20 requests in 1 minute in normal situation, so here we give the queue size up to 50 minutes.
|
||||
* It's mostly for buffer, since each time CheckRouterTask running, it would add all the redundant networks in the queue immediately
|
||||
*/
|
||||
_vrUpdateQueue = new LinkedBlockingQueue<Long>(_rvrStatusUpdatePoolSize * 1000);
|
||||
|
||||
_rvrStatusUpdateExecutor = Executors.newFixedThreadPool(_rvrStatusUpdatePoolSize, new NamedThreadFactory("RedundantRouterStatusMonitor"));
|
||||
|
||||
_instance = configs.get("instance.name");
|
||||
if (_instance == null) {
|
||||
_instance = "DEFAULT";
|
||||
|
|
@ -704,6 +724,9 @@ public class VirtualNetworkApplianceManagerImpl implements VirtualNetworkApplian
|
|||
|
||||
if (_routerCheckInterval > 0) {
|
||||
_checkExecutor.scheduleAtFixedRate(new CheckRouterTask(), _routerCheckInterval, _routerCheckInterval, TimeUnit.SECONDS);
|
||||
for (int i = 0; i < _rvrStatusUpdatePoolSize; i++) {
|
||||
_rvrStatusUpdateExecutor.execute(new RvRStatusUpdateTask());
|
||||
}
|
||||
} else {
|
||||
s_logger.debug("router.check.interval - " + _routerCheckInterval+ " so not scheduling the redundant router checking thread");
|
||||
}
|
||||
|
|
@ -1039,14 +1062,11 @@ public class VirtualNetworkApplianceManagerImpl implements VirtualNetworkApplian
|
|||
if (host == null || host.getStatus() != Status.Up) {
|
||||
router.setRedundantState(RedundantState.UNKNOWN);
|
||||
updated = true;
|
||||
} else if (host.getManagementServerId() != ManagementServerNode.getManagementServerId()) {
|
||||
/* Only cover hosts managed by this management server */
|
||||
continue;
|
||||
} else if (privateIP != null) {
|
||||
final CheckRouterCommand command = new CheckRouterCommand();
|
||||
command.setAccessDetail(NetworkElementCommand.ROUTER_IP, getRouterControlIp(router.getId()));
|
||||
command.setAccessDetail(NetworkElementCommand.ROUTER_NAME, router.getInstanceName());
|
||||
command.setWait(60);
|
||||
command.setWait(30);
|
||||
final Answer origAnswer = _agentMgr.easySend(router.getHostId(), command);
|
||||
CheckRouterAnswer answer = null;
|
||||
if (origAnswer instanceof CheckRouterAnswer) {
|
||||
|
|
@ -1128,11 +1148,11 @@ public class VirtualNetworkApplianceManagerImpl implements VirtualNetworkApplian
|
|||
return priority;
|
||||
}
|
||||
|
||||
protected class CheckRouterTask implements Runnable {
|
||||
protected class RvRStatusUpdateTask implements Runnable {
|
||||
|
||||
public CheckRouterTask() {
|
||||
public RvRStatusUpdateTask() {
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* In order to make fail-over works well at any time, we have to ensure:
|
||||
* 1. Backup router's priority = Master's priority - DELTA + 1
|
||||
|
|
@ -1195,9 +1215,9 @@ public class VirtualNetworkApplianceManagerImpl implements VirtualNetworkApplian
|
|||
String title = "More than one redundant virtual router is in MASTER state! Router " + router.getHostName() + " and router " + dupRouter.getHostName();
|
||||
String context = "Virtual router (name: " + router.getHostName() + ", id: " + router.getId() + " and router (name: "
|
||||
+ dupRouter.getHostName() + ", id: " + router.getId() + ") are both in MASTER state! If the problem persist, restart both of routers. ";
|
||||
|
||||
_alertMgr.sendAlert(AlertManager.ALERT_TYPE_DOMAIN_ROUTER, router.getDataCenterIdToDeployIn(), router.getPodIdToDeployIn(), title, context);
|
||||
_alertMgr.sendAlert(AlertManager.ALERT_TYPE_DOMAIN_ROUTER, dupRouter.getDataCenterIdToDeployIn(), dupRouter.getPodIdToDeployIn(), title, context);
|
||||
s_logger.warn(context);
|
||||
} else {
|
||||
networkRouterMaps.put(routerGuestNtwkId, router);
|
||||
}
|
||||
|
|
@ -1206,19 +1226,61 @@ public class VirtualNetworkApplianceManagerImpl implements VirtualNetworkApplian
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while (true) {
|
||||
try {
|
||||
Long networkId = _vrUpdateQueue.take();
|
||||
List <DomainRouterVO> routers = _routerDao.listByNetworkAndRole(networkId, Role.VIRTUAL_ROUTER);
|
||||
|
||||
if (routers.size() != 2) {
|
||||
continue;
|
||||
}
|
||||
/*
|
||||
* We update the router pair which the lower id router owned by this mgmt server, in order
|
||||
* to prevent duplicate update of router status from cluster mgmt servers
|
||||
*/
|
||||
DomainRouterVO router = routers.get(0);
|
||||
if (routers.get(1).getId() < router.getId()) {
|
||||
router = routers.get(1);
|
||||
}
|
||||
HostVO host = _hostDao.findById(router.getHostId());
|
||||
if (host == null || host.getManagementServerId() == null ||
|
||||
host.getManagementServerId() != ManagementServerNode.getManagementServerId()) {
|
||||
continue;
|
||||
}
|
||||
updateRoutersRedundantState(routers);
|
||||
checkDuplicateMaster(routers);
|
||||
checkSanity(routers);
|
||||
} catch (Exception ex) {
|
||||
s_logger.error("Fail to complete the RvRStatusUpdateTask! ", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
protected class CheckRouterTask implements Runnable {
|
||||
|
||||
public CheckRouterTask() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
final List<DomainRouterVO> routers = _routerDao.listIsolatedByHostId(null);
|
||||
s_logger.debug("Found " + routers.size() + " routers. ");
|
||||
s_logger.debug("Found " + routers.size() + " routers to update status. ");
|
||||
|
||||
updateRoutersRedundantState(routers);
|
||||
updateSite2SiteVpnConnectionState(routers);
|
||||
|
||||
/* FIXME assumed the a pair of redundant routers managed by same mgmt server,
|
||||
* then the update above can get the latest status */
|
||||
checkDuplicateMaster(routers);
|
||||
checkSanity(routers);
|
||||
final List<NetworkVO> networks = _networkDao.listRedundantNetworks();
|
||||
s_logger.debug("Found " + networks.size() + " networks to update RvR status. ");
|
||||
for (NetworkVO network : networks) {
|
||||
if (!_vrUpdateQueue.offer(network.getId(), 500, TimeUnit.MILLISECONDS)) {
|
||||
s_logger.warn("Cannot insert into virtual router update queue! Adjustment of router.check.interval and router.check.poolsize maybe needed.");
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
s_logger.error("Fail to complete the CheckRouterTask! ", ex);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -351,4 +351,10 @@ public class MockNetworkDaoImpl extends GenericDaoBase<NetworkVO, Long> implemen
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<NetworkVO> listRedundantNetworks() {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -52,6 +52,8 @@ CREATE TABLE `cloud`.`template_s3_ref` (
|
|||
|
||||
INSERT IGNORE INTO `cloud`.`configuration` VALUES ('Advanced', 'DEFAULT', 'management-server', 's3.enable', 'false', 'enable s3');
|
||||
|
||||
INSERT IGNORE INTO `cloud`.`configuration` VALUES ('Advanced', 'DEFAULT', 'NetworkManager', 'router.check.poolsize' , '10', 'Numbers of threads using to check redundant router status.');
|
||||
|
||||
ALTER TABLE `cloud`.`snapshots` ADD COLUMN `s3_id` bigint unsigned COMMENT 'S3 to which this snapshot will be stored';
|
||||
|
||||
ALTER TABLE `cloud`.`snapshots` ADD CONSTRAINT `fk_snapshots__s3_id` FOREIGN KEY `fk_snapshots__s3_id` (`s3_id`) REFERENCES `s3` (`id`);
|
||||
|
|
|
|||
|
|
@ -84,6 +84,7 @@ class CloudMonkeyShell(cmd.Cmd, object):
|
|||
config = self.write_config()
|
||||
print "Welcome! Using `set` configure the necessary settings:"
|
||||
print " ".join(sorted(self.config_fields.keys()))
|
||||
print "Config file:", self.config_file
|
||||
print "For debugging, tail -f", self.log_file, "\n"
|
||||
|
||||
for key in self.config_fields.keys():
|
||||
|
|
@ -204,7 +205,7 @@ class CloudMonkeyShell(cmd.Cmd, object):
|
|||
|
||||
def print_result_as_dict(result, result_filter=None):
|
||||
for key in sorted(result.keys(),
|
||||
key=lambda x: x!='id' and x!='count' and x):
|
||||
key=lambda x: x != 'id' and x != 'count' and x):
|
||||
if not (isinstance(result[key], list) or
|
||||
isinstance(result[key], dict)):
|
||||
self.print_shell("%s = %s" % (key, result[key]))
|
||||
|
|
@ -267,8 +268,8 @@ class CloudMonkeyShell(cmd.Cmd, object):
|
|||
requests = {'jobid': jobId}
|
||||
timeout = int(self.timeout)
|
||||
pollperiod = 3
|
||||
progress = 1
|
||||
while timeout > 0:
|
||||
progress = int((int(self.timeout) - timeout) / pollperiod ) + 1
|
||||
print '\r' + '.' * progress,
|
||||
sys.stdout.flush()
|
||||
response = process_json(conn.make_request_with_auth(command,
|
||||
|
|
@ -290,6 +291,7 @@ class CloudMonkeyShell(cmd.Cmd, object):
|
|||
return response
|
||||
time.sleep(pollperiod)
|
||||
timeout = timeout - pollperiod
|
||||
progress += 1
|
||||
logger.debug("job: %s to timeout in %ds" % (jobId, timeout))
|
||||
self.print_shell("Error:", "Async query timeout for jobid=", jobId)
|
||||
|
||||
|
|
@ -363,30 +365,6 @@ class CloudMonkeyShell(cmd.Cmd, object):
|
|||
except Exception as e:
|
||||
self.print_shell("🙈 Error on parsing and printing", e)
|
||||
|
||||
def cache_verb_miss(self, verb):
|
||||
self.print_shell("Oops: Verb %s should have been precached" % verb)
|
||||
completions_found = filter(lambda x: x.startswith(verb), completions)
|
||||
self.cache_verbs[verb] = {}
|
||||
for api_name in completions_found:
|
||||
api_cmd_str = "%sCmd" % api_name
|
||||
api_mod = self.get_api_module(api_name, [api_cmd_str])
|
||||
if api_mod is None:
|
||||
continue
|
||||
try:
|
||||
api_cmd = getattr(api_mod, api_cmd_str)()
|
||||
required = api_cmd.required
|
||||
doc = api_mod.__doc__
|
||||
except AttributeError, e:
|
||||
self.print_shell("Error: API attribute %s not found!" % e)
|
||||
params = filter(lambda x: '__' not in x and 'required' not in x,
|
||||
dir(api_cmd))
|
||||
if len(required) > 0:
|
||||
doc += "\nRequired args: %s" % " ".join(required)
|
||||
doc += "\nArgs: %s" % " ".join(params)
|
||||
api_name_lower = api_name.replace(verb, '').lower()
|
||||
self.cache_verbs[verb][api_name_lower] = [api_name, params, doc,
|
||||
required]
|
||||
|
||||
def completedefault(self, text, line, begidx, endidx):
|
||||
partitions = line.partition(" ")
|
||||
verb = partitions[0]
|
||||
|
|
@ -401,9 +379,6 @@ class CloudMonkeyShell(cmd.Cmd, object):
|
|||
autocompletions = []
|
||||
search_string = ""
|
||||
|
||||
if verb not in self.cache_verbs:
|
||||
self.cache_verb_miss(verb)
|
||||
|
||||
if separator != " ": # Complete verb subjects
|
||||
autocompletions = self.cache_verbs[verb].keys()
|
||||
search_string = subject
|
||||
|
|
@ -445,8 +420,8 @@ class CloudMonkeyShell(cmd.Cmd, object):
|
|||
"""
|
||||
args = args.strip().partition(" ")
|
||||
key, value = (args[0], args[2])
|
||||
# Note: keys and class attributes should have same names
|
||||
setattr(self, key, value)
|
||||
setattr(self, key, value) # keys and attributes should have same names
|
||||
self.prompt += " " # prompt fix
|
||||
self.write_config()
|
||||
|
||||
def complete_set(self, text, line, begidx, endidx):
|
||||
|
|
@ -484,8 +459,6 @@ class CloudMonkeyShell(cmd.Cmd, object):
|
|||
else:
|
||||
verb = fields[0]
|
||||
subject = fields[2].partition(" ")[0]
|
||||
if verb not in self.cache_verbs:
|
||||
self.cache_verb_miss(verb)
|
||||
|
||||
if subject in self.cache_verbs[verb]:
|
||||
self.print_shell(self.cache_verbs[verb][subject][2])
|
||||
|
|
@ -537,8 +510,6 @@ def main():
|
|||
prog_name = "python " + prog_name
|
||||
self.do_shell("%s %s %s" % (prog_name, rule, args))
|
||||
return
|
||||
if not rule in self.cache_verbs:
|
||||
self.cache_verb_miss(rule)
|
||||
try:
|
||||
args_partition = args.partition(" ")
|
||||
res = self.cache_verbs[rule][args_partition[0]]
|
||||
|
|
|
|||
Loading…
Reference in New Issue