mirror of https://github.com/apache/cloudstack.git
1674 lines
68 KiB
Python
1674 lines
68 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
""" Test for IPv4 Routed mode"""
|
|
import datetime
|
|
import logging
|
|
import random
|
|
import time
|
|
|
|
from marvin.cloudstackTestCase import cloudstackTestCase
|
|
from marvin.lib.base import ZoneIpv4Subnet, Domain, Account, ServiceOffering, NetworkOffering, VpcOffering, Network, \
|
|
Ipv4SubnetForGuestNetwork, VirtualMachine, VPC, NetworkACLList, NetworkACL, RoutingFirewallRule, Template, ASNRange, \
|
|
BgpPeer, Router
|
|
from marvin.lib.common import get_domain, get_zone, list_routers, list_hosts
|
|
from marvin.lib.utils import get_host_credentials, get_process_status
|
|
|
|
from nose.plugins.attrib import attr
|
|
|
|
ICMPv4_ALL_TYPES = ("{ echo-reply, destination-unreachable, source-quench, redirect, echo-request, "
|
|
"router-advertisement, router-solicitation, time-exceeded, parameter-problem, timestamp-request, "
|
|
"timestamp-reply, info-request, info-reply, address-mask-request, address-mask-reply }")
|
|
SUBNET_PREFIX = "172.30."
|
|
SUBNET_1_PREFIX = SUBNET_PREFIX + str(random.randrange(100, 150))
|
|
SUBNET_2_PREFIX = SUBNET_PREFIX + str(random.randrange(151, 199))
|
|
|
|
VPC_CIDR_PREFIX = "172.31" # .0 to .16
|
|
NETWORK_CIDR_PREFIX = VPC_CIDR_PREFIX + ".100"
|
|
NETWORK_CIDR_PREFIX_DYNAMIC = VPC_CIDR_PREFIX + ".101"
|
|
|
|
MAX_RETRIES = 30
|
|
WAIT_INTERVAL = 5
|
|
|
|
test_network = None
|
|
test_network_vm = None
|
|
test_vpc = None
|
|
test_vpc_tier = None
|
|
test_vpc_vm = None
|
|
test_network_acl = None
|
|
|
|
START_ASN = 888800
|
|
END_ASN = 888888
|
|
ASN_1 = 900100 + random.randrange(1, 200)
|
|
ASN_2 = 900301 + random.randrange(0, 200)
|
|
IP4_ADDR_1 = "10.0.53.10"
|
|
IP4_ADDR_2 = "10.0.53.11"
|
|
PASSWORD_1 = "testpassword1"
|
|
PASSWORD_2 = "testpassword2"
|
|
|
|
NETWORK_OFFERING = {
|
|
"name": "Test Network offering - Routed mode",
|
|
"displaytext": "Test Network offering - Routed mode",
|
|
"networkmode": "ROUTED",
|
|
"guestiptype": "Isolated",
|
|
"supportedservices":
|
|
"Dhcp,Dns,UserData,Firewall",
|
|
"traffictype": "GUEST",
|
|
"availability": "Optional",
|
|
"egress_policy": "true",
|
|
"serviceProviderList": {
|
|
"Dhcp": "VirtualRouter",
|
|
"Dns": "VirtualRouter",
|
|
"UserData": "VirtualRouter",
|
|
"Firewall": "VirtualRouter"
|
|
}
|
|
}
|
|
|
|
VPC_OFFERING = {
|
|
"name": "Test VPC offering - Routed mode",
|
|
"displaytext": "Test VPC offering - Routed mode",
|
|
"networkmode": "ROUTED",
|
|
"supportedservices":
|
|
"Dhcp,Dns,UserData,NetworkACL"
|
|
}
|
|
|
|
VPC_NETWORK_OFFERING = {
|
|
"name": "Test VPC Network offering - Routed mode",
|
|
"displaytext": "Test VPC Network offering - Routed mode",
|
|
"networkmode": "ROUTED",
|
|
"guestiptype": "Isolated",
|
|
"supportedservices":
|
|
"Dhcp,Dns,UserData,NetworkACL",
|
|
"traffictype": "GUEST",
|
|
"availability": "Optional",
|
|
"serviceProviderList": {
|
|
"Dhcp": "VpcVirtualRouter",
|
|
"Dns": "VpcVirtualRouter",
|
|
"UserData": "VpcVirtualRouter",
|
|
"NetworkACL": "VpcVirtualRouter"
|
|
}
|
|
}
|
|
|
|
NETWORK_OFFERING_DYNAMIC = {
|
|
"name": "Test Network offering - Dynamic Routed mode",
|
|
"displaytext": "Test Network offering - Dynamic Routed mode",
|
|
"networkmode": "ROUTED",
|
|
"routingmode": "Dynamic",
|
|
"guestiptype": "Isolated",
|
|
"supportedservices":
|
|
"Dhcp,Dns,UserData,Firewall",
|
|
"traffictype": "GUEST",
|
|
"availability": "Optional",
|
|
"egress_policy": "true",
|
|
"serviceProviderList": {
|
|
"Dhcp": "VirtualRouter",
|
|
"Dns": "VirtualRouter",
|
|
"UserData": "VirtualRouter",
|
|
"Firewall": "VirtualRouter"
|
|
}
|
|
}
|
|
|
|
VPC_OFFERING_DYNAMIC = {
|
|
"name": "Test VPC offering - Routed mode",
|
|
"displaytext": "Test VPC offering - Routed mode",
|
|
"networkmode": "ROUTED",
|
|
"routingmode": "Dynamic",
|
|
"supportedservices":
|
|
"Dhcp,Dns,UserData,NetworkACL"
|
|
}
|
|
|
|
VPC_NETWORK_OFFERING_DYNAMIC = {
|
|
"name": "Test VPC Network offering - Dynamic Routed mode",
|
|
"displaytext": "Test VPC Network offering - Dynamic Routed mode",
|
|
"networkmode": "ROUTED",
|
|
"routingmode": "Dynamic",
|
|
"guestiptype": "Isolated",
|
|
"supportedservices":
|
|
"Dhcp,Dns,UserData,NetworkACL",
|
|
"traffictype": "GUEST",
|
|
"availability": "Optional",
|
|
"serviceProviderList": {
|
|
"Dhcp": "VpcVirtualRouter",
|
|
"Dns": "VpcVirtualRouter",
|
|
"UserData": "VpcVirtualRouter",
|
|
"NetworkACL": "VpcVirtualRouter"
|
|
}
|
|
}
|
|
class TestIpv4Routing(cloudstackTestCase):
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
testdata = super(TestIpv4Routing, cls).getClsTestClient()
|
|
cls.services = testdata.getParsedTestDataConfig()
|
|
cls.apiclient = testdata.getApiClient()
|
|
cls.dbclient = testdata.getDbConnection()
|
|
cls.hypervisor = testdata.getHypervisorInfo()
|
|
cls.domain = get_domain(cls.apiclient)
|
|
cls.zone = get_zone(cls.apiclient)
|
|
|
|
cls._cleanup = []
|
|
|
|
cls.logger = logging.getLogger("TestIpv4Routing")
|
|
cls.stream_handler = logging.StreamHandler()
|
|
cls.logger.setLevel(logging.DEBUG)
|
|
cls.logger.addHandler(cls.stream_handler)
|
|
|
|
# 0. register template
|
|
cls.template = Template.register(cls.apiclient, cls.services["test_templates"][cls.hypervisor.lower()],
|
|
zoneid=cls.zone.id, hypervisor=cls.hypervisor.lower())
|
|
cls.template.download(cls.apiclient)
|
|
cls._cleanup.append(cls.template)
|
|
|
|
# 1.1 create subnet for zone
|
|
cls.subnet_1 = ZoneIpv4Subnet.create(
|
|
cls.apiclient,
|
|
zoneid=cls.zone.id,
|
|
subnet=SUBNET_1_PREFIX + ".0/24"
|
|
)
|
|
cls._cleanup.append(cls.subnet_1)
|
|
|
|
# 1.2 create ASN range for zone
|
|
cls.asnrange = ASNRange.create(
|
|
cls.apiclient,
|
|
zoneid=cls.zone.id,
|
|
startasn=START_ASN,
|
|
endasn=END_ASN
|
|
)
|
|
cls._cleanup.append(cls.asnrange)
|
|
|
|
# 2. Create small service offering
|
|
cls.service_offering = ServiceOffering.create(
|
|
cls.apiclient,
|
|
cls.services["service_offerings"]["small"]
|
|
)
|
|
cls._cleanup.append(cls.service_offering)
|
|
|
|
# 3. Create network and vpc offering with routed mode
|
|
# 3.1 Network offering for static routing
|
|
cls.network_offering_isolated = NetworkOffering.create(
|
|
cls.apiclient,
|
|
NETWORK_OFFERING
|
|
)
|
|
cls._cleanup.append(cls.network_offering_isolated)
|
|
cls.network_offering_isolated.update(cls.apiclient, state='Enabled')
|
|
|
|
# 3.2 VPC offering for static routing
|
|
cls.vpc_offering = VpcOffering.create(
|
|
cls.apiclient,
|
|
VPC_OFFERING
|
|
)
|
|
cls._cleanup.append(cls.vpc_offering)
|
|
cls.vpc_offering.update(cls.apiclient, state='Enabled')
|
|
|
|
# 3.3 VPC tier offering for static routing
|
|
cls.vpc_network_offering = NetworkOffering.create(
|
|
cls.apiclient,
|
|
VPC_NETWORK_OFFERING
|
|
)
|
|
cls._cleanup.append(cls.vpc_network_offering)
|
|
cls.vpc_network_offering.update(cls.apiclient, state='Enabled')
|
|
|
|
# 3.4 Network offering for dynamic routing
|
|
cls.network_offering_dynamic = NetworkOffering.create(
|
|
cls.apiclient,
|
|
NETWORK_OFFERING_DYNAMIC
|
|
)
|
|
cls._cleanup.append(cls.network_offering_dynamic)
|
|
cls.network_offering_dynamic.update(cls.apiclient, state='Enabled')
|
|
|
|
# 3.5 VPC Network offering for dynamic routing
|
|
cls.vpc_network_offering_dynamic = NetworkOffering.create(
|
|
cls.apiclient,
|
|
VPC_NETWORK_OFFERING_DYNAMIC
|
|
)
|
|
cls._cleanup.append(cls.vpc_network_offering_dynamic)
|
|
cls.vpc_network_offering_dynamic.update(cls.apiclient, state='Enabled')
|
|
|
|
# 4. Create sub-domain
|
|
cls.sub_domain = Domain.create(
|
|
cls.apiclient,
|
|
cls.services["acl"]["domain1"]
|
|
)
|
|
cls._cleanup.append(cls.sub_domain)
|
|
|
|
# 5. Create regular user
|
|
cls.regular_user = Account.create(
|
|
cls.apiclient,
|
|
cls.services["acl"]["accountD11A"],
|
|
domainid=cls.sub_domain.id
|
|
)
|
|
cls._cleanup.append(cls.regular_user)
|
|
|
|
# 6. Create api clients for regular user
|
|
cls.regular_user_user = cls.regular_user.user[0]
|
|
cls.regular_user_apiclient = cls.testClient.getUserApiClient(
|
|
cls.regular_user_user.username, cls.sub_domain.name
|
|
)
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
super(TestIpv4Routing, cls).tearDownClass()
|
|
|
|
@classmethod
|
|
def message(cls, msg):
|
|
cls.logger.debug("====== " + str(datetime.datetime.now()) + " " + msg + " ======")
|
|
|
|
def setUp(self):
|
|
self.apiclient = self.testClient.getApiClient()
|
|
self.cleanup = []
|
|
|
|
def tearDown(self):
|
|
super(TestIpv4Routing, self).tearDown()
|
|
|
|
def get_router(self, networkid=None, vpcid=None):
|
|
# list router
|
|
if vpcid:
|
|
list_router_response = list_routers(
|
|
self.apiclient,
|
|
vpcid=vpcid,
|
|
listall="true"
|
|
)
|
|
else:
|
|
list_router_response = list_routers(
|
|
self.apiclient,
|
|
networkid=networkid,
|
|
listall="true"
|
|
)
|
|
self.assertEqual(
|
|
isinstance(list_router_response, list),
|
|
True,
|
|
"list routers response should return a valid list"
|
|
)
|
|
router = list_router_response[0]
|
|
return router
|
|
|
|
def run_command_in_router(self, router, command):
|
|
# get host of router
|
|
hosts = list_hosts(
|
|
self.apiclient,
|
|
zoneid=router.zoneid,
|
|
type='Routing',
|
|
state='Up',
|
|
id=router.hostid
|
|
)
|
|
self.assertEqual(
|
|
isinstance(hosts, list),
|
|
True,
|
|
"Check list host returns a valid list"
|
|
)
|
|
host = hosts[0]
|
|
|
|
# run command
|
|
result = ''
|
|
if router.hypervisor.lower() in ('vmware', 'hyperv'):
|
|
result = get_process_status(
|
|
self.apiclient.connection.mgtSvr,
|
|
22,
|
|
self.apiclient.connection.user,
|
|
self.apiclient.connection.passwd,
|
|
router.linklocalip,
|
|
command,
|
|
hypervisor=router.hypervisor
|
|
)
|
|
else:
|
|
try:
|
|
host.user, host.passwd = get_host_credentials(self.config, host.ipaddress)
|
|
result = get_process_status(
|
|
host.ipaddress,
|
|
22,
|
|
host.user,
|
|
host.passwd,
|
|
router.linklocalip,
|
|
command
|
|
)
|
|
except KeyError:
|
|
self.skipTest("Marvin configuration has no host credentials to check router services")
|
|
res = str(result)
|
|
self.message("VR command (%s) result: (%s)" % (command, res))
|
|
return res
|
|
|
|
def rebootRouter(self, router):
|
|
try:
|
|
Router.reboot(
|
|
self.apiclient,
|
|
id=router.id
|
|
)
|
|
except Exception as e:
|
|
self.fail("Failed to reboot the virtual router: %s, %s" % (router.id, e))
|
|
|
|
def createNetworkAclRule(self, rule):
|
|
return NetworkACL.create(self.apiclient,
|
|
services=rule,
|
|
aclid=test_network_acl.id)
|
|
|
|
def createIpv4RoutingFirewallRule(self, rule):
|
|
return RoutingFirewallRule.create(self.apiclient,
|
|
services=rule,
|
|
networkid=test_network.id)
|
|
|
|
def verifyNftablesRulesInRouter(self, router, rules):
|
|
if router.vpcid:
|
|
table = "ip4_acl"
|
|
else:
|
|
table = "ip4_firewall"
|
|
for rule in rules:
|
|
cmd = "nft list chain ip %s %s" % (table, rule["chain"])
|
|
res = self.run_command_in_router(router, cmd)
|
|
if "exists" not in rule or rule["exists"]:
|
|
exists = True
|
|
else:
|
|
exists = False
|
|
if exists and not rule["rule"] in res:
|
|
self.fail("The nftables rule (%s) should exist but is not found in the VR !!!" % rule["rule"])
|
|
if not exists and rule["rule"] in res:
|
|
self.fail("The nftables rule (%s) should not exist but is found in the VR !!!" % rule["rule"])
|
|
self.message("The nftables rules look good so far.")
|
|
|
|
def verifyPingFromRouter(self, router, vm, expected=True, retries=2):
|
|
while retries > 0:
|
|
cmd_ping_vm = "ping -c1 -W1 %s" % vm.ipaddress
|
|
try:
|
|
result = self.run_command_in_router(router, cmd_ping_vm)
|
|
if "0 packets received" in result:
|
|
retries = retries - 1
|
|
self.message("No packets received, remaining retries %s" % retries)
|
|
if retries > 0:
|
|
time.sleep(WAIT_INTERVAL)
|
|
else:
|
|
self.message("packets are received, looks good")
|
|
return
|
|
except Exception as ex:
|
|
self.fail("Failed to ping vm %s from router %s: %s" % (vm.ipaddress, router.name, ex))
|
|
if retries == 0 and expected:
|
|
self.fail("Failed to ping vm %s from router %s, which is expected to work !!!" % (vm.ipaddress, router.name))
|
|
if retries > 0 and not expected:
|
|
self.fail("ping vm %s from router %s works, however it is unexpected !!!" % (vm.ipaddress, router.name))
|
|
|
|
def verifyFrrConf(self, router, configs):
|
|
cmd = "cat /etc/frr/frr.conf"
|
|
res = self.run_command_in_router(router, cmd)
|
|
for config in configs:
|
|
if "exists" not in config or config["exists"]:
|
|
exists = True
|
|
else:
|
|
exists = False
|
|
if exists and not config["config"] in res:
|
|
self.fail("The frr config (%s) should exist but is not found in the VR !!!" % config["config"])
|
|
if not exists and config["config"] in res:
|
|
self.fail("The frr config (%s) should not exist but is found in the VR !!!" % config["config"])
|
|
self.message("The frr config look good so far.")
|
|
|
|
@attr(tags=['advanced'], required_hardware=False)
|
|
def test_01_zone_subnet(self):
|
|
""" Test for subnet for zone"""
|
|
"""
|
|
# 1. Create subnet
|
|
# 2. List subnet
|
|
# 3. Update subnet
|
|
# 4. dedicate subnet to domain
|
|
# 5. released dedicated subnet
|
|
# 6. dedicate subnet to sub-domain/account
|
|
# 7. released dedicated subnet
|
|
# 8. delete subnet
|
|
"""
|
|
self.message("Running test_01_zone_subnet")
|
|
# 1. Create subnet
|
|
self.subnet_2 = ZoneIpv4Subnet.create(
|
|
self.apiclient,
|
|
zoneid=self.zone.id,
|
|
subnet=SUBNET_2_PREFIX + ".0/24"
|
|
)
|
|
self.cleanup.append(self.subnet_2)
|
|
# 2. List subnet
|
|
subnets = ZoneIpv4Subnet.list(
|
|
self.apiclient,
|
|
id=self.subnet_2.id
|
|
)
|
|
self.assertEqual(
|
|
isinstance(subnets, list),
|
|
True,
|
|
"List subnets for zone should return a valid list"
|
|
)
|
|
self.assertEqual(
|
|
len(subnets) == 1,
|
|
True,
|
|
"The number of subnets for zone (%s) should be equal to 1" % (len(subnets))
|
|
)
|
|
self.assertEqual(
|
|
subnets[0].subnet == SUBNET_2_PREFIX + ".0/24",
|
|
True,
|
|
"The subnet of subnet for zone (%s) should be equal to %s" % (subnets[0].subnet, SUBNET_2_PREFIX + ".0/24")
|
|
)
|
|
# 3. Update subnet
|
|
self.subnet_2.update(
|
|
self.apiclient,
|
|
subnet=SUBNET_2_PREFIX + ".0/25"
|
|
)
|
|
subnets = ZoneIpv4Subnet.list(
|
|
self.apiclient,
|
|
id=self.subnet_2.id
|
|
)
|
|
self.assertEqual(
|
|
isinstance(subnets, list) and len(subnets) == 1 and subnets[0].subnet == SUBNET_2_PREFIX + ".0/25",
|
|
True,
|
|
"The subnet of subnet for zone should be equal to %s" % (SUBNET_2_PREFIX + ".0/25")
|
|
)
|
|
# 4. dedicate subnet to domain
|
|
ZoneIpv4Subnet.dedicate(
|
|
self.apiclient,
|
|
id=self.subnet_2.id,
|
|
domainid=self.domain.id
|
|
)
|
|
subnets = ZoneIpv4Subnet.list(
|
|
self.apiclient,
|
|
id=self.subnet_2.id
|
|
)
|
|
self.assertEqual(
|
|
isinstance(subnets, list) and len(subnets) == 1 and subnets[0].domainid == self.domain.id,
|
|
True,
|
|
"The subnet should be dedicated to domain %s" % self.domain.id
|
|
)
|
|
# 5. released dedicated subnet
|
|
self.subnet_2.release(
|
|
self.apiclient
|
|
)
|
|
subnets = ZoneIpv4Subnet.list(
|
|
self.apiclient,
|
|
id=self.subnet_2.id
|
|
)
|
|
self.assertEqual(
|
|
isinstance(subnets, list) and len(subnets) == 1 and not subnets[0].domainid,
|
|
True,
|
|
"The subnet should not be dedicated to domain %s" % self.domain.id
|
|
)
|
|
# 6. dedicate subnet to sub-domain/account
|
|
ZoneIpv4Subnet.dedicate(
|
|
self.apiclient,
|
|
id=self.subnet_2.id,
|
|
domainid=self.sub_domain.id,
|
|
account=self.regular_user.name
|
|
)
|
|
subnets = ZoneIpv4Subnet.list(
|
|
self.apiclient,
|
|
id=self.subnet_2.id
|
|
)
|
|
self.assertEqual(
|
|
isinstance(subnets, list) and len(subnets) == 1
|
|
and subnets[0].domainid == self.sub_domain.id and subnets[0].account == self.regular_user.name,
|
|
True,
|
|
"The subnet should be dedicated to account %s" % self.regular_user.name
|
|
)
|
|
# 7. released dedicated subnet
|
|
self.subnet_2.release(
|
|
self.apiclient
|
|
)
|
|
subnets = ZoneIpv4Subnet.list(
|
|
self.apiclient,
|
|
id=self.subnet_2.id
|
|
)
|
|
self.assertEqual(
|
|
isinstance(subnets, list) and len(subnets) == 1 and not subnets[0].domainid,
|
|
True,
|
|
"The subnet should not be dedicated to account %s" % self.regular_user.name
|
|
)
|
|
# 8. delete subnet
|
|
self.subnet_2.delete(
|
|
self.apiclient
|
|
)
|
|
self.cleanup.remove(self.subnet_2)
|
|
|
|
@attr(tags=['advanced'], required_hardware=False)
|
|
def test_02_create_network_routed_mode_with_specified_cidr(self):
|
|
""" Test for guest network with specified cidr"""
|
|
"""
|
|
# 1. Create Isolated network
|
|
# 2. List subnet for network by subnet
|
|
# 3. Delete the network
|
|
# 4. List subnet for network by subnet. the subnet should be gone as well
|
|
"""
|
|
self.message("Running test_02_create_network_routed_mode_with_specified_cidr")
|
|
|
|
# 1. Create Isolated network
|
|
isolated_network = Network.create(
|
|
self.apiclient,
|
|
self.services["network"],
|
|
gateway=NETWORK_CIDR_PREFIX + ".1",
|
|
netmask="255.255.255.0",
|
|
networkofferingid=self.network_offering_isolated.id,
|
|
zoneid=self.zone.id
|
|
)
|
|
self.cleanup.append(isolated_network)
|
|
|
|
# 2. List subnet for network by subnet
|
|
subnets = Ipv4SubnetForGuestNetwork.list(
|
|
self.apiclient,
|
|
subnet=NETWORK_CIDR_PREFIX + ".0/24"
|
|
)
|
|
self.assertEqual(
|
|
isinstance(subnets, list) and len(subnets) == 1
|
|
and subnets[0].subnet == NETWORK_CIDR_PREFIX + ".0/24" and subnets[0].state == "Allocated",
|
|
True,
|
|
"The subnet should be added for network %s" % isolated_network.name
|
|
)
|
|
|
|
# 3. Delete the network
|
|
isolated_network.delete(self.apiclient)
|
|
self.cleanup.remove(isolated_network)
|
|
|
|
# 4. List subnet for network by subnet. the subnet should be gone as well
|
|
network_cidr = subnets[0].subnet
|
|
subnets = Ipv4SubnetForGuestNetwork.list(
|
|
self.apiclient,
|
|
subnet=network_cidr
|
|
)
|
|
self.assertEqual(
|
|
not isinstance(subnets, list) or len(subnets) == 0,
|
|
True,
|
|
"The subnet %s should be removed for network %s" % (network_cidr, isolated_network.name)
|
|
)
|
|
|
|
@attr(tags=['advanced'], required_hardware=False)
|
|
def test_03_create_subnets_for_guest_network(self):
|
|
""" Test for subnets for guest network with cidr/cidrsize"""
|
|
"""
|
|
# 1. Create subnet with cidr for guest network
|
|
# 2. List subnets for network
|
|
# 3. delete subnet for network
|
|
|
|
# 4. Create subnet with cidrsize
|
|
# 5. List subnet for network
|
|
# 6. delete subnet for network
|
|
"""
|
|
self.message("Running test_03_create_subnets_for_guest_network")
|
|
|
|
# 1. Create subnet with cidr for guest network
|
|
subnet_network_1 = Ipv4SubnetForGuestNetwork.create(
|
|
self.apiclient,
|
|
parentid=self.subnet_1.id,
|
|
subnet=SUBNET_1_PREFIX + ".0/26"
|
|
)
|
|
self.cleanup.append(subnet_network_1)
|
|
|
|
# 2. List subnets for network
|
|
subnets = Ipv4SubnetForGuestNetwork.list(
|
|
self.apiclient,
|
|
subnet=subnet_network_1.subnet
|
|
)
|
|
self.assertEqual(
|
|
isinstance(subnets, list) and len(subnets) == 1,
|
|
True,
|
|
"The subnet should be created for subnet_network_1 %s" % subnet_network_1.subnet
|
|
)
|
|
|
|
# 3. delete subnet for network
|
|
subnet_network_1.delete(self.apiclient)
|
|
self.cleanup.remove(subnet_network_1)
|
|
|
|
# 4. Create subnet with cidrsize
|
|
subnet_network_2 = Ipv4SubnetForGuestNetwork.create(
|
|
self.apiclient,
|
|
parentid=self.subnet_1.id,
|
|
cidrsize=26
|
|
)
|
|
self.cleanup.append(subnet_network_2)
|
|
# 5. List subnet for network
|
|
subnets = Ipv4SubnetForGuestNetwork.list(
|
|
self.apiclient,
|
|
subnet=subnet_network_2.subnet
|
|
)
|
|
self.assertEqual(
|
|
isinstance(subnets, list) and len(subnets) == 1,
|
|
True,
|
|
"The subnet should be created for subnet_network_2 %s" % subnet_network_2.subnet
|
|
)
|
|
|
|
# 6. delete subnet for network
|
|
subnet_network_2.delete(self.apiclient)
|
|
self.cleanup.remove(subnet_network_2)
|
|
|
|
@attr(tags=['advanced'], required_hardware=False)
|
|
def test_04_create_isolated_network_routed_mode_with_cidrsize(self):
|
|
""" Test for subnet and guest network with cidrsize"""
|
|
"""
|
|
# 1. Create Isolated network with cidrsize
|
|
# 2. List subnet for network by networkid
|
|
# 3. Delete the network
|
|
# 4. List subnet for network by networkid, it should be removed
|
|
"""
|
|
self.message("Running test_04_create_isolated_network_routed_mode_with_cidrsize")
|
|
|
|
# 1. Create Isolated network with cidrsize
|
|
isolated_network = Network.create(
|
|
self.apiclient,
|
|
self.services["network"],
|
|
networkofferingid=self.network_offering_isolated.id,
|
|
zoneid=self.zone.id,
|
|
cidrsize=26
|
|
)
|
|
self.cleanup.append(isolated_network)
|
|
|
|
# 2. List subnet for network by networkid
|
|
subnets = Ipv4SubnetForGuestNetwork.list(
|
|
self.apiclient,
|
|
networkid=isolated_network.id
|
|
)
|
|
self.assertEqual(
|
|
isinstance(subnets, list) and len(subnets) == 1
|
|
and subnets[0].networkid == isolated_network.id and subnets[0].state == "Allocated",
|
|
True,
|
|
"The subnet should be created for isolated_network %s" % isolated_network.name
|
|
)
|
|
|
|
# 3. Delete the network
|
|
isolated_network.delete(self.apiclient)
|
|
self.cleanup.remove(isolated_network)
|
|
|
|
# 4. List subnet for network by network cidr, it should be removed
|
|
network_cidr = subnets[0].subnet
|
|
subnets = Ipv4SubnetForGuestNetwork.list(
|
|
self.apiclient,
|
|
subnet=network_cidr
|
|
)
|
|
self.assertEqual(
|
|
not isinstance(subnets, list) or len(subnets) == 0,
|
|
True,
|
|
"The subnet should be removed for isolated_network %s" % isolated_network.name
|
|
)
|
|
|
|
@attr(tags=['advanced'], required_hardware=False)
|
|
def test_05_create_vpc_routed_mode_with_cidrsize(self):
|
|
""" Test for Routed VPC with cidrsize"""
|
|
"""
|
|
# 1. Create VPC with cidrsize
|
|
# 2. List subnet for network by vpcid
|
|
# 3. Delete the VPC
|
|
# 4. List subnet for network by vpcid, it should be removed
|
|
"""
|
|
self.message("Running test_05_create_vpc_routed_mode_with_cidrsize")
|
|
|
|
# 1. Create VPC with cidrsize
|
|
del self.services["vpc"]["cidr"]
|
|
vpc = VPC.create(self.apiclient,
|
|
self.services["vpc"],
|
|
vpcofferingid=self.vpc_offering.id,
|
|
zoneid=self.zone.id,
|
|
cidrsize=26,
|
|
start=False
|
|
)
|
|
self.cleanup.append(vpc)
|
|
|
|
# 2. List subnet for network by networkid
|
|
subnets = Ipv4SubnetForGuestNetwork.list(
|
|
self.apiclient,
|
|
vpcid=vpc.id
|
|
)
|
|
self.assertEqual(
|
|
isinstance(subnets, list) and len(subnets) == 1
|
|
and subnets[0].vpcid == vpc.id and subnets[0].state == "Allocated",
|
|
True,
|
|
"The subnet should be created for vpc %s" % vpc.name
|
|
)
|
|
|
|
# 3. Delete the VPC
|
|
vpc.delete(self.apiclient)
|
|
self.cleanup.remove(vpc)
|
|
|
|
# 4. List subnet for network by vpc cidr, it should be removed
|
|
vpc_cidr = subnets[0].subnet
|
|
subnets = Ipv4SubnetForGuestNetwork.list(
|
|
self.apiclient,
|
|
subnet=vpc_cidr
|
|
)
|
|
self.assertEqual(
|
|
not isinstance(subnets, list) or len(subnets) == 0,
|
|
True,
|
|
"The subnet should be removed for vpc %s" % vpc.name
|
|
)
|
|
|
|
@attr(tags=['advanced'], required_hardware=False)
|
|
def test_06_isolated_network_with_routed_mode(self):
|
|
""" Test for Isolated Network with Routed mode"""
|
|
"""
|
|
# 1. Create Isolated network
|
|
# 2. Create VM in the network
|
|
"""
|
|
self.message("Running test_06_isolated_network_with_routed_mode")
|
|
|
|
# 1. Create Isolated network
|
|
global test_network
|
|
test_network = Network.create(
|
|
self.apiclient,
|
|
self.services["network"],
|
|
networkofferingid=self.network_offering_isolated.id,
|
|
zoneid=self.zone.id,
|
|
domainid=self.sub_domain.id,
|
|
accountid=self.regular_user.name,
|
|
gateway=NETWORK_CIDR_PREFIX + ".1",
|
|
netmask="255.255.255.0"
|
|
)
|
|
self._cleanup.append(test_network)
|
|
|
|
# 2. Create VM in the network
|
|
global test_network_vm
|
|
test_network_vm = VirtualMachine.create(
|
|
self.regular_user_apiclient,
|
|
self.services["virtual_machine"],
|
|
zoneid=self.zone.id,
|
|
domainid=self.sub_domain.id,
|
|
accountid=self.regular_user.name,
|
|
networkids=test_network.id,
|
|
serviceofferingid=self.service_offering.id,
|
|
templateid=self.template.id)
|
|
self._cleanup.append(test_network_vm)
|
|
|
|
@attr(tags=['advanced'], required_hardware=False)
|
|
def test_07_vpc_and_tier_with_routed_mode(self):
|
|
""" Test for VPC/tier with Routed mode"""
|
|
"""
|
|
# 1. Create VPC
|
|
# 2. Create Network ACL (egress = Deny, ingress = Deny)
|
|
# 3. Create VPC tier with Network ACL in the VPC
|
|
# 4. Create VM in the VPC tier
|
|
"""
|
|
self.message("Running test_07_vpc_and_tier_with_routed_mode")
|
|
|
|
# 1. Create VPC
|
|
self.services["vpc"]["cidr"] = VPC_CIDR_PREFIX + ".0.0/22"
|
|
global test_vpc
|
|
test_vpc = VPC.create(self.apiclient,
|
|
self.services["vpc"],
|
|
vpcofferingid=self.vpc_offering.id,
|
|
zoneid=self.zone.id,
|
|
domainid=self.sub_domain.id,
|
|
account=self.regular_user.name,
|
|
start=False
|
|
)
|
|
self._cleanup.append(test_vpc)
|
|
|
|
# 2. Create Network ACL (egress = Deny, ingress = Deny)
|
|
global test_network_acl
|
|
test_network_acl = NetworkACLList.create(self.apiclient,
|
|
services={},
|
|
name="test-network-acl",
|
|
description="test-network-acl",
|
|
vpcid=test_vpc.id
|
|
)
|
|
|
|
# 3. Create VPC tier with Network ACL in the VPC
|
|
global test_vpc_tier
|
|
test_vpc_tier = Network.create(self.regular_user_apiclient,
|
|
self.services["network"],
|
|
networkofferingid=self.vpc_network_offering.id,
|
|
zoneid=self.zone.id,
|
|
domainid=self.sub_domain.id,
|
|
accountid=self.regular_user.name,
|
|
vpcid=test_vpc.id,
|
|
gateway=VPC_CIDR_PREFIX + ".1.1",
|
|
netmask="255.255.255.0",
|
|
aclid=test_network_acl.id
|
|
)
|
|
self._cleanup.append(test_vpc_tier)
|
|
|
|
# 4. Create VM in the VPC tier
|
|
global test_vpc_vm
|
|
test_vpc_vm = VirtualMachine.create(
|
|
self.regular_user_apiclient,
|
|
self.services["virtual_machine"],
|
|
zoneid=self.zone.id,
|
|
domainid=self.sub_domain.id,
|
|
accountid=self.regular_user.name,
|
|
networkids=test_vpc_tier.id,
|
|
serviceofferingid=self.service_offering.id,
|
|
templateid=self.template.id)
|
|
self._cleanup.append(test_vpc_vm)
|
|
|
|
@attr(tags=['advanced'], required_hardware=False)
|
|
def test_08_vpc_and_tier_failed_cases(self):
|
|
""" Test for VPC/tier with Routed mode (some failed cases)"""
|
|
"""
|
|
# 1. create VPC with Routed mode
|
|
# 2. create network offering with NATTED mode, create vpc tier, it should fail
|
|
# 3. create vpc tier not in the vpc cidr, it should fail
|
|
"""
|
|
|
|
self.message("Running test_08_vpc_and_tier_failed_cases")
|
|
|
|
# 1. Create VPC
|
|
self.services["vpc"]["cidr"] = VPC_CIDR_PREFIX + ".8.0/22"
|
|
test_vpc_2 = VPC.create(self.apiclient,
|
|
self.services["vpc"],
|
|
vpcofferingid=self.vpc_offering.id,
|
|
zoneid=self.zone.id,
|
|
domainid=self.sub_domain.id,
|
|
account=self.regular_user.name,
|
|
start=False
|
|
)
|
|
self.cleanup.append(test_vpc_2)
|
|
|
|
# 2. create network offering with NATTED mode, create vpc tier, it should fail
|
|
nw_offering_isolated_vpc = NetworkOffering.create(
|
|
self.apiclient,
|
|
self.services["nw_offering_isolated_vpc"]
|
|
)
|
|
self.cleanup.append(nw_offering_isolated_vpc)
|
|
nw_offering_isolated_vpc.update(self.apiclient, state='Enabled')
|
|
try:
|
|
test_vpc_tier_2 = Network.create(self.regular_user_apiclient,
|
|
self.services["network"],
|
|
networkofferingid=nw_offering_isolated_vpc.id,
|
|
zoneid=self.zone.id,
|
|
domainid=self.sub_domain.id,
|
|
accountid=self.regular_user.name,
|
|
vpcid=test_vpc_2.id,
|
|
gateway=VPC_CIDR_PREFIX + ".1.1",
|
|
netmask="255.255.255.0"
|
|
)
|
|
self.cleanup.append(test_vpc_tier_2)
|
|
self.fail("Created vpc network successfully, but expected to fail")
|
|
except Exception as ex:
|
|
self.message("Failed to create vpc network due to %s, which is expected behaviour" % ex)
|
|
|
|
# 3. create vpc tier not in the vpc cidr, it should fail
|
|
try:
|
|
test_vpc_tier_3 = Network.create(self.regular_user_apiclient,
|
|
self.services["network"],
|
|
networkofferingid=self.vpc_network_offering.id,
|
|
zoneid=self.zone.id,
|
|
domainid=self.sub_domain.id,
|
|
accountid=self.regular_user.name,
|
|
vpcid=test_vpc_2.id,
|
|
gateway=VPC_CIDR_PREFIX + ".31.1",
|
|
netmask="255.255.255.0"
|
|
)
|
|
self.cleanup.append(test_vpc_tier_3)
|
|
self.fail("Created vpc network successfully, but expected to fail")
|
|
except Exception as ex:
|
|
self.message("Failed to create vpc network due to %s, which is expected behaviour" % ex)
|
|
|
|
@attr(tags=['advanced'], required_hardware=False)
|
|
def test_09_connectivity_between_network_and_vpc_tier(self):
|
|
""" Test for connectivity between VMs in the Isolated Network and VPC/tier"""
|
|
"""
|
|
# 0. Get static routes of Network/VPC
|
|
# 1. Add static routes in VRs manually
|
|
|
|
# 2. Test VM2 in VR1-Network (ping/ssh should fail)
|
|
# 3. Test VM1 in VR2-VPC (ping/ssh should fail)
|
|
|
|
# 4. Create Ingress rules in Network ACL for VPC
|
|
# 5. Create Egress rules in Network ACL for VPC
|
|
# 6. Test VM2 in VR1-Network (ping/ssh should succeed)
|
|
# 7. Test VM1 in VR2-VPC (ping/ssh should fail)
|
|
|
|
# 8. Create IPv4 firewalls for Isolated network
|
|
# 9. Test VM2 in VR1-Network (ping/ssh should succeed)
|
|
# 10. Test VM1 in VR2-VPC (ping/ssh should succeed)
|
|
|
|
# 11. Delete Network ACL rules for VPC
|
|
# 12. Delete IPv4 firewall rules for Network
|
|
# 13. Test VM2 in VR1-Network (ping/ssh should fail)
|
|
# 14. Test VM1 in VR2-VPC (ping/ssh should fail)
|
|
|
|
"""
|
|
self.message("Running test_09_connectivity_between_network_and_vpc_tier")
|
|
|
|
# 0. Get static routes of Network/VPC
|
|
network_ip4routes = []
|
|
if test_network:
|
|
network_ip4routes = Network.list(
|
|
self.apiclient,
|
|
id=test_network.id,
|
|
listall=True
|
|
)[0].ip4routes
|
|
else:
|
|
self.skipTest("test_network is not created")
|
|
|
|
vpc_ip4routes = []
|
|
if test_vpc:
|
|
vpc_ip4routes = VPC.list(
|
|
self.apiclient,
|
|
id=test_vpc.id,
|
|
listall=True
|
|
)[0].ip4routes
|
|
else:
|
|
self.skipTest("test_vpc is not created")
|
|
|
|
network_router = self.get_router(networkid=test_network.id)
|
|
vpc_router = self.get_router(vpcid=test_vpc.id)
|
|
|
|
# Test VM1 in VR1-Network (wait until ping works)
|
|
self.verifyPingFromRouter(network_router, test_network_vm, retries=MAX_RETRIES)
|
|
# Test VM2 in VR2-VPC (wait until ping works)
|
|
self.verifyPingFromRouter(vpc_router, test_vpc_vm, retries=MAX_RETRIES)
|
|
|
|
# 1. Add static routes in VRs manually
|
|
if not network_router or not vpc_router:
|
|
self.skipTest("network_router (%s) or vpc_router (%s) does not exist" % (network_router, vpc_router))
|
|
for ip4route in network_ip4routes:
|
|
self.run_command_in_router(vpc_router, "ip route add %s via %s" % (ip4route.subnet, ip4route.gateway))
|
|
for ip4route in vpc_ip4routes:
|
|
self.run_command_in_router(network_router, "ip route add %s via %s" % (ip4route.subnet, ip4route.gateway))
|
|
|
|
# 2. Test VM2 in VR1-Network (ping/ssh should fail)
|
|
self.verifyPingFromRouter(network_router, test_vpc_vm, expected=False)
|
|
# 3. Test VM1 in VR2-VPC (ping/ssh should fail)
|
|
self.verifyPingFromRouter(vpc_router, test_network_vm, expected=False)
|
|
|
|
vpc_router_rules = [{"chain": "FORWARD",
|
|
"rule": "ip daddr %s jump eth2_ingress_policy" % test_vpc_tier.cidr},
|
|
{"chain": "FORWARD",
|
|
"rule": "ip saddr %s jump eth2_egress_policy" % test_vpc_tier.cidr}]
|
|
vpc_acl_rules = []
|
|
# 4. Create Ingress rules in Network ACL for VPC
|
|
rule = {}
|
|
rule["traffictype"] = "Ingress"
|
|
rule["cidrlist"] = test_network.cidr
|
|
rule["protocol"] = "icmp"
|
|
rule["icmptype"] = -1
|
|
rule["icmpcode"] = -1
|
|
vpc_acl_rules.append(self.createNetworkAclRule(rule))
|
|
vpc_router_rules.append({"chain": "eth2_ingress_policy",
|
|
"rule": "ip saddr %s icmp type %s accept" % (test_network.cidr, ICMPv4_ALL_TYPES)})
|
|
self.verifyNftablesRulesInRouter(vpc_router, vpc_router_rules)
|
|
|
|
rule = {}
|
|
rule["traffictype"] = "Ingress"
|
|
rule["cidrlist"] = test_network.cidr
|
|
rule["protocol"] = "tcp"
|
|
rule["startport"] = 22
|
|
rule["endport"] = 22
|
|
vpc_acl_rules.append(self.createNetworkAclRule(rule))
|
|
vpc_router_rules.append({"chain": "eth2_ingress_policy",
|
|
"rule": "ip saddr %s tcp dport 22 accept" % test_network.cidr})
|
|
self.verifyNftablesRulesInRouter(vpc_router, vpc_router_rules)
|
|
|
|
rule = {}
|
|
rule["traffictype"] = "Ingress"
|
|
rule["cidrlist"] = network_router.publicip + "/32"
|
|
rule["protocol"] = "icmp"
|
|
rule["icmptype"] = -1
|
|
rule["icmpcode"] = -1
|
|
vpc_acl_rules.append(self.createNetworkAclRule(rule))
|
|
vpc_router_rules.append({"chain": "eth2_ingress_policy",
|
|
"rule": "ip saddr %s icmp type %s accept" % (network_router.publicip, ICMPv4_ALL_TYPES)})
|
|
self.verifyNftablesRulesInRouter(vpc_router, vpc_router_rules)
|
|
|
|
# 5. Create Egress rules in Network ACL for VPC
|
|
rule = {}
|
|
rule["traffictype"] = "Egress"
|
|
rule["protocol"] = "icmp"
|
|
rule["icmptype"] = -1
|
|
rule["icmpcode"] = -1
|
|
vpc_acl_rules.append(self.createNetworkAclRule(rule))
|
|
vpc_router_rules.append({"chain": "eth2_egress_policy",
|
|
"rule": "ip daddr 0.0.0.0/0 icmp type %s accept" % ICMPv4_ALL_TYPES})
|
|
self.verifyNftablesRulesInRouter(vpc_router, vpc_router_rules)
|
|
|
|
# 6. Test VM2 in VR1-Network (ping/ssh should succeed)
|
|
self.verifyPingFromRouter(network_router, test_vpc_vm, expected=True)
|
|
# 7. Test VM1 in VR2-VPC (ping/ssh should fail)
|
|
self.verifyPingFromRouter(vpc_router, test_network_vm, expected=False)
|
|
|
|
network_router_rules = [{"chain": "FORWARD",
|
|
"rule": "ip daddr %s jump fw_chain_ingress" % test_network.cidr},
|
|
{"chain": "FORWARD",
|
|
"rule": "ip saddr %s jump fw_chain_egress" % test_network.cidr}]
|
|
network_routing_firewall_rules = []
|
|
# 8. Create IPv4 firewalls for Isolated network
|
|
rule = {}
|
|
rule["traffictype"] = "Ingress"
|
|
rule["cidrlist"] = test_vpc.cidr
|
|
rule["protocol"] = "icmp"
|
|
rule["icmptype"] = -1
|
|
rule["icmpcode"] = -1
|
|
network_routing_firewall_rules.append(self.createIpv4RoutingFirewallRule(rule))
|
|
network_router_rules.append({"chain": "fw_chain_ingress",
|
|
"rule": "ip saddr %s ip daddr 0.0.0.0/0 icmp type %s accept" % (test_vpc.cidr, ICMPv4_ALL_TYPES)})
|
|
self.verifyNftablesRulesInRouter(network_router, network_router_rules)
|
|
|
|
rule = {}
|
|
rule["traffictype"] = "Ingress"
|
|
rule["cidrlist"] = test_vpc.cidr
|
|
rule["protocol"] = "tcp"
|
|
rule["startport"] = 22
|
|
rule["endport"] = 22
|
|
network_routing_firewall_rules.append(self.createIpv4RoutingFirewallRule(rule))
|
|
network_router_rules.append({"chain": "fw_chain_ingress",
|
|
"rule": "ip saddr %s ip daddr 0.0.0.0/0 tcp dport 22 accept" % test_vpc.cidr})
|
|
self.verifyNftablesRulesInRouter(network_router, network_router_rules)
|
|
|
|
rule = {}
|
|
rule["traffictype"] = "Ingress"
|
|
rule["cidrlist"] = vpc_router.publicip + "/32"
|
|
rule["protocol"] = "icmp"
|
|
rule["icmptype"] = -1
|
|
rule["icmpcode"] = -1
|
|
network_routing_firewall_rules.append(self.createIpv4RoutingFirewallRule(rule))
|
|
network_router_rules.append({"chain": "fw_chain_ingress",
|
|
"rule": "ip saddr %s ip daddr 0.0.0.0/0 icmp type %s accept" % (vpc_router.publicip, ICMPv4_ALL_TYPES)})
|
|
self.verifyNftablesRulesInRouter(network_router, network_router_rules)
|
|
|
|
# 9. Test VM2 in VR1-Network (ping/ssh should succeed)
|
|
self.verifyPingFromRouter(network_router, test_vpc_vm, expected=True)
|
|
# 10. Test VM1 in VR2-VPC (ping/ssh should succeed)
|
|
self.verifyPingFromRouter(vpc_router, test_network_vm, expected=True)
|
|
|
|
# 11. Delete Network ACL rules for VPC
|
|
for rule in vpc_acl_rules:
|
|
rule.delete(self.apiclient)
|
|
vpc_router_rules[2] = {"chain": "eth2_ingress_policy",
|
|
"rule": "ip saddr %s icmp type %s accept" % (test_network.cidr, ICMPv4_ALL_TYPES),
|
|
"exists": False}
|
|
vpc_router_rules[3] = {"chain": "eth2_ingress_policy",
|
|
"rule": "ip saddr %s tcp dport 22 accept" % test_network.cidr,
|
|
"exists": False}
|
|
vpc_router_rules[4] = {"chain": "eth2_egress_policy",
|
|
"rule": "ip daddr 0.0.0.0/0 icmp type %s accept" % ICMPv4_ALL_TYPES,
|
|
"exists": False}
|
|
vpc_router_rules[5] = {"chain": "eth2_ingress_policy",
|
|
"rule": "ip saddr %s icmp type %s accept" % (network_router.publicip, ICMPv4_ALL_TYPES),
|
|
"exists": False}
|
|
self.verifyNftablesRulesInRouter(vpc_router, vpc_router_rules)
|
|
|
|
# 12. Delete IPv4 firewall rules for Network
|
|
for rule in network_routing_firewall_rules:
|
|
rule.delete(self.apiclient)
|
|
network_router_rules[2] = {"chain": "fw_chain_ingress",
|
|
"rule": "ip saddr %s ip daddr 0.0.0.0/0 icmp type %s accept" % (test_vpc.cidr, ICMPv4_ALL_TYPES),
|
|
"exists": False}
|
|
network_router_rules[3] = {"chain": "fw_chain_ingress",
|
|
"rule": "ip saddr %s ip daddr 0.0.0.0/0 tcp dport 22 accept" % test_vpc.cidr,
|
|
"exists": False}
|
|
network_router_rules[4] = {"chain": "fw_chain_ingress",
|
|
"rule": "ip saddr %s ip daddr 0.0.0.0/0 icmp type %s accept" % (vpc_router.publicip, ICMPv4_ALL_TYPES),
|
|
"exists": False}
|
|
self.verifyNftablesRulesInRouter(network_router, network_router_rules)
|
|
|
|
# 13. Test VM2 in VR1-Network (ping/ssh should fail)
|
|
self.verifyPingFromRouter(network_router, test_vpc_vm, expected=False)
|
|
# 14. Test VM1 in VR2-VPC (ping/ssh should fail)
|
|
self.verifyPingFromRouter(vpc_router, test_network_vm, expected=False)
|
|
|
|
@attr(tags=['advanced'], required_hardware=False)
|
|
def test_10_bgp_peers(self):
|
|
""" Test for BGP peers"""
|
|
"""
|
|
# 1. Create bgppeer
|
|
# 2. List bgppeer
|
|
# 3. Update bgppeer
|
|
# 4. dedicate bgppeer to domain
|
|
# 5. released dedicated bgppeer
|
|
# 6. dedicate bgppeer to sub-domain/account
|
|
# 7. released dedicated bgppeer
|
|
# 8. delete bgppeer
|
|
"""
|
|
self.message("Running test_10_bgp_peers")
|
|
# 1. Create bgp peer
|
|
bgppeer_1 = BgpPeer.create(
|
|
self.apiclient,
|
|
zoneid=self.zone.id,
|
|
asnumber=ASN_1,
|
|
ipaddress=IP4_ADDR_1
|
|
)
|
|
self.cleanup.append(bgppeer_1)
|
|
# 2. List bgp peer
|
|
bgppeers = BgpPeer.list(
|
|
self.apiclient,
|
|
id=bgppeer_1.id
|
|
)
|
|
self.assertEqual(
|
|
isinstance(bgppeers, list),
|
|
True,
|
|
"List bgppeers for zone should return a valid list"
|
|
)
|
|
self.assertEqual(
|
|
len(bgppeers) == 1,
|
|
True,
|
|
"The number of bgp peers (%s) should be equal to 1" % (len(bgppeers))
|
|
)
|
|
self.assertEqual(
|
|
bgppeers[0].asnumber == ASN_1 and bgppeers[0].ipaddress == IP4_ADDR_1,
|
|
True,
|
|
"The asnumber of bgp peer (%s) should be equal to %s, the ip address (%s) should be %s"
|
|
% (bgppeers[0].asnumber, ASN_1, bgppeers[0].ipaddress, IP4_ADDR_1)
|
|
)
|
|
# 3. Update bgp peer
|
|
bgppeer_1.update(
|
|
self.apiclient,
|
|
asnumber=ASN_2,
|
|
ipaddress=IP4_ADDR_2
|
|
)
|
|
bgppeers = BgpPeer.list(
|
|
self.apiclient,
|
|
id=bgppeer_1.id
|
|
)
|
|
self.assertEqual(
|
|
isinstance(bgppeers, list) and len(bgppeers) == 1
|
|
and bgppeers[0].asnumber == ASN_2 and bgppeers[0].ipaddress == IP4_ADDR_2,
|
|
True,
|
|
"The asnumber of bgp peer (%s) should be equal to %s, the ip address (%s) should be %s"
|
|
% (bgppeers[0].asnumber, ASN_2, bgppeers[0].ipaddress, IP4_ADDR_2)
|
|
)
|
|
# 4. dedicate bgp peer to domain
|
|
BgpPeer.dedicate(
|
|
self.apiclient,
|
|
id=bgppeer_1.id,
|
|
domainid=self.domain.id
|
|
)
|
|
bgppeers = BgpPeer.list(
|
|
self.apiclient,
|
|
id=bgppeer_1.id
|
|
)
|
|
self.assertEqual(
|
|
isinstance(bgppeers, list) and len(bgppeers) == 1 and bgppeers[0].domainid == self.domain.id,
|
|
True,
|
|
"The bgppeer should be dedicated to domain %s" % self.domain.id
|
|
)
|
|
# 5. released dedicated bgp peer
|
|
bgppeer_1.release(
|
|
self.apiclient
|
|
)
|
|
bgppeers = BgpPeer.list(
|
|
self.apiclient,
|
|
id=bgppeer_1.id
|
|
)
|
|
self.assertEqual(
|
|
isinstance(bgppeers, list) and len(bgppeers) == 1 and not bgppeers[0].domainid,
|
|
True,
|
|
"The bgp peer should not be dedicated to domain %s" % self.domain.id
|
|
)
|
|
# 6. dedicate bgp peer to sub-domain/account
|
|
BgpPeer.dedicate(
|
|
self.apiclient,
|
|
id=bgppeer_1.id,
|
|
domainid=self.sub_domain.id,
|
|
account=self.regular_user.name
|
|
)
|
|
bgppeers = BgpPeer.list(
|
|
self.apiclient,
|
|
id=bgppeer_1.id
|
|
)
|
|
self.assertEqual(
|
|
isinstance(bgppeers, list) and len(bgppeers) == 1
|
|
and bgppeers[0].domainid == self.sub_domain.id and bgppeers[0].account == self.regular_user.name,
|
|
True,
|
|
"The bgp peer should be dedicated to account %s" % self.regular_user.name
|
|
)
|
|
# 7. released dedicated bgp peer
|
|
bgppeer_1.release(
|
|
self.apiclient
|
|
)
|
|
bgppeers = BgpPeer.list(
|
|
self.apiclient,
|
|
id=bgppeer_1.id
|
|
)
|
|
self.assertEqual(
|
|
isinstance(bgppeers, list) and len(bgppeers) == 1 and not bgppeers[0].domainid,
|
|
True,
|
|
"The bgppeer should not be dedicated to account %s" % self.regular_user.name
|
|
)
|
|
# 8. delete bgp peer
|
|
bgppeer_1.delete(
|
|
self.apiclient
|
|
)
|
|
self.cleanup.remove(bgppeer_1)
|
|
|
|
@attr(tags=['advanced'], required_hardware=False)
|
|
def test_11_isolated_network_with_dynamic_routed_mode(self):
|
|
""" Test for Isolated Network with Dynamic Routed mode"""
|
|
"""
|
|
# 1. Create Isolated network with bgp_peer_1
|
|
# 2. Create VM in the network
|
|
# 3. Verify frr.conf in network VR
|
|
# 4. Update network BGP peers (to bgp_peer_1 and bgp_peer_2)
|
|
# 5. Verify frr.conf in network VR
|
|
# 6. Reboot VR
|
|
# 7. Verify frr.conf in network VR
|
|
# 8. Update network BGP peers (to bgppeer_2)
|
|
# 9. Verify frr.conf in network VR
|
|
# 10. Update network BGP peers (to null)
|
|
# 11. Verify frr.conf in network VR
|
|
"""
|
|
self.message("Running test_11_isolated_network_with_dynamic_routed_mode")
|
|
|
|
# 1. Create bgp peers
|
|
bgppeer_1 = BgpPeer.create(
|
|
self.apiclient,
|
|
zoneid=self.zone.id,
|
|
asnumber=ASN_1,
|
|
ipaddress=IP4_ADDR_1,
|
|
password=PASSWORD_1
|
|
)
|
|
self.cleanup.append(bgppeer_1)
|
|
|
|
# 1. Create Isolated network with Dynamic routing
|
|
test_network_dynamic = Network.create(
|
|
self.apiclient,
|
|
self.services["network"],
|
|
networkofferingid=self.network_offering_dynamic.id,
|
|
zoneid=self.zone.id,
|
|
domainid=self.sub_domain.id,
|
|
accountid=self.regular_user.name,
|
|
gateway=NETWORK_CIDR_PREFIX_DYNAMIC + ".1",
|
|
netmask="255.255.255.0",
|
|
bgppeerids=bgppeer_1.id
|
|
)
|
|
self.cleanup.append(test_network_dynamic)
|
|
|
|
# 2. Create VM in the network
|
|
test_network_dynamic_vm = VirtualMachine.create(
|
|
self.regular_user_apiclient,
|
|
self.services["virtual_machine"],
|
|
zoneid=self.zone.id,
|
|
domainid=self.sub_domain.id,
|
|
accountid=self.regular_user.name,
|
|
networkids=test_network_dynamic.id,
|
|
serviceofferingid=self.service_offering.id,
|
|
templateid=self.template.id)
|
|
self.cleanup.append(test_network_dynamic_vm)
|
|
|
|
network_router = self.get_router(networkid=test_network_dynamic.id)
|
|
|
|
# 3. Verify frr.conf in network VR
|
|
frr_configs = [{"config": "neighbor %s remote-as %s" % (bgppeer_1.ipaddress, bgppeer_1.asnumber),
|
|
"exists": True},
|
|
{"config": "neighbor %s password %s" % (bgppeer_1.ipaddress, PASSWORD_1),
|
|
"exists": True},
|
|
{"config": "network %s" % test_network_dynamic.cidr,
|
|
"exists": True}]
|
|
self.verifyFrrConf(network_router, frr_configs)
|
|
|
|
# 4. Update network BGP peers (to bgp_peer_1 and bgp_peer_2)
|
|
bgppeer_2 = BgpPeer.create(
|
|
self.apiclient,
|
|
zoneid=self.zone.id,
|
|
asnumber=ASN_2,
|
|
ipaddress=IP4_ADDR_2,
|
|
password=PASSWORD_2
|
|
)
|
|
self.cleanup.append(bgppeer_2)
|
|
|
|
test_network_dynamic.changeBgpPeers(
|
|
self.apiclient,
|
|
bgppeerids=[bgppeer_1.id, bgppeer_2.id]
|
|
)
|
|
|
|
# 5. Verify frr.conf in network VR
|
|
frr_configs = [{"config": "neighbor %s remote-as %s" % (bgppeer_1.ipaddress, bgppeer_1.asnumber),
|
|
"exists": True},
|
|
{"config": "neighbor %s password %s" % (bgppeer_1.ipaddress, PASSWORD_1),
|
|
"exists": True},
|
|
{"config": "neighbor %s remote-as %s" % (bgppeer_2.ipaddress, bgppeer_2.asnumber),
|
|
"exists": True},
|
|
{"config": "neighbor %s password %s" % (bgppeer_2.ipaddress, PASSWORD_2),
|
|
"exists": True},
|
|
{"config": "network %s" % test_network_dynamic.cidr,
|
|
"exists": True}]
|
|
self.verifyFrrConf(network_router, frr_configs)
|
|
|
|
# 6. Reboot VR
|
|
self.rebootRouter(network_router)
|
|
|
|
# 7. Verify frr.conf in network VR
|
|
network_router = self.get_router(networkid=test_network_dynamic.id)
|
|
self.verifyFrrConf(network_router, frr_configs)
|
|
|
|
# 8. Update network BGP peers (to bgppeer_2)
|
|
test_network_dynamic.changeBgpPeers(
|
|
self.apiclient,
|
|
bgppeerids=[bgppeer_2.id]
|
|
)
|
|
|
|
# 9. Verify frr.conf in network VR
|
|
frr_configs = [{"config": "neighbor %s remote-as %s" % (bgppeer_1.ipaddress, bgppeer_1.asnumber),
|
|
"exists": False},
|
|
{"config": "neighbor %s password %s" % (bgppeer_1.ipaddress, PASSWORD_1),
|
|
"exists": False},
|
|
{"config": "neighbor %s remote-as %s" % (bgppeer_2.ipaddress, bgppeer_2.asnumber),
|
|
"exists": True},
|
|
{"config": "neighbor %s password %s" % (bgppeer_2.ipaddress, PASSWORD_2),
|
|
"exists": True},
|
|
{"config": "network %s" % test_network_dynamic.cidr,
|
|
"exists": True}]
|
|
self.verifyFrrConf(network_router, frr_configs)
|
|
|
|
# 10. Update network BGP peers (to null)
|
|
test_network_dynamic.changeBgpPeers(
|
|
self.apiclient,
|
|
bgppeerids=[]
|
|
)
|
|
|
|
# 11. Verify frr.conf in network VR
|
|
frr_configs = [{"config": "neighbor %s remote-as %s" % (bgppeer_1.ipaddress, bgppeer_1.asnumber),
|
|
"exists": True},
|
|
{"config": "neighbor %s password %s" % (bgppeer_1.ipaddress, PASSWORD_1),
|
|
"exists": True},
|
|
{"config": "neighbor %s remote-as %s" % (bgppeer_2.ipaddress, bgppeer_2.asnumber),
|
|
"exists": True},
|
|
{"config": "neighbor %s password %s" % (bgppeer_2.ipaddress, PASSWORD_2),
|
|
"exists": True},
|
|
{"config": "network %s" % test_network_dynamic.cidr,
|
|
"exists": True}]
|
|
self.verifyFrrConf(network_router, frr_configs)
|
|
|
|
@attr(tags=['advanced'], required_hardware=False)
|
|
def test_12_vpc_and_tier_with_dynamic_routed_mode(self):
|
|
""" Test for VPC/tier with Dynamic Routed mode"""
|
|
"""
|
|
# 1. Create bgp peers
|
|
# 2. Create VPC
|
|
# 3. Create Network ACL (egress = Deny, ingress = Deny)
|
|
# 4. Create VPC tier with Network ACL in the VPC
|
|
# 5. Create VM in the VPC tier
|
|
# 6. Verify frr.conf in VPC VR
|
|
# 7. Update network BGP peers (to bgp_peer_1 and bgp_peer_2)
|
|
# 8. Verify frr.conf in VPC VR
|
|
# 9. Create VPC tier-2 with Network ACL in the VPC
|
|
# 10. Create VM-2 in the VPC tier-2
|
|
# 11. Verify frr.conf in VPC VR
|
|
# 12. Reboot VPC VR
|
|
# 13. Verify frr.conf in VPC VR
|
|
# 14. Update network BGP peers (to bgppeer_2)
|
|
# 15. Verify frr.conf in VPC VR
|
|
# 16. Update network BGP peers (to null)
|
|
# 17. Verify frr.conf in VPC VR
|
|
"""
|
|
self.message("Running test_12_vpc_and_tier_with_dynamic_routed_mode")
|
|
|
|
# 1. Create bgp peers
|
|
bgppeer_1 = BgpPeer.create(
|
|
self.apiclient,
|
|
zoneid=self.zone.id,
|
|
asnumber=ASN_1,
|
|
ipaddress=IP4_ADDR_1,
|
|
password=PASSWORD_1
|
|
)
|
|
self.cleanup.append(bgppeer_1)
|
|
|
|
# 2.1 VPC offering for static routing
|
|
vpc_offering_dynamic = VpcOffering.create(
|
|
self.apiclient,
|
|
VPC_OFFERING_DYNAMIC
|
|
)
|
|
self.cleanup.append(vpc_offering_dynamic)
|
|
vpc_offering_dynamic.update(self.apiclient, state='Enabled')
|
|
|
|
# 2.2 Create VPC
|
|
self.services["vpc"]["cidr"] = VPC_CIDR_PREFIX + ".8.0/22"
|
|
test_vpc_dynamic = VPC.create(self.apiclient,
|
|
self.services["vpc"],
|
|
vpcofferingid=vpc_offering_dynamic.id,
|
|
zoneid=self.zone.id,
|
|
domainid=self.sub_domain.id,
|
|
account=self.regular_user.name,
|
|
start=False,
|
|
bgppeerids=bgppeer_1.id
|
|
)
|
|
self.cleanup.append(test_vpc_dynamic)
|
|
|
|
# 3. Create Network ACL (egress = Deny, ingress = Deny)
|
|
test_network_acl_dynamic = NetworkACLList.create(self.apiclient,
|
|
services={},
|
|
name="test-network-acl-dynamic",
|
|
description="test-network-acl-dynamic",
|
|
vpcid=test_vpc_dynamic.id
|
|
)
|
|
|
|
# 4. Create VPC tier with Network ACL in the VPC
|
|
self.services["network"]["name"] = "test_vpc_tier_dynamic_1"
|
|
test_vpc_tier_dynamic_1 = Network.create(self.regular_user_apiclient,
|
|
self.services["network"],
|
|
networkofferingid=self.vpc_network_offering_dynamic.id,
|
|
zoneid=self.zone.id,
|
|
domainid=self.sub_domain.id,
|
|
accountid=self.regular_user.name,
|
|
vpcid=test_vpc_dynamic.id,
|
|
gateway=VPC_CIDR_PREFIX + ".8.1",
|
|
netmask="255.255.255.0",
|
|
aclid=test_network_acl_dynamic.id
|
|
)
|
|
self.cleanup.append(test_vpc_tier_dynamic_1)
|
|
|
|
# 5. Create VM in the VPC tier
|
|
test_vpc_vm_dynamic_1 = VirtualMachine.create(
|
|
self.regular_user_apiclient,
|
|
self.services["virtual_machine"],
|
|
zoneid=self.zone.id,
|
|
domainid=self.sub_domain.id,
|
|
accountid=self.regular_user.name,
|
|
networkids=test_vpc_tier_dynamic_1.id,
|
|
serviceofferingid=self.service_offering.id,
|
|
templateid=self.template.id)
|
|
self.cleanup.append(test_vpc_vm_dynamic_1)
|
|
|
|
vpc_router = self.get_router(vpcid=test_vpc_dynamic.id)
|
|
|
|
# 6. Verify frr.conf in VPC VR
|
|
frr_configs = [{"config": "neighbor %s remote-as %s" % (bgppeer_1.ipaddress, bgppeer_1.asnumber),
|
|
"exists": True},
|
|
{"config": "neighbor %s password %s" % (bgppeer_1.ipaddress, PASSWORD_1),
|
|
"exists": True},
|
|
{"config": "network %s" % test_vpc_tier_dynamic_1.cidr,
|
|
"exists": True}]
|
|
self.verifyFrrConf(vpc_router, frr_configs)
|
|
|
|
# 7. Update VPC BGP peers (to bgp_peer_1 and bgp_peer_2)
|
|
bgppeer_2 = BgpPeer.create(
|
|
self.apiclient,
|
|
zoneid=self.zone.id,
|
|
asnumber=ASN_2,
|
|
ipaddress=IP4_ADDR_2,
|
|
password=PASSWORD_2
|
|
)
|
|
self.cleanup.append(bgppeer_2)
|
|
|
|
test_vpc_dynamic.changeBgpPeers(
|
|
self.apiclient,
|
|
bgppeerids=[bgppeer_1.id, bgppeer_2.id]
|
|
)
|
|
|
|
# 8. Verify frr.conf in VPC VR
|
|
frr_configs = [{"config": "neighbor %s remote-as %s" % (bgppeer_1.ipaddress, bgppeer_1.asnumber),
|
|
"exists": True},
|
|
{"config": "neighbor %s password %s" % (bgppeer_1.ipaddress, PASSWORD_1),
|
|
"exists": True},
|
|
{"config": "neighbor %s remote-as %s" % (bgppeer_2.ipaddress, bgppeer_2.asnumber),
|
|
"exists": True},
|
|
{"config": "neighbor %s password %s" % (bgppeer_2.ipaddress, PASSWORD_2),
|
|
"exists": True},
|
|
{"config": "network %s" % test_vpc_tier_dynamic_1.cidr,
|
|
"exists": True}]
|
|
self.verifyFrrConf(vpc_router, frr_configs)
|
|
|
|
# 9. Create VPC tier-2 with Network ACL in the VPC
|
|
self.services["network"]["name"] = "test_vpc_tier_dynamic_2"
|
|
test_vpc_tier_dynamic_2 = Network.create(self.regular_user_apiclient,
|
|
self.services["network"],
|
|
networkofferingid=self.vpc_network_offering_dynamic.id,
|
|
zoneid=self.zone.id,
|
|
domainid=self.sub_domain.id,
|
|
accountid=self.regular_user.name,
|
|
vpcid=test_vpc_dynamic.id,
|
|
gateway=VPC_CIDR_PREFIX + ".9.1",
|
|
netmask="255.255.255.0",
|
|
aclid=test_network_acl_dynamic.id
|
|
)
|
|
self.cleanup.append(test_vpc_tier_dynamic_2)
|
|
|
|
# 10. Create VM-2 in the VPC tier-2
|
|
test_vpc_vm_dynamic_2 = VirtualMachine.create(
|
|
self.regular_user_apiclient,
|
|
self.services["virtual_machine"],
|
|
zoneid=self.zone.id,
|
|
domainid=self.sub_domain.id,
|
|
accountid=self.regular_user.name,
|
|
networkids=test_vpc_tier_dynamic_2.id,
|
|
serviceofferingid=self.service_offering.id,
|
|
templateid=self.template.id)
|
|
self.cleanup.append(test_vpc_vm_dynamic_2)
|
|
|
|
# 11. Verify frr.conf in VPC VR
|
|
frr_configs = [{"config": "neighbor %s remote-as %s" % (bgppeer_1.ipaddress, bgppeer_1.asnumber),
|
|
"exists": True},
|
|
{"config": "neighbor %s password %s" % (bgppeer_1.ipaddress, PASSWORD_1),
|
|
"exists": True},
|
|
{"config": "neighbor %s remote-as %s" % (bgppeer_2.ipaddress, bgppeer_2.asnumber),
|
|
"exists": True},
|
|
{"config": "neighbor %s password %s" % (bgppeer_2.ipaddress, PASSWORD_2),
|
|
"exists": True},
|
|
{"config": "network %s" % test_vpc_tier_dynamic_1.cidr,
|
|
"exists": True},
|
|
{"config": "network %s" % test_vpc_tier_dynamic_2.cidr,
|
|
"exists": True}]
|
|
self.verifyFrrConf(vpc_router, frr_configs)
|
|
|
|
# 12. Reboot VPC VR
|
|
self.rebootRouter(vpc_router)
|
|
|
|
# 13. Verify frr.conf in VPC VR
|
|
vpc_router = self.get_router(vpcid=test_vpc_dynamic.id)
|
|
self.verifyFrrConf(vpc_router, frr_configs)
|
|
|
|
# 14. Update VPC BGP peers (to bgppeer_2)
|
|
test_vpc_dynamic.changeBgpPeers(
|
|
self.apiclient,
|
|
bgppeerids=[bgppeer_2.id]
|
|
)
|
|
|
|
# 15. Verify frr.conf in VPC VR
|
|
frr_configs = [{"config": "neighbor %s remote-as %s" % (bgppeer_1.ipaddress, bgppeer_1.asnumber),
|
|
"exists": False},
|
|
{"config": "neighbor %s password %s" % (bgppeer_1.ipaddress, PASSWORD_1),
|
|
"exists": False},
|
|
{"config": "neighbor %s remote-as %s" % (bgppeer_2.ipaddress, bgppeer_2.asnumber),
|
|
"exists": True},
|
|
{"config": "neighbor %s password %s" % (bgppeer_2.ipaddress, PASSWORD_2),
|
|
"exists": True},
|
|
{"config": "network %s" % test_vpc_tier_dynamic_1.cidr,
|
|
"exists": True},
|
|
{"config": "network %s" % test_vpc_tier_dynamic_2.cidr,
|
|
"exists": True}]
|
|
self.verifyFrrConf(vpc_router, frr_configs)
|
|
|
|
# 16. Update VPC BGP peers (to null)
|
|
test_vpc_dynamic.changeBgpPeers(
|
|
self.apiclient,
|
|
bgppeerids=[]
|
|
)
|
|
|
|
# 17. Verify frr.conf in VPC VR
|
|
frr_configs = [{"config": "neighbor %s remote-as %s" % (bgppeer_1.ipaddress, bgppeer_1.asnumber),
|
|
"exists": True},
|
|
{"config": "neighbor %s password %s" % (bgppeer_1.ipaddress, PASSWORD_1),
|
|
"exists": True},
|
|
{"config": "neighbor %s remote-as %s" % (bgppeer_2.ipaddress, bgppeer_2.asnumber),
|
|
"exists": True},
|
|
{"config": "neighbor %s password %s" % (bgppeer_2.ipaddress, PASSWORD_2),
|
|
"exists": True},
|
|
{"config": "network %s" % test_vpc_tier_dynamic_1.cidr,
|
|
"exists": True},
|
|
{"config": "network %s" % test_vpc_tier_dynamic_2.cidr,
|
|
"exists": True}]
|
|
self.verifyFrrConf(vpc_router, frr_configs)
|
|
|
|
|
|
@attr(tags=['advanced'], required_hardware=False)
|
|
def test_13_asn_ranges(self):
|
|
""" Test for ASN ranges"""
|
|
"""
|
|
# 1. Create an ASN range without overlap
|
|
# 2. List ASN ranges by zoneid
|
|
# 3. List ASN numbers by ASN range id
|
|
# 4. Create an ASN range with overlap, it should fail
|
|
# 5. Delete ASN range
|
|
"""
|
|
self.message("Running test_13_asn_ranges")
|
|
|
|
# 1. Create an ASN range without overlap
|
|
asnrange_2 = ASNRange.create(
|
|
self.apiclient,
|
|
zoneid=self.zone.id,
|
|
startasn=END_ASN+100,
|
|
endasn=END_ASN+200
|
|
)
|
|
self.cleanup.append(asnrange_2)
|
|
|
|
# 2. List ASN ranges by zoneid
|
|
ranges = ASNRange.list(
|
|
self.apiclient,
|
|
zoneid = self.zone.id
|
|
)
|
|
self.assertEqual(
|
|
isinstance(ranges, list),
|
|
True,
|
|
"List ASN ranges by zoneid should return a valid list"
|
|
)
|
|
self.assertEqual(
|
|
len(ranges) >= 1,
|
|
True,
|
|
"The number of ASN ranges (%s) should be at least 1" % (len(ranges))
|
|
)
|
|
asnrange_2_new = None
|
|
for range in ranges:
|
|
if range.startasn == asnrange_2.startasn:
|
|
asnrange_2_new = range
|
|
break
|
|
if asnrange_2_new:
|
|
self.assertEqual(
|
|
asnrange_2_new.endasn == asnrange_2.endasn,
|
|
True,
|
|
"The end ASN of ASN range (%s-%s) should be equal to %s" % (asnrange_2_new.startasn, asnrange_2_new.endasn, asnrange_2.endasn)
|
|
)
|
|
else:
|
|
self.fail("Unable to find ASN range (%s-%s)" % (asnrange_2.startasn, asnrange_2.endasn))
|
|
|
|
# 3. List ASN numbers by ASN range id
|
|
asnumbers = ASNRange.listAsNumbers(
|
|
self.apiclient,
|
|
zoneid = self.zone.id,
|
|
asnrangeid = asnrange_2.id
|
|
)
|
|
self.assertEqual(
|
|
isinstance(asnumbers, list),
|
|
True,
|
|
"List AS numbers should return a valid list"
|
|
)
|
|
self.assertEqual(
|
|
len(asnumbers) == asnrange_2.endasn - asnrange_2.startasn + 1,
|
|
True,
|
|
"The number of asnumbers (%s) should be equal to %s" % (len(asnumbers), (asnrange_2.endasn - asnrange_2.startasn + 1))
|
|
)
|
|
|
|
# 4. Create an ASN range with overlap, it should fail
|
|
try:
|
|
asnrange_3 = ASNRange.create(
|
|
self.apiclient,
|
|
zoneid=self.zone.id,
|
|
startasn=END_ASN+150,
|
|
endasn=END_ASN+250
|
|
)
|
|
self.cleanup.append(asnrange_3)
|
|
self.fail("Succeeded to create ASN range (%s-%s) but it should fail" % (asnrange_3.startasn, asnrange_3.endasn))
|
|
except Exception as e:
|
|
self.message("Failed to create ASN range but it is expected")
|
|
|
|
# 5. Delete ASN range
|
|
asnrange_2.delete(
|
|
self.apiclient
|
|
)
|
|
self.cleanup.remove(asnrange_2)
|