# -*- encoding: utf-8 -*- # # Copyright (c) 2012 Citrix. All rights reserved. # """ BVT tests for Network Life Cycle """ #Import Local Modules from cloudstackTestCase import * from cloudstackAPI import * from settings import * import remoteSSHClient from utils import * from base import * #Import System modules import time services = TEST_NETWORK_SERVICES class TestPublicIP(cloudstackTestCase): def setUp(self): self.apiclient = self.testClient.getApiClient() def test_public_ip_admin_account(self): """Test for Associate/Disassociate public IP address for admin account""" # Validate the following: # 1. listPubliIpAddresses API returns the list of acquired addresses # 2. the returned list should contain our acquired IP address ip_address = PublicIPAddress.create( self.apiclient, services["admin_account"], services["zoneid"], services["domainid"] ) cmd = listPublicIpAddresses.listPublicIpAddressesCmd() cmd.id = ip_address.ipaddress.id list_pub_ip_addr_resp = self.apiclient.listPublicIpAddresses(cmd) #listPublicIpAddresses should return newly created public IP self.assertNotEqual( len(list_pub_ip_addr_resp), 0, "Check if new IP Address is associated" ) self.assertEqual( list_pub_ip_addr_resp[0].id, ip_address.ipaddress.id, "Check Correct IP Address is returned in the List Cacls" ) ip_address.delete(self.apiclient) # Validate the following: #1. listPublicIpAddresses API should no more return the released address cmd = listPublicIpAddresses.listPublicIpAddressesCmd() cmd.id = ip_address.ipaddress.id list_pub_ip_addr_resp = self.apiclient.listPublicIpAddresses(cmd) self.assertEqual( list_pub_ip_addr_resp, None, "Check if disassociated IP Address is no longer available" ) return def test_public_ip_user_account(self): """Test for Associate/Disassociate public IP address for user account""" # Validate the following: # 1. listPubliIpAddresses API returns the list of acquired addresses # 2. the returned list should contain our acquired IP address ip_address = PublicIPAddress.create( self.apiclient, services["user_account"], services["zoneid"], services["domainid"] ) cmd = listPublicIpAddresses.listPublicIpAddressesCmd() cmd.id = ip_address.ipaddress.id #listPublicIpAddresses should return newly created public IP list_pub_ip_addr_resp = self.apiclient.listPublicIpAddresses(cmd) self.assertNotEqual( len(list_pub_ip_addr_resp), 0, "Check if new IP Address is associated" ) self.assertEqual( list_pub_ip_addr_resp[0].id, ip_address.ipaddress.id, "Check Correct IP Address is returned in the List Call" ) ip_address.delete(self.apiclient) cmd = listPublicIpAddresses.listPublicIpAddressesCmd() cmd.id = ip_address.ipaddress.id list_pub_ip_addr_resp = self.apiclient.listPublicIpAddresses(cmd) self.assertEqual( list_pub_ip_addr_resp, None, "Check if disassociated IP Address is no longer available" ) return class TestPortForwarding(cloudstackTestCase): @classmethod def setUpClass(cls): cls.api_client = fetch_api_client() #Create an account, network, VM and IP addresses cls.account = Account.create(cls.api_client, services["account"], admin=True) cls.virtual_machine = VirtualMachine.create( cls.api_client, services["server"], accountid=cls.account.account.name, ) cls._cleanup = [cls.virtual_machine, cls.account] def setUp(self): self.apiclient = self.testClient.getApiClient() self.cleanup = [] return @classmethod def tearDownClass(cls): cleanup_resources(cls.api_client, cls._cleanup) return def tearDown(self): cleanup_resources(self.apiclient, self.cleanup) return def test_01_port_fwd_on_src_nat(self): """Test for port forwarding on source NAT""" #Validate the following: #1. listPortForwarding rules API should return the added PF rule #2. attempt to do an ssh into the user VM through the sourceNAT cmd = listPublicIpAddresses.listPublicIpAddressesCmd() cmd.account = self.account.account.name cmd.domainid = services["server"]["domainid"] src_nat_ip_addr = self.apiclient.listPublicIpAddresses(cmd)[0] #Create NAT rule nat_rule = NATRule.create(self.apiclient, self.virtual_machine, services["natrule"], src_nat_ip_addr.id) time.sleep(60) cmd = listPortForwardingRules.listPortForwardingRulesCmd() cmd.id = nat_rule.id list_nat_rule_response = self.apiclient.listPortForwardingRules(cmd) self.assertNotEqual( len(list_nat_rule_response), 0, "Check Port Forwarding Rule is created" ) self.assertEqual( list_nat_rule_response[0].id, nat_rule.id, "Check Correct Port forwarding Rule is returned" ) #SSH virtual machine to test port forwarding try: self.virtual_machine.get_ssh_client(src_nat_ip_addr.ipaddress) except Exception as e: self.fail("SSH Access failed for %s: %s" %(self.virtual_machine.ipaddress, e)) nat_rule.delete(self.apiclient) time.sleep(60) cmd = listPortForwardingRules.listPortForwardingRulesCmd() cmd.id = nat_rule.id list_nat_rule_response = self.apiclient.listPortForwardingRules(cmd) self.assertEqual( list_nat_rule_response, None, "Check Port Forwarding Rule is deleted" ) # Check if the Public SSH port is inaccessible with self.assertRaises(Exception): remoteSSHClient.remoteSSHClient( ip.ipaddress, self.virtual_machine.ssh_port, self.virtual_machine.username, self.virtual_machine.password ) return def test_02_port_fwd_on_non_src_nat(self): """Test for port forwarding on non source NAT""" #Validate the following: #1. listPortForwardingRules should not return the deleted rule anymore #2. attempt to do ssh should now fail ip_address = PublicIPAddress.create(self.apiclient, self.account.account.name) self.clean_up.append(ip_address) #Create NAT rule nat_rule = NATRule.create( self.apiclient, self.virtual_machine, services["natrule"], ip_address.ipaddress.id ) time.sleep(60) #Validate the following: #1. listPortForwardingRules should not return the deleted rule anymore #2. attempt to do ssh should now fail cmd = listPortForwardingRules.listPortForwardingRulesCmd() cmd.id = nat_rule.id list_nat_rule_response = self.apiclient.listPortForwardingRules(cmd) self.assertNotEqual( len(list_nat_rule_response), 0, "Check Port Forwarding Rule is created" ) self.assertEqual( list_nat_rule_response[0].id, nat_rule.id, "Check Correct Port forwarding Rule is returned" ) try: self.virtual_machine.get_ssh_client(public_ip = ip_address.ipaddress.ipaddress) except Exception as e: self.fail("SSH Access failed for %s: %s" %(self.virtual_machine.ipaddress.ipaddress, e)) nat_rule.delete(apiclient) time.sleep(60) cmd = listPortForwardingRules.listPortForwardingRulesCmd() cmd.id = nat_rule.id list_nat_rule_response = self.apiclient.listPortForwardingRules(cmd) self.assertEqual( len(list_nat_rule_response), None, "Check Port Forwarding Rule is deleted" ) # Check if the Public SSH port is inaccessible with self.assertRaises(Exception): remoteSSHClient.remoteSSHClient( ip_address.ipaddress.ipaddress, self.virtual_machine.ssh_port, self.virtual_machine.username, self.virtual_machine.password ) return class TestLoadBalancingRule(cloudstackTestCase): @classmethod def setUpClass(cls): cls.api_client = fetch_api_client() #Create an account, network, VM and IP addresses cls.account = Account.create(cls.api_client, services["account"], admin=True) cls.vm_1 = VirtualMachine.create( cls.api_client, services["server"], accountid=cls.account.account.name, ) cls.vm_2 = VirtualMachine.create( cls.api_client, services["server"], accountid=cls.account.account.name, ) cls.non_src_nat_ip = PublicIPAddress.create( cls.api_client, cls.account.account.name, services["zoneid"], services["domainid"] ) cls._cleanup = [cls.vm_1, cls.vm_2, cls.non_src_nat_ip, cls.account] def setUp(self): self.apiclient = self.testClient.getApiClient() self.cleanup = [] return def tearDown(self): cleanup_resources(self.apiclient, self.cleanup) return @classmethod def tearDownClass(cls): cleanup_resources(cls.api_client, cls._cleanup) return def test_01_create_lb_rule_src_nat(self): """Test to create Load balancing rule with source NAT""" # Validate the Following: #1. listLoadBalancerRules should return the added rule #2. attempt to ssh twice on the load balanced IP #3. verify using the hostname of the VM that round robin is indeed happening as expected cmd = listPublicIpAddresses.listPublicIpAddressesCmd() cmd.account = self.account.account.name cmd.domainid = services["server"]["domainid"] src_nat_ip_addr = self.apiclient.listPublicIpAddresses(cmd)[0] #Create Load Balancer rule and assign VMs to rule lb_rule = LoadBalancerRule.create( self.apiclient, services["lbrule"], src_nat_ip_addr.id, accountid = self.account.account.name ) self.cleanup.append(lb_rule) lb_rule.assign(self.apiclient, [self.vm_1, self.vm_2]) cmd = listLoadBalancerRules.listLoadBalancerRulesCmd() cmd.id = lb_rule.id lb_rules = self.apiclient.listLoadBalancerRules(cmd) #verify listLoadBalancerRules lists the added load balancing rule self.assertNotEqual( len(lb_rules), 0, "Check Load Balancer Rule in its List" ) self.assertEqual( lb_rules[0].id, lb_rule.id, "Check List Load Balancer Rules returns valid Rule" ) # listLoadBalancerRuleInstances should list all instances associated with that LB rule cmd = listLoadBalancerRuleInstances.listLoadBalancerRuleInstancesCmd() cmd.id = lb_rule.id lb_instance_rules = self.apiclient.listLoadBalancerRuleInstances(cmd) self.assertNotEqual( len(lb_instance_rules), 0, "Check Load Balancer instances Rule in its List" ) self.assertEqual( lb_instance_rules[0].id, self.vm_2.id, "Check List Load Balancer instances Rules returns valid VM ID associated with it" ) self.assertEqual( lb_instance_rules[1].id, self.vm_1.id, "Check List Load Balancer instances Rules returns valid VM ID associated with it" ) ssh_1 = remoteSSHClient.remoteSSHClient( src_nat_ip_addr.ipaddress, services['lbrule']["publicport"], self.vm_1.username, self.vm_1.password ) #If Round Robin Algorithm is chosen, each ssh command should alternate between VMs hostnames = [ssh_1.execute("hostname")[0]] time.sleep(20) ssh_2 = remoteSSHClient.remoteSSHClient( src_nat_ip_addr.ipaddress, services['lbrule']["publicport"], self.vm_1.username, self.vm_1.password ) hostnames.append(ssh_2.execute("hostname")[0]) self.assertIn(self.vm_1.name, hostnames, "Check if ssh succeeded for server1") self.assertIn(self.vm_2.name, hostnames, "Check if ssh succeeded for server2") #SSH should pass till there is a last VM associated with LB rule lb_rule.remove(self.apiclient, [self.vm_2]) hostnames.append(ssh_1.execute("hostname")[0]) self.assertIn(self.vm_1.name, hostnames, "Check if ssh succeeded for server1") lb_rule.remove(self.apiclient, [self.vm_1]) with self.assertRaises(Exception): ssh_1.execute("hostname")[0] return def test_02_create_lb_rule_non_nat(self): """Test to create Load balancing rule with source NAT""" # Validate the Following: #1. listLoadBalancerRules should return the added rule #2. attempt to ssh twice on the load balanced IP #3. verify using the hostname of the VM that round robin is indeed happening as expected #Create Load Balancer rule and assign VMs to rule lb_rule = LoadBalancerRule.create( self.apiclient, services["lbrule"], self.non_src_nat_ip.ipaddress.id, accountid = self.account.account.name ) self.cleanup.append(lb_rule) lb_rule.assign(self.apiclient, [self.vm_1, self.vm_2]) cmd = listLoadBalancerRules.listLoadBalancerRulesCmd() cmd.id = lb_rule.id lb_rules = self.apiclient.listLoadBalancerRules(cmd) #verify listLoadBalancerRules lists the added load balancing rule self.assertNotEqual( len(lb_rules), 0, "Check Load Balancer Rule in its List" ) self.assertEqual( lb_rules[0].id, lb_rule.id, "Check List Load Balancer Rules returns valid Rule" ) # listLoadBalancerRuleInstances should list all instances associated with that LB rule cmd = listLoadBalancerRuleInstances.listLoadBalancerRuleInstancesCmd() cmd.id = lb_rule.id lb_instance_rules = self.apiclient.listLoadBalancerRuleInstances(cmd) self.assertNotEqual( len(lb_instance_rules), 0, "Check Load Balancer instances Rule in its List" ) self.assertEqual( lb_instance_rules[0].id, self.vm_2.id, "Check List Load Balancer instances Rules returns valid VM ID associated with it" ) self.assertEqual( lb_instance_rules[1].id, self.vm_1.id, "Check List Load Balancer instances Rules returns valid VM ID associated with it" ) ssh_1 = remoteSSHClient.remoteSSHClient( self.non_src_nat_ip.ipaddress.ipaddress, services['lbrule']["publicport"], self.vm_1.username, self.vm_1.password ) #If Round Robin Algorithm is chosen, each ssh command should alternate between VMs hostnames = [ssh_1.execute("hostname")[0]] time.sleep(20) ssh_2 = remoteSSHClient.remoteSSHClient( self.non_src_nat_ip.ipaddress.ipaddress, services['lbrule']["publicport"], self.vm_1.username, self.vm_1.password ) hostnames.append(ssh_2.execute("hostname")[0]) self.assertIn(self.vm_1.name, hostnames, "Check if ssh succeeded for server1") self.assertIn(self.vm_2.name, hostnames, "Check if ssh succeeded for server2") #SSH should pass till there is a last VM associated with LB rule lb_rule.remove(self.apiclient, [self.vm_2]) hostnames.append(ssh_1.execute("hostname")[0]) self.assertIn(self.vm_1.name, hostnames, "Check if ssh succeeded for server1") lb_rule.remove(self.apiclient, [self.vm_1]) with self.assertRaises(Exception): ssh_1.execute("hostname")[0] return class TestRebootRouter(cloudstackTestCase): def setUp(self): self.apiclient = self.testClient.getApiClient() #Create an account, network, VM and IP addresses self.account = Account.create(self.apiclient, services["account"], admin=True) self.vm_1 = VirtualMachine.create( self.apiclient, services["server"], accountid=self.account.account.name, ) src_nat_ip_addr = self.account.public_ip.ipaddress lb_rule = LoadBalancerRule.create( self.apiclient, services["lbrule"], src_nat_ip_addr.id, self.account.account.name ) lb_rule.assign(self.apiclient, [self.vm_1]) #nat_rule = NATRule.create(self.apiclient, self.vm_1, services["natrule"], src_nat_ip_addr.id) self.cleanup = [self.vm_1, lb_rule, account] return def test_reboot_router(self): """Test for reboot router""" #Validate the Following #1. Post restart PF and LB rules should still function #2. verify if the ssh into the virtual machine still works through the sourceNAT Ip #Retrieve router for the user account cmd = listRouters.listRoutersCmd() cmd.account = self.account.account.name cmd.domainid = self.account.account.domainid routers = self.apiclient.listRouters(cmd) router = routers[0] cmd = rebootRouter.rebootRouterCmd() cmd.id = router.id self.apiclient.rebootRouter(cmd) #Sleep to ensure router is rebooted properly time.sleep(60) cmd = listPublicIpAddresses.listPublicIpAddressesCmd() cmd.account = self.account.account.name cmd.domainid = services["server"]["domainid"] src_nat_ip_addr = self.apiclient.listPublicIpAddresses(cmd)[0] #we should be able to SSH after successful reboot try: remoteSSHClient.remoteSSHClient( src_nat_ip_addr.ipaddress, services["natrule"]["publicport"], self.vm_1.username, self.vm_1.password ) except Exception as e: self.fail("SSH Access failed for %s: %s" %(self.vm_1.ipaddress, e)) return def tearDown(self): cleanup_resources(self.apiclient, self.cleanup) return class TestAssignRemoveLB(cloudstackTestCase): def setUp(self): self.apiclient = self.testClient.getApiClient() #Create VMs, accounts self.account = Account.create(self.apiclient, services["account"], admin=True) self.vm_1 = VirtualMachine.create( self.apiclient, services["server"], accountid=self.account.account.name, ) self.vm_2 = VirtualMachine.create( self.apiclient, services["server"], accountid=self.account.account.name, ) self.vm_3 = VirtualMachine.create( self.apiclient, services["server"], accountid=self.account.account.name, ) self.cleanup = [self.vm_1, self.vm_2, self.vm_3] return def test_assign_and_removal_elb(self): """Test for assign & removing load balancing rule""" #Validate: #1. Verify list API - listLoadBalancerRules lists all the rules with the relevant ports #2. listLoadBalancerInstances will list the instances associated with the corresponding rule. #3. verify ssh attempts should pass as long as there is at least one instance associated with the rule cmd = listPublicIpAddresses.listPublicIpAddressesCmd() cmd.account = self.account.account.name cmd.domainid = services["server"]["domainid"] self.non_src_nat_ip = self.apiclient.listPublicIpAddresses(cmd)[0] lb_rule = LoadBalancerRule.create( self.apiclient, services["lbrule"], self.non_src_nat_ip.id, self.account.account.name ) self.cleanup.append(lb_rule) lb_rule.assign(self.apiclient, [self.vm_1, self.vm_2]) #Create SSH client for each VM ssh_1 = remoteSSHClient.remoteSSHClient( self.non_src_nat_ip.ipaddress, services["natrule"]["publicport"], self.vm_1.username, self.vm_1.password ) ssh_2 = remoteSSHClient.remoteSSHClient( self.non_src_nat_ip.ipaddress, services["natrule"]["publicport"], self.vm_2.username, self.vm_2.password ) ssh_3 = remoteSSHClient.remoteSSHClient( self.non_src_nat_ip.ipaddress, services["natrule"]["publicport"], self.vm_3.username, self.vm_3.password ) #If Round Robin Algorithm is chosen, each ssh command should alternate between VMs res_1 = ssh_1.execute("hostname")[0] time.sleep(20) res_2 = ssh_2.execute("hostname")[0] self.assertIn(self.vm_1.name, res_1, "Check if ssh succeeded for server1") self.assertIn(self.vm_2.name, res_2, "Check if ssh succeeded for server2") #Removing VM and assigning another VM to LB rule lb_rule.remove(self.apiclient, [self.vm_2]) res_1 = ssh_1.execute("hostname")[0] self.assertIn(self.vm_1.name, res_1, "Check if ssh succeeded for server1") lb_rule.assign(self.apiclient, [self.vm_3]) res_1 = ssh_1.execute("hostname")[0] time.sleep(20) res_3 = ssh_3.execute("hostname")[0] self.assertIn(self.vm_1.name, res_1, "Check if ssh succeeded for server1") self.assertIn(self.vm_3.name, res_3, "Check if ssh succeeded for server3") return def teardown(self): cleanup_resources(self.apiclient, self.cleanup) return class TestReleaseIP(cloudstackTestCase): def setUp(self): self.apiclient = self.testClient.getApiClient() #Create an account, network, VM, Port forwarding rule, LB rules and IP addresses self.account = Account.create(self.apiclient, services["account"], admin=True) self.virtual_machine = VirtualMachine.create( self.apiclient, services["server"], accountid=self.account.account.name, ) self.ip_address = PublicIPAddress.create( self.apiclient, self.account.account.name, services["zoneid"], self.account.account.domainid ) cmd = listPublicIpAddresses.listPublicIpAddressesCmd() cmd.account = self.account.account.name cmd.domainid = services["server"]["domainid"] self.ip_addr = self.apiclient.listPublicIpAddresses(cmd)[0] self.nat_rule = NATRule.create(self.apiclient, self.virtual_machine, services["natrule"], self.ip_addr.id) self.lb_rule = LoadBalancerRule.create(self.apiclient, services["lbrule"], self.ip_addr.id, accountid = self.account.account.name) self.cleanup = [self.virtual_machine, self.account] return def teardown(self): cleanup_resources(self.apiclient,self.cleanup) def test_releaseIP(self): """Test for Associate/Disassociate public IP address""" self.ip_address.delete(self.apiclient) # ListPublicIpAddresses should not list deleted Public IP address cmd = listPublicIpAddresses.listPublicIpAddressesCmd() cmd.id = self.ip_addr.id list_pub_ip_addr_resp = self.apiclient.listPublicIpAddresses(cmd) self.assertEqual( list_pub_ip_addr_resp, None, "Check if disassociated IP Address is no longer available" ) # ListPortForwardingRules should not list associated rules with Public IP address cmd = listPortForwardingRules.listPortForwardingRulesCmd() cmd.id = self.nat_rule.id list_nat_rules = self.apiclient.listPortForwardingRules(cmd) self.assertEqual( list_nat_rules, None, "Check if Port forwarding rules for disassociated IP Address are no longer available" ) # listLoadBalancerRules should not list associated rules with Public IP address cmd = listLoadBalancerRules.listLoadBalancerRulesCmd() cmd.id = self.lb_rule.id list_lb_rules = self.apiclient.listLoadBalancerRules(cmd) self.assertEqual( list_lb_rules, None, "Check if LB rules for disassociated IP Address are no longer available" ) # SSH Attempt though public IP should fail with self.assertRaises(Exception): ssh_2 = remoteSSHClient.remoteSSHClient( self.ip_addr.ipaddress, services["natrule"]["publicport"], self.virtual_machine.username, self.virtual_machine.password ) return class TestDeleteAccount(cloudstackTestCase): def setUp(self): self.apiclient = self.testClient.getApiClient() #Create an account, network, VM and IP addresses self.account = Account.create(self.apiclient, services["account"], admin=True) self.vm_1 = VirtualMachine.create( self.apiclient, services["server"], accountid=self.account.account.name, ) cmd = listPublicIpAddresses.listPublicIpAddressesCmd() cmd.account = self.account.account.name cmd.domainid = services["server"]["domainid"] src_nat_ip_addr = self.apiclient.listPublicIpAddresses(cmd)[0] self.lb_rule = LoadBalancerRule.create( self.apiclient, services["lbrule"], src_nat_ip_addr.id, self.account.account.name ) self.lb_rule.assign(self.apiclient, [self.vm_1]) self.nat_rule = NATRule.create(self.apiclient, self.vm_1, services["natrule"], src_nat_ip_addr.id) self.cleanup = [] return def test_delete_account(self): """Test for delete account""" #Validate the Following # 1. after account.cleanup.interval (global setting) time all the PF/LB rules should be deleted # 2. verify that list(LoadBalancer/PortForwarding)Rules API does not return any rules for the account # 3. The domR should have been expunged for this account self.account.delete(self.apiclient) time.sleep(120) # ListLoadBalancerRules should not list associated rules with deleted account cmd = listLoadBalancerRules.listLoadBalancerRulesCmd() cmd.account = self.account.account.name cmd.domainid = services["server"]["domainid"] # Unable to find account testuser1 in domain 1 : Exception with self.assertRaises(Exception): self.apiclient.listLoadBalancerRules(cmd) # ListPortForwardingRules should not list associated rules with deleted account cmd = listPortForwardingRules.listPortForwardingRulesCmd() cmd.account = self.account.account.name cmd.domainid = services["server"]["domainid"] with self.assertRaises(Exception): self.apiclient.listPortForwardingRules(cmd) #Retrieve router for the user account cmd = listRouters.listRoutersCmd() cmd.account = self.account.account.name cmd.domainid = self.account.account.domainid with self.assertRaises(Exception): routers = self.apiclient.listRouters(cmd) return def tearDown(self): cleanup_resources(self.apiclient, self.cleanup) return