diff --git a/tools/testClient/testcase/P1-tests/test_egress_rules.py b/tools/testClient/testcase/P1-tests/test_egress_rules.py new file mode 100644 index 00000000000..1edb37a8626 --- /dev/null +++ b/tools/testClient/testcase/P1-tests/test_egress_rules.py @@ -0,0 +1,2335 @@ +# -*- encoding: utf-8 -*- +# Copyright 2012 Citrix Systems, Inc. Licensed under the +# Apache License, Version 2.0 (the "License"); you may not use this +# file except in compliance with the License. Citrix Systems, Inc. +# reserves all rights not expressly granted by the License. +# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Automatically generated by addcopyright.py at 04/03/2012 + +""" P1 for Egresss & Ingress rules +""" +#Import Local Modules +from cloudstackTestCase import * +from cloudstackAPI import * +import remoteSSHClient +from testcase.libs.utils import * +from testcase.libs.base import * +from testcase.libs.common import * + +#Import System modules +import time +import subprocess + + +class Services: + """Test Security groups Services + """ + + def __init__(self): + self.services = { + "disk_offering": { + "displaytext": "Small", + "name": "Small", + "disksize": 1 + }, + "account": { + "email": "test@test.com", + "firstname": "Test", + "lastname": "User", + "username": "test", + # Random characters are appended in create account to + # ensure unique username generated each time + "password": "fr3sca", + }, + "virtual_machine": { + # Create a small virtual machine instance with disk offering + "displayname": "Test VM", + "username": "root", # VM creds for SSH + "password": "password", + "ssh_port": 22, + "hypervisor": 'XenServer', + "privateport": 22, + "publicport": 22, + "protocol": 'TCP', + "userdata": 'This is sample data', + }, + "service_offering": { + "name": "Tiny Instance", + "displaytext": "Tiny Instance", + "cpunumber": 1, + "cpuspeed": 100, # in MHz + "memory": 64, # In MBs + }, + "security_group": { + "name": 'SSH', + "protocol": 'TCP', + "startport": 22, + "endport": 22, + "cidrlist": '0.0.0.0/0', + }, + "sg_invalid_port": { + "name": 'SSH', + "protocol": 'TCP', + "startport": -22, + "endport": -22, + "cidrlist": '0.0.0.0/0', + }, + "sg_invalid_cidr": { + "name": 'SSH', + "protocol": 'TCP', + "startport": 22, + "endport": 22, + "cidrlist": '0.0.0.10' + }, + "sg_cidr_anywhere": { + "name": 'SSH', + "protocol": 'TCP', + "startport": 22, + "endport": 22, + "cidrlist": '0.0.0.0/0' + }, + "sg_cidr_restricted": { + "name": 'SSH', + "protocol": 'TCP', + "startport": 22, + "endport": 22, + "cidrlist": '10.0.0.1/24', + }, + "sg_account": { + "name": 'SSH', + "protocol": 'TCP', + "startport": 22, + "endport": 22, + "cidrlist": '0.0.0.0/0', + }, + "mgmt_server": { + "username": "root", + "password": "fr3sca", + "ipaddress": "192.168.100.21" + }, + "ostypeid": '85cb528f-72ed-4df9-ac6a-f6ccf0892ff2', + # CentOS 5.3 (64-bit) + "sleep": 60, + "timeout": 10, + "mode": 'basic', + # Networking mode: Basic or Advanced + } + + +class TestDefaultSecurityGroupEgress(cloudstackTestCase): + + def setUp(self): + + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.cleanup = [] + return + + def tearDown(self): + try: + self.dbclient.close() + #Clean up, terminate the created templates + cleanup_resources(self.apiclient, self.cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + @classmethod + def setUpClass(cls): + cls.services = Services().services + cls.api_client = super( + TestDefaultSecurityGroupEgress, + cls + ).getClsTestClient().getApiClient() + + # Get Zone, Domain and templates + cls.domain = get_domain(cls.api_client, cls.services) + cls.zone = get_zone(cls.api_client, cls.services) + + template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostypeid"] + ) + cls.services["domainid"] = cls.domain.id + cls.services["virtual_machine"]["zoneid"] = cls.zone.id + cls.services["virtual_machine"]["template"] = template.id + + cls.service_offering = ServiceOffering.create( + cls.api_client, + cls.services["service_offering"] + ) + cls.account = Account.create( + cls.api_client, + cls.services["account"], + admin=True, + domainid=cls.domain.id + ) + cls.services["account"] = cls.account.account.name + + cls._cleanup = [ + cls.account, + cls.service_offering + ] + return + + @classmethod + def tearDownClass(cls): + try: + #Cleanup resources used + cleanup_resources(cls.api_client, cls._cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + + return + + def test_deployVM_InDefaultSecurityGroup(self): + """Test deploy VM in default security group with no egress rules + """ + + # Validate the following: + # 1. Deploy a VM. + # 2. Deployed VM should be running, verify with listVirtualMachiens + # 3. listSecurityGroups for this account. should list the default + # security group with no egress rules + # 4. listVirtualMachines should show that the VM belongs to default + # security group + + self.debug("Deploying VM in account: %s" % self.account.account.name) + self.virtual_machine = VirtualMachine.create( + self.apiclient, + self.services["virtual_machine"], + accountid=self.account.account.name, + domainid=self.account.account.domainid, + serviceofferingid=self.service_offering.id + ) + self.debug("Deployed VM with ID: %s" % self.virtual_machine.id) + self.cleanup.append(self.virtual_machine) + + list_vm_response = list_virtual_machines( + self.apiclient, + id=self.virtual_machine.id + ) + self.debug( + "Verify listVirtualMachines response for virtual machine: %s" \ + % self.virtual_machine.id + ) + self.assertEqual( + isinstance(list_vm_response, list), + True, + "Check for list VM response" + ) + vm_response = list_vm_response[0] + self.assertNotEqual( + len(list_vm_response), + 0, + "Check VM available in List Virtual Machines" + ) + + self.assertEqual( + + vm_response.id, + self.virtual_machine.id, + "Check virtual machine id in listVirtualMachines" + ) + + self.assertEqual( + vm_response.state, + 'Running', + "VM state should be running" + ) + self.assertEqual( + hasattr(vm_response, "securitygroup"), + True, + "List VM response should have atleast one security group" + ) + + # Verify listSecurity groups response + security_groups = SecurityGroup.list( + self.apiclient, + account=self.account.account.name, + domainid=self.account.account.domainid + ) + self.assertEqual( + isinstance(security_groups, list), + True, + "Check for list security groups response" + ) + self.assertEqual( + len(security_groups), + 1, + "Check List Security groups response" + ) + self.debug("List Security groups response: %s" % + str(security_groups)) + sec_grp = security_groups[0] + self.assertEqual( + sec_grp.name, + 'default', + "List Sec Group should only list default sec. group" + ) + return + + +class TestAuthorizeIngressRule(cloudstackTestCase): + + def setUp(self): + + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.cleanup = [] + return + + def tearDown(self): + try: + self.dbclient.close() + #Clean up, terminate the created templates + cleanup_resources(self.apiclient, self.cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + @classmethod + def setUpClass(cls): + cls.services = Services().services + cls.api_client = super( + TestAuthorizeIngressRule, + cls + ).getClsTestClient().getApiClient() + + # Get Zone, Domain and templates + cls.domain = get_domain(cls.api_client, cls.services) + cls.zone = get_zone(cls.api_client, cls.services) + + template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostypeid"] + ) + cls.services["domainid"] = cls.domain.id + cls.services["virtual_machine"]["zoneid"] = cls.zone.id + cls.services["virtual_machine"]["template"] = template.id + + cls.service_offering = ServiceOffering.create( + cls.api_client, + cls.services["service_offering"] + ) + cls.account = Account.create( + cls.api_client, + cls.services["account"], + domainid=cls.domain.id + ) + cls.services["account"] = cls.account.account.name + cls._cleanup = [ + cls.account, + cls.service_offering + ] + return + + @classmethod + def tearDownClass(cls): + try: + #Cleanup resources used + cleanup_resources(cls.api_client, cls._cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + + return + + def test_authorizeIngressRule(self): + """Test authorize ingress rule + """ + + # Validate the following: + # 1. createaccount of type user + # 2. createsecuritygroup (ssh) for this account + # 3. authorizeSecurityGroupIngress to allow ssh access to the VM + # 4. deployVirtualMachine into this security group (ssh). deployed VM + # should be Running + # 5. listSecurityGroups should show two groups, default and ssh + # 6. verify that ssh-access into the VM is now allowed + # 7. verify from within the VM is able to ping outside world + # (ping www.google.com) + + security_group = SecurityGroup.create( + self.apiclient, + self.services["security_group"], + account=self.account.account.name, + domainid=self.account.account.domainid + ) + self.debug("Created security group with ID: %s" % security_group.id) + # Default Security group should not have any ingress rule + sercurity_groups = SecurityGroup.list( + self.apiclient, + account=self.account.account.name, + domainid=self.account.account.domainid + ) + self.assertEqual( + isinstance(sercurity_groups, list), + True, + "Check for list security groups response" + ) + + self.assertEqual( + len(sercurity_groups), + 2, + "Check List Security groups response" + ) + # Authorize Security group to SSH to VM + ingress_rule = security_group.authorize( + self.apiclient, + self.services["security_group"], + account=self.account.account.name, + domainid=self.account.account.domainid + ) + self.assertEqual( + isinstance(ingress_rule, dict), + True, + "Check ingress rule created properly" + ) + + self.debug("Authorizing ingress rule for sec group ID: %s for ssh access" + % security_group.id) + self.virtual_machine = VirtualMachine.create( + self.apiclient, + self.services["virtual_machine"], + accountid=self.account.account.name, + domainid=self.account.account.domainid, + serviceofferingid=self.service_offering.id, + securitygroupids=[security_group.id] + ) + self.debug("Deploying VM in account: %s" % self.account.account.name) + # Should be able to SSH VM + try: + self.debug("SSH into VM: %s" % self.virtual_machine.ssh_ip) + ssh = self.virtual_machine.get_ssh_client() + + # Ping to outsite world + res = ssh.execute("ping -c 1 www.google.com") + # res = 64 bytes from maa03s17-in-f20.1e100.net (74.125.236.212): + # icmp_req=1 ttl=57 time=25.9 ms + # --- www.l.google.com ping statistics --- + # 1 packets transmitted, 1 received, 0% packet loss, time 0ms + # rtt min/avg/max/mdev = 25.970/25.970/25.970/0.000 ms + except Exception as e: + self.fail("SSH Access failed for %s: %s" % \ + (self.virtual_machine.ipaddress, e) + ) + + result = str(res) + self.assertEqual( + result.count("1 received"), + 1, + "Ping to outside world from VM should be successful" + ) + return + + +class TestDefaultGroupEgress(cloudstackTestCase): + + def setUp(self): + + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.cleanup = [] + return + + def tearDown(self): + try: + self.dbclient.close() + #Clean up, terminate the created templates + cleanup_resources(self.apiclient, self.cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + @classmethod + def setUpClass(cls): + cls.services = Services().services + cls.api_client = super( + TestDefaultGroupEgress, + cls + ).getClsTestClient().getApiClient() + + # Get Zone, Domain and templates + cls.domain = get_domain(cls.api_client, cls.services) + cls.zone = get_zone(cls.api_client, cls.services) + + template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostypeid"] + ) + cls.services["domainid"] = cls.domain.id + cls.services["virtual_machine"]["zoneid"] = cls.zone.id + cls.services["virtual_machine"]["template"] = template.id + + cls.service_offering = ServiceOffering.create( + cls.api_client, + cls.services["service_offering"] + ) + cls.account = Account.create( + cls.api_client, + cls.services["account"], + domainid=cls.domain.id + ) + cls.services["account"] = cls.account.account.name + cls._cleanup = [ + cls.account, + cls.service_offering + ] + return + + @classmethod + def tearDownClass(cls): + try: + #Cleanup resources used + cleanup_resources(cls.api_client, cls._cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + + return + + def test_01_default_group_with_egress(self): + """Test default group with egress rule before VM deploy and ping, ssh + """ + + # Validate the following: + # 1. createaccount of type user + # 2. createsecuritygroup (ssh) for this account + # 3. authorizeSecurityGroupIngress to allow ssh access to the VM + # 4. authorizeSecurityGroupEgress to allow ssh access only out to + # CIDR: 0.0.0.0/0 + # 5. deployVirtualMachine into this security group (ssh) + # 6. deployed VM should be Running, ssh should be allowed into the VM, + # ping out to google.com from the VM should fail, + # ssh from within VM to mgt server should pass + + security_group = SecurityGroup.create( + self.apiclient, + self.services["security_group"], + account=self.account.account.name, + domainid=self.account.account.domainid + ) + self.debug("Created security group with ID: %s" % security_group.id) + + # Default Security group should not have any ingress rule + sercurity_groups = SecurityGroup.list( + self.apiclient, + account=self.account.account.name, + domainid=self.account.account.domainid + ) + self.assertEqual( + isinstance(sercurity_groups, list), + True, + "Check for list security groups response" + ) + + self.assertEqual( + len(sercurity_groups), + 2, + "Check List Security groups response" + ) + # Authorize Security group to SSH to VM + self.debug("Authorizing ingress rule for sec group ID: %s for ssh access" + % security_group.id) + ingress_rule = security_group.authorize( + self.apiclient, + self.services["security_group"], + account=self.account.account.name, + domainid=self.account.account.domainid + ) + + self.assertEqual( + isinstance(ingress_rule, dict), + True, + "Check ingress rule created properly" + ) + + ssh_rule = (ingress_rule["ingressrule"][0]).__dict__ + + # Authorize Security group to SSH to VM + self.debug("Authorizing egress rule for sec group ID: %s for ssh access" + % security_group.id) + egress_rule = security_group.authorizeEgress( + self.apiclient, + self.services["security_group"], + account=self.account.account.name, + domainid=self.account.account.domainid + ) + + self.assertEqual( + isinstance(egress_rule, dict), + True, + "Check egress rule created properly" + ) + ssh_egress_rule = (egress_rule["egressrule"][0]).__dict__ + + self.virtual_machine = VirtualMachine.create( + self.apiclient, + self.services["virtual_machine"], + accountid=self.account.account.name, + domainid=self.account.account.domainid, + serviceofferingid=self.service_offering.id, + securitygroupids=[security_group.id] + ) + self.debug("Deploying VM in account: %s" % self.account.account.name) + + # Should be able to SSH VM + try: + self.debug("SSH into VM: %s" % self.virtual_machine.ssh_ip) + + ssh = self.virtual_machine.get_ssh_client() + + self.debug("Ping to google.com from VM") + # Ping to outsite world + res = ssh.execute("ping -c 1 www.google.com") + # res = 64 bytes from maa03s17-in-f20.1e100.net (74.125.236.212): + # icmp_req=1 ttl=57 time=25.9 ms + # --- www.l.google.com ping statistics --- + # 1 packets transmitted, 1 received, 0% packet loss, time 0ms + # rtt min/avg/max/mdev = 25.970/25.970/25.970/0.000 ms + except Exception as e: + self.fail("SSH Access failed for %s: %s" % \ + (self.virtual_machine.ipaddress, e) + ) + + result = str(res) + self.assertEqual( + result.count("0 received"), + 1, + "Ping to outside world from VM should be successful" + ) + + try: + self.debug("SSHing into management server from VM") + res = ssh.execute("ssh %s@%s" % ( + self.services["mgmt_server"]["username"], + self.services["mgmt_server"]["ipaddress"] + )) + self.debug("SSH result: %s" % str(res)) + + except Exception as e: + self.fail("SSH Access failed for %s: %s" % \ + (self.virtual_machine.ipaddress, e) + ) + result = str(res) + self.assertNotEqual( + result.count("No route to host"), + 1, + "SSH into management server from VM should be successful" + ) + return + + +class TestDefaultGroupEgressAfterDeploy(cloudstackTestCase): + + def setUp(self): + + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.cleanup = [] + return + + def tearDown(self): + try: + self.dbclient.close() + #Clean up, terminate the created templates + cleanup_resources(self.apiclient, self.cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + @classmethod + def setUpClass(cls): + cls.services = Services().services + cls.api_client = super( + TestDefaultGroupEgressAfterDeploy, + cls + ).getClsTestClient().getApiClient() + + # Get Zone, Domain and templates + cls.domain = get_domain(cls.api_client, cls.services) + cls.zone = get_zone(cls.api_client, cls.services) + + template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostypeid"] + ) + cls.services["domainid"] = cls.domain.id + cls.services["virtual_machine"]["zoneid"] = cls.zone.id + cls.services["virtual_machine"]["template"] = template.id + + cls.service_offering = ServiceOffering.create( + cls.api_client, + cls.services["service_offering"] + ) + cls.account = Account.create( + cls.api_client, + cls.services["account"], + domainid=cls.domain.id + ) + cls.services["account"] = cls.account.account.name + cls._cleanup = [ + cls.account, + cls.service_offering + ] + return + + @classmethod + def tearDownClass(cls): + try: + #Cleanup resources used + cleanup_resources(cls.api_client, cls._cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + + return + + def test_01_default_group_with_egress(self): + """ Test default group with egress rule added after vm deploy and ping, + ssh test + """ + + # Validate the following: + # 1. createaccount of type user + # 2. createsecuritygroup (ssh) for this account + # 3. authorizeSecurityGroupIngress to allow ssh access to the VM + # 4. deployVirtualMachine into this security group (ssh) + # 5. authorizeSecurityGroupEgress to allow ssh access only out to + # CIDR: 0.0.0.0/0 + # 6. deployed VM should be Running, ssh should be allowed into the VM, + # ping out to google.com from the VM should fail + + security_group = SecurityGroup.create( + self.apiclient, + self.services["security_group"], + account=self.account.account.name, + domainid=self.account.account.domainid + ) + self.debug("Created security group with ID: %s" % security_group.id) + + # Default Security group should not have any ingress rule + sercurity_groups = SecurityGroup.list( + self.apiclient, + account=self.account.account.name, + domainid=self.account.account.domainid + ) + self.assertEqual( + isinstance(sercurity_groups, list), + True, + "Check for list security groups response" + ) + + self.assertEqual( + len(sercurity_groups), + 2, + "Check List Security groups response" + ) + # Authorize Security group to SSH to VM + self.debug("Authorizing ingress rule for sec group ID: %s for ssh access" + % security_group.id) + ingress_rule = security_group.authorize( + self.apiclient, + self.services["security_group"], + account=self.account.account.name, + domainid=self.account.account.domainid + ) + + self.assertEqual( + isinstance(ingress_rule, dict), + True, + "Check ingress rule created properly" + ) + + ssh_rule = (ingress_rule["ingressrule"][0]).__dict__ + + self.virtual_machine = VirtualMachine.create( + self.apiclient, + self.services["virtual_machine"], + accountid=self.account.account.name, + domainid=self.account.account.domainid, + serviceofferingid=self.service_offering.id, + securitygroupids=[security_group.id] + ) + self.debug("Deploying VM in account: %s" % self.account.account.name) + + # Authorize Security group to SSH to VM + self.debug( + "Authorizing egress rule for sec group ID: %s for ssh access" + % security_group.id) + egress_rule = security_group.authorizeEgress( + self.apiclient, + self.services["security_group"], + account=self.account.account.name, + domainid=self.account.account.domainid + ) + + self.assertEqual( + isinstance(egress_rule, dict), + True, + "Check egress rule created properly" + ) + ssh_egress_rule = (egress_rule["egressrule"][0]).__dict__ + + # Should be able to SSH VM + try: + self.debug("SSH into VM: %s" % self.virtual_machine.ssh_ip) + ssh = self.virtual_machine.get_ssh_client() + + self.debug("Ping to google.com from VM") + # Ping to outsite world + res = ssh.execute("ping -c 1 www.google.com") + # res = 64 bytes from maa03s17-in-f20.1e100.net (74.125.236.212): + # icmp_req=1 ttl=57 time=25.9 ms + # --- www.l.google.com ping statistics --- + # 1 packets transmitted, 1 received, 0% packet loss, time 0ms + # rtt min/avg/max/mdev = 25.970/25.970/25.970/0.000 ms + self.debug("SSH result: %s" % str(res)) + except Exception as e: + self.fail("SSH Access failed for %s: %s" % \ + (self.virtual_machine.ipaddress, e) + ) + + result = str(res) + self.assertEqual( + result.count("0 received"), + 1, + "Ping to outside world from VM should be successful" + ) + return + + +class TestRevokeEgressRule(cloudstackTestCase): + + def setUp(self): + + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.cleanup = [] + return + + def tearDown(self): + try: + self.dbclient.close() + #Clean up, terminate the created templates + cleanup_resources(self.apiclient, self.cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + @classmethod + def setUpClass(cls): + cls.services = Services().services + cls.api_client = super( + TestRevokeEgressRule, + cls + ).getClsTestClient().getApiClient() + + # Get Zone, Domain and templates + cls.domain = get_domain(cls.api_client, cls.services) + cls.zone = get_zone(cls.api_client, cls.services) + + template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostypeid"] + ) + cls.services["domainid"] = cls.domain.id + cls.services["virtual_machine"]["zoneid"] = cls.zone.id + cls.services["virtual_machine"]["template"] = template.id + + cls.service_offering = ServiceOffering.create( + cls.api_client, + cls.services["service_offering"] + ) + cls.account = Account.create( + cls.api_client, + cls.services["account"], + domainid=cls.domain.id + ) + cls.services["account"] = cls.account.account.name + cls._cleanup = [ + cls.account, + cls.service_offering + ] + return + + @classmethod + def tearDownClass(cls): + try: + #Cleanup resources used + cleanup_resources(cls.api_client, cls._cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + + return + + def test_revoke_egress_rule(self): + """Test revoke security group egress rule + """ + + # Validate the following: + # 1. createaccount of type user + # 2. createsecuritygroup (ssh) for this account + # 3. authorizeSecurityGroupIngress to allow ssh access to the VM + # 4. authorizeSecurityGroupEgress to allow ssh access only out to + # CIDR: 0.0.0.0/0 + # 5. deployVirtualMachine into this security group (ssh) + # 6. deployed VM should be Running, ssh should be allowed into the VM, + # ping out to google.com from the VM should fail, + # ssh from within VM to mgt server should pass + # 7. Revoke egress rule. Verify ping and SSH access to management server + # is restored + + security_group = SecurityGroup.create( + self.apiclient, + self.services["security_group"], + account=self.account.account.name, + domainid=self.account.account.domainid + ) + self.debug("Created security group with ID: %s" % security_group.id) + + # Default Security group should not have any ingress rule + sercurity_groups = SecurityGroup.list( + self.apiclient, + account=self.account.account.name, + domainid=self.account.account.domainid + ) + self.assertEqual( + isinstance(sercurity_groups, list), + True, + "Check for list security groups response" + ) + + self.assertEqual( + len(sercurity_groups), + 2, + "Check List Security groups response" + ) + # Authorize Security group to SSH to VM + self.debug( + "Authorizing ingress rule for sec group ID: %s for ssh access" + % security_group.id) + ingress_rule = security_group.authorize( + self.apiclient, + self.services["security_group"], + account=self.account.account.name, + domainid=self.account.account.domainid + ) + + self.assertEqual( + isinstance(ingress_rule, dict), + True, + "Check ingress rule created properly" + ) + + ssh_rule = (ingress_rule["ingressrule"][0]).__dict__ + + # Authorize Security group to SSH to VM + self.debug( + "Authorizing egress rule for sec group ID: %s for ssh access" + % security_group.id) + egress_rule = security_group.authorizeEgress( + self.apiclient, + self.services["security_group"], + account=self.account.account.name, + domainid=self.account.account.domainid + ) + + self.assertEqual( + isinstance(egress_rule, dict), + True, + "Check egress rule created properly" + ) + ssh_egress_rule = (egress_rule["egressrule"][0]).__dict__ + + self.virtual_machine = VirtualMachine.create( + self.apiclient, + self.services["virtual_machine"], + accountid=self.account.account.name, + domainid=self.account.account.domainid, + serviceofferingid=self.service_offering.id, + securitygroupids=[security_group.id] + ) + self.debug("Deploying VM in account: %s" % self.account.account.name) + + # Should be able to SSH VM + try: + self.debug("SSH into VM: %s" % self.virtual_machine.ssh_ip) + ssh = self.virtual_machine.get_ssh_client() + + self.debug("Ping to google.com from VM") + # Ping to outsite world + res = ssh.execute("ping -c 1 www.google.com") + # res = 64 bytes from maa03s17-in-f20.1e100.net (74.125.236.212): + # icmp_req=1 ttl=57 time=25.9 ms + # --- www.l.google.com ping statistics --- + # 1 packets transmitted, 1 received, 0% packet loss, time 0ms + # rtt min/avg/max/mdev = 25.970/25.970/25.970/0.000 ms + self.debug("SSH result: %s" % str(res)) + except Exception as e: + self.fail("SSH Access failed for %s: %s" % \ + (self.virtual_machine.ipaddress, e) + ) + + result = str(res) + self.assertEqual( + result.count("0 received"), + 1, + "Ping to outside world from VM should be successful" + ) + + try: + self.debug("SSHing into management server from VM") + res = ssh.execute("ssh %s@%s" % ( + self.services["mgmt_server"]["username"], + self.services["mgmt_server"]["ipaddress"] + )) + self.debug("SSH result: %s" % str(res)) + + except Exception as e: + self.fail("SSH Access failed for %s: %s" % \ + (self.virtual_machine.ipaddress, e) + ) + result = str(res) + self.assertNotEqual( + result.count("No route to host"), + 1, + "SSH into management server from VM should be successful" + ) + + self.debug( + "Revoke Egress Rule for Security Group %s for account: %s" \ + % ( + security_group.id, + self.account.account.name + )) + + result = security_group.revokeEgress( + self.apiclient, + id=ssh_egress_rule["ruleid"] + ) + self.debug("Revoke egress rule result: %s" % result) + + # Should be able to SSH VM + try: + self.debug("SSH into VM: %s" % self.virtual_machine.ssh_ip) + ssh = self.virtual_machine.get_ssh_client(reconnect=True) + + self.debug("Ping to google.com from VM") + # Ping to outsite world + res = ssh.execute("ping -c 1 www.google.com") + # res = 64 bytes from maa03s17-in-f20.1e100.net (74.125.236.212): + # icmp_req=1 ttl=57 time=25.9 ms + # --- www.l.google.com ping statistics --- + # 1 packets transmitted, 1 received, 0% packet loss, time 0ms + # rtt min/avg/max/mdev = 25.970/25.970/25.970/0.000 ms + except Exception as e: + self.fail("SSH Access failed for %s: %s" % \ + (self.virtual_machine.ipaddress, e) + ) + + result = str(res) + self.assertEqual( + result.count("1 received"), + 1, + "Ping to outside world from VM should be successful" + ) + + try: + self.debug("SSHing into management server from VM") + res = ssh.execute("ssh %s@%s" % ( + self.services["mgmt_server"]["username"], + self.services["mgmt_server"]["ipaddress"] + )) + self.debug("SSH result: %s" % str(res)) + + except Exception as e: + self.fail("SSH Access failed for %s: %s" % \ + (self.virtual_machine.ipaddress, e) + ) + result = str(res) + self.assertNotEqual( + result.count("No route to host"), + 1, + "SSH into management server from VM should be successful" + ) + return + + +class TestInvalidAccountAuthroize(cloudstackTestCase): + + def setUp(self): + + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.cleanup = [] + return + + def tearDown(self): + try: + self.dbclient.close() + #Clean up, terminate the created templates + cleanup_resources(self.apiclient, self.cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + @classmethod + def setUpClass(cls): + cls.services = Services().services + cls.api_client = super( + TestInvalidAccountAuthroize, + cls + ).getClsTestClient().getApiClient() + + # Get Zone, Domain and templates + cls.domain = get_domain(cls.api_client, cls.services) + cls.zone = get_zone(cls.api_client, cls.services) + + template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostypeid"] + ) + cls.services["domainid"] = cls.domain.id + cls.services["virtual_machine"]["zoneid"] = cls.zone.id + cls.services["virtual_machine"]["template"] = template.id + + cls.service_offering = ServiceOffering.create( + cls.api_client, + cls.services["service_offering"] + ) + cls.account = Account.create( + cls.api_client, + cls.services["account"], + domainid=cls.domain.id + ) + cls.services["account"] = cls.account.account.name + cls._cleanup = [ + cls.account, + cls.service_offering + ] + return + + @classmethod + def tearDownClass(cls): + try: + #Cleanup resources used + cleanup_resources(cls.api_client, cls._cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + + return + + def test_invalid_account_authroize(self): + """Test invalid account authroize + """ + + # Validate the following: + # 1. createaccount of type user + # 2. createsecuritygroup (ssh) for this account + # 3. authorizeSecurityGroupEgress to allow ssh access only out to + # non-existent random account and default security group + # 4. listSecurityGroups should show ssh and default security groups + # 5. authorizeSecurityGroupEgress API should fail since there is no + # account + + security_group = SecurityGroup.create( + self.apiclient, + self.services["security_group"], + account=self.account.account.name, + domainid=self.account.account.domainid + ) + self.debug("Created security group with ID: %s" % security_group.id) + + # Default Security group should not have any ingress rule + sercurity_groups = SecurityGroup.list( + self.apiclient, + account=self.account.account.name, + domainid=self.account.account.domainid + ) + self.assertEqual( + isinstance(sercurity_groups, list), + True, + "Check for list security groups response" + ) + + self.assertEqual( + len(sercurity_groups), + 2, + "Check List Security groups response" + ) + + # Authorize Security group to SSH to VM + self.debug( + "Authorizing egress rule for sec group ID: %s for ssh access" + % security_group.id) + with self.assertRaises(Exception): + egress_rule = security_group.authorizeEgress( + self.apiclient, + self.services["security_group"], + account=random_gen(), + domainid=self.account.account.domainid + ) + return + + +class TestMultipleAccountsEgressRuleNeg(cloudstackTestCase): + + def setUp(self): + + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.cleanup = [] + return + + def tearDown(self): + try: + self.dbclient.close() + #Clean up, terminate the created templates + cleanup_resources(self.apiclient, self.cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + @classmethod + def setUpClass(cls): + cls.services = Services().services + cls.api_client = super( + TestMultipleAccountsEgressRuleNeg, + cls + ).getClsTestClient().getApiClient() + + # Get Zone, Domain and templates + cls.domain = get_domain(cls.api_client, cls.services) + cls.zone = get_zone(cls.api_client, cls.services) + + template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostypeid"] + ) + cls.services["domainid"] = cls.domain.id + cls.services["virtual_machine"]["zoneid"] = cls.zone.id + cls.services["virtual_machine"]["template"] = template.id + + cls.service_offering = ServiceOffering.create( + cls.api_client, + cls.services["service_offering"] + ) + cls.accountA = Account.create( + cls.api_client, + cls.services["account"], + domainid=cls.domain.id + ) + cls.accountB = Account.create( + cls.api_client, + cls.services["account"], + domainid=cls.domain.id + ) + cls.services["account"] = cls.accountA.account.name + cls._cleanup = [ + cls.accountA, + cls.accountB, + cls.service_offering + ] + return + + @classmethod + def tearDownClass(cls): + try: + #Cleanup resources used + cleanup_resources(cls.api_client, cls._cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + + return + + def test_multiple_account_egress_rule_negative(self): + """Test multiple account egress rules negative case + """ + + # Validate the following: + # 1. createaccount of type user A + # 2. createaccount of type user B + # 3. createsecuritygroup (SSH-A) for account A + # 4. authorizeSecurityGroupEgress in account A to allow ssh access + # only out to VMs in account B's default security group + # 5. authorizeSecurityGroupIngress in account A to allow ssh incoming + # access from anywhere into Vm's of account A. listSecurityGroups + # for account A should show two groups (default and ssh-a) and ssh + # ingress rule and account based egress rule + # 6. deployVM in account A into security group SSH-A. deployed VM + # should be Running + # 7. deployVM in account B. deployed VM should be Running + # 8. ssh into VM in account A and from there ssh to VM in account B. + # ssh should fail + + security_group = SecurityGroup.create( + self.apiclient, + self.services["security_group"], + account=self.accountA.account.name, + domainid=self.accountA.account.domainid + ) + self.debug("Created security group with ID: %s" % security_group.id) + + # Default Security group should not have any ingress rule + sercurity_groups = SecurityGroup.list( + self.apiclient, + account=self.accountA.account.name, + domainid=self.accountA.account.domainid + ) + self.assertEqual( + isinstance(sercurity_groups, list), + True, + "Check for list security groups response" + ) + + self.assertEqual( + len(sercurity_groups), + 2, + "Check List Security groups response" + ) + # Authorize Security group to SSH to VM + self.debug( + "Authorizing egress rule for sec group ID: %s for ssh access" + % security_group.id) + # Authorize to only account not CIDR + user_secgrp_list = {self.accountB.account.name: 'default'} + + egress_rule = security_group.authorizeEgress( + self.apiclient, + self.services["sg_account"], + account=self.accountA.account.name, + domainid=self.accountA.account.domainid, + user_secgrp_list=user_secgrp_list + ) + + self.assertEqual( + isinstance(egress_rule, dict), + True, + "Check egress rule created properly" + ) + ssh_egress_rule = (egress_rule["egressrule"][0]).__dict__ + + # Authorize Security group to SSH to VM + self.debug( + "Authorizing ingress rule for sec group ID: %s for ssh access" + % security_group.id) + ingress_rule = security_group.authorize( + self.apiclient, + self.services["security_group"], + account=self.accountA.account.name, + domainid=self.accountA.account.domainid + ) + + self.assertEqual( + isinstance(ingress_rule, dict), + True, + "Check ingress rule created properly" + ) + + ssh_rule = (ingress_rule["ingressrule"][0]).__dict__ + + self.virtual_machineA = VirtualMachine.create( + self.apiclient, + self.services["virtual_machine"], + accountid=self.accountA.account.name, + domainid=self.accountA.account.domainid, + serviceofferingid=self.service_offering.id, + securitygroupids=[security_group.id] + ) + self.cleanup.append(self.virtual_machineA) + self.debug("Deploying VM in account: %s" % self.accountA.account.name) + vms = VirtualMachine.list( + self.apiclient, + id=self.virtual_machineA.id, + listall=True + ) + self.assertEqual( + isinstance(vms, list), + True, + "List VM should return a valid list" + ) + vm = vms[0] + self.assertEqual( + vm.state, + "Running", + "VM state after deployment should be running" + ) + self.debug("VM: %s state: %s" % (vm.id, vm.state)) + + self.virtual_machineB = VirtualMachine.create( + self.apiclient, + self.services["virtual_machine"], + accountid=self.accountB.account.name, + domainid=self.accountB.account.domainid, + serviceofferingid=self.service_offering.id + ) + self.cleanup.append(self.virtual_machineB) + self.debug("Deploying VM in account: %s" % self.accountB.account.name) + + vms = VirtualMachine.list( + self.apiclient, + id=self.virtual_machineB.id, + listall=True + ) + self.assertEqual( + isinstance(vms, list), + True, + "List VM should return a valid list" + ) + vm = vms[0] + self.assertEqual( + vm.state, + "Running", + "VM state after deployment should be running" + ) + self.debug("VM: %s state: %s" % (vm.id, vm.state)) + + # Should be able to SSH VM + try: + self.debug("SSH into VM: %s" % self.virtual_machineA.ssh_ip) + ssh = self.virtual_machineA.get_ssh_client() + except Exception as e: + self.fail("SSH Access failed for %s: %s" % \ + (self.virtual_machineA.ipaddress, e) + ) + + try: + self.debug("SSHing into VM type B from VM A") + self.debug("VM IP: %s" % self.virtual_machineB.ssh_ip) + res = ssh.execute("ssh %s@%s" % ( + self.services["virtual_machine"]["username"], + self.virtual_machineB.ssh_ip + )) + self.debug("SSH result: %s" % str(res)) + + except Exception as e: + self.fail("SSH Access failed for %s: %s" % \ + (self.virtual_machineA.ipaddress, e) + ) + result = str(res) + self.assertEqual( + result.count("Connection timed out"), + 1, + "SSH into management server from VM should not be successful" + ) + return + + +class TestMultipleAccountsEgressRule(cloudstackTestCase): + + def setUp(self): + + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.cleanup = [] + return + + def tearDown(self): + try: + self.dbclient.close() + #Clean up, terminate the created templates + cleanup_resources(self.apiclient, self.cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + @classmethod + def setUpClass(cls): + cls.services = Services().services + cls.api_client = super( + TestMultipleAccountsEgressRule, + cls + ).getClsTestClient().getApiClient() + + # Get Zone, Domain and templates + cls.domain = get_domain(cls.api_client, cls.services) + cls.zone = get_zone(cls.api_client, cls.services) + + template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostypeid"] + ) + cls.services["domainid"] = cls.domain.id + cls.services["virtual_machine"]["zoneid"] = cls.zone.id + cls.services["virtual_machine"]["template"] = template.id + + cls.service_offering = ServiceOffering.create( + cls.api_client, + cls.services["service_offering"] + ) + cls.accountA = Account.create( + cls.api_client, + cls.services["account"], + domainid=cls.domain.id + ) + cls.accountB = Account.create( + cls.api_client, + cls.services["account"], + domainid=cls.domain.id + ) + cls.services["account"] = cls.accountA.account.name + cls._cleanup = [ + cls.accountA, + cls.accountB, + cls.service_offering + ] + return + + @classmethod + def tearDownClass(cls): + try: + #Cleanup resources used + cleanup_resources(cls.api_client, cls._cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + + return + + def test_multiple_account_egress_rule_positive(self): + """Test multiple account egress rules positive case + """ + + # Validate the following: + # 1. createaccount of type user A + # 2. createaccount of type user B + # 3. createsecuritygroup (SSH-A) for account A + # 4. authorizeSecurityGroupEgress in account A to allow ssh access + # only out to VMs in account B's default security group + # 5. authorizeSecurityGroupIngress in account A to allow ssh incoming + # access from anywhere into Vm's of account A. listSecurityGroups + # for account A should show two groups (default and ssh-a) and ssh + # ingress rule and account based egress rule + # 6. deployVM in account A into security group SSH-A. deployed VM + # should be Running + # 7. deployVM in account B. deployed VM should be Running + # 8. ssh into VM in account A and from there ssh to VM in account B. + # ssh should fail + + security_groupA = SecurityGroup.create( + self.apiclient, + self.services["security_group"], + account=self.accountA.account.name, + domainid=self.accountA.account.domainid + ) + self.debug("Created security group with ID: %s" % security_groupA.id) + + # Default Security group should not have any ingress rule + sercurity_groups = SecurityGroup.list( + self.apiclient, + account=self.accountA.account.name, + domainid=self.accountA.account.domainid + ) + self.assertEqual( + isinstance(sercurity_groups, list), + True, + "Check for list security groups response" + ) + + self.assertEqual( + len(sercurity_groups), + 2, + "Check List Security groups response" + ) + + security_groupB = SecurityGroup.create( + self.apiclient, + self.services["security_group"], + account=self.accountB.account.name, + domainid=self.accountB.account.domainid + ) + self.debug("Created security group with ID: %s" % security_groupB.id) + + # Default Security group should not have any ingress rule + sercurity_groups = SecurityGroup.list( + self.apiclient, + account=self.accountB.account.name, + domainid=self.accountB.account.domainid + ) + self.assertEqual( + isinstance(sercurity_groups, list), + True, + "Check for list security groups response" + ) + + self.assertEqual( + len(sercurity_groups), + 2, + "Check List Security groups response" + ) + + # Authorize Security group to SSH to VM + self.debug( + "Authorizing egress rule for sec group ID: %s for ssh access" + % security_groupA.id) + # Authorize to only account not CIDR + user_secgrp_list = {self.accountB.account.name: security_groupB.name} + + egress_rule = security_groupA.authorizeEgress( + self.apiclient, + self.services["sg_account"], + account=self.accountA.account.name, + domainid=self.accountA.account.domainid, + user_secgrp_list=user_secgrp_list + ) + + self.assertEqual( + isinstance(egress_rule, dict), + True, + "Check egress rule created properly" + ) + ssh_egress_rule = (egress_rule["egressrule"][0]).__dict__ + + # Authorize Security group to SSH to VM + self.debug( + "Authorizing ingress rule for sec group ID: %s for ssh access" + % security_groupA.id) + ingress_ruleA = security_groupA.authorize( + self.apiclient, + self.services["security_group"], + account=self.accountA.account.name, + domainid=self.accountA.account.domainid + ) + + self.assertEqual( + isinstance(ingress_ruleA, dict), + True, + "Check ingress rule created properly" + ) + + ssh_ruleA = (ingress_ruleA["ingressrule"][0]).__dict__ + + self.virtual_machineA = VirtualMachine.create( + self.apiclient, + self.services["virtual_machine"], + accountid=self.accountA.account.name, + domainid=self.accountA.account.domainid, + serviceofferingid=self.service_offering.id, + securitygroupids=[security_groupA.id] + ) + self.cleanup.append(self.virtual_machineA) + self.debug("Deploying VM in account: %s" % self.accountA.account.name) + + vms = VirtualMachine.list( + self.apiclient, + id=self.virtual_machineA.id, + listall=True + ) + self.assertEqual( + isinstance(vms, list), + True, + "List VM should return a valid list" + ) + vm = vms[0] + self.assertEqual( + vm.state, + "Running", + "VM state after deployment should be running" + ) + self.debug("VM: %s state: %s" % (vm.id, vm.state)) + + # Authorize Security group to SSH to VM + self.debug( + "Authorizing ingress rule for sec group ID: %s for ssh access" + % security_groupB.id) + ingress_ruleB = security_groupB.authorize( + self.apiclient, + self.services["security_group"], + account=self.accountB.account.name, + domainid=self.accountB.account.domainid + ) + + self.assertEqual( + isinstance(ingress_ruleB, dict), + True, + "Check ingress rule created properly" + ) + + ssh_ruleB = (ingress_ruleB["ingressrule"][0]).__dict__ + + self.virtual_machineB = VirtualMachine.create( + self.apiclient, + self.services["virtual_machine"], + accountid=self.accountB.account.name, + domainid=self.accountB.account.domainid, + serviceofferingid=self.service_offering.id, + securitygroupids=[security_groupB.id] + ) + self.cleanup.append(self.virtual_machineB) + self.debug("Deploying VM in account: %s" % self.accountB.account.name) + + vms = VirtualMachine.list( + self.apiclient, + id=self.virtual_machineB.id, + listall=True + ) + self.assertEqual( + isinstance(vms, list), + True, + "List VM should return a valid list" + ) + vm = vms[0] + self.assertEqual( + vm.state, + "Running", + "VM state after deployment should be running" + ) + self.debug("VM: %s state: %s" % (vm.id, vm.state)) + + # Should be able to SSH VM + try: + self.debug("SSH into VM: %s" % self.virtual_machineA.ssh_ip) + ssh = self.virtual_machineA.get_ssh_client() + except Exception as e: + self.fail("SSH Access failed for %s: %s" % \ + (self.virtual_machineA.ipaddress, e) + ) + + try: + self.debug("SSHing into VB type B from VM A") + self.debug("VM IP: %s" % self.virtual_machineB.ssh_ip) + + res = ssh.execute("ssh %s@%s" % ( + self.services["virtual_machine"]["username"], + self.virtual_machineB.ssh_ip + )) + self.debug("SSH result: %s" % str(res)) + + except Exception as e: + self.fail("SSH Access failed for %s: %s" % \ + (self.virtual_machineA.ipaddress, e) + ) + result = str(res) + self.assertNotEqual( + result.count("Connection timed out"), + 1, + "SSH into management server from VM should be successful" + ) + return + + +class TestStartStopVMWithEgressRule(cloudstackTestCase): + + def setUp(self): + + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.cleanup = [] + return + + def tearDown(self): + try: + self.dbclient.close() + #Clean up, terminate the created templates + cleanup_resources(self.apiclient, self.cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + @classmethod + def setUpClass(cls): + cls.services = Services().services + cls.api_client = super( + TestStartStopVMWithEgressRule, + cls + ).getClsTestClient().getApiClient() + + # Get Zone, Domain and templates + cls.domain = get_domain(cls.api_client, cls.services) + cls.zone = get_zone(cls.api_client, cls.services) + + template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostypeid"] + ) + cls.services["domainid"] = cls.domain.id + cls.services["virtual_machine"]["zoneid"] = cls.zone.id + cls.services["virtual_machine"]["template"] = template.id + + cls.service_offering = ServiceOffering.create( + cls.api_client, + cls.services["service_offering"] + ) + cls.account = Account.create( + cls.api_client, + cls.services["account"], + domainid=cls.domain.id + ) + cls.services["account"] = cls.account.account.name + cls._cleanup = [ + cls.account, + cls.service_offering + ] + return + + @classmethod + def tearDownClass(cls): + try: + #Cleanup resources used + cleanup_resources(cls.api_client, cls._cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + + return + + def test_start_stop_vm_egress(self): + """ Test stop start Vm with egress rules + """ + + # Validate the following: + # 1. createaccount of type user + # 2. createsecuritygroup (ssh) for this account + # 3. authorizeSecurityGroupIngress to allow ssh access to the VM + # 4. authorizeSecurityGroupEgress to allow ssh access only out to + # CIDR: 0.0.0.0/0 + # 5. deployVirtualMachine into this security group (ssh) + # 6. stopVirtualMachine + # 7. startVirtualMachine + # 8. ssh in to VM + + security_group = SecurityGroup.create( + self.apiclient, + self.services["security_group"], + account=self.account.account.name, + domainid=self.account.account.domainid + ) + self.debug("Created security group with ID: %s" % security_group.id) + + # Default Security group should not have any ingress rule + sercurity_groups = SecurityGroup.list( + self.apiclient, + account=self.account.account.name, + domainid=self.account.account.domainid + ) + self.assertEqual( + isinstance(sercurity_groups, list), + True, + "Check for list security groups response" + ) + + self.assertEqual( + len(sercurity_groups), + 2, + "Check List Security groups response" + ) + # Authorize Security group to SSH to VM + self.debug( + "Authorizing ingress rule for sec group ID: %s for ssh access" + % security_group.id) + ingress_rule = security_group.authorize( + self.apiclient, + self.services["security_group"], + account=self.account.account.name, + domainid=self.account.account.domainid + ) + + self.assertEqual( + isinstance(ingress_rule, dict), + True, + "Check ingress rule created properly" + ) + + ssh_rule = (ingress_rule["ingressrule"][0]).__dict__ + + self.virtual_machine = VirtualMachine.create( + self.apiclient, + self.services["virtual_machine"], + accountid=self.account.account.name, + domainid=self.account.account.domainid, + serviceofferingid=self.service_offering.id, + securitygroupids=[security_group.id] + ) + self.debug("Deploying VM in account: %s" % self.account.account.name) + + # Authorize Security group to SSH to VM + self.debug( + "Authorizing egress rule for sec group ID: %s for ssh access" + % security_group.id) + egress_rule = security_group.authorizeEgress( + self.apiclient, + self.services["security_group"], + account=self.account.account.name, + domainid=self.account.account.domainid + ) + + self.assertEqual( + isinstance(egress_rule, dict), + True, + "Check egress rule created properly" + ) + ssh_egress_rule = (egress_rule["egressrule"][0]).__dict__ + + # Stop virtual machine + self.debug("Stopping virtual machine: %s" % self.virtual_machine.id) + self.virtual_machine.stop(self.apiclient) + + vms = VirtualMachine.list( + self.apiclient, + id=self.virtual_machine.id, + listall=True + ) + self.assertEqual( + isinstance(vms, list), + True, + "List VM should return a valid list" + ) + vm = vms[0] + self.assertEqual( + vm.state, + "Stopped", + "VM state should be stopped" + ) + self.debug("VM: %s state: %s" % (vm.id, vm.state)) + + # Start virtual machine + self.debug("Starting virtual machine: %s" % self.virtual_machine.id) + self.virtual_machine.start(self.apiclient) + + vms = VirtualMachine.list( + self.apiclient, + id=self.virtual_machine.id, + listall=True + ) + self.assertEqual( + isinstance(vms, list), + True, + "List VM should return a valid list" + ) + vm = vms[0] + self.assertEqual( + vm.state, + "Running", + "VM state should be stopped" + ) + self.debug("VM: %s state: %s" % (vm.id, vm.state)) + + # Should be able to SSH VM + try: + self.debug("SSH into VM: %s" % self.virtual_machine.ssh_ip) + ssh = self.virtual_machine.get_ssh_client() + except Exception as e: + self.fail("SSH Access failed for %s: %s" % \ + (self.virtual_machine.ipaddress, e) + ) + return + + +@unittest.skip("Valid bug- ID: CS-12647") +class TestInvalidParametersForEgress(cloudstackTestCase): + + def setUp(self): + + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.cleanup = [] + return + + def tearDown(self): + try: + self.dbclient.close() + #Clean up, terminate the created templates + cleanup_resources(self.apiclient, self.cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + @classmethod + def setUpClass(cls): + cls.services = Services().services + cls.api_client = super( + TestInvalidParametersForEgress, + cls + ).getClsTestClient().getApiClient() + + # Get Zone, Domain and templates + cls.domain = get_domain(cls.api_client, cls.services) + cls.zone = get_zone(cls.api_client, cls.services) + + template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostypeid"] + ) + cls.services["domainid"] = cls.domain.id + cls.services["virtual_machine"]["zoneid"] = cls.zone.id + cls.services["virtual_machine"]["template"] = template.id + + cls.service_offering = ServiceOffering.create( + cls.api_client, + cls.services["service_offering"] + ) + cls.account = Account.create( + cls.api_client, + cls.services["account"], + domainid=cls.domain.id + ) + cls.services["account"] = cls.account.account.name + cls._cleanup = [ + cls.account, + cls.service_offering + ] + return + + @classmethod + def tearDownClass(cls): + try: + #Cleanup resources used + cleanup_resources(cls.api_client, cls._cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + + return + + def test_invalid_parameters(self): + """ Test invalid parameters for egress rules + """ + + # Validate the following: + # 1. createUserAccount + # 2. createSecurityGroup (test) + # 3. authorizeEgressRule (negative port) - Should fail + # 4. authorizeEgressRule (invalid CIDR) - Should fail + # 5. authorizeEgressRule (invalid account) - Should fail + # 6. authorizeEgressRule (22, cidr: anywhere) and + # authorizeEgressRule (22, cidr: restricted) - Should pass + # 7. authorizeEgressRule (21, cidr : 10.1.1.0/24) and + # authorizeEgressRule (21, cidr: 10.1.1.0/24) - Should fail + + security_group = SecurityGroup.create( + self.apiclient, + self.services["security_group"], + account=self.account.account.name, + domainid=self.account.account.domainid + ) + self.debug("Created security group with ID: %s" % security_group.id) + + # Default Security group should not have any ingress rule + sercurity_groups = SecurityGroup.list( + self.apiclient, + account=self.account.account.name, + domainid=self.account.account.domainid + ) + self.assertEqual( + isinstance(sercurity_groups, list), + True, + "Check for list security groups response" + ) + + self.assertEqual( + len(sercurity_groups), + 2, + "Check List Security groups response" + ) + + # Authorize Security group to SSH to VM + self.debug( + "Authorizing egress rule for sec group ID: %s with invalid port" + % security_group.id) + with self.assertRaises(Exception): + egress_rule = security_group.authorizeEgress( + self.apiclient, + self.services["sg_invalid_port"], + account=self.account.account.name, + domainid=self.account.account.domainid + ) + self.debug( + "Authorizing egress rule for sec group ID: %s with invalid cidr" + % security_group.id) + with self.assertRaises(Exception): + egress_rule = security_group.authorizeEgress( + self.apiclient, + self.services["sg_invalid_cidr"], + account=self.account.account.name, + domainid=self.account.account.domainid + ) + self.debug( + "Authorizing egress rule for sec group ID: %s with invalid account" + % security_group.id) + with self.assertRaises(Exception): + egress_rule = security_group.authorizeEgress( + self.apiclient, + self.services["security_group"], + account=random_gen(), + domainid=self.account.account.domainid + ) + self.debug( + "Authorizing egress rule for sec group ID: %s with cidr: anywhere and port: 22" + % security_group.id) + egress_rule_A = security_group.authorizeEgress( + self.apiclient, + self.services["sg_cidr_anywhere"], + account=self.account.account.name, + domainid=self.account.account.domainid + ) + + self.assertEqual( + isinstance(egress_rule_A, dict), + True, + "Check egress rule created properly" + ) + + egress_rule_R = security_group.authorizeEgress( + self.apiclient, + self.services["sg_cidr_restricted"], + account=self.account.account.name, + domainid=self.account.account.domainid + ) + + self.assertEqual( + isinstance(egress_rule_R, dict), + True, + "Check egress rule created properly" + ) + + self.debug( + "Authorizing egress rule for sec group ID: %s with duplicate port" + % security_group.id) + with self.assertRaises(Exception): + security_group.authorizeEgress( + self.apiclient, + self.services["sg_cidr_restricted"], + account=self.account.account.name, + domainid=self.account.account.domainid + ) + return + + +class TestEgressAfterHostMaintainance(cloudstackTestCase): + + def setUp(self): + + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.cleanup = [] + return + + def tearDown(self): + try: + self.dbclient.close() + #Clean up, terminate the created templates + cleanup_resources(self.apiclient, self.cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + @classmethod + def setUpClass(cls): + cls.services = Services().services + cls.api_client = super( + TestEgressAfterHostMaintainance, + cls + ).getClsTestClient().getApiClient() + + # Get Zone, Domain and templates + cls.domain = get_domain(cls.api_client, cls.services) + cls.zone = get_zone(cls.api_client, cls.services) + cls.pod = get_pod( + cls.api_client, + zoneid=cls.zone.id + ) + + template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostypeid"] + ) + cls.services["domainid"] = cls.domain.id + cls.services["virtual_machine"]["zoneid"] = cls.zone.id + cls.services["virtual_machine"]["template"] = template.id + + cls.service_offering = ServiceOffering.create( + cls.api_client, + cls.services["service_offering"] + ) + cls.account = Account.create( + cls.api_client, + cls.services["account"], + domainid=cls.domain.id + ) + cls.services["account"] = cls.account.account.name + cls._cleanup = [ + cls.account, + cls.service_offering + ] + return + + @classmethod + def tearDownClass(cls): + try: + #Cleanup resources used + cleanup_resources(cls.api_client, cls._cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + + return + + def test_egress_after_host_maintainance(self): + """Test maintenance case for egress + """ + + # Validate the following: + # 1. createaccount of type user + # 2. createsecuritygroup (ssh) for this account + # 3. authorizeSecurityGroupIngress to allow ssh access to the VM + # 4. authorizeSecurityGroupEgress to allow ssh access only out to + # CIDR: 0.0.0.0/0 + # 5. deployVirtualMachine into this security group (ssh) + # 6. deployed VM should be Running, ssh should be allowed into the VM + # 7. Enable maintainance mode for host, cance maintainance mode + # 8. User should be able to SSH into VM after maintainace + + security_group = SecurityGroup.create( + self.apiclient, + self.services["security_group"], + account=self.account.account.name, + domainid=self.account.account.domainid + ) + self.debug("Created security group with ID: %s" % security_group.id) + + # Default Security group should not have any ingress rule + sercurity_groups = SecurityGroup.list( + self.apiclient, + account=self.account.account.name, + domainid=self.account.account.domainid + ) + self.assertEqual( + isinstance(sercurity_groups, list), + True, + "Check for list security groups response" + ) + + self.assertEqual( + len(sercurity_groups), + 2, + "Check List Security groups response" + ) + # Authorize Security group to SSH to VM + self.debug( + "Authorizing ingress rule for sec group ID: %s for ssh access" + % security_group.id) + ingress_rule = security_group.authorize( + self.apiclient, + self.services["security_group"], + account=self.account.account.name, + domainid=self.account.account.domainid + ) + + self.assertEqual( + isinstance(ingress_rule, dict), + True, + "Check ingress rule created properly" + ) + + ssh_rule = (ingress_rule["ingressrule"][0]).__dict__ + + # Authorize Security group to SSH to VM + self.debug( + "Authorizing egress rule for sec group ID: %s for ssh access" + % security_group.id) + egress_rule = security_group.authorizeEgress( + self.apiclient, + self.services["security_group"], + account=self.account.account.name, + domainid=self.account.account.domainid + ) + + self.assertEqual( + isinstance(egress_rule, dict), + True, + "Check egress rule created properly" + ) + ssh_egress_rule = (egress_rule["egressrule"][0]).__dict__ + + self.virtual_machine = VirtualMachine.create( + self.apiclient, + self.services["virtual_machine"], + accountid=self.account.account.name, + domainid=self.account.account.domainid, + serviceofferingid=self.service_offering.id, + securitygroupids=[security_group.id] + ) + self.debug("Deploying VM in account: %s" % self.account.account.name) + + # Should be able to SSH VM + try: + self.debug("SSH into VM: %s" % self.virtual_machine.id) + ssh = self.virtual_machine.get_ssh_client() + except Exception as e: + self.fail("SSH Access failed for %s: %s" % \ + (self.virtual_machine.ipaddress, e) + ) + vms = VirtualMachine.list( + self.apiclient, + id=self.virtual_machine.id, + listall=True + ) + self.assertEqual( + isinstance(vms, list), + True, + "Check list VMs response for valid host" + ) + vm = vms[0] + + self.debug("Enabling host maintainance for ID: %s" % vm.hostid) + cmd = prepareHostForMaintenance.prepareHostForMaintenanceCmd() + cmd.id = vm.hostid + self.apiclient.prepareHostForMaintenance(cmd) + + self.debug("Canceling host maintainance for ID: %s" % vm.hostid) + cmd = cancelHostMaintenance.cancelHostMaintenanceCmd() + cmd.id = vm.hostid + self.apiclient.cancelHostMaintenance(cmd) + + self.debug("Waiting for SSVMs to come up") + wait_for_ssvms( + self.apiclient, + zoneid=self.zone.id, + podid=self.pod.id, + ) + self.debug("Starting VM: %s" % self.virtual_machine.id) + + self.virtual_machine.start(self.apiclient) + # Should be able to SSH VM + try: + self.debug("SSH into VM: %s" % self.virtual_machine.id) + ssh = self.virtual_machine.get_ssh_client(reconnect=True) + except Exception as e: + self.fail("SSH Access failed for %s: %s" % \ + (self.virtual_machine.ipaddress, e) + ) + return diff --git a/tools/testClient/testcase/P1-tests/test_high_availability.py b/tools/testClient/testcase/P1-tests/test_high_availability.py new file mode 100644 index 00000000000..e4ee99e2216 --- /dev/null +++ b/tools/testClient/testcase/P1-tests/test_high_availability.py @@ -0,0 +1,968 @@ +# -*- encoding: utf-8 -*- +# Copyright 2012 Citrix Systems, Inc. Licensed under the +# Apache License, Version 2.0 (the "License"); you may not use this +# file except in compliance with the License. Citrix Systems, Inc. +# reserves all rights not expressly granted by the License. +# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Automatically generated by addcopyright.py at 04/03/2012 + +""" P1 tests for high availability +""" +#Import Local Modules +from cloudstackTestCase import * +from cloudstackAPI import * +from testcase.libs.utils import * +from testcase.libs.base import * +from testcase.libs.common import * +import remoteSSHClient +import datetime + + +class Services: + """Test network offering Services + """ + + def __init__(self): + self.services = { + "account": { + "email": "test@test.com", + "firstname": "HA", + "lastname": "HA", + "username": "HA", + # Random characters are appended for unique + # username + "password": "password", + }, + "service_offering": { + "name": "Tiny Instance", + "displaytext": "Tiny Instance", + "cpunumber": 1, + "cpuspeed": 100, # in MHz + "memory": 64, # In MBs + }, + "lbrule": { + "name": "SSH", + "alg": "roundrobin", + # Algorithm used for load balancing + "privateport": 22, + "publicport": 2222, + }, + "natrule": { + "privateport": 22, + "publicport": 22, + "protocol": "TCP" + }, + "fw_rule":{ + "startport": 1, + "endport": 6000, + "cidr": '55.55.0.0/11', + # Any network (For creating FW rule) + }, + "virtual_machine": { + "displayname": "VM", + "username": "root", + "password": "password", + "ssh_port": 22, + "hypervisor": 'XenServer', + # Hypervisor type should be same as + # hypervisor type of cluster + "privateport": 22, + "publicport": 22, + "protocol": 'TCP', + }, + "ostypeid": '9958b10f-9e5d-4ef1-908d-a047372d823b', + # Cent OS 5.3 (64 bit) + "sleep": 60, + "timeout": 100, + "mode":'advanced' + } + + +class TestHighAvailability(cloudstackTestCase): + + @classmethod + def setUpClass(cls): + + cls.api_client = super( + TestHighAvailability, + cls + ).getClsTestClient().getApiClient() + cls.services = Services().services + # Get Zone, Domain and templates + cls.domain = get_domain( + cls.api_client, + cls.services + ) + cls.zone = get_zone( + cls.api_client, + cls.services + ) + cls.pod = get_pod( + cls.api_client, + zoneid=cls.zone.id, + services=cls.services + ) + cls.template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostypeid"] + ) + cls.services["virtual_machine"]["zoneid"] = cls.zone.id + cls.services["virtual_machine"]["template"] = cls.template.id + + cls.service_offering = ServiceOffering.create( + cls.api_client, + cls.services["service_offering"], + offerha=True + ) + cls._cleanup = [ + cls.service_offering, + ] + return + +# @classmethod +# def tearDownClass(cls): +# try: +# #Cleanup resources used +# cleanup_resources(cls.api_client, cls._cleanup) +# except Exception as e: +# raise Exception("Warning: Exception during cleanup : %s" % e) +# return + + def setUp(self): + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.account = Account.create( + self.apiclient, + self.services["account"], + admin=True, + domainid=self.domain.id + ) + self.cleanup = [self.account] + return + +# def tearDown(self): +# try: +# #Clean up, terminate the created accounts, domains etc +# cleanup_resources(self.apiclient, self.cleanup) +# self.testClient.close() +# except Exception as e: +# raise Exception("Warning: Exception during cleanup : %s" % e) +# return + + def test_01_host_maintenance_mode(self): + """Test host maintenance mode + """ + + # Validate the following + # 1. Create Vms. Acquire IP. Create port forwarding & load balancing + # rules for Vms. + # 2. Host 1: put to maintenance mode. All Vms should failover to Host + # 2 in cluster. Vms should be in running state. All port forwarding + # rules and load balancing Rules should work. + # 3. After failover to Host 2 succeeds, deploy Vms. Deploy Vms on host + # 2 should succeed. + # 4. Host 1: cancel maintenance mode. + # 5. Host 2 : put to maintenance mode. All Vms should failover to + # Host 1 in cluster. + # 6. After failover to Host 1 succeeds, deploy VMs. Deploy Vms on + # host 1 should succeed. + + hosts = Host.list( + self.apiclient, + zoneid=self.zone.id, + resourcestate='Enabled', + type='Routing' + ) + self.assertEqual( + isinstance(hosts, list), + True, + "List hosts should return valid host response" + ) + self.assertEqual( + len(hosts), + 2, + "There must be two hosts present in a cluster" + ) + self.debug("Checking HA with hosts: %s, %s" % ( + hosts[0].name, + hosts[1].name + )) + self.debug("Deploying VM in account: %s" % self.account.account.name) + # Spawn an instance in that network + virtual_machine = VirtualMachine.create( + self.apiclient, + self.services["virtual_machine"], + accountid=self.account.account.name, + domainid=self.account.account.domainid, + serviceofferingid=self.service_offering.id + ) + vms = VirtualMachine.list( + self.apiclient, + id=virtual_machine.id, + listall=True + ) + self.assertEqual( + isinstance(vms, list), + True, + "List VMs should return valid response for deployed VM" + ) + self.assertNotEqual( + len(vms), + 0, + "List VMs should return valid response for deployed VM" + ) + vm = vms[0] + self.debug("Deployed VM on host: %s" % vm.hostid) + self.assertEqual( + vm.state, + "Running", + "Deployed VM should be in RUnning state" + ) + networks = Network.list( + self.apiclient, + account=self.account.account.name, + domainid=self.account.account.domainid, + listall=True + ) + self.assertEqual( + isinstance(networks, list), + True, + "List networks should return valid list for the account" + ) + network = networks[0] + + self.debug("Associating public IP for account: %s" % + self.account.account.name) + public_ip = PublicIPAddress.create( + self.apiclient, + accountid=self.account.account.name, + zoneid=self.zone.id, + domainid=self.account.account.domainid, + networkid=network.id + ) + + self.debug("Associated %s with network %s" % ( + public_ip.ipaddress.ipaddress, + network.id + )) + self.debug("Creating PF rule for IP address: %s" % + public_ip.ipaddress.ipaddress) + nat_rule= NATRule.create( + self.apiclient, + virtual_machine, + self.services["natrule"], + ipaddressid=public_ip.ipaddress.id + ) + + self.debug("Creating LB rule on IP with NAT: %s" % + public_ip.ipaddress.ipaddress) + + # Create Load Balancer rule on IP already having NAT rule + lb_rule = LoadBalancerRule.create( + self.apiclient, + self.services["lbrule"], + ipaddressid=public_ip.ipaddress.id, + accountid=self.account.account.name + ) + self.debug("Created LB rule with ID: %s" % lb_rule.id) + + # Should be able to SSH VM + try: + self.debug("SSH into VM: %s" % virtual_machine.id) + ssh = virtual_machine.get_ssh_client( + ipaddress=public_ip.ipaddress.ipaddress) + except Exception as e: + self.fail("SSH Access failed for %s: %s" % \ + (virtual_machine.ipaddress, e) + ) + + first_host = vm.hostid + self.debug("Enabling maintenance mode for host %s" % vm.hostid) + cmd = prepareHostForMaintenance.prepareHostForMaintenanceCmd() + cmd.id = first_host + self.apiclient.prepareHostForMaintenance(cmd) + + self.debug("Waiting for SSVMs to come up") + wait_for_ssvms( + self.apiclient, + zoneid=self.zone.id, + podid=self.pod.id, + ) + + timeout = self.services["timeout"] + # Poll and check state of VM while it migrates from one host to another + while True: + vms = VirtualMachine.list( + self.apiclient, + id=virtual_machine.id, + listall=True + ) + self.assertEqual( + isinstance(vms, list), + True, + "List VMs should return valid response for deployed VM" + ) + self.assertNotEqual( + len(vms), + 0, + "List VMs should return valid response for deployed VM" + ) + vm = vms[0] + + self.debug("VM 1 state: %s" % vm.state) + if vm.state in ["Stopping", "Stopped", "Running", "Starting"]: + if vm.state == "Running": + break + else: + time.sleep(self.services["sleep"]) + timeout = timeout - 1 + else: + self.fail( + "VM migration from one-host-to-other failed while enabling maintenance" + ) + second_host = vm.hostid + self.assertEqual( + vm.state, + "Running", + "VM should be in Running state after enabling host maintenance" + ) + # Should be able to SSH VM + try: + self.debug("SSH into VM: %s" % virtual_machine.id) + ssh = virtual_machine.get_ssh_client( + ipaddress=public_ip.ipaddress.ipaddress) + except Exception as e: + self.fail("SSH Access failed for %s: %s" % \ + (virtual_machine.ipaddress, e) + ) + self.debug("Deploying VM in account: %s" % self.account.account.name) + # Spawn an instance on other host + virtual_machine_2 = VirtualMachine.create( + self.apiclient, + self.services["virtual_machine"], + accountid=self.account.account.name, + domainid=self.account.account.domainid, + serviceofferingid=self.service_offering.id + ) + vms = VirtualMachine.list( + self.apiclient, + id=virtual_machine_2.id, + listall=True + ) + self.assertEqual( + isinstance(vms, list), + True, + "List VMs should return valid response for deployed VM" + ) + self.assertNotEqual( + len(vms), + 0, + "List VMs should return valid response for deployed VM" + ) + vm = vms[0] + self.debug("Deployed VM on host: %s" % vm.hostid) + self.debug("VM 2 state: %s" % vm.state) + self.assertEqual( + vm.state, + "Running", + "Deployed VM should be in Running state" + ) + + self.debug("Canceling host maintenance for ID: %s" % first_host) + cmd = cancelHostMaintenance.cancelHostMaintenanceCmd() + cmd.id = first_host + self.apiclient.cancelHostMaintenance(cmd) + self.debug("Maintenance mode canceled for host: %s" % first_host) + + self.debug("Enabling maintenance mode for host %s" % second_host) + cmd = prepareHostForMaintenance.prepareHostForMaintenanceCmd() + cmd.id = second_host + self.apiclient.prepareHostForMaintenance(cmd) + self.debug("Maintenance mode enabled for host: %s" % second_host) + + self.debug("Waiting for SSVMs to come up") + wait_for_ssvms( + self.apiclient, + zoneid=self.zone.id, + podid=self.pod.id, + ) + + # Poll and check the status of VMs + timeout = self.services["timeout"] + while True: + vms = VirtualMachine.list( + self.apiclient, + account=self.account.account.name, + domainid=self.account.account.domainid, + listall=True + ) + self.assertEqual( + isinstance(vms, list), + True, + "List VMs should return valid response for deployed VM" + ) + self.assertNotEqual( + len(vms), + 0, + "List VMs should return valid response for deployed VM" + ) + vm = vms[0] + self.debug( + "VM state after enabling maintenance on first host: %s" % + vm.state) + if vm.state in ["Stopping", "Stopped", "Running", "Starting"]: + if vm.state == "Running": + break + else: + time.sleep(self.services["sleep"]) + timeout = timeout - 1 + else: + self.fail( + "VM migration from one-host-to-other failed while enabling maintenance" + ) + + for vm in vms: + self.debug( + "VM states after enabling maintenance mode on host: %s - %s" % + (first_host, vm.state)) + self.assertEqual( + vm.state, + "Running", + "Deployed VM should be in Running state" + ) + # Spawn an instance on other host + virtual_machine_3 = VirtualMachine.create( + self.apiclient, + self.services["virtual_machine"], + accountid=self.account.account.name, + domainid=self.account.account.domainid, + serviceofferingid=self.service_offering.id + ) + vms = VirtualMachine.list( + self.apiclient, + id=virtual_machine_3.id, + listall=True + ) + self.assertEqual( + isinstance(vms, list), + True, + "List VMs should return valid response for deployed VM" + ) + self.assertNotEqual( + len(vms), + 0, + "List VMs should return valid response for deployed VM" + ) + vm = vms[0] + + self.debug("Deployed VM on host: %s" % vm.hostid) + self.debug("VM 3 state: %s" % vm.state) + self.assertEqual( + vm.state, + "Running", + "Deployed VM should be in Running state" + ) + + # Should be able to SSH VM + try: + self.debug("SSH into VM: %s" % virtual_machine.id) + ssh = virtual_machine.get_ssh_client( + ipaddress=public_ip.ipaddress.ipaddress) + except Exception as e: + self.fail("SSH Access failed for %s: %s" % \ + (virtual_machine.ipaddress, e) + ) + + self.debug("Canceling host maintenance for ID: %s" % second_host) + cmd = cancelHostMaintenance.cancelHostMaintenanceCmd() + cmd.id = second_host + self.apiclient.cancelHostMaintenance(cmd) + self.debug("Maintenance mode canceled for host: %s" % second_host) + self.debug("Waiting for SSVMs to come up") + wait_for_ssvms( + self.apiclient, + zoneid=self.zone.id, + podid=self.pod.id, + ) + return + + def test_02_host_maintenance_mode_with_activities(self): + """Test host maintenance mode with activities + """ + + # Validate the following + # 1. Create Vms. Acquire IP. Create port forwarding & load balancing + # rules for Vms. + # 2. While activities are ongoing: Create snapshots, recurring + # snapshots, create templates, download volumes, Host 1: put to + # maintenance mode. All Vms should failover to Host 2 in cluster + # Vms should be in running state. All port forwarding rules and + # load balancing Rules should work. + # 3. After failover to Host 2 succeeds, deploy Vms. Deploy Vms on host + # 2 should succeed. All ongoing activities in step 3 should succeed + # 4. Host 1: cancel maintenance mode. + # 5. While activities are ongoing: Create snapshots, recurring + # snapshots, create templates, download volumes, Host 2: put to + # maintenance mode. All Vms should failover to Host 1 in cluster. + # 6. After failover to Host 1 succeeds, deploy VMs. Deploy Vms on + # host 1 should succeed. All ongoing activities in step 6 should + # succeed. + + hosts = Host.list( + self.apiclient, + zoneid=self.zone.id, + resourcestate='Enabled', + type='Routing' + ) + self.assertEqual( + isinstance(hosts, list), + True, + "List hosts should return valid host response" + ) + self.assertEqual( + len(hosts), + 2, + "There must be two hosts present in a cluster" + ) + self.debug("Checking HA with hosts: %s, %s" % ( + hosts[0].name, + hosts[1].name + )) + self.debug("Deploying VM in account: %s" % self.account.account.name) + # Spawn an instance in that network + virtual_machine = VirtualMachine.create( + self.apiclient, + self.services["virtual_machine"], + accountid=self.account.account.name, + domainid=self.account.account.domainid, + serviceofferingid=self.service_offering.id + ) + vms = VirtualMachine.list( + self.apiclient, + id=virtual_machine.id, + listall=True + ) + self.assertEqual( + isinstance(vms, list), + True, + "List VMs should return valid response for deployed VM" + ) + self.assertNotEqual( + len(vms), + 0, + "List VMs should return valid response for deployed VM" + ) + vm = vms[0] + self.debug("Deployed VM on host: %s" % vm.hostid) + self.assertEqual( + vm.state, + "Running", + "Deployed VM should be in RUnning state" + ) + networks = Network.list( + self.apiclient, + account=self.account.account.name, + domainid=self.account.account.domainid, + listall=True + ) + self.assertEqual( + isinstance(networks, list), + True, + "List networks should return valid list for the account" + ) + network = networks[0] + + self.debug("Associating public IP for account: %s" % + self.account.account.name) + public_ip = PublicIPAddress.create( + self.apiclient, + accountid=self.account.account.name, + zoneid=self.zone.id, + domainid=self.account.account.domainid, + networkid=network.id + ) + + self.debug("Associated %s with network %s" % ( + public_ip.ipaddress.ipaddress, + network.id + )) + self.debug("Creating PF rule for IP address: %s" % + public_ip.ipaddress.ipaddress) + nat_rule= NATRule.create( + self.apiclient, + virtual_machine, + self.services["natrule"], + ipaddressid=public_ip.ipaddress.id + ) + + self.debug("Creating LB rule on IP with NAT: %s" % + public_ip.ipaddress.ipaddress) + + # Create Load Balancer rule on IP already having NAT rule + lb_rule = LoadBalancerRule.create( + self.apiclient, + self.services["lbrule"], + ipaddressid=public_ip.ipaddress.id, + accountid=self.account.account.name + ) + self.debug("Created LB rule with ID: %s" % lb_rule.id) + + # Should be able to SSH VM + try: + self.debug("SSH into VM: %s" % virtual_machine.id) + ssh = virtual_machine.get_ssh_client( + ipaddress=public_ip.ipaddress.ipaddress) + except Exception as e: + self.fail("SSH Access failed for %s: %s" % \ + (virtual_machine.ipaddress, e) + ) + # Get the Root disk of VM + volumes = list_volumes( + self.apiclient, + virtualmachineid=virtual_machine.id, + type='ROOT', + listall=True + ) + volume = volumes[0] + self.debug( + "Root volume of VM(%s): %s" % ( + virtual_machine.name, + volume.name + )) + # Create a snapshot from the ROOTDISK + self.debug("Creating snapshot on ROOT volume: %s" % volume.name) + snapshot = Snapshot.create(self.apiclient, volumes[0].id) + self.debug("Snapshot created: ID - %s" % snapshot.id) + + snapshots = list_snapshots( + self.apiclient, + id=snapshot.id, + listall=True + ) + self.assertEqual( + isinstance(snapshots, list), + True, + "Check list response returns a valid list" + ) + self.assertNotEqual( + snapshots, + None, + "Check if result exists in list snapshots call" + ) + self.assertEqual( + snapshots[0].id, + snapshot.id, + "Check snapshot id in list resources call" + ) + # Generate template from the snapshot + self.debug("Generating template from snapshot: %s" % snapshot.name) + template = Template.create_from_snapshot( + self.apiclient, + snapshot, + self.services["templates"] + ) + self.cleanup.append(template) + self.debug("Created template from snapshot: %s" % template.id) + + templates = list_templates( + self.apiclient, + templatefilter=\ + self.services["templates"]["templatefilter"], + id=template.id + ) + + self.assertEqual( + isinstance(templates, list), + True, + "List template call should return the newly created template" + ) + + self.assertEqual( + templates[0].isready, + True, + "The newly created template should be in ready state" + ) + + first_host = vm.hostid + self.debug("Enabling maintenance mode for host %s" % vm.hostid) + cmd = prepareHostForMaintenance.prepareHostForMaintenanceCmd() + cmd.id = first_host + self.apiclient.prepareHostForMaintenance(cmd) + + self.debug("Waiting for SSVMs to come up") + wait_for_ssvms( + self.apiclient, + zoneid=self.zone.id, + podid=self.pod.id, + ) + + timeout = self.services["timeout"] + # Poll and check state of VM while it migrates from one host to another + while True: + vms = VirtualMachine.list( + self.apiclient, + id=virtual_machine.id, + listall=True + ) + self.assertEqual( + isinstance(vms, list), + True, + "List VMs should return valid response for deployed VM" + ) + self.assertNotEqual( + len(vms), + 0, + "List VMs should return valid response for deployed VM" + ) + vm = vms[0] + + self.debug("VM 1 state: %s" % vm.state) + if vm.state in ["Stopping", "Stopped", "Running", "Starting"]: + if vm.state == "Running": + break + else: + time.sleep(self.services["sleep"]) + timeout = timeout - 1 + else: + self.fail( + "VM migration from one-host-to-other failed while enabling maintenance" + ) + second_host = vm.hostid + self.assertEqual( + vm.state, + "Running", + "VM should be in Running state after enabling host maintenance" + ) + # Should be able to SSH VM + try: + self.debug("SSH into VM: %s" % virtual_machine.id) + ssh = virtual_machine.get_ssh_client( + ipaddress=public_ip.ipaddress.ipaddress) + except Exception as e: + self.fail("SSH Access failed for %s: %s" % \ + (virtual_machine.ipaddress, e) + ) + self.debug("Deploying VM in account: %s" % self.account.account.name) + # Spawn an instance on other host + virtual_machine_2 = VirtualMachine.create( + self.apiclient, + self.services["virtual_machine"], + accountid=self.account.account.name, + domainid=self.account.account.domainid, + serviceofferingid=self.service_offering.id + ) + vms = VirtualMachine.list( + self.apiclient, + id=virtual_machine_2.id, + listall=True + ) + self.assertEqual( + isinstance(vms, list), + True, + "List VMs should return valid response for deployed VM" + ) + self.assertNotEqual( + len(vms), + 0, + "List VMs should return valid response for deployed VM" + ) + vm = vms[0] + self.debug("Deployed VM on host: %s" % vm.hostid) + self.debug("VM 2 state: %s" % vm.state) + self.assertEqual( + vm.state, + "Running", + "Deployed VM should be in Running state" + ) + + self.debug("Canceling host maintenance for ID: %s" % first_host) + cmd = cancelHostMaintenance.cancelHostMaintenanceCmd() + cmd.id = first_host + self.apiclient.cancelHostMaintenance(cmd) + self.debug("Maintenance mode canceled for host: %s" % first_host) + + # Get the Root disk of VM + volumes = list_volumes( + self.apiclient, + virtualmachineid=virtual_machine_2.id, + type='ROOT', + listall=True + ) + volume = volumes[0] + self.debug( + "Root volume of VM(%s): %s" % ( + virtual_machine_2.name, + volume.name + )) + # Create a snapshot from the ROOTDISK + self.debug("Creating snapshot on ROOT volume: %s" % volume.name) + snapshot = Snapshot.create(self.apiclient, volumes[0].id) + self.debug("Snapshot created: ID - %s" % snapshot.id) + + snapshots = list_snapshots( + self.apiclient, + id=snapshot.id, + listall=True + ) + self.assertEqual( + isinstance(snapshots, list), + True, + "Check list response returns a valid list" + ) + self.assertNotEqual( + snapshots, + None, + "Check if result exists in list snapshots call" + ) + self.assertEqual( + snapshots[0].id, + snapshot.id, + "Check snapshot id in list resources call" + ) + # Generate template from the snapshot + self.debug("Generating template from snapshot: %s" % snapshot.name) + template = Template.create_from_snapshot( + self.apiclient, + snapshot, + self.services["templates"] + ) + self.cleanup.append(template) + self.debug("Created template from snapshot: %s" % template.id) + + templates = list_templates( + self.apiclient, + templatefilter=\ + self.services["templates"]["templatefilter"], + id=template.id + ) + + self.assertEqual( + isinstance(templates, list), + True, + "List template call should return the newly created template" + ) + + self.assertEqual( + templates[0].isready, + True, + "The newly created template should be in ready state" + ) + + self.debug("Enabling maintenance mode for host %s" % second_host) + cmd = prepareHostForMaintenance.prepareHostForMaintenanceCmd() + cmd.id = second_host + self.apiclient.prepareHostForMaintenance(cmd) + self.debug("Maintenance mode enabled for host: %s" % second_host) + + self.debug("Waiting for SSVMs to come up") + wait_for_ssvms( + self.apiclient, + zoneid=self.zone.id, + podid=self.pod.id, + ) + + # Poll and check the status of VMs + timeout = self.services["timeout"] + while True: + vms = VirtualMachine.list( + self.apiclient, + account=self.account.account.name, + domainid=self.account.account.domainid, + listall=True + ) + self.assertEqual( + isinstance(vms, list), + True, + "List VMs should return valid response for deployed VM" + ) + self.assertNotEqual( + len(vms), + 0, + "List VMs should return valid response for deployed VM" + ) + vm = vms[0] + self.debug( + "VM state after enabling maintenance on first host: %s" % + vm.state) + if vm.state in ["Stopping", "Stopped", "Running", "Starting"]: + if vm.state == "Running": + break + else: + time.sleep(self.services["sleep"]) + timeout = timeout - 1 + else: + self.fail( + "VM migration from one-host-to-other failed while enabling maintenance" + ) + + for vm in vms: + self.debug( + "VM states after enabling maintenance mode on host: %s - %s" % + (first_host, vm.state)) + self.assertEqual( + vm.state, + "Running", + "Deployed VM should be in Running state" + ) + # Spawn an instance on other host + virtual_machine_3 = VirtualMachine.create( + self.apiclient, + self.services["virtual_machine"], + accountid=self.account.account.name, + domainid=self.account.account.domainid, + serviceofferingid=self.service_offering.id + ) + vms = VirtualMachine.list( + self.apiclient, + id=virtual_machine_3.id, + listall=True + ) + self.assertEqual( + isinstance(vms, list), + True, + "List VMs should return valid response for deployed VM" + ) + self.assertNotEqual( + len(vms), + 0, + "List VMs should return valid response for deployed VM" + ) + vm = vms[0] + + self.debug("Deployed VM on host: %s" % vm.hostid) + self.debug("VM 3 state: %s" % vm.state) + self.assertEqual( + vm.state, + "Running", + "Deployed VM should be in Running state" + ) + + # Should be able to SSH VM + try: + self.debug("SSH into VM: %s" % virtual_machine.id) + ssh = virtual_machine.get_ssh_client( + ipaddress=public_ip.ipaddress.ipaddress) + except Exception as e: + self.fail("SSH Access failed for %s: %s" % \ + (virtual_machine.ipaddress, e) + ) + + self.debug("Canceling host maintenance for ID: %s" % second_host) + cmd = cancelHostMaintenance.cancelHostMaintenanceCmd() + cmd.id = second_host + self.apiclient.cancelHostMaintenance(cmd) + self.debug("Maintenance mode canceled for host: %s" % second_host) + self.debug("Waiting for SSVMs to come up") + wait_for_ssvms( + self.apiclient, + zoneid=self.zone.id, + podid=self.pod.id, + ) + return diff --git a/tools/testClient/testcase/P1-tests/test_network_offering.py b/tools/testClient/testcase/P1-tests/test_network_offering.py new file mode 100644 index 00000000000..bea2f53f5b9 --- /dev/null +++ b/tools/testClient/testcase/P1-tests/test_network_offering.py @@ -0,0 +1,1782 @@ +# -*- encoding: utf-8 -*- +# Copyright 2012 Citrix Systems, Inc. Licensed under the +# Apache License, Version 2.0 (the "License"); you may not use this +# file except in compliance with the License. Citrix Systems, Inc. +# reserves all rights not expressly granted by the License. +# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Automatically generated by addcopyright.py at 04/03/2012 + +""" P1 tests for network offering +""" +#Import Local Modules +from cloudstackTestCase import * +from cloudstackAPI import * +from testcase.libs.utils import * +from testcase.libs.base import * +from testcase.libs.common import * +import remoteSSHClient +import datetime + + +class Services: + """Test network offering Services + """ + + def __init__(self): + self.services = { + "account": { + "email": "test@test.com", + "firstname": "Test", + "lastname": "User", + "username": "test", + # Random characters are appended for unique + # username + "password": "fr3sca", + }, + "service_offering": { + "name": "Tiny Instance", + "displaytext": "Tiny Instance", + "cpunumber": 1, + "cpuspeed": 100, # in MHz + "memory": 64, # In MBs + }, + "network_offering": { + "name": 'Network offering-VR services', + "displaytext": 'Network offering-VR services', + "guestiptype": 'Isolated', + "supportedservices": 'Dhcp,Dns,SourceNat,PortForwarding,Vpn,Firewall,Lb,UserData,StaticNat', + "traffictype": 'GUEST', + "availability": 'Optional', + "serviceProviderList": { + "Dhcp": 'VirtualRouter', + "Dns": 'VirtualRouter', + "SourceNat": 'VirtualRouter', + "PortForwarding": 'VirtualRouter', + "Vpn": 'VirtualRouter', + "Firewall": 'VirtualRouter', + "Lb": 'VirtualRouter', + "UserData": 'VirtualRouter', + "StaticNat": 'VirtualRouter', + }, + }, + "network_offering_netscaler": { + "name": 'Network offering-netscaler', + "displaytext": 'Network offering-netscaler', + "guestiptype": 'Isolated', + "supportedservices": 'Dhcp,Dns,SourceNat,PortForwarding,Vpn,Firewall,Lb,UserData,StaticNat', + "traffictype": 'GUEST', + "availability": 'Optional', + "serviceProviderList": { + "Dhcp": 'VirtualRouter', + "Dns": 'VirtualRouter', + "SourceNat": 'VirtualRouter', + "PortForwarding": 'VirtualRouter', + "Vpn": 'VirtualRouter', + "Firewall": 'VirtualRouter', + "Lb": 'Netscaler', + "UserData": 'VirtualRouter', + "StaticNat": 'VirtualRouter', + }, + }, + "network": { + "name": "Test Network", + "displaytext": "Test Network", + }, + "lbrule": { + "name": "SSH", + "alg": "leastconn", + # Algorithm used for load balancing + "privateport": 22, + "publicport": 2222, + "openfirewall": False, + }, + "lbrule_port_2221": { + "name": "SSH", + "alg": "leastconn", + # Algorithm used for load balancing + "privateport": 22, + "publicport": 2221, + "openfirewall": False, + }, + "natrule": { + "privateport": 22, + "publicport": 22, + "protocol": "TCP" + }, + "natrule_port_66": { + "privateport": 22, + "publicport": 66, + "protocol": "TCP" + }, + "fw_rule": { + "startport": 1, + "endport": 6000, + "cidr": '55.55.0.0/11', + # Any network (For creating FW rule) + }, + "virtual_machine": { + "displayname": "Test VM", + "username": "root", + "password": "password", + "ssh_port": 22, + "hypervisor": 'XenServer', + # Hypervisor type should be same as + # hypervisor type of cluster + "privateport": 22, + "publicport": 22, + "protocol": 'TCP', + }, + "ostypeid": '9958b10f-9e5d-4ef1-908d-a047372d823b', + # Cent OS 5.3 (64 bit) + "sleep": 60, + "timeout": 10, + "mode": 'advanced' + } + + +class TestNOVirtualRouter(cloudstackTestCase): + + @classmethod + def setUpClass(cls): + cls.api_client = super( + TestNOVirtualRouter, + cls + ).getClsTestClient().getApiClient() + cls.services = Services().services + # Get Zone, Domain and templates + cls.domain = get_domain(cls.api_client, cls.services) + cls.zone = get_zone(cls.api_client, cls.services) + cls.template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostypeid"] + ) + cls.services["virtual_machine"]["zoneid"] = cls.zone.id + cls.services["virtual_machine"]["template"] = cls.template.id + + cls.service_offering = ServiceOffering.create( + cls.api_client, + cls.services["service_offering"] + ) + cls._cleanup = [ + cls.service_offering, + ] + return + + @classmethod + def tearDownClass(cls): + try: + #Cleanup resources used + cleanup_resources(cls.api_client, cls._cleanup) + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + def setUp(self): + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.account = Account.create( + self.apiclient, + self.services["account"], + admin=True, + domainid=self.domain.id + ) + self.cleanup = [] + return + + def tearDown(self): + try: + self.account.delete(self.apiclient) + interval = list_configurations( + self.apiclient, + name='account.cleanup.interval' + ) + # Sleep to ensure that all resources are deleted + time.sleep(int(interval[0].value) * 2) + #Clean up, terminate the created network offerings + cleanup_resources(self.apiclient, self.cleanup) + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + def test_01_network_off_without_conserve_mode(self): + """Test Network offering with Conserve mode off and VR - All services + """ + + # Validate the following + # 1. Create a Network from the above network offering and deploy a VM. + # 2. On source NAT ipaddress, we should NOT be allowed to add a + # LB rules + # 3. On source NAT ipaddress, we should be NOT be allowed to add + # PF rule + # 4. On an ipaddress that has PF rules, we should NOT be allowed to + # add a LB rules. + # 5. On an ipaddress that has Lb rules, we should NOT allow PF rules + # to be programmed. + # 6. We should be allowed to program multiple PF rules on the same Ip + # address on different public ports. + # 7. We should be allowed to program multiple LB rules on the same Ip + # address for different public port ranges. + # 8. On source NAT ipaddress, we should be allowed to Enable VPN. + # 9. On SOurce NAT ipaddress, we will be allowed to add firewall rule + + # Create a network offering with all virtual router services enabled + self.debug( + "Creating n/w offering with all services in VR & conserve mode:off" + ) + self.network_offering = NetworkOffering.create( + self.api_client, + self.services["network_offering"], + conservemode=False + ) + self.cleanup.append(self.network_offering) + + self.debug("Created n/w offering with ID: %s" % + self.network_offering.id) + # Enable Network offering + self.network_offering.update(self.apiclient, state='Enabled') + + # Creating network using the network offering created + self.debug("Creating network with network offering: %s" % + self.network_offering.id) + self.network = Network.create( + self.apiclient, + self.services["network"], + accountid=self.account.account.name, + domainid=self.account.account.domainid, + networkofferingid=self.network_offering.id, + zoneid=self.zone.id + ) + self.debug("Created network with ID: %s" % self.network.id) + + self.debug("Deploying VM in account: %s" % self.account.account.name) + + # Spawn an instance in that network + virtual_machine = VirtualMachine.create( + self.apiclient, + self.services["virtual_machine"], + accountid=self.account.account.name, + domainid=self.account.account.domainid, + serviceofferingid=self.service_offering.id, + networkids=[str(self.network.id)] + ) + self.debug("Deployed VM in network: %s" % self.network.id) + src_nat_list = PublicIPAddress.list( + self.apiclient, + associatednetworkid=self.network.id, + account=self.account.account.name, + domainid=self.account.account.domainid, + listall=True, + issourcenat=True, + ) + self.assertEqual( + isinstance(src_nat_list, list), + True, + "List Public IP should return a valid source NAT" + ) + self.assertNotEqual( + len(src_nat_list), + 0, + "Length of response from listPublicIp should not be 0" + ) + src_nat = src_nat_list[0] + self.debug("Trying to create LB rule on source NAT IP: %s" % + src_nat.ipaddress) + # Create Load Balancer rule with source NAT + with self.assertRaises(Exception): + LoadBalancerRule.create( + self.apiclient, + self.services["lbrule"], + ipaddressid=src_nat.id, + accountid=self.account.account.name + ) + self.debug( + "Trying to create a port forwarding rule in source NAT: %s" % + src_nat.ipaddress) + #Create NAT rule + with self.assertRaises(Exception): + NATRule.create( + self.apiclient, + virtual_machine, + self.services["natrule"], + ipaddressid=src_nat.id + ) + + self.debug("Associating public IP for network: %s" % self.network.id) + ip_with_nat_rule = PublicIPAddress.create( + self.apiclient, + accountid=self.account.account.name, + zoneid=self.zone.id, + domainid=self.account.account.domainid, + networkid=self.network.id + ) + + self.debug("Associated %s with network %s" % ( + ip_with_nat_rule.ipaddress.ipaddress, + self.network.id + )) + self.debug("Creating PF rule for IP address: %s" % + ip_with_nat_rule.ipaddress.ipaddress) + NATRule.create( + self.apiclient, + virtual_machine, + self.services["natrule"], + ipaddressid=ip_with_nat_rule.ipaddress.id + ) + + self.debug("Trying to create LB rule on IP with NAT: %s" % + ip_with_nat_rule.ipaddress.ipaddress) + + # Create Load Balancer rule on IP already having NAT rule + with self.assertRaises(Exception): + LoadBalancerRule.create( + self.apiclient, + self.services["lbrule"], + ipaddressid=ip_with_nat_rule.ipaddress.id, + accountid=self.account.account.name + ) + self.debug("Creating PF rule with public port: 66") + + nat_rule = NATRule.create( + self.apiclient, + virtual_machine, + self.services["natrule_port_66"], + ipaddressid=ip_with_nat_rule.ipaddress.id + ) + + # Check if NAT rule created successfully + nat_rules = NATRule.list( + self.apiclient, + id=nat_rule.id + ) + + self.assertEqual( + isinstance(nat_rules, list), + True, + "List NAT rules should return valid list" + ) + + self.debug("Associating public IP for network: %s" % self.network.id) + ip_with_lb_rule = PublicIPAddress.create( + self.apiclient, + accountid=self.account.account.name, + zoneid=self.zone.id, + domainid=self.account.account.domainid, + networkid=self.network.id + ) + self.debug("Associated %s with network %s" % ( + ip_with_lb_rule.ipaddress.ipaddress, + self.network.id + )) + self.debug("Creating LB rule for IP address: %s" % + ip_with_lb_rule.ipaddress.ipaddress) + + LoadBalancerRule.create( + self.apiclient, + self.services["lbrule"], + ipaddressid=ip_with_lb_rule.ipaddress.id, + accountid=self.account.account.name + ) + + self.debug("Trying to create PF rule on IP with LB rule: %s" % + ip_with_nat_rule.ipaddress.ipaddress) + + with self.assertRaises(Exception): + NATRule.create( + self.apiclient, + virtual_machine, + self.services["natrule"], + ipaddressid=ip_with_lb_rule.ipaddress.id + ) + + self.debug("Creating LB rule with public port: 2221") + lb_rule = LoadBalancerRule.create( + self.apiclient, + self.services["lbrule_port_2221"], + ipaddressid=ip_with_lb_rule.ipaddress.id, + accountid=self.account.account.name + ) + + # Check if NAT rule created successfully + lb_rules = LoadBalancerRule.list( + self.apiclient, + id=lb_rule.id + ) + + self.assertEqual( + isinstance(lb_rules, list), + True, + "List LB rules should return valid list" + ) + + self.debug("Creating firewall rule on source NAT: %s" % + src_nat.ipaddress) + #Create Firewall rule on source NAT + fw_rule = FireWallRule.create( + self.apiclient, + ipaddressid=src_nat.id, + protocol='TCP', + cidrlist=[self.services["fw_rule"]["cidr"]], + startport=self.services["fw_rule"]["startport"], + endport=self.services["fw_rule"]["endport"] + ) + self.debug("Created firewall rule: %s" % fw_rule.id) + + fw_rules = FireWallRule.list( + self.apiclient, + id=fw_rule.id + ) + self.assertEqual( + isinstance(fw_rules, list), + True, + "List fw rules should return a valid firewall rules" + ) + + self.assertNotEqual( + len(fw_rules), + 0, + "Length of fw rules response should not be zero" + ) + return + + def test_02_network_off_with_conserve_mode(self): + """Test Network offering with Conserve mode ON and VR - All services + """ + + # Validate the following + # 1. Create a Network from the above network offering and deploy a VM. + # 2. On source NAT ipaddress, we should be allowed to add a LB rules + # 3. On source NAT ipaddress, we should be allowed to add a PF rules + # 4. On source NAT ipaddress, we should be allowed to add a Firewall + # rules + # 5. On an ipaddress that has Lb rules, we should be allowed to + # program PF rules. + # 6. We should be allowed to program multiple PF rules on the same Ip + # address on different public ports. + # 7. We should be allowed to program multiple LB rules on the same Ip + # address for different public port ranges. + # 8. On source NAT ipaddress, we should be allowed to Enable VPN + # access. + + # Create a network offering with all virtual router services enabled + self.debug( + "Creating n/w offering with all services in VR & conserve mode:off" + ) + self.network_offering = NetworkOffering.create( + self.api_client, + self.services["network_offering"], + conservemode=True + ) + self.cleanup.append(self.network_offering) + + self.debug("Created n/w offering with ID: %s" % + self.network_offering.id) + # Enable Network offering + self.network_offering.update(self.apiclient, state='Enabled') + + # Creating network using the network offering created + self.debug("Creating network with network offering: %s" % + self.network_offering.id) + self.network = Network.create( + self.apiclient, + self.services["network"], + accountid=self.account.account.name, + domainid=self.account.account.domainid, + networkofferingid=self.network_offering.id, + zoneid=self.zone.id + ) + self.debug("Created network with ID: %s" % self.network.id) + + self.debug("Deploying VM in account: %s" % self.account.account.name) + + # Spawn an instance in that network + virtual_machine = VirtualMachine.create( + self.apiclient, + self.services["virtual_machine"], + accountid=self.account.account.name, + domainid=self.account.account.domainid, + serviceofferingid=self.service_offering.id, + networkids=[str(self.network.id)] + ) + self.debug("Deployed VM in network: %s" % self.network.id) + src_nat_list = PublicIPAddress.list( + self.apiclient, + associatednetworkid=self.network.id, + account=self.account.account.name, + domainid=self.account.account.domainid, + listall=True, + issourcenat=True, + ) + self.assertEqual( + isinstance(src_nat_list, list), + True, + "List Public IP should return a valid source NAT" + ) + self.assertNotEqual( + len(src_nat_list), + 0, + "Length of response from listPublicIp should not be 0" + ) + + src_nat = src_nat_list[0] + + self.debug("Trying to create LB rule on source NAT IP: %s" % + src_nat.ipaddress) + # Create Load Balancer rule with source NAT + lb_rule = LoadBalancerRule.create( + self.apiclient, + self.services["lbrule"], + ipaddressid=src_nat.id, + accountid=self.account.account.name + ) + self.debug("Created LB rule on source NAT: %s" % src_nat.ipaddress) + + lb_rules = LoadBalancerRule.list( + self.apiclient, + id=lb_rule.id + ) + self.assertEqual( + isinstance(lb_rules, list), + True, + "List lb rules should return a valid lb rules" + ) + self.assertNotEqual( + len(lb_rules), + 0, + "Length of response from listLbRules should not be 0" + ) + + self.debug( + "Trying to create a port forwarding rule in source NAT: %s" % + src_nat.ipaddress) + #Create NAT rule + nat_rule = NATRule.create( + self.apiclient, + virtual_machine, + self.services["natrule"], + ipaddressid=src_nat.id + ) + self.debug("Created PF rule on source NAT: %s" % src_nat.ipaddress) + + nat_rules = NATRule.list( + self.apiclient, + id=nat_rule.id + ) + self.assertEqual( + isinstance(nat_rules, list), + True, + "List NAT should return a valid port forwarding rules" + ) + self.assertNotEqual( + len(nat_rules), + 0, + "Length of response from listLbRules should not be 0" + ) + self.debug("Creating firewall rule on source NAT: %s" % + src_nat.ipaddress) + #Create Firewall rule on source NAT + fw_rule = FireWallRule.create( + self.apiclient, + ipaddressid=src_nat.id, + protocol='TCP', + cidrlist=[self.services["fw_rule"]["cidr"]], + startport=self.services["fw_rule"]["startport"], + endport=self.services["fw_rule"]["endport"] + ) + self.debug("Created firewall rule: %s" % fw_rule.id) + + fw_rules = FireWallRule.list( + self.apiclient, + id=fw_rule.id + ) + self.assertEqual( + isinstance(fw_rules, list), + True, + "List fw rules should return a valid firewall rules" + ) + + self.assertNotEqual( + len(fw_rules), + 0, + "Length of fw rules response should not be zero" + ) + + self.debug("Associating public IP for network: %s" % self.network.id) + public_ip = PublicIPAddress.create( + self.apiclient, + accountid=self.account.account.name, + zoneid=self.zone.id, + domainid=self.account.account.domainid, + networkid=self.network.id + ) + + self.debug("Associated %s with network %s" % ( + public_ip.ipaddress.ipaddress, + self.network.id + )) + self.debug("Creating PF rule for IP address: %s" % + public_ip.ipaddress.ipaddress) + NATRule.create( + self.apiclient, + virtual_machine, + self.services["natrule"], + ipaddressid=public_ip.ipaddress.id + ) + + self.debug("Trying to create LB rule on IP with NAT: %s" % + public_ip.ipaddress.ipaddress) + + # Create Load Balancer rule on IP already having NAT rule + lb_rule = LoadBalancerRule.create( + self.apiclient, + self.services["lbrule"], + ipaddressid=public_ip.ipaddress.id, + accountid=self.account.account.name + ) + self.debug("Creating PF rule with public port: 66") + + nat_rule = NATRule.create( + self.apiclient, + virtual_machine, + self.services["natrule_port_66"], + ipaddressid=public_ip.ipaddress.id + ) + + # Check if NAT rule created successfully + nat_rules = NATRule.list( + self.apiclient, + id=nat_rule.id + ) + + self.assertEqual( + isinstance(nat_rules, list), + True, + "List NAT rules should return valid list" + ) + + self.debug("Creating LB rule with public port: 2221") + lb_rule = LoadBalancerRule.create( + self.apiclient, + self.services["lbrule_port_2221"], + ipaddressid=public_ip.ipaddress.id, + accountid=self.account.account.name + ) + + # Check if NAT rule created successfully + lb_rules = LoadBalancerRule.list( + self.apiclient, + id=lb_rule.id + ) + + self.assertEqual( + isinstance(lb_rules, list), + True, + "List LB rules should return valid list" + ) + + # User should be able to enable VPN on source NAT + self.debug("Created VPN with source NAT IP: %s" % src_nat.ipaddress) + # Assign VPN to source NAT + vpn = Vpn.create( + self.apiclient, + src_nat.id, + account=self.account.account.name, + domainid=self.account.account.domainid + ) + + vpns = Vpn.list( + self.apiclient, + publicipid=src_nat.id, + listall=True, + ) + + self.assertEqual( + isinstance(vpns, list), + True, + "List VPNs should return a valid VPN list" + ) + + self.assertNotEqual( + len(vpns), + 0, + "Length of list VPN response should not be zero" + ) + return + + +class TestNOWithNetscaler(cloudstackTestCase): + + @classmethod + def setUpClass(cls): + cls.api_client = super( + TestNOWithNetscaler, + cls + ).getClsTestClient().getApiClient() + cls.services = Services().services + # Get Zone, Domain and templates + cls.domain = get_domain(cls.api_client, cls.services) + cls.zone = get_zone(cls.api_client, cls.services) + cls.template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostypeid"] + ) + cls.services["virtual_machine"]["zoneid"] = cls.zone.id + cls.services["virtual_machine"]["template"] = cls.template.id + + cls.service_offering = ServiceOffering.create( + cls.api_client, + cls.services["service_offering"] + ) + cls._cleanup = [ + cls.service_offering, + ] + return + + @classmethod + def tearDownClass(cls): + try: + #Cleanup resources used + cleanup_resources(cls.api_client, cls._cleanup) + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + def setUp(self): + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.account = Account.create( + self.apiclient, + self.services["account"], + admin=True, + domainid=self.domain.id + ) + self.cleanup = [] + return + + def tearDown(self): + try: + self.account.delete(self.apiclient) + interval = list_configurations( + self.apiclient, + name='account.cleanup.interval' + ) + # Sleep to ensure that all resources are deleted + time.sleep(int(interval[0].value) * 2) + #Clean up, terminate the created network offerings + cleanup_resources(self.apiclient, self.cleanup) + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + def test_01_network_off_without_conserve_mode(self): + """Test Nw off with Conserve mode off, VR-All services, LB-netscaler + """ + + # Validate the following + # 1. Create a Network from the above network offering and deploy a VM. + # 2. On source NAT ipaddress, we should NOT be allowed to add LB rule + # 3. On source NAT ipaddress, we should NOT be allowed to add PF rule + # 4. On an ipaddress that has PF rules, we should NOT be allowed to + # add a LB rules. + # 5. On an ipaddress that has Lb rules , we should NOT allow firewall + # rules to be programmed. + # 6. On an ipaddress that has Lb rules , we should NOT allow PF rules + # to be programmed. + # 7. We should be allowed to program multiple PF rules on the same Ip + # address on different public ports. + # 8. We should be allowed to program multiple LB rules on the same Ip + # address for different public port ranges. + # 9. On source NAT ipaddress, we should NOT be allowed to Enable VPN. + + # Create a network offering with all virtual router services enabled + self.debug( + "Creating n/w offering with all services in VR & conserve mode:ON" + ) + self.network_offering = NetworkOffering.create( + self.api_client, + self.services["network_offering_netscaler"], + conservemode=False + ) + self.cleanup.append(self.network_offering) + + self.debug("Created n/w offering with ID: %s" % + self.network_offering.id) + # Enable Network offering + self.network_offering.update(self.apiclient, state='Enabled') + + # Creating network using the network offering created + self.debug("Creating network with network offering: %s" % + self.network_offering.id) + self.network = Network.create( + self.apiclient, + self.services["network"], + accountid=self.account.account.name, + domainid=self.account.account.domainid, + networkofferingid=self.network_offering.id, + zoneid=self.zone.id + ) + self.debug("Created network with ID: %s" % self.network.id) + + self.debug("Deploying VM in account: %s" % self.account.account.name) + + # Spawn an instance in that network + virtual_machine = VirtualMachine.create( + self.apiclient, + self.services["virtual_machine"], + accountid=self.account.account.name, + domainid=self.account.account.domainid, + serviceofferingid=self.service_offering.id, + networkids=[str(self.network.id)] + ) + self.debug("Deployed VM in network: %s" % self.network.id) + src_nat_list = PublicIPAddress.list( + self.apiclient, + associatednetworkid=self.network.id, + account=self.account.account.name, + domainid=self.account.account.domainid, + listall=True, + issourcenat=True, + ) + self.assertEqual( + isinstance(src_nat_list, list), + True, + "List Public IP should return a valid source NAT" + ) + self.assertNotEqual( + len(src_nat_list), + 0, + "Length of response from listPublicIp should not be 0" + ) + + src_nat = src_nat_list[0] + + self.debug("Trying to create LB rule on source NAT IP: %s" % + src_nat.ipaddress) + # Create Load Balancer rule with source NAT + with self.assertRaises(Exception): + LoadBalancerRule.create( + self.apiclient, + self.services["lbrule"], + ipaddressid=src_nat.id, + accountid=self.account.account.name + ) + + self.debug( + "Trying to create a port forwarding rule in source NAT: %s" % + src_nat.ipaddress) + #Create NAT rule + with self.assertRaises(Exception): + NATRule.create( + self.apiclient, + virtual_machine, + self.services["natrule"], + ipaddressid=src_nat.id + ) + self.debug("Creating firewall rule on source NAT: %s" % + src_nat.ipaddress) + #Create Firewall rule on source NAT + fw_rule = FireWallRule.create( + self.apiclient, + ipaddressid=src_nat.id, + protocol='TCP', + cidrlist=[self.services["fw_rule"]["cidr"]], + startport=self.services["fw_rule"]["startport"], + endport=self.services["fw_rule"]["endport"] + ) + + self.debug("Created firewall rule: %s" % fw_rule.id) + + fw_rules = FireWallRule.list( + self.apiclient, + id=fw_rule.id + ) + self.assertEqual( + isinstance(fw_rules, list), + True, + "List fw rules should return a valid firewall rules" + ) + + self.assertNotEqual( + len(fw_rules), + 0, + "Length of fw rules response should not be zero" + ) + + self.debug("Associating public IP for network: %s" % self.network.id) + ip_with_nat_rule = PublicIPAddress.create( + self.apiclient, + accountid=self.account.account.name, + zoneid=self.zone.id, + domainid=self.account.account.domainid, + networkid=self.network.id + ) + + self.debug("Associated %s with network %s" % ( + ip_with_nat_rule.ipaddress.ipaddress, + self.network.id + )) + self.debug("Creating PF rule for IP address: %s" % + ip_with_nat_rule.ipaddress.ipaddress) + NATRule.create( + self.apiclient, + virtual_machine, + self.services["natrule"], + ipaddressid=ip_with_nat_rule.ipaddress.id + ) + + self.debug("Trying to create LB rule on IP with NAT: %s" % + ip_with_nat_rule.ipaddress.ipaddress) + + # Create Load Balancer rule on IP already having NAT rule + with self.assertRaises(Exception): + LoadBalancerRule.create( + self.apiclient, + self.services["lbrule"], + ipaddressid=ip_with_nat_rule.ipaddress.id, + accountid=self.account.account.name + ) + self.debug("Creating PF rule with public port: 66") + + nat_rule = NATRule.create( + self.apiclient, + virtual_machine, + self.services["natrule_port_66"], + ipaddressid=ip_with_nat_rule.ipaddress.id + ) + + # Check if NAT rule created successfully + nat_rules = NATRule.list( + self.apiclient, + id=nat_rule.id + ) + + self.assertEqual( + isinstance(nat_rules, list), + True, + "List NAT rules should return valid list" + ) + + self.debug("Associating public IP for network: %s" % self.network.id) + ip_with_lb_rule = PublicIPAddress.create( + self.apiclient, + accountid=self.account.account.name, + zoneid=self.zone.id, + domainid=self.account.account.domainid, + networkid=self.network.id + ) + self.debug("Associated %s with network %s" % ( + ip_with_lb_rule.ipaddress.ipaddress, + self.network.id + )) + self.debug("Creating LB rule for IP address: %s" % + ip_with_lb_rule.ipaddress.ipaddress) + + LoadBalancerRule.create( + self.apiclient, + self.services["lbrule"], + ipaddressid=ip_with_lb_rule.ipaddress.id, + accountid=self.account.account.name, + networkid=self.network.id + ) + + self.debug("Trying to create PF rule on IP with LB rule: %s" % + ip_with_nat_rule.ipaddress.ipaddress) + + with self.assertRaises(Exception): + NATRule.create( + self.apiclient, + virtual_machine, + self.services["natrule"], + ipaddressid=ip_with_lb_rule.ipaddress.id + ) + + self.debug("Trying to create FW rule on IP with LB rule") + with self.assertRaises(Exception): + FireWallRule.create( + self.apiclient, + ipaddressid=src_nat.id, + protocol='TCP', + cidrlist=[self.services["fw_rule"]["cidr"]], + startport=self.services["fw_rule"]["startport"], + endport=self.services["fw_rule"]["endport"] + ) + + self.debug("Creating LB rule with public port: 2221") + lb_rule = LoadBalancerRule.create( + self.apiclient, + self.services["lbrule_port_2221"], + ipaddressid=ip_with_lb_rule.ipaddress.id, + accountid=self.account.account.name, + networkid=self.network.id + ) + + # Check if NAT rule created successfully + lb_rules = LoadBalancerRule.list( + self.apiclient, + id=lb_rule.id + ) + + self.assertEqual( + isinstance(lb_rules, list), + True, + "List LB rules should return valid list" + ) + + # User should be able to enable VPN on source NAT + self.debug("Enabling VPN on source NAT IP: %s" % src_nat.ipaddress) + # Assign VPN to source NAT + with self.assertRaises(Exception): + Vpn.create( + self.apiclient, + src_nat.id, + account=self.account.account.name, + domainid=self.account.account.domainid + ) + return + + def test_02_network_off_with_conserve_mode_netscaler(self): + """Test NW off with Conserve mode ON, LB-Netscaler and VR-All services + """ + + # Validate the following + # 1. Create a Network from the above network offering and deploy a VM. + # 2. On source NAT ipaddress, we should NOT be allowed to add LB rule + # 3. On source NAT ipaddress, we should be allowed to add PF rule and + # Fierwall rules. + # 4. On an ipaddress that has PF rules, we should NOT be allowed to + # add a LB rules. + # 5. On an ipaddress that has Lb rules , we should NOT allow firewall + # rules to be programmed. + # 6. On an ipaddress that has Lb rules , we should NOT allow PF rules + # to be programmed. + # 7. We should be allowed to program multiple PF rules on the same Ip + # address on different public ports. + # 8. We should be allowed to program multiple LB rules on the same Ip + # address for different public port ranges. + # 9. On source NAT ipaddress, we should be allowed to Enable VPN. + + # Create a network offering with all virtual router services enabled + self.debug( + "Creating n/w offering with all services in VR & conserve mode:ON" + ) + self.network_offering = NetworkOffering.create( + self.api_client, + self.services["network_offering_netscaler"], + conservemode=True + ) + self.cleanup.append(self.network_offering) + + self.debug("Created n/w offering with ID: %s" % + self.network_offering.id) + # Enable Network offering + self.network_offering.update(self.apiclient, state='Enabled') + + # Creating network using the network offering created + self.debug("Creating network with network offering: %s" % + self.network_offering.id) + self.network = Network.create( + self.apiclient, + self.services["network"], + accountid=self.account.account.name, + domainid=self.account.account.domainid, + networkofferingid=self.network_offering.id, + zoneid=self.zone.id + ) + self.debug("Created network with ID: %s" % self.network.id) + + self.debug("Deploying VM in account: %s" % self.account.account.name) + + # Spawn an instance in that network + virtual_machine = VirtualMachine.create( + self.apiclient, + self.services["virtual_machine"], + accountid=self.account.account.name, + domainid=self.account.account.domainid, + serviceofferingid=self.service_offering.id, + networkids=[str(self.network.id)] + ) + self.debug("Deployed VM in network: %s" % self.network.id) + src_nat_list = PublicIPAddress.list( + self.apiclient, + associatednetworkid=self.network.id, + account=self.account.account.name, + domainid=self.account.account.domainid, + listall=True, + issourcenat=True, + ) + self.assertEqual( + isinstance(src_nat_list, list), + True, + "List Public IP should return a valid source NAT" + ) + self.assertNotEqual( + len(src_nat_list), + 0, + "Length of response from listPublicIp should not be 0" + ) + + src_nat = src_nat_list[0] + + self.debug("Trying to create LB rule on source NAT IP: %s" % + src_nat.ipaddress) + # Create Load Balancer rule with source NAT + with self.assertRaises(Exception): + LoadBalancerRule.create( + self.apiclient, + self.services["lbrule"], + ipaddressid=src_nat.id, + accountid=self.account.account.name + ) + + self.debug( + "Trying to create a port forwarding rule in source NAT: %s" % + src_nat.ipaddress) + #Create NAT rule + nat_rule = NATRule.create( + self.apiclient, + virtual_machine, + self.services["natrule"], + ipaddressid=src_nat.id + ) + self.debug("Created PF rule on source NAT: %s" % src_nat.ipaddress) + + nat_rules = NATRule.list( + self.apiclient, + id=nat_rule.id + ) + self.assertEqual( + isinstance(nat_rules, list), + True, + "List NAT should return a valid port forwarding rules" + ) + self.assertNotEqual( + len(nat_rules), + 0, + "Length of response from listLbRules should not be 0" + ) + self.debug("Creating firewall rule on source NAT: %s" % + src_nat.ipaddress) + #Create Firewall rule on source NAT + fw_rule = FireWallRule.create( + self.apiclient, + ipaddressid=src_nat.id, + protocol='TCP', + cidrlist=[self.services["fw_rule"]["cidr"]], + startport=self.services["fw_rule"]["startport"], + endport=self.services["fw_rule"]["endport"] + ) + self.debug("Created firewall rule: %s" % fw_rule.id) + + fw_rules = FireWallRule.list( + self.apiclient, + id=fw_rule.id + ) + self.assertEqual( + isinstance(fw_rules, list), + True, + "List fw rules should return a valid firewall rules" + ) + + self.assertNotEqual( + len(fw_rules), + 0, + "Length of fw rules response should not be zero" + ) + self.debug("Associating public IP for network: %s" % self.network.id) + ip_with_nat_rule = PublicIPAddress.create( + self.apiclient, + accountid=self.account.account.name, + zoneid=self.zone.id, + domainid=self.account.account.domainid, + networkid=self.network.id + ) + + self.debug("Associated %s with network %s" % ( + ip_with_nat_rule.ipaddress.ipaddress, + self.network.id + )) + self.debug("Creating PF rule for IP address: %s" % + ip_with_nat_rule.ipaddress.ipaddress) + NATRule.create( + self.apiclient, + virtual_machine, + self.services["natrule"], + ipaddressid=ip_with_nat_rule.ipaddress.id + ) + + self.debug("Trying to create LB rule on IP with NAT: %s" % + ip_with_nat_rule.ipaddress.ipaddress) + + # Create Load Balancer rule on IP already having NAT rule + with self.assertRaises(Exception): + LoadBalancerRule.create( + self.apiclient, + self.services["lbrule"], + ipaddressid=ip_with_nat_rule.ipaddress.id, + accountid=self.account.account.name + ) + self.debug("Creating PF rule with public port: 66") + + nat_rule = NATRule.create( + self.apiclient, + virtual_machine, + self.services["natrule_port_66"], + ipaddressid=ip_with_nat_rule.ipaddress.id + ) + + # Check if NAT rule created successfully + nat_rules = NATRule.list( + self.apiclient, + id=nat_rule.id + ) + + self.assertEqual( + isinstance(nat_rules, list), + True, + "List NAT rules should return valid list" + ) + + self.debug("Associating public IP for network: %s" % self.network.id) + ip_with_lb_rule = PublicIPAddress.create( + self.apiclient, + accountid=self.account.account.name, + zoneid=self.zone.id, + domainid=self.account.account.domainid, + networkid=self.network.id + ) + self.debug("Associated %s with network %s" % ( + ip_with_lb_rule.ipaddress.ipaddress, + self.network.id + )) + self.debug("Creating LB rule for IP address: %s" % + ip_with_lb_rule.ipaddress.ipaddress) + + LoadBalancerRule.create( + self.apiclient, + self.services["lbrule"], + ipaddressid=ip_with_lb_rule.ipaddress.id, + accountid=self.account.account.name, + networkid=self.network.id + ) + + self.debug("Trying to create PF rule on IP with LB rule: %s" % + ip_with_nat_rule.ipaddress.ipaddress) + + with self.assertRaises(Exception): + NATRule.create( + self.apiclient, + virtual_machine, + self.services["natrule"], + ipaddressid=ip_with_lb_rule.ipaddress.id + ) + + self.debug("Trying to create FW rule on IP with LB rule") + with self.assertRaises(Exception): + FireWallRule.create( + self.apiclient, + ipaddressid=src_nat.id, + protocol='TCP', + cidrlist=[self.services["fw_rule"]["cidr"]], + startport=self.services["fw_rule"]["startport"], + endport=self.services["fw_rule"]["endport"] + ) + + self.debug("Creating LB rule with public port: 2221") + lb_rule = LoadBalancerRule.create( + self.apiclient, + self.services["lbrule_port_2221"], + ipaddressid=ip_with_lb_rule.ipaddress.id, + accountid=self.account.account.name, + networkid=self.network.id + ) + + # Check if NAT rule created successfully + lb_rules = LoadBalancerRule.list( + self.apiclient, + id=lb_rule.id + ) + + self.assertEqual( + isinstance(lb_rules, list), + True, + "List LB rules should return valid list" + ) + + # User should be able to enable VPN on source NAT + self.debug("Created VPN with source NAT IP: %s" % src_nat.ipaddress) + # Assign VPN to source NAT + vpn = Vpn.create( + self.apiclient, + src_nat.id, + account=self.account.account.name, + domainid=self.account.account.domainid + ) + + vpns = Vpn.list( + self.apiclient, + publicipid=src_nat.id, + listall=True, + ) + + self.assertEqual( + isinstance(vpns, list), + True, + "List VPNs should return a valid VPN list" + ) + + self.assertNotEqual( + len(vpns), + 0, + "Length of list VNP response should not be zero" + ) + return + + +class TestNetworkUpgrade(cloudstackTestCase): + + @classmethod + def setUpClass(cls): + cls.api_client = super( + TestNetworkUpgrade, + cls + ).getClsTestClient().getApiClient() + cls.services = Services().services + # Get Zone, Domain and templates + cls.domain = get_domain(cls.api_client, cls.services) + cls.zone = get_zone(cls.api_client, cls.services) + cls.template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostypeid"] + ) + cls.services["virtual_machine"]["zoneid"] = cls.zone.id + cls.services["virtual_machine"]["template"] = cls.template.id + + cls.service_offering = ServiceOffering.create( + cls.api_client, + cls.services["service_offering"] + ) + cls.network_offering = NetworkOffering.create( + cls.api_client, + cls.services["network_offering"], + conservemode=True + ) + # Enable Network offering + cls.network_offering.update(cls.api_client, state='Enabled') + + cls._cleanup = [ + cls.service_offering, + cls.network_offering + ] + return + + @classmethod + def tearDownClass(cls): + try: + #Cleanup resources used + cleanup_resources(cls.api_client, cls._cleanup) + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + def setUp(self): + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.account = Account.create( + self.apiclient, + self.services["account"], + admin=True, + domainid=self.domain.id + ) + self.cleanup = [] + return + + def tearDown(self): + try: + self.account.delete(self.apiclient) + interval = list_configurations( + self.apiclient, + name='account.cleanup.interval' + ) + # Sleep to ensure that all resources are deleted + time.sleep(int(interval[0].value) * 2) + #Clean up, terminate the created network offerings + cleanup_resources(self.apiclient, self.cleanup) + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + def test_01_nwupgrade_netscaler_conserve_on(self): + """Test Nw upgrade to netscaler lb service and conserve mode ON + """ + + # Validate the following + # 1. Upgrade a network with VR and conserve mode ON TO + # A network that has Lb provided by "Netscaler" and all other + # services provided by "VR" and Conserve mode ON + # 2. Have PF and LB rules on the same ip address. Upgrade network + # should fail. + # 3. Have SourceNat,PF and VPN on the same IP address. Upgrade of + # network should succeed. + + # Creating network using the network offering created + self.debug("Creating network with network offering: %s" % + self.network_offering.id) + self.network = Network.create( + self.apiclient, + self.services["network"], + accountid=self.account.account.name, + domainid=self.account.account.domainid, + networkofferingid=self.network_offering.id, + zoneid=self.zone.id + ) + self.debug("Created network with ID: %s" % self.network.id) + + self.debug("Deploying VM in account: %s" % self.account.account.name) + + # Spawn an instance in that network + virtual_machine = VirtualMachine.create( + self.apiclient, + self.services["virtual_machine"], + accountid=self.account.account.name, + domainid=self.account.account.domainid, + serviceofferingid=self.service_offering.id, + networkids=[str(self.network.id)] + ) + self.debug("Deployed VM in network: %s" % self.network.id) + src_nat_list = PublicIPAddress.list( + self.apiclient, + associatednetworkid=self.network.id, + account=self.account.account.name, + domainid=self.account.account.domainid, + listall=True, + issourcenat=True, + ) + self.assertEqual( + isinstance(src_nat_list, list), + True, + "List Public IP should return a valid source NAT" + ) + self.assertNotEqual( + len(src_nat_list), + 0, + "Length of response from listPublicIp should not be 0" + ) + + src_nat = src_nat_list[0] + self.debug("Trying to create LB rule on source NAT IP: %s" % + src_nat.ipaddress) + # Create Load Balancer rule with source NAT + lb_rule = LoadBalancerRule.create( + self.apiclient, + self.services["lbrule"], + ipaddressid=src_nat.id, + accountid=self.account.account.name + ) + self.debug("Created LB rule on source NAT: %s" % src_nat.ipaddress) + + lb_rules = LoadBalancerRule.list( + self.apiclient, + id=lb_rule.id + ) + self.assertEqual( + isinstance(lb_rules, list), + True, + "List lb rules should return a valid lb rules" + ) + self.assertNotEqual( + len(lb_rules), + 0, + "Length of response from listLbRules should not be 0" + ) + + self.debug( + "Trying to create a port forwarding rule in source NAT: %s" % + src_nat.ipaddress) + #Create NAT rule + nat_rule = NATRule.create( + self.apiclient, + virtual_machine, + self.services["natrule"], + ipaddressid=src_nat.id + ) + self.debug("Created PF rule on source NAT: %s" % src_nat.ipaddress) + + nat_rules = NATRule.list( + self.apiclient, + id=nat_rule.id + ) + self.assertEqual( + isinstance(nat_rules, list), + True, + "List NAT should return a valid port forwarding rules" + ) + self.assertNotEqual( + len(nat_rules), + 0, + "Length of response from listLbRules should not be 0" + ) + # Create a network offering with all virtual router services enabled + self.debug( + "Creating n/w offering with all services in VR & conserve mode:ON LB- Netscaler" + ) + ns_lb_offering = NetworkOffering.create( + self.api_client, + self.services["network_offering_netscaler"], + conservemode=True + ) + self.cleanup.append(ns_lb_offering) + ns_lb_offering.update(self.apiclient, state='Enabled') + #Stop all the VMs associated with network to update cidr + self.debug("Stopping the VM: %s" % virtual_machine.name) + virtual_machine.stop(self.apiclient) + + self.debug("Updating network offering for network: %s" % + self.network.id) + with self.assertRaises(Exception): + self.network.update( + self.apiclient, + networkofferingid=ns_lb_offering.id, + changecidr=True + ) + + self.debug("Network upgrade failed!") + self.debug("Deleting LB Rule: %s" % lb_rule.id) + lb_rule.delete(self.apiclient) + self.debug("LB rule deleted") + + # Assign VPN to source NAT + self.debug("Enabling VPN on source NAT") + vpn = Vpn.create( + self.apiclient, + src_nat.id, + account=self.account.account.name, + domainid=self.account.account.domainid + ) + + vpns = Vpn.list( + self.apiclient, + publicipid=src_nat.id, + listall=True, + ) + + self.assertEqual( + isinstance(vpns, list), + True, + "List VPNs should return a valid VPN list" + ) + + self.assertNotEqual( + len(vpns), + 0, + "Length of list VPN response should not be zero" + ) + self.debug("Upgrading the network: %s" % self.network.id) + self.network.update( + self.apiclient, + networkofferingid=ns_lb_offering.id, + changecidr=True + ) + networks = Network.list( + self.apiclient, + id=self.network.id, + listall=True + ) + self.assertEqual( + isinstance(networks, list), + True, + "List Networks should return a valid list for given network ID" + ) + self.assertNotEqual( + len(networks), + 0, + "Length of list networks should not be 0" + ) + network = networks[0] + self.assertEqual( + network.networkofferingid, + ns_lb_offering.id, + "Network offering ID should match with new offering ID" + ) + return + + def test_02_nwupgrade_netscaler_conserve_off(self): + """Test Nw upgrade to netscaler lb service and conserve mode OFF + """ + + # Validate the following + # 1. Upgrade a network with VR and conserve mode ON TO + # A network that has Lb provided by "Netscaler" and all other + # services provided by "VR" and Conserve mode OFF + # 2. Have PF and LB rules on the same ip address. Upgrade network + # should fail. + # 3. Have SourceNat,PF and VPN on the same IP address. Upgrade of + # network should fail. + + # Creating network using the network offering created + self.debug("Creating network with network offering: %s" % + self.network_offering.id) + self.network = Network.create( + self.apiclient, + self.services["network"], + accountid=self.account.account.name, + domainid=self.account.account.domainid, + networkofferingid=self.network_offering.id, + zoneid=self.zone.id + ) + self.debug("Created network with ID: %s" % self.network.id) + + self.debug("Deploying VM in account: %s" % self.account.account.name) + + # Spawn an instance in that network + virtual_machine = VirtualMachine.create( + self.apiclient, + self.services["virtual_machine"], + accountid=self.account.account.name, + domainid=self.account.account.domainid, + serviceofferingid=self.service_offering.id, + networkids=[str(self.network.id)] + ) + self.debug("Deployed VM in network: %s" % self.network.id) + src_nat_list = PublicIPAddress.list( + self.apiclient, + associatednetworkid=self.network.id, + account=self.account.account.name, + domainid=self.account.account.domainid, + listall=True, + issourcenat=True, + ) + self.assertEqual( + isinstance(src_nat_list, list), + True, + "List Public IP should return a valid source NAT" + ) + self.assertNotEqual( + len(src_nat_list), + 0, + "Length of response from listPublicIp should not be 0" + ) + + src_nat = src_nat_list[0] + self.debug("Trying to create LB rule on source NAT IP: %s" % + src_nat.ipaddress) + # Create Load Balancer rule with source NAT + lb_rule = LoadBalancerRule.create( + self.apiclient, + self.services["lbrule"], + ipaddressid=src_nat.id, + accountid=self.account.account.name + ) + self.debug("Created LB rule on source NAT: %s" % src_nat.ipaddress) + + lb_rules = LoadBalancerRule.list( + self.apiclient, + id=lb_rule.id + ) + self.assertEqual( + isinstance(lb_rules, list), + True, + "List lb rules should return a valid lb rules" + ) + self.assertNotEqual( + len(lb_rules), + 0, + "Length of response from listLbRules should not be 0" + ) + + self.debug( + "Trying to create a port forwarding rule in source NAT: %s" % + src_nat.ipaddress) + #Create NAT rule + nat_rule = NATRule.create( + self.apiclient, + virtual_machine, + self.services["natrule"], + ipaddressid=src_nat.id + ) + self.debug("Created PF rule on source NAT: %s" % src_nat.ipaddress) + + nat_rules = NATRule.list( + self.apiclient, + id=nat_rule.id + ) + self.assertEqual( + isinstance(nat_rules, list), + True, + "List NAT should return a valid port forwarding rules" + ) + self.assertNotEqual( + len(nat_rules), + 0, + "Length of response from listLbRules should not be 0" + ) + # Create a network offering with all virtual router services enabled + self.debug( + "Creating n/w offering with all services in VR & conserve mode:ON LB- Netscaler" + ) + ns_lb_offering = NetworkOffering.create( + self.api_client, + self.services["network_offering_netscaler"], + conservemode=False + ) + self.cleanup.append(ns_lb_offering) + ns_lb_offering.update(self.apiclient, state='Enabled') + #Stop all the VMs associated with network to update cidr + self.debug("Stopping the VM: %s" % virtual_machine.name) + virtual_machine.stop(self.apiclient) + + self.debug("Updating network offering for network: %s" % + self.network.id) + with self.assertRaises(Exception): + self.network.update( + self.apiclient, + networkofferingid=ns_lb_offering.id, + changecidr=True + ) + + self.debug("Network upgrade failed!") + self.debug("Deleting LB Rule: %s" % lb_rule.id) + lb_rule.delete(self.apiclient) + self.debug("LB rule deleted") + + # Assign VPN to source NAT + self.debug("Enabling VPN on source NAT") + vpn = Vpn.create( + self.apiclient, + src_nat.id, + account=self.account.account.name, + domainid=self.account.account.domainid + ) + + vpns = Vpn.list( + self.apiclient, + publicipid=src_nat.id, + listall=True, + ) + + self.assertEqual( + isinstance(vpns, list), + True, + "List VPNs should return a valid VPN list" + ) + + self.assertNotEqual( + len(vpns), + 0, + "Length of list VPN response should not be zero" + ) + self.debug("Upgrading the network: %s" % self.network.id) + with self.assertRaises(Exception): + self.network.update( + self.apiclient, + networkofferingid=ns_lb_offering.id, + changecidr=True + ) + return diff --git a/tools/testClient/testcase/P1-tests/test_project_configs.py b/tools/testClient/testcase/P1-tests/test_project_configs.py index e7abab519bd..eb3a36c4260 100644 --- a/tools/testClient/testcase/P1-tests/test_project_configs.py +++ b/tools/testClient/testcase/P1-tests/test_project_configs.py @@ -9,7 +9,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -# +# # Automatically generated by addcopyright.py at 04/03/2012 """ P1 tests for Project """ @@ -40,7 +40,7 @@ class Services: "ipaddress": '192.168.100.21', "username": 'root', "password": 'fr3sca', - "port": 22, + "port": 22, }, "account": { "email": "administrator@clogeny.com", @@ -52,7 +52,7 @@ class Services: "password": "fr3sca", }, "user": { - "email": "mayur.dhande@clogeny.com", + "email": "administrator@clogeny.com", "firstname": "User", "lastname": "User", "username": "User", @@ -118,7 +118,7 @@ class TestUserProjectCreation(cloudstackTestCase): cls.services = Services().services # Get Zone cls.zone = get_zone(cls.api_client, cls.services) - + # Create domains, account etc. cls.domain = Domain.create( cls.api_client, @@ -131,14 +131,14 @@ class TestUserProjectCreation(cloudstackTestCase): admin=True, domainid=cls.domain.id ) - + cls.user = Account.create( cls.api_client, cls.services["account"], admin=True, domainid=cls.domain.id ) - + cls._cleanup = [cls.account, cls.user, cls.domain] return @@ -174,7 +174,7 @@ class TestUserProjectCreation(cloudstackTestCase): # 1. Check if 'allow.user.create.projects' configuration is true # 2. Create a Project as domain admin # 3. Create a Project as domain user - # 4. In both 2 and 3 project creation should be successful + # 4. In both 2 and 3 project creation should be successful configs = Configurations.list( self.apiclient, diff --git a/tools/testClient/testcase/P1-tests/test_project_limits.py b/tools/testClient/testcase/P1-tests/test_project_limits.py index dbc677b2417..cfb4a247e2e 100644 --- a/tools/testClient/testcase/P1-tests/test_project_limits.py +++ b/tools/testClient/testcase/P1-tests/test_project_limits.py @@ -447,7 +447,7 @@ class TestResourceLimitsProject(cloudstackTestCase): @classmethod def setUpClass(cls): - cls.api_client = super(TestResourceLimitsDomain, cls).getClsTestClient().getApiClient() + cls.api_client = super(TestResourceLimitsProject, cls).getClsTestClient().getApiClient() cls.services = Services().services # Get Zone, Domain and templates cls.zone = get_zone(cls.api_client, cls.services) diff --git a/tools/testClient/testcase/P1-tests/test_projects.py b/tools/testClient/testcase/P1-tests/test_projects.py index 55fa2f95ec0..aee9070e583 100644 --- a/tools/testClient/testcase/P1-tests/test_projects.py +++ b/tools/testClient/testcase/P1-tests/test_projects.py @@ -87,7 +87,7 @@ class Services: "publicport": 22, "protocol": 'TCP', }, - "ostypeid": '5776c0d2-f331-42db-ba3a-29f1f8319bc9', + "ostypeid": '8531d1df-faac-4895-a741-238d3b10e6e6', # Cent OS 5.3 (64 bit) "sleep": 60, "timeout": 10, @@ -164,6 +164,23 @@ class TestMultipleProjectCreation(cloudstackTestCase): # 2. add one account to multiple project. Verify at step 2 an account # is allowed to added to multiple project + # Verify 'project.invite.required' is set to false + configs = Configurations.list( + self.apiclient, + name='project.invite.required' + ) + self.assertEqual( + isinstance(configs, list), + True, + "Check for a valid list configurations response" + ) + config = configs[0] + self.assertEqual( + (config.value).lower(), + 'false', + "'project.invite.required' should be set to false" + ) + # Create project as a domain admin project_1 = Project.create( self.apiclient, @@ -181,7 +198,6 @@ class TestMultipleProjectCreation(cloudstackTestCase): id=project_1.id, listall=True ) - self.assertEqual( isinstance(list_projects_reponse, list), True, @@ -369,6 +385,23 @@ class TestCrossDomainAccountAdd(cloudstackTestCase): # 2. Add different domain account to the project. Add account should # fail + # Verify 'project.invite.required' is set to false + configs = Configurations.list( + self.apiclient, + name='project.invite.required' + ) + self.assertEqual( + isinstance(configs, list), + True, + "Check for a valid list configurations response" + ) + config = configs[0] + self.assertEqual( + (config.value).lower(), + 'false', + "'project.invite.required' should be set to false" + ) + # Create project as a domain admin project = Project.create( self.apiclient, @@ -479,6 +512,23 @@ class TestDeleteAccountWithProject(cloudstackTestCase): # 2. Delete account who is owner of the project. Delete account should # fail + # Verify 'project.invite.required' is set to false + configs = Configurations.list( + self.apiclient, + name='project.invite.required' + ) + self.assertEqual( + isinstance(configs, list), + True, + "Check for a valid list configurations response" + ) + config = configs[0] + self.assertEqual( + (config.value).lower(), + 'false', + "'project.invite.required' should be set to false" + ) + # Create project as a domain admin project = Project.create( self.apiclient, @@ -582,6 +632,23 @@ class TestDeleteDomainWithProject(cloudstackTestCase): # 2. Delete domain forcefully. Verify that project is also deleted as # as part of domain cleanup + # Verify 'project.invite.required' is set to false + configs = Configurations.list( + self.apiclient, + name='project.invite.required' + ) + self.assertEqual( + isinstance(configs, list), + True, + "Check for a valid list configurations response" + ) + config = configs[0] + self.assertEqual( + (config.value).lower(), + 'false', + "'project.invite.required' should be set to false" + ) + # Create project as a domain admin project = Project.create( self.apiclient, @@ -718,6 +785,23 @@ class TestProjectOwners(cloudstackTestCase): # owner. verify new user is project owner and old account is # regular user of the project. + # Verify 'project.invite.required' is set to false + configs = Configurations.list( + self.apiclient, + name='project.invite.required' + ) + self.assertEqual( + isinstance(configs, list), + True, + "Check for a valid list configurations response" + ) + config = configs[0] + self.assertEqual( + (config.value).lower(), + 'false', + "'project.invite.required' should be set to false" + ) + # Create project as a domain admin project = Project.create( self.apiclient, @@ -859,6 +943,23 @@ class TestProjectOwners(cloudstackTestCase): # owner. # 3. Update project to add another account as an owner + # Verify 'project.invite.required' is set to false + configs = Configurations.list( + self.apiclient, + name='project.invite.required' + ) + self.assertEqual( + isinstance(configs, list), + True, + "Check for a valid list configurations response" + ) + config = configs[0] + self.assertEqual( + (config.value).lower(), + 'false', + "'project.invite.required' should be set to false" + ) + # Create project as a domain admin project = Project.create( self.apiclient, @@ -1140,6 +1241,23 @@ class TestProjectResources(cloudstackTestCase): # 3. Delete the account. Verify resources are still there after # account deletion. + # Verify 'project.invite.required' is set to false + configs = Configurations.list( + self.apiclient, + name='project.invite.required' + ) + self.assertEqual( + isinstance(configs, list), + True, + "Check for a valid list configurations response" + ) + config = configs[0] + self.assertEqual( + (config.value).lower(), + 'false', + "'project.invite.required' should be set to false" + ) + # Create project as a domain admin project = Project.create( self.apiclient, @@ -1256,6 +1374,23 @@ class TestProjectResources(cloudstackTestCase): # account deletion. # 4. Verify all accounts are unassigned from project. + # Verify 'project.invite.required' is set to false + configs = Configurations.list( + self.apiclient, + name='project.invite.required' + ) + self.assertEqual( + isinstance(configs, list), + True, + "Check for a valid list configurations response" + ) + config = configs[0] + self.assertEqual( + (config.value).lower(), + 'false', + "'project.invite.required' should be set to false" + ) + # Create project as a domain admin project = Project.create( self.apiclient, @@ -1460,6 +1595,23 @@ class TestProjectSuspendActivate(cloudstackTestCase): # 3. Delete the account. Verify resources are still there after # account deletion. + # Verify 'project.invite.required' is set to false + configs = Configurations.list( + self.apiclient, + name='project.invite.required' + ) + self.assertEqual( + isinstance(configs, list), + True, + "Check for a valid list configurations response" + ) + config = configs[0] + self.assertEqual( + (config.value).lower(), + 'false', + "'project.invite.required' should be set to false" + ) + self.debug("Adding %s user to project: %s" % ( self.user.account.name, self.project.name @@ -1594,6 +1746,23 @@ class TestProjectSuspendActivate(cloudstackTestCase): # 1. Activate the project # 2. Verify project is activated and we are able to add resources + # Verify 'project.invite.required' is set to false + configs = Configurations.list( + self.apiclient, + name='project.invite.required' + ) + self.assertEqual( + isinstance(configs, list), + True, + "Check for a valid list configurations response" + ) + config = configs[0] + self.assertEqual( + (config.value).lower(), + 'false', + "'project.invite.required' should be set to false" + ) + # Activating the project self.debug("Activating project: %s" % self.project.name) self.project.activate(self.apiclient) diff --git a/tools/testClient/testcase/libs/base.py b/tools/testClient/testcase/libs/base.py index 854f06c2ce6..638ff8cd577 100644 --- a/tools/testClient/testcase/libs/base.py +++ b/tools/testClient/testcase/libs/base.py @@ -878,7 +878,7 @@ class ServiceOffering: self.__dict__.update(items) @classmethod - def create(cls, apiclient, services, domainid=None): + def create(cls, apiclient, services, domainid=None, **kwargs): """Create Service offering""" cmd = createServiceOffering.createServiceOfferingCmd() cmd.cpunumber = services["cpunumber"] @@ -891,6 +891,7 @@ class ServiceOffering: if domainid: cmd.domainid = domainid + [setattr(cmd, k, v) for k, v in kwargs.items()] return ServiceOffering(apiclient.createServiceOffering(cmd).__dict__) def delete(self, apiclient): @@ -953,12 +954,12 @@ class NetworkOffering: self.__dict__.update(items) @classmethod - def create(cls, apiclient, services, serviceProviderList=None, **kwargs): + def create(cls, apiclient, services, **kwargs): """Create network offering""" cmd = createNetworkOffering.createNetworkOfferingCmd() - cmd.displaytext = services["displaytext"] - cmd.name = services["name"] + cmd.displaytext = "-".join([services["displaytext"], random_gen()]) + cmd.name = "-".join([services["name"], random_gen()]) cmd.guestiptype = services["guestiptype"] cmd.supportedservices = services["supportedservices"] cmd.traffictype = services["traffictype"] @@ -1039,7 +1040,7 @@ class LoadBalancerRule: @classmethod def create(cls, apiclient, services, ipaddressid, accountid=None, - projectid=None): + networkid=None, projectid=None): """Create Load balancing Rule""" cmd = createLoadBalancerRule.createLoadBalancerRuleCmd() @@ -1055,9 +1056,14 @@ class LoadBalancerRule: cmd.privateport = services["privateport"] cmd.publicport = services["publicport"] + if "openfirewall" in services: + cmd.openfirewall = services["openfirewall"] + if projectid: cmd.projectid = projectid - + + if networkid: + cmd.networkid = networkid return LoadBalancerRule(apiclient.createLoadBalancerRule(cmd).__dict__) def delete(self, apiclient): @@ -1322,6 +1328,14 @@ class Network: cmd.id = self.id apiclient.deleteNetwork(cmd) + def update(self, apiclient, **kwargs): + """Updates network with parameters passed""" + + cmd = updateNetwork.updateNetworkCmd() + cmd.id = self.id + [setattr(cmd, k, v) for k, v in kwargs.items()] + return(apiclient.updateNetwork(cmd)) + @classmethod def list(cls, apiclient, **kwargs): """List all Networks matching criteria""" @@ -1358,6 +1372,14 @@ class Vpn: cmd.publicipid = self.publicipid apiclient.deleteRemoteAccessVpn(cmd) + @classmethod + def list(cls, apiclient, **kwargs): + """List all VPN matching criteria""" + + cmd = listRemoteAccessVpns.listRemoteAccessVpnsCmd() + [setattr(cmd, k, v) for k, v in kwargs.items()] + return(apiclient.listRemoteAccessVpns(cmd)) + class VpnUser: """Manage VPN user""" @@ -1390,6 +1412,14 @@ class VpnUser: cmd.domainid = self.domainid apiclient.removeVpnUser(cmd) + @classmethod + def list(cls, apiclient, **kwargs): + """List all VPN Users matching criteria""" + + cmd = listVpnUsers.listVpnUsersCmd() + [setattr(cmd, k, v) for k, v in kwargs.items()] + return(apiclient.listVpnUsers(cmd)) + class Zone: """Manage Zone""" @@ -1640,7 +1670,48 @@ class SecurityGroup: cmd=revokeSecurityGroupIngress.revokeSecurityGroupIngressCmd() cmd.id=id return apiclient.revokeSecurityGroupIngress(cmd) + + def authorizeEgress(self, apiclient, services, account=None, domainid=None, + projectid=None, user_secgrp_list = {}): + """Authorize Egress Rule""" + + cmd=authorizeSecurityGroupEgress.authorizeSecurityGroupEgressCmd() + + if domainid: + cmd.domainid = domainid + if account: + cmd.account = account + + if projectid: + cmd.projectid = projectid + cmd.securitygroupid=self.id + cmd.protocol=services["protocol"] + + if services["protocol"] == 'ICMP': + cmd.icmptype = -1 + cmd.icmpcode = -1 + else: + cmd.startport = services["startport"] + cmd.endport = services["endport"] + + cmd.cidrlist = services["cidrlist"] + + cmd.usersecuritygrouplist = [] + for account, group in user_secgrp_list.items(): + cmd.usersecuritygrouplist.append({ + 'account' : account, + 'group': group + }) + + return (apiclient.authorizeSecurityGroupEgress(cmd).__dict__) + def revokeEgress(self, apiclient, id): + """Revoke Egress rule""" + + cmd=revokeSecurityGroupEgress.revokeSecurityGroupEgressCmd() + cmd.id=id + return apiclient.revokeSecurityGroupEgress(cmd) + @classmethod def list(cls, apiclient, **kwargs): """Lists all security groups.""" @@ -1787,4 +1858,4 @@ class Configurations: cmd = listConfigurations.listConfigurationsCmd() [setattr(cmd, k, v) for k, v in kwargs.items()] - return(apiclient.listConfigurations(cmd)) \ No newline at end of file + return(apiclient.listConfigurations(cmd))