diff --git a/tools/testClient/testcase/P1-tests/__init__.py b/tools/testClient/testcase/P1-tests/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tools/testClient/testcase/P1-tests/test_accounts.py b/tools/testClient/testcase/P1-tests/test_accounts.py new file mode 100644 index 00000000000..2657a8e2252 --- /dev/null +++ b/tools/testClient/testcase/P1-tests/test_accounts.py @@ -0,0 +1,1139 @@ +# -*- encoding: utf-8 -*- +# +# Copyright (c) 2012 Citrix. All rights reserved. +# +""" P1 tests for Account +""" +#Import Local Modules +from cloudstackTestCase import * +from cloudstackAPI import * +from testcase.libs.utils import * +from testcase.libs.base import * +from testcase.libs.common import * +import remoteSSHClient +import datetime + + +class Services: + """Test Account Services + """ + + def __init__(self): + self.services = { + "domain": { + "name": "Domain", + }, + "zone": { + "dns1": '121.242.190.180', + "internaldns1": '192.168.100.1', + "name" : "Test Zone", + "networktype" : "Basic", + "dns2": '121.242.190.211', + }, + "pod": { + "name": "Test Pod", + "gateway": '192.168.100.1', + "netmask": '255.255.255.0', + "startip": '192.168.100.136', + "endip": '192.168.100.141', + }, + "public_ip": { + "gateway": '192.168.100.1', + "netmask": '255.255.255.0', + "forvirtualnetwork": False, + "startip": '192.168.100.136', + "endip": '192.168.100.141', + "vlan": "untagged", + }, + "cluster": { + "clustername": "Xen Cluster", + "clustertype": "CloudManaged", + # CloudManaged or ExternalManaged" + "hypervisor": "XenServer", + # Hypervisor type + }, + "host": { + "hypervisor": 'XenServer', + # Hypervisor type + "clustertype": 'CloudManaged', + # CloudManaged or ExternalManaged" + "url": 'http://192.168.100.211', + "username": "root", + "password": "fr3sca", + "port": 22, + "ipaddress": '192.168.100.211' + }, + + "primary_storage": { + "name": "Test Primary", + "url": "nfs://192.168.100.131/Primary3", + # Format: File_System_Type/Location/Path + }, + "sec_storage": { + "url": "nfs://192.168.100.131/SecStorage" + # Format: File_System_Type/Location/Path + + + }, + "mgmt_server": { + "ipaddress": '192.168.100.154', + "port": 22, + "username": 'root', + "password": 'fr3sca', + }, + "sysVM": { + "mnt_dir": '/mnt/test', + "sec_storage": '192.168.100.131', + "path": 'TestSec', + "command": '/usr/lib64/cloud/agent/scripts/storage/secondary/cloud-install-sys-tmplt', + "download_url": 'http://download.cloud.com/releases/2.2.0/systemvm.vhd.bz2', + "hypervisor": "xenserver", + }, + "account": { + "email": "test@test.com", + "firstname": "Test", + "lastname": "User", + "username": "test", + # Random characters are appended for unique + # username + "password": "fr3sca", + }, + "user": { + "email": "user@test.com", + "firstname": "User", + "lastname": "User", + "username": "User", + # Random characters are appended for unique + # username + "password": "fr3sca", + }, + "service_offering": { + "name": "Tiny Instance", + "displaytext": "Tiny Instance", + "cpunumber": 1, + "cpuspeed": 100, # in MHz + "memory": 64, # In MBs + }, + "virtual_machine": { + "displayname": "Test VM", + "username": "root", + "password": "password", + "ssh_port": 22, + "hypervisor": 'XenServer', + # Hypervisor type should be same as + # hypervisor type of cluster + "domainid": 1, + "privateport": 22, + "publicport": 22, + "protocol": 'TCP', + }, + "template": { + "displaytext": "Public Template", + "name": "Public template", + "ostypeid": 126, + "url": "http://download.cloud.com/releases/2.0.0/UbuntuServer-10-04-64bit.vhd.bz2", + "hypervisor": 'XenServer', + "format" : 'VHD', + "isfeatured": True, + "ispublic": True, + "isextractable": True, + }, + "domainid": 1, + "ostypeid": 12, + # Cent OS 5.3 (64 bit) + "zoneid": 1, + # Optional, if specified the mentioned zone will be + # used for tests + "sleep": 60, + "mode":'advanced' + } + + +class TestAccounts(cloudstackTestCase): + + @classmethod + def setUpClass(cls): + cls.api_client = fetch_api_client() + cls.services = Services().services + # Get Zone, Domain and templates + cls.zone = get_zone(cls.api_client, cls.services) + cls.template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostypeid"] + ) + cls.services["virtual_machine"]["zoneid"] = cls.zone.id + cls.services["virtual_machine"]["template"] = cls.template.id + + cls.service_offering = ServiceOffering.create( + cls.api_client, + cls.services["service_offering"] + ) + cls._cleanup = [cls.service_offering] + return + + @classmethod + def tearDownClass(cls): + try: + #Cleanup resources used + cleanup_resources(cls.api_client, cls._cleanup) + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + def setUp(self): + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.cleanup = [] + return + + def tearDown(self): + try: + #Clean up, terminate the created accounts, domains etc + cleanup_resources(self.apiclient, self.cleanup) + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + def test_01_create_account(self): + """Test Create Account and user for that account + """ + + # Validate the following + # 1. Create an Account. Verify the account is created. + # 2. Create User associated with that account. Verify the created user + + # Create an account + account = Account.create( + self.apiclient, + self.services["account"] + ) + self.cleanup.append(account) + list_accounts_response = list_accounts( + self.apiclient, + id=account.account.id + ) + self.assertNotEqual( + len(list_accounts_response), + 0, + "Check List Account response" + ) + + account_response = list_accounts_response[0] + self.assertEqual( + account.account.accounttype, + account_response.accounttype, + "Check Account Type of Created account" + ) + self.assertEqual( + account.account.name, + account_response.name, + "Check Account Name of Created account" + ) + # Create an User associated with account + user = User.create( + self.apiclient, + self.services["user"], + account=account.account.name, + domainid=account.account.domainid + ) + + list_users_response = list_users( + self.apiclient, + id=user.id + ) + + self.assertNotEqual( + len(list_users_response), + 0, + "Check List User response" + ) + + user_response = list_users_response[0] + self.assertEqual( + user.username, + user_response.username, + "Check username of Created user" + ) + self.assertEqual( + user.state, + user_response.state, + "Check state of created user" + ) + return + + +class TestRemoveUserFromAccount(cloudstackTestCase): + + @classmethod + def setUpClass(cls): + cls.api_client = fetch_api_client() + cls.services = Services().services + # Get Zone, Domain and templates + cls.zone = get_zone(cls.api_client, cls.services) + cls.template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostypeid"] + ) + cls.services["virtual_machine"]["zoneid"] = cls.zone.id + cls.services["virtual_machine"]["template"] = cls.template.id + + cls.service_offering = ServiceOffering.create( + cls.api_client, + cls.services["service_offering"] + ) + # Create an account + cls.account = Account.create( + cls.api_client, + cls.services["account"] + ) + + cls._cleanup = [ + cls.service_offering, + ] + return + + @classmethod + def tearDownClass(cls): + try: + #Cleanup resources used + cleanup_resources(cls.api_client, cls._cleanup) + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + def setUp(self): + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.cleanup = [] + return + + def tearDown(self): + try: + #Clean up, terminate the created instance, users etc + cleanup_resources(self.apiclient, self.cleanup) + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + def test_01_user_remove_VM_running(self): + """Test Remove one user from the account + """ + + # Validate the following + # 1. Create an account with 2 users. + # 2. Start 2 VMs; one for each user of the account + # 3. Remove one user from the account. Verify that account still exists. + # 4. Verify that VM started by the removed user are still running + + # Create an User associated with account and VMs + user_1 = User.create( + self.apiclient, + self.services["user"], + account=self.account.account.name, + domainid=self.account.account.domainid + ) + + user_2 = User.create( + self.apiclient, + self.services["user"], + account=self.account.account.name, + domainid=self.account.account.domainid + ) + self.cleanup.append(user_2) + + vm_1 = VirtualMachine.create( + self.apiclient, + self.services["virtual_machine"], + accountid=self.account.account.name, + serviceofferingid=self.service_offering.id + ) + self.cleanup.append(vm_1) + + vm_2 = VirtualMachine.create( + self.apiclient, + self.services["virtual_machine"], + accountid=self.account.account.name, + serviceofferingid=self.service_offering.id + ) + self.cleanup.append(vm_2) + + # Remove one of the user + user_1.delete(self.apiclient) + + # Account should exist after deleting user + accounts_response = list_accounts( + self.apiclient, + id=self.account.account.id + ) + self.assertNotEqual( + len(accounts_response), + 0, + "Check List Account response" + ) + vm_response = list_virtual_machines( + self.apiclient, + account=self.account.account.name, + domainid=self.account.account.domainid + ) + + self.assertNotEqual( + len(vm_response), + 0, + "Check List VM response" + ) + + # VMs associated with that account should be running + for vm in vm_response: + self.assertEqual( + vm.state, + 'Running', + "Check state of VMs associated with account" + ) + return + + def test_02_remove_all_users(self): + """Test Remove both users from the account + """ + + # Validate the following + # 1. Remove both the users from the account. + # 2. Verify account is removed + # 3. Verify all VMs associated with that account got removed + + # Create an User associated with account and VMs + user_1 = User.create( + self.apiclient, + self.services["user"], + account=self.account.account.name, + domainid=self.account.account.domainid + ) + + user_2 = User.create( + self.apiclient, + self.services["user"], + account=self.account.account.name, + domainid=self.account.account.domainid + ) + + vm_1 = VirtualMachine.create( + self.apiclient, + self.services["virtual_machine"], + accountid=self.account.account.name, + serviceofferingid=self.service_offering.id + ) + + vm_2 = VirtualMachine.create( + self.apiclient, + self.services["virtual_machine"], + accountid=self.account.account.name, + serviceofferingid=self.service_offering.id + ) + + # Get users associated with an account + # (Total 3: 2 - Created & 1 default generated while account creation) + users = list_users( + self.apiclient, + account=self.account.account.name, + domainid=self.account.account.domainid + ) + for user in users: + cmd = deleteUser.deleteUserCmd() + cmd.id = user.id + self.apiclient.deleteUser(cmd) + + interval = list_configurations( + self.apiclient, + name='account.cleanup.interval' + ) + # Sleep to ensure that all resources are deleted + time.sleep(int(interval[0].value)) + + # Account is removed after last user is deleted + account_response = list_accounts( + self.apiclient, + id=self.account.account.id + ) + self.assertEqual( + account_response, + None, + "Check List VM response" + ) + # All VMs associated with account are removed. + vm_response = list_virtual_machines( + self.apiclient, + account=self.account.account.name, + domainid=self.account.account.domainid + ) + + self.assertEqual( + vm_response, + None, + "Check List VM response" + ) + # DomR associated with account is deleted + with self.assertRaises(Exception): + list_routers( + self.apiclient, + account=self.account.account.name, + domainid=self.account.account.domainid + ) + return + + +class TestNonRootAdminsPrivileges(cloudstackTestCase): + + @classmethod + def setUpClass(cls): + cls.api_client = fetch_api_client() + cls.services = Services().services + # Get Zone settings + cls.zone = get_zone(cls.api_client, cls.services) + + # Create an account, domain etc + cls.domain = Domain.create( + cls.api_client, + cls.services["domain"], + ) + cls.account = Account.create( + cls.api_client, + cls.services["account"], + admin=True, + domainid=cls.domain.id + ) + cls._cleanup = [ + cls.account, + cls.domain + ] + return + + @classmethod + def tearDownClass(cls): + try: + #Cleanup resources used + cleanup_resources(cls.api_client, cls._cleanup) + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + def setUp(self): + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.cleanup = [] + return + + def tearDown(self): + try: + #Clean up, terminate the created accounts + cleanup_resources(self.apiclient, self.cleanup) + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + def test_01_non_root_admin_Privileges(self): + """Test to verify Non Root admin previleges""" + + # Validate the following + # 1. Create few accounts/users in ROOT domain + # 2. Verify listAccounts API gives only accounts associated with new + # domain. + + # Create accounts for ROOT domain + account_1 = Account.create( + self.apiclient, + self.services["account"] + ) + self.cleanup.append(account_1) + account_2 = Account.create( + self.apiclient, + self.services["account"] + ) + self.cleanup.append(account_2) + + accounts_response = list_accounts( + self.apiclient, + domainid=self.domain.id + ) + + self.assertEqual( + len(accounts_response), + 1, + "Check List accounts response" + ) + # Verify only account associated with domain is listed + for account in accounts_response: + self.assertEqual( + account.domainid, + self.domain.id, + "Check domain ID of account" + ) + return + + +class TestServiceOfferingSiblings(cloudstackTestCase): + + @classmethod + def setUpClass(cls): + cls.api_client = fetch_api_client() + cls.services = Services().services + + # Create Domains, accounts etc + cls.domain_1 = Domain.create( + cls.api_client, + cls.services["domain"] + ) + cls.domain_2 = Domain.create( + cls.api_client, + cls.services["domain"] + ) + cls.service_offering = ServiceOffering.create( + cls.api_client, + cls.services["service_offering"], + domainid=cls.domain_1.id + ) + # Create account for doamin_1 + cls.account_1 = Account.create( + cls.api_client, + cls.services["account"], + admin=True, + domainid=cls.domain_1.id + ) + + # Create an account for domain_2 + cls.account_2 = Account.create( + cls.api_client, + cls.services["account"], + admin=True, + domainid=cls.domain_2.id + ) + + cls._cleanup = [ + cls.account_1, + cls.account_2, + cls.service_offering, + cls.domain_1, + cls.domain_2, + ] + return + + @classmethod + def tearDownClass(cls): + try: + #Cleanup resources used + cleanup_resources(cls.api_client, cls._cleanup) + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + def setUp(self): + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.cleanup = [] + return + + def tearDown(self): + try: + #Clean up, terminate the created domains, accounts + cleanup_resources(self.apiclient, self.cleanup) + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + def test_01_service_offering_siblings(self): + """Test to verify service offerings at same level in hierarchy""" + + # Validate the following + # 1. Verify service offering is visible for domain_1 + # 2. Verify service offering is not visible for domain_2 + + service_offerings = list_service_offering( + self.apiclient, + domainid=self.domain_1.id + ) + self.assertNotEqual( + len(service_offerings), + 0, + "Check List Service Offerings response" + ) + + for service_offering in service_offerings: + self.assertEqual( + service_offering.id, + self.service_offering.id, + "Check Service offering ID for domain" + str(self.domain_1.name) + ) + # Verify private service offering is not visible to other domain + service_offerings = list_service_offering( + self.apiclient, + domainid=self.domain_2.id + ) + self.assertEqual( + service_offerings, + None, + "Check List Service Offerings response for other domain" + ) + return + + +# TODO: Testing +class TestServiceOfferingHierarchy(cloudstackTestCase): + + @classmethod + def setUpClass(cls): + cls.api_client = fetch_api_client() + cls.services = Services().services + + # Create domain, service offerings etc + cls.domain_1 = Domain.create( + cls.api_client, + cls.services["domain"] + ) + cls.domain_2 = Domain.create( + cls.api_client, + cls.services["domain"], + parentdomainid=cls.domain_1.id + ) + cls.service_offering = ServiceOffering.create( + cls.api_client, + cls.services["service_offering"], + domainid=cls.domain_1.id + ) + # Create account for doamin_1 + cls.account_1 = Account.create( + cls.api_client, + cls.services["account"], + admin=True, + domainid=cls.domain_1.id + ) + + # Create an account for domain_2 + cls.account_2 = Account.create( + cls.api_client, + cls.services["account"], + admin=True, + domainid=cls.domain_2.id + ) + + cls._cleanup = [ + cls.account_1, + cls.account_2, + cls.service_offering, + cls.domain_1, + cls.domain_2, + ] + return + + @classmethod + def tearDownClass(cls): + try: + #Cleanup resources used + cleanup_resources(cls.api_client, cls._cleanup) + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + def setUp(self): + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.cleanup = [] + return + + def tearDown(self): + try: + #Clean up, terminate the created instance, volumes and snapshots + cleanup_resources(self.apiclient, self.cleanup) + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + def test_01_service_offering_hierarchy(self): + """Test to verify service offerings at same level in hierarchy""" + + # Validate the following + # 1. Verify service offering is visible for domain_1 + # 2. Verify service offering is also visible for domain_2 + + service_offerings = list_service_offering( + self.apiclient, + domainid=self.domain_1.id + ) + self.assertNotEqual( + len(service_offerings), + 0, + "Check List Service Offerings response" + ) + + for service_offering in service_offerings: + self.assertEqual( + service_offering.id, + self.service_offering.id, + "Check Service offering ID for domain" + str(self.domain_1.name) + ) + + # Verify private service offering is not visible to other domain + service_offerings = list_service_offering( + self.apiclient, + domainid=self.domain_2.id + ) + self.assertNotEqual( + len(service_offerings), + 0, + "Check List Service Offerings response" + ) + + for service_offering in service_offerings: + self.assertEqual( + service_offering.id, + self.service_offering.id, + "Check Service offering ID for domain" + str(self.domain_2.name) + ) + return + +# TODO: Testing + +class TesttemplateHierarchy(cloudstackTestCase): + + @classmethod + def setUpClass(cls): + cls.api_client = fetch_api_client() + cls.services = Services().services + # Get Zone settings + cls.zone = get_zone(cls.api_client, cls.services) + cls.services["template"]["zoneid"] = cls.zone.id + + # Create domains, accounts and template + cls.domain_1 = Domain.create( + cls.api_client, + cls.services["domain"] + ) + cls.domain_2 = Domain.create( + cls.api_client, + cls.services["domain"], + parentdomainid=cls.domain_1.id + ) + + # Create account for doamin_1 + cls.account_1 = Account.create( + cls.api_client, + cls.services["account"], + admin=True, + domainid=cls.domain_1.id + ) + + # Create an account for domain_2 + cls.account_2 = Account.create( + cls.api_client, + cls.services["account"], + admin=True, + domainid=cls.domain_2.id + ) + + cls.template = Template.register( + cls.api_client, + cls.services["template"], + account=cls.account_1.account.name, + domainid=cls.domain_1.id + ) + cls._cleanup = [ + cls.template, + cls.account_1, + cls.account_2, + cls.domain_1, + cls.domain_2, + ] + return + + @classmethod + def tearDownClass(cls): + try: + #Cleanup resources used + cleanup_resources(cls.api_client, cls._cleanup) + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + def setUp(self): + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.cleanup = [] + return + + def tearDown(self): + try: + #Clean up, terminate the created instance, volumes and snapshots + cleanup_resources(self.apiclient, self.cleanup) + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + def test_01_template_hierarchy(self): + """Test to verify template at same level in hierarchy""" + + # Validate the following + # 1. Verify template is visible for domain_1 + # 2. Verify template is also visible for domain_2 + + # Sleep to ensure that template state is reflected across + time.sleep(self.services["sleep"]) + + templates = list_templates( + self.apiclient, + templatefilter='self', + account=self.account_1.account.name, + domainid=self.domain_1.id + ) + self.assertNotEqual( + len(templates), + 0, + "Check List Template response" + ) + + for template in templates: + self.assertEqual( + template.id, + self.template.id, + "Check Template ID for domain" + str(self.domain_1.name) + ) + + # Verify private service offering is not visible to other domain + templates = list_templates( + self.apiclient, + templatefilter='self', + account=self.account_2.account.name, + domainid=self.domain_2.id + ) + self.assertNotEqual( + len(templates), + 0, + "Check List Service Offerings response" + ) + + for template in templates: + self.assertEqual( + template.id, + self.template.id, + "Check Template ID for domain" + str(self.domain_2.name) + ) + return + +# TODO: Testing +class TestAddVmToSubDomain(cloudstackTestCase): + + @classmethod + def setUpClass(cls): + cls.api_client = fetch_api_client() + cls.services = Services().services + + # Setup working Environment- Create domain, zone, pod cluster etc. + cls.domain = Domain.create( + cls.api_client, + cls.services["domain"] + ) + cls.zone = Zone.create( + cls.api_client, + cls.services["zone"], + domainid=cls.domain.id + ) + cls.services["pod"]["zoneid"] = cls.zone.id + + cls.pod = Pod.create( + cls.api_client, + cls.services["pod"] + ) + cls.services["public_ip"]["zoneid"] = cls.zone.id + cls.services["public_ip"]["podid"] = cls.pod.id + + cls.public_ip_range = PublicIp.create( + cls.api_client, + cls.services["public_ip"] + ) + cls.services["cluster"]["zoneid"] = cls.zone.id + cls.services["cluster"]["podid"] = cls.pod.id + + cls.cluster = Cluster.create( + cls.api_client, + cls.services["cluster"] + ) + + cls.services["host"]["zoneid"] = cls.zone.id + cls.services["host"]["podid"] = cls.pod.id + + cls.host = Host.create( + cls.api_client, + cls.cluster, + cls.services["host"] + ) + + cls.services["primary_storage"]["zoneid"] = cls.zone.id + cls.services["primary_storage"]["podid"] = cls.pod.id + + cls.primary_storage = StoragePool.create( + cls.api_client, + cls.services["primary_storage"], + cls.cluster.id + ) + + # before adding Sec Storage, First download System Templates on it + download_systemplates_sec_storage( + cls.services["mgmt_server"], + cls.services["sysVM"] + ) + + cls.services["sec_storage"]["zoneid"] = cls.zone.id + cls.services["sec_storage"]["podid"] = cls.pod.id + + cls.secondary_storage = SecondaryStorage.create( + cls.api_client, + cls.services["sec_storage"] + ) + # After adding Host, Clusters wait for SSVMs to come up + wait_for_ssvms( + cls.api_client, + cls.zone.id, + cls.pod.id + ) + + ssvm_response = list_ssvms( + cls.api_client, + systemvmtype='secondarystoragevm', + hostid=cls.host.id + ) + ssvm = ssvm_response[0] + # Download BUILTIN templates + download_builtin_templates( + cls.api_client, + cls.zone.id, + cls.services["cluster"]["hypervisor"], + cls.services["host"], + ssvm.linklocalip + ) + cls.sub_domain = Domain.create( + cls.api_client, + cls.services["domain"], + parentdomainid=cls.domain.id + ) + + # Create account for doamin_1 + cls.account_1 = Account.create( + cls.api_client, + cls.services["account"], + admin=True, + domainid=cls.domain.id + ) + + # Create an account for domain_2 + cls.account_2 = Account.create( + cls.api_client, + cls.services["account"], + admin=True, + domainid=cls.sub_domain.id + ) + + cls.service_offering = ServiceOffering.create( + cls.api_client, + cls.services["service_offering"], + domainid=cls.domain.id + ) + cls.template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostypeid"] + ) + + cls.vm_1 = VirtualMachine.create( + cls.api_client, + cls.services["virtual_machine"], + templateid=cls.template.id, + accountid=cls.account_1.account.name, + domainid=cls.account_1.account.domainid, + serviceofferingid=cls.service_offering.id + ) + + cls.vm_2 = VirtualMachine.create( + cls.api_client, + cls.services["virtual_machine"], + templateid=cls.template.id, + accountid=cls.account_2.account.name, + domainid=cls.account_2.account.domainid, + serviceofferingid=cls.service_offering.id + ) + cls._cleanup = [ + cls.account_1, + cls.account_2, + cls.service_offering, + cls.sub_domain, + cls.secondary_storage, + cls.primary_storage, + cls.host, + cls.cluster, + cls.pod, + cls.zone + ] + return + + @classmethod + def tearDownClass(cls): + try: + # Cleanup resources used + cleanup_resources(cls.api_client, cls._cleanup) + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + def setUp(self): + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.cleanup = [] + return + + def tearDown(self): + try: + #Clean up, terminate the created resources + cleanup_resources(self.apiclient, self.cleanup) + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + def test_01_add_vm_to_subdomain(self): + """ Test Sub domain allowed to launch VM when a Domain level zone is + created""" + + # Validate the following + # 1. Verify VM created by Account_1 is in Running state + # 2. Verify VM created by Account_2 is in Running state + + vm_response = list_virtual_machines( + self.apiclient, + id=self.vm_1.id + ) + self.assertNotEqual( + len(vm_response), + 0, + "Check List Template response" + ) + + for vm in vm_response: + self.assertEqual( + vm.state, + 'Running', + "Check State of Virtual machine" + ) + + vm_response = list_virtual_machines( + self.apiclient, + id=self.vm_2.id + ) + self.assertNotEqual( + len(vm_response), + 0, + "Check List Template response" + ) + + for vm in vm_response: + self.assertEqual( + vm.state, + 'Running', + "Check State of Virtual machine" + ) + return diff --git a/tools/testClient/testcase/P1-tests/test_resource_limits.py b/tools/testClient/testcase/P1-tests/test_resource_limits.py new file mode 100644 index 00000000000..0f39a3048e7 --- /dev/null +++ b/tools/testClient/testcase/P1-tests/test_resource_limits.py @@ -0,0 +1,1124 @@ +# -*- encoding: utf-8 -*- +# +# Copyright (c) 2012 Citrix. All rights reserved. +# +""" P1 tests for Resource limits +""" +#Import Local Modules +from cloudstackTestCase import * +from cloudstackAPI import * +from testcase.libs.utils import * +from testcase.libs.base import * +from testcase.libs.common import * +import datetime + +class Services: + """Test Resource Limits Services + """ + + def __init__(self): + self.services = { + "domain": { + "name": "Domain", + }, + "account": { + "email": "test@test.com", + "firstname": "Test", + "lastname": "User", + "username": "test", + # Random characters are appended for unique + # username + "password": "fr3sca", + }, + "service_offering": { + "name": "Tiny Instance", + "displaytext": "Tiny Instance", + "cpunumber": 1, + "cpuspeed": 100, # in MHz + "memory": 64, # In MBs + }, + "disk_offering": { + "displaytext": "Small", + "name": "Small", + "disksize": 1 + }, + "volume": { + "diskname": "TestDiskServ", + "domainid": 1, + }, + "server": { + "displayname": "TestVM", + "username": "root", + "password": "password", + "ssh_port": 22, + "hypervisor": 'XenServer', + "domainid": 1, + "privateport": 22, + "publicport": 22, + "protocol": 'TCP', + }, + "template": { + "displaytext": "Cent OS Template", + "name": "Cent OS Template", + "ostypeid": 12, + "templatefilter": 'self', + }, + "domainid": 1, + "ostypeid": 12, + # Cent OS 5.3 (64 bit) + "zoneid": 1, + # Optional, if specified the mentioned zone will be + # used for tests + "sleep": 60, + "mode":'advanced' + } + +class TestResourceLimitsAccount(cloudstackTestCase): + + @classmethod + def setUpClass(cls): + cls.api_client = fetch_api_client() + cls.services = Services().services + # Get Zone, Domain and templates + cls.zone = get_zone(cls.api_client, cls.services) + + cls.template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostypeid"] + ) + cls.services["server"]["zoneid"] = cls.zone.id + + # Create Account, VMs etc + cls.account_1 = Account.create( + cls.api_client, + cls.services["account"], + ) + # Create Account, VMs etc + cls.account_2 = Account.create( + cls.api_client, + cls.services["account"], + ) + + cls.service_offering = ServiceOffering.create( + cls.api_client, + cls.services["service_offering"] + ) + cls.disk_offering = DiskOffering.create( + cls.api_client, + cls.services["disk_offering"] + ) + cls._cleanup = [ + cls.disk_offering, + cls.service_offering, + cls.account_1, + cls.account_2 + ] + return + + @classmethod + def tearDownClass(cls): + try: + #Cleanup resources used + cleanup_resources(cls.api_client, cls._cleanup) + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + def setUp(self): + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.cleanup = [] + return + + def tearDown(self): + try: + #Clean up, terminate the created instance, volumes and snapshots + cleanup_resources(self.apiclient, self.cleanup) + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + def test_01_vm_per_account(self): + """Test VM limit per account + """ + + # Validate the following + # 1. Set user_vm=1 limit for account 1. + # 2. Try to start 2 VMs account 1. Verify start of second VM is denied + # for this account. + # 3. Try to start 2 VMs account 2. Verify 2 SM are started properly + + # Set usage_vm=1 for Account 1 + update_resource_limit( + self.apiclient, + 0, # Instance + account=self.account_1.account.name, + domainid=self.account_1.account.domainid, + max=1 + ) + + virtual_machine = VirtualMachine.create( + self.apiclient, + self.services["server"], + templateid=self.template.id, + accountid=self.account_1.account.name, + serviceofferingid=self.service_offering.id + ) + self.cleanup.append(virtual_machine) + + # Verify VM state + self.assertEqual( + virtual_machine.state, + 'Running', + "Check VM state is Running or not" + ) + + # Exception should be raised for second instance (account_1) + with self.assertRaises(Exception): + VirtualMachine.create( + self.apiclient, + self.services["server"], + templateid=self.template.id, + accountid=self.account_1.account.name, + serviceofferingid=self.service_offering.id + ) + # Start 2 instances for account_2 + virtual_machine_1 = VirtualMachine.create( + self.apiclient, + self.services["server"], + templateid=self.template.id, + accountid=self.account_2.account.name, + serviceofferingid=self.service_offering.id + ) + self.cleanup.append(virtual_machine_1) + # Verify VM state + self.assertEqual( + virtual_machine_1.state, + 'Running', + "Check VM state is Running or not" + ) + + virtual_machine_2 = VirtualMachine.create( + self.apiclient, + self.services["server"], + templateid=self.template.id, + accountid=self.account_2.account.name, + serviceofferingid=self.service_offering.id + ) + self.cleanup.append(virtual_machine_2) + # Verify VM state + self.assertEqual( + virtual_machine_2.state, + 'Running', + "Check VM state is Running or not" + ) + return + + def test_02_publicip_per_account(self): + """Test Public IP limit per account + """ + + # Validate the following + # 1. Set Public_IP= 2 limit for account 1. + # 2. start 1 VMs account 1 + # 3. start 1 VMs account 2 + # 4. Acquire 2 IP in account 1. Verify account with limit should be + # denied to acquire more than one IP. + # 5. Acquire 2 IP in account 2. Verify account 2 should be able to + # Acquire IP without any warning + + # Set usage_vm=1 for Account 1 + update_resource_limit( + self.apiclient, + 1, # Public Ip + account=self.account_1.account.name, + domainid=self.account_1.account.domainid, + max=2 + ) + + virtual_machine_1 = VirtualMachine.create( + self.apiclient, + self.services["server"], + templateid=self.template.id, + accountid=self.account_1.account.name, + serviceofferingid=self.service_offering.id + ) + self.cleanup.append(virtual_machine_1) + # Verify VM state + self.assertEqual( + virtual_machine_1.state, + 'Running', + "Check VM state is Running or not" + ) + + # Create VM for second account + virtual_machine_2 = VirtualMachine.create( + self.apiclient, + self.services["server"], + templateid=self.template.id, + accountid=self.account_2.account.name, + serviceofferingid=self.service_offering.id + ) + self.cleanup.append(virtual_machine_2) + # Verify VM state + self.assertEqual( + virtual_machine_2.state, + 'Running', + "Check VM state is Running or not" + ) + + public_ip_1 = PublicIPAddress.create( + self.apiclient, + virtual_machine_1.account, + virtual_machine_1.zoneid, + virtual_machine_1.domainid, + self.services["server"] + ) + self.cleanup.append(public_ip_1) + + # Sleep to ensure that state is reflected across all the calls + time.sleep(self.services["sleep"]) + + # Verify Public IP state + self.assertEqual( + public_ip_1.ipaddress.state in [ + 'Allocated', + 'Allocating' + ], + True, + "Check Public IP state is allocated or not" + ) + + # Exception should be raised for second instance (account_1) + with self.assertRaises(Exception): + public_ip_2 = PublicIPAddress.create( + self.apiclient, + virtual_machine_1.account, + virtual_machine_1.zoneid, + virtual_machine_1.domainid, + self.services["server"] + ) + + # Assign Public IP for account 2 + public_ip_3 = PublicIPAddress.create( + self.apiclient, + virtual_machine_2.account, + virtual_machine_2.zoneid, + virtual_machine_2.domainid, + self.services["server"] + ) + self.cleanup.append(public_ip_3) + + # Verify Public IP state + self.assertEqual( + public_ip_3.ipaddress.state in [ + 'Allocated', + 'Allocating' + ], + True, + "Check Public IP state is allocated or not" + ) + public_ip_4 = PublicIPAddress.create( + self.apiclient, + virtual_machine_2.account, + virtual_machine_2.zoneid, + virtual_machine_2.domainid, + self.services["server"] + ) + self.cleanup.append(public_ip_4) + # Verify Public IP state + self.assertEqual( + public_ip_4.ipaddress.state in [ + 'Allocated', + 'Allocating' + ], + True, + "Check Public IP state is allocated or not" + ) + return + + def test_03_snapshots_per_account(self): + """Test Snapshot limit per account + """ + + # Validate the following + # 1. Set snapshot= 2 limit for account 1. + # 2. start 1 VMs account 1 + # 3. start 1 VMs account 2 + # 4. Create 2 snapshot in account 1. Verify account with limit should + # be denied to create more than one snapshot. + # 5. Create 2 snapshot in account 2. Verify account 2 should be able to + # create snapshots without any warning + + # Set usage_vm=1 for Account 1 + update_resource_limit( + self.apiclient, + 3, # Snapshot + account=self.account_1.account.name, + domainid=self.account_1.account.domainid, + max=1 + ) + + virtual_machine_1 = VirtualMachine.create( + self.apiclient, + self.services["server"], + templateid=self.template.id, + accountid=self.account_1.account.name, + serviceofferingid=self.service_offering.id + ) + self.cleanup.append(virtual_machine_1) + # Verify VM state + self.assertEqual( + virtual_machine_1.state, + 'Running', + "Check VM state is Running or not" + ) + + # Create VM for second account + virtual_machine_2 = VirtualMachine.create( + self.apiclient, + self.services["server"], + templateid=self.template.id, + accountid=self.account_2.account.name, + serviceofferingid=self.service_offering.id + ) + self.cleanup.append(virtual_machine_2) + # Verify VM state + self.assertEqual( + virtual_machine_2.state, + 'Running', + "Check VM state is Running or not" + ) + + # Get the Root disk of VM + volumes = list_volumes( + self.apiclient, + virtualmachineid=virtual_machine_1.id, + type='ROOT' + ) + volume = volumes[0] + + # Create a snapshot from the ROOTDISK (Account 1) + snapshot_1 = Snapshot.create(self.apiclient, + volumes[0].id, + account=self.account_1.account.name, + domainid=self.account_1.account.domainid, + ) + self.cleanup.append(snapshot_1) + # Verify Snapshot state + self.assertEqual( + snapshot_1.state in [ + 'BackedUp', + 'CreatedOnPrimary' + ], + True, + "Check Snapshot state is Running or not" + ) + + # Exception should be raised for second snapshot (account_1) + with self.assertRaises(Exception): + Snapshot.create(self.apiclient, + volumes[0].id, + account=self.account_1.account.name, + domainid=self.account_1.account.domainid, + ) + + # Get the Root disk of VM + volumes = list_volumes( + self.apiclient, + virtualmachineid=virtual_machine_2.id, + type='ROOT' + ) + volume = volumes[0] + + # Create a snapshot from the ROOTDISK (Account 2) + snapshot_2 = Snapshot.create(self.apiclient, + volumes[0].id, + account=self.account_2.account.name, + domainid=self.account_2.account.domainid, + ) + self.cleanup.append(snapshot_2) + # Verify Snapshot state + self.assertEqual( + snapshot_2.state in [ + 'BackedUp', + 'CreatedOnPrimary' + ], + True, + "Check Snapshot state is Running or not" + ) + # Create a second snapshot from the ROOTDISK (Account 2) + snapshot_3 = Snapshot.create(self.apiclient, + volumes[0].id, + account=self.account_2.account.name, + domainid=self.account_2.account.domainid, + ) + self.cleanup.append(snapshot_3) + # Verify Snapshot state + self.assertEqual( + snapshot_3.state in [ + 'BackedUp', + 'CreatedOnPrimary' + ], + True, + "Check Snapshot state is Running or not" + ) + return + + def test_04_volumes_per_account(self): + """Test Volumes limit per account + """ + + # Validate the following + # 1. Set volumes=2 limit for account 1. + # 2. Start 1 VMs account 1 + # 3. Start 1 VMs account 2 + # 4. Create 2 volumes in account 1. Verify account with limit should be + # denied to create more than one volume. + # 5. Create 2 volumes in account 2. Verify account 2 should be able to + # create Volume without any warning + + # Set usage_vm=1 for Account 1 + update_resource_limit( + self.apiclient, + 2, # Volume + account=self.account_1.account.name, + domainid=self.account_1.account.domainid, + max=3 + ) + + virtual_machine_1 = VirtualMachine.create( + self.apiclient, + self.services["server"], + templateid=self.template.id, + accountid=self.account_1.account.name, + serviceofferingid=self.service_offering.id + ) + self.cleanup.append(virtual_machine_1) + # Verify VM state + self.assertEqual( + virtual_machine_1.state, + 'Running', + "Check VM state is Running or not" + ) + + # Create VM for second account + virtual_machine_2 = VirtualMachine.create( + self.apiclient, + self.services["server"], + templateid=self.template.id, + accountid=self.account_2.account.name, + serviceofferingid=self.service_offering.id + ) + self.cleanup.append(virtual_machine_2) + # Verify VM state + self.assertEqual( + virtual_machine_2.state, + 'Running', + "Check VM state is Running or not" + ) + + volume_1 = Volume.create( + self.apiclient, + self.services["volume"], + zoneid=self.zone.id, + account=self.account_1.account.name, + domainid=self.account_2.account.domainid, + diskofferingid=self.disk_offering.id + ) + self.cleanup.append(volume_1) + # Verify Volume state + self.assertEqual( + volume_1.state in [ + 'Allocated', + 'Ready' + ], + True, + "Check Volume state is Ready or not" + ) + + # Exception should be raised for second snapshot (account_1) + with self.assertRaises(Exception): + Volume.create( + self.apiclient, + self.services["volume"], + zoneid=self.zone.id, + account=self.account_1.account.name, + domainid=self.account_1.account.domainid, + diskofferingid=self.disk_offering.id + ) + + # Create volume for Account 2 + volume_2 = Volume.create( + self.apiclient, + self.services["volume"], + zoneid=self.zone.id, + account=self.account_2.account.name, + domainid=self.account_2.account.domainid, + diskofferingid=self.disk_offering.id + ) + self.cleanup.append(volume_2) + # Verify Volume state + self.assertEqual( + volume_2.state in [ + 'Allocated', + 'Ready' + ], + True, + "Check Volume state is Ready or not" + ) + + # Create a second volume from the ROOTDISK (Account 2) + volume_3 = Volume.create( + self.apiclient, + self.services["volume"], + zoneid=self.zone.id, + account=self.account_2.account.name, + domainid=self.account_2.account.domainid, + diskofferingid=self.disk_offering.id + ) + self.cleanup.append(volume_3) + # Verify Volume state + self.assertEqual( + volume_3.state in [ + 'Allocated', + 'Ready' + ], + True, + "Check Volume state is Ready or not" + ) + return + + def test_05_templates_per_account(self): + """Test Templates limit per account + """ + + # Validate the following + # 1. Set templates=1 limit for account 1. + # 2. Try to create 2 templates in account 1. Verify account with limit + # should be denied to create more than 1 template. + # 3. Try to create 2 templates in account 2. Verify account 2 should be + # able to create template without any error + + # Set usage_vm=1 for Account 1 + update_resource_limit( + self.apiclient, + 4, # Template + account=self.account_1.account.name, + domainid=self.account_1.account.domainid, + max=1 + ) + + virtual_machine_1 = VirtualMachine.create( + self.apiclient, + self.services["server"], + templateid=self.template.id, + accountid=self.account_1.account.name, + serviceofferingid=self.service_offering.id + ) + self.cleanup.append(virtual_machine_1) + # Verify VM state + self.assertEqual( + virtual_machine_1.state, + 'Running', + "Check VM state is Running or not" + ) + + # Create VM for second account + virtual_machine_2 = VirtualMachine.create( + self.apiclient, + self.services["server"], + templateid=self.template.id, + accountid=self.account_2.account.name, + serviceofferingid=self.service_offering.id + ) + self.cleanup.append(virtual_machine_2) + # Verify VM state + self.assertEqual( + virtual_machine_2.state, + 'Running', + "Check VM state is Running or not" + ) + + virtual_machine_1.stop(self.apiclient) + # Get the Root disk of VM + volumes = list_volumes( + self.apiclient, + virtualmachineid=virtual_machine_1.id, + type='ROOT' + ) + volume = volumes[0] + + # Create a snapshot from the ROOTDISK (Account 1) + template_1 = Template.create( + self.apiclient, + self.services["template"], + volumeid=volume.id, + account=self.account_1.account.name, + domainid=self.account_1.account.domainid, + ) + + self.cleanup.append(template_1) + # Verify Template state + self.assertEqual( + template_1.isready, + True, + "Check Template is in ready state or not" + ) + + # Exception should be raised for second snapshot (account_1) + with self.assertRaises(Exception): + Template.create( + self.apiclient, + self.services["template"], + volumeid=volume.id, + account=self.account_1.account.name, + domainid=self.account_1.account.domainid, + ) + virtual_machine_2.stop(self.apiclient) + # Get the Root disk of VM + volumes = list_volumes( + self.apiclient, + virtualmachineid=virtual_machine_2.id, + type='ROOT' + ) + volume = volumes[0] + + # Create a snapshot from the ROOTDISK (Account 1) + template_2 = Template.create( + self.apiclient, + self.services["template"], + volumeid=volume.id, + account=self.account_2.account.name, + domainid=self.account_2.account.domainid, + ) + + self.cleanup.append(template_2) + # Verify Template state + self.assertEqual( + template_2.isready, + True, + "Check Template is in ready state or not" + ) + # Create a second volume from the ROOTDISK (Account 2) + template_3 = Template.create( + self.apiclient, + self.services["template"], + volumeid=volume.id, + account=self.account_2.account.name, + domainid=self.account_2.account.domainid, + ) + + self.cleanup.append(template_3) + # Verify Template state + self.assertEqual( + template_3.isready, + True, + "Check Template is in ready state or not" + ) + return + + +class TestResourceLimitsDomain(cloudstackTestCase): + + @classmethod + def setUpClass(cls): + cls.api_client = fetch_api_client() + cls.services = Services().services + # Get Zone, Domain and templates + cls.zone = get_zone(cls.api_client, cls.services) + + cls.template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostypeid"] + ) + cls.services["server"]["zoneid"] = cls.zone.id + + # Create Domains, Account etc + cls.domain = Domain.create( + cls.api_client, + cls.services["domain"] + ) + + cls.account = Account.create( + cls.api_client, + cls.services["account"], + domainid=cls.domain.id + ) + + cls.services["account"] = cls.account.account.name + + # Create Service offering and disk offerings etc + cls.service_offering = ServiceOffering.create( + cls.api_client, + cls.services["service_offering"] + ) + cls.disk_offering = DiskOffering.create( + cls.api_client, + cls.services["disk_offering"] + ) + cls._cleanup = [ + cls.service_offering, + cls.disk_offering, + cls.account, + cls.domain + ] + return + + @classmethod + def tearDownClass(cls): + try: + #Cleanup resources used + cleanup_resources(cls.api_client, cls._cleanup) + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + def setUp(self): + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.cleanup = [] + return + + def tearDown(self): + try: + #Clean up, terminate the created instance, volumes and snapshots + cleanup_resources(self.apiclient, self.cleanup) + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + def test_01_vm_per_domain(self): + """Test VM limit per domain + """ + + # Validate the following + # 1. Set max VM per domain to 2 + # 2. Create account and start 2 VMs. Verify VM state is Up and Running + # 3. Try to create 3rd VM instance. The appropriate error or alert + # should be raised + + # Set usage_vm=1 for Account 1 + update_resource_limit( + self.apiclient, + 0, # Instance + domainid=self.account.account.domainid, + max=2 + ) + + virtual_machine_1 = VirtualMachine.create( + self.apiclient, + self.services["server"], + templateid=self.template.id, + accountid=self.account.account.name, + domainid=self.account.account.domainid, + serviceofferingid=self.service_offering.id + ) + self.cleanup.append(virtual_machine_1) + # Verify VM state + self.assertEqual( + virtual_machine_1.state, + 'Running', + "Check VM state is Running or not" + ) + virtual_machine_2 = VirtualMachine.create( + self.apiclient, + self.services["server"], + templateid=self.template.id, + accountid=self.account.account.name, + domainid=self.account.account.domainid, + serviceofferingid=self.service_offering.id + ) + self.cleanup.append(virtual_machine_2) + # Verify VM state + self.assertEqual( + virtual_machine_2.state, + 'Running', + "Check VM state is Running or not" + ) + # Exception should be raised for second instance + with self.assertRaises(Exception): + VirtualMachine.create( + self.apiclient, + self.services["server"], + templateid=self.template.id, + accountid=self.account_1.account.name, + domainid=self.account.account.domainid, + serviceofferingid=self.service_offering.id + ) + return + + def test_01_publicip_per_domain(self): + """Test Public IP limit per domain + """ + + # Validate the following + # 1. set max no of IPs per domain to 2. + # 2. Create an account in this domain + # 3. Create 1 VM in this domain + # 4. Acquire 1 IP in the domain. IP should be successfully acquired + # 5. Try to acquire 3rd IP in this domain. It should give the user an + # appropriate error and an alert should be generated. + + # Set usage_vm=1 for Account 1 + update_resource_limit( + self.apiclient, + 1, # Public Ip + domainid=self.account.account.domainid, + max=2 + ) + + virtual_machine_1 = VirtualMachine.create( + self.apiclient, + self.services["server"], + templateid=self.template.id, + accountid=self.account.account.name, + domainid=self.account.account.domainid, + serviceofferingid=self.service_offering.id + ) + self.cleanup.append(virtual_machine_1) + # Verify VM state + self.assertEqual( + virtual_machine_1.state, + 'Running', + "Check VM state is Running or not" + ) + + public_ip_1 = PublicIPAddress.create( + self.apiclient, + virtual_machine_1.account, + virtual_machine_1.zoneid, + virtual_machine_1.domainid, + self.services["server"] + ) + self.cleanup.append(public_ip_1) + # Verify Public IP state + self.assertEqual( + public_ip_1.ipaddress.state in [ + 'Allocated', + 'Allocating' + ], + True, + "Check Public IP state is allocated or not" + ) + + # Exception should be raised for second Public IP + with self.assertRaises(Exception): + public_ip_2 = PublicIPAddress.create( + self.apiclient, + virtual_machine_1.account, + virtual_machine_1.zoneid, + virtual_machine_1.domainid, + self.services["server"] + ) + return + + def test_03_snapshots_per_domain(self): + """Test Snapshot limit per domain + """ + + # Validate the following + # 1. set max no of snapshots per domain to 1. + # 2. Create an account in this domain + # 3. Create 1 VM in this domain + # 4. Create one snapshot in the domain. Snapshot should be successfully + # created + # 5. Try to create another snapshot in this domain. It should give the + # user an appropriate error and an alert should be generated. + + # Set usage_vm=1 for Account 1 + update_resource_limit( + self.apiclient, + 3, # Snapshot + domainid=self.account.account.domainid, + max=1 + ) + + virtual_machine_1 = VirtualMachine.create( + self.apiclient, + self.services["server"], + templateid=self.template.id, + accountid=self.account.account.name, + domainid=self.account.account.domainid, + serviceofferingid=self.service_offering.id + ) + self.cleanup.append(virtual_machine_1) + # Verify VM state + self.assertEqual( + virtual_machine_1.state, + 'Running', + "Check VM state is Running or not" + ) + + # Get the Root disk of VM + volumes = list_volumes( + self.apiclient, + virtualmachineid=virtual_machine_1.id, + type='ROOT' + ) + volume = volumes[0] + + # Create a snapshot from the ROOTDISK + snapshot_1 = Snapshot.create(self.apiclient, + volumes[0].id, + account=self.account.account.name, + domainid=self.account.account.domainid, + ) + self.cleanup.append(snapshot_1) + # Verify Snapshot state + self.assertEqual( + snapshot_1.state in [ + 'BackedUp', + 'CreatedOnPrimary' + ], + True, + "Check Snapshot state is Running or not" + ) + + # Exception should be raised for second snapshot + with self.assertRaises(Exception): + Snapshot.create(self.apiclient, + volumes[0].id, + account=self.account.account.name, + domainid=self.account.account.domainid, + ) + return + + def test_04_volumes_per_domain(self): + """Test Volumes limit per domain + """ + + # Validate the following + # 1. set max no of volume per domain to 1. + # 2. Create an account in this domain + # 3. Create 1 VM in this domain + # 4. Try to Create another VM in the domain. It should give the user an + # appropriate error that Volume limit is exhausted and an alert + # should be generated. + + # Set usage_vm=1 for Account 1 + update_resource_limit( + self.apiclient, + 2, # Volume + domainid=self.account.account.domainid, + max=1 + ) + + virtual_machine_1 = VirtualMachine.create( + self.apiclient, + self.services["server"], + templateid=self.template.id, + accountid=self.account.account.name, + domainid=self.account.account.domainid, + serviceofferingid=self.service_offering.id + ) + self.cleanup.append(virtual_machine_1) + # Verify VM state + self.assertEqual( + virtual_machine_1.state, + 'Running', + "Check VM state is Running or not" + ) + + # Exception should be raised for second volume + with self.assertRaises(Exception): + Volume.create( + self.apiclient, + self.services["volume"], + zoneid=self.zone.id, + account=self.account.account.name, + domainid=self.account.account.domainid, + diskofferingid=self.disk_offering.id + ) + return + + def test_05_templates_per_domain(self): + """Test Templates limit per domain + """ + + # Validate the following + # 1. set max no of templates per domain to 2. + # 2. Create an account in this domain + # 3. Create 2 templates in this domain. Both template should be in + # ready state + # 4. Try create 3rd template in the domain. It should give the user an + # appropriate error and an alert should be generated. + + # Set usage_vm=1 for Account 1 + update_resource_limit( + self.apiclient, + 4, # Template + domainid=self.account.account.domainid, + max=2 + ) + + virtual_machine_1 = VirtualMachine.create( + self.apiclient, + self.services["server"], + templateid=self.template.id, + accountid=self.account.account.name, + domainid=self.account.account.domainid, + serviceofferingid=self.service_offering.id + ) + self.cleanup.append(virtual_machine_1) + # Verify VM state + self.assertEqual( + virtual_machine_1.state, + 'Running', + "Check VM state is Running or not" + ) + virtual_machine_1.stop(self.apiclient) + # Get the Root disk of VM + volumes = list_volumes( + self.apiclient, + virtualmachineid=virtual_machine_1.id, + type='ROOT' + ) + volume = volumes[0] + + # Create a template from the ROOTDISK + template_1 = Template.create( + self.apiclient, + self.services["template"], + volumeid=volume.id, + account=self.account.account.name, + domainid=self.account.account.domainid, + ) + + self.cleanup.append(template_1) + # Verify Template state + self.assertEqual( + template_1.isready, + True, + "Check Template is in ready state or not" + ) + + # Create a template from the ROOTDISK + template_2 = Template.create( + self.apiclient, + self.services["template"], + volumeid=volume.id, + account=self.account.account.name, + domainid=self.account.account.domainid, + ) + + self.cleanup.append(template_2) + # Verify Template state + self.assertEqual( + template_2.isready, + True, + "Check Template is in ready state or not" + ) + + # Exception should be raised for second template + with self.assertRaises(Exception): + Template.create( + self.apiclient, + self.services["template"], + volumeid=volume.id, + account=self.account.account.name, + domainid=self.account.account.domainid, + ) + return diff --git a/tools/testClient/testcase/P1-tests/test_templates.py b/tools/testClient/testcase/P1-tests/test_templates.py new file mode 100644 index 00000000000..823cf3635a5 --- /dev/null +++ b/tools/testClient/testcase/P1-tests/test_templates.py @@ -0,0 +1,523 @@ +# -*- encoding: utf-8 -*- +# +# Copyright (c) 2012 Citrix. All rights reserved. +# +""" P1 tests for Templates +""" +#Import Local Modules +from cloudstackTestCase import * +from cloudstackAPI import * +from testcase.libs.utils import * +from testcase.libs.base import * +from testcase.libs.common import * +import urllib +from random import random +#Import System modules +import time + + +class Services: + """Test Templates Services + """ + + def __init__(self): + self.services = { + "account": { + "email": "test@test.com", + "firstname": "Test", + "lastname": "User", + "username": "test", + # Random characters are appended for unique + # username + "password": "fr3sca", + }, + "service_offering": { + "name": "Tiny Instance", + "displaytext": "Tiny Instance", + "cpunumber": 1, + "cpuspeed": 100, # in MHz + "memory": 64, # In MBs + }, + "disk_offering": { + "displaytext": "Small", + "name": "Small", + "disksize": 1 + }, + "virtual_machine": { + "displayname": "testVM", + "hypervisor": 'XenServer', + "domainid": 1, + "protocol": 'TCP', + "ssh_port": 22, + "username": "root", + "password": "password", + "privateport": 22, + "publicport": 22, + }, + "volume": { + "diskname": "Test Volume", + }, + "templates": { + # Configs for different Template formats + # For Eg. raw image, zip etc + 0:{ + "displaytext": "Public Template", + "name": "Public template", + "ostypeid": 126, + "url": "http://download.cloud.com/releases/2.0.0/UbuntuServer-10-04-64bit.vhd.bz2", + "hypervisor": 'XenServer', + "format" : 'VHD', + "isfeatured": True, + "ispublic": True, + "isextractable": True, + }, + }, + "template": { + "displaytext": "Cent OS Template", + "name": "Cent OS Template", + "ostypeid": 12, + "templatefilter": 'self', + }, + "templatefilter": 'self', + "destzoneid": 13, # For Copy template (Destination zone) + "ostypeid": 12, + "zoneid": 1, + # Optional, if specified the mentioned zone will be + # used for tests + "mode": 'advanced', # Networking mode: Advanced, basic + } + + +# TODO : Testing +class TestCreateTemplate(cloudstackTestCase): + + def setUp(self): + + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.cleanup = [] + return + + def tearDown(self): + try: + self.dbclient.close() + #Clean up, terminate the created templates + cleanup_resources(self.apiclient, self.cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + @classmethod + def setUpClass(cls): + cls.services = Services().services + cls.api_client = fetch_api_client() + + # Get Zone, Domain and templates + cls.zone = get_zone(cls.api_client, cls.services) + cls.services["virtual_machine"]["zoneid"] = cls.zone.id + + cls.service_offering = ServiceOffering.create( + cls.api_client, + cls.services["service_offering"] + ) + cls.account = Account.create( + cls.api_client, + cls.services["account"] + ) + cls.services["account"] = cls.account.account.name + + cls._cleanup = [ + cls.account, + cls.service_offering + ] + return + + @classmethod + def tearDownClass(cls): + try: + cls.api_client = fetch_api_client() + #Cleanup resources used + cleanup_resources(cls.api_client, cls._cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + + return + + def test_01_create_template(self): + """Test create public & private template + """ + + # Validate the following: + # 1. Upload a templates in raw img format. Create a Vm instances from + # raw img template. + # 2. Upload a templates in zip file format. Create a Vm instances from + # zip template. + # 3. Upload a templates in tar format.Create a Vm instances from tar + # template. + # 4. Upload a templates in tar gzip format.Create a Vm instances from + # tar gzip template. + # 5. Upload a templates in tar bzip format. Create a Vm instances from + # tar bzip template. + # 6. Verify VMs & Templates is up and in ready state + + for k, v in self.services["templates"].items(): + + # Register new template + template = Template.register( + self.apiclient, + v, + account=self.account.account.name, + domainid=self.account.account.domainid + ) + # Wait for template to download + template.download(self.apiclient) + self.cleanup.append(template) + + # Wait for template status to be changed across + time.sleep(60) + list_template_response = list_templates( + self.apiclient, + templatefilter=\ + self.services["templatefilter"], + id=template.id, + zoneid=self.zone.id + ) + + #Verify template response to check whether template added successfully + self.assertNotEqual( + len(list_template_response), + 0, + "Check template available in List Templates" + ) + template_response = list_template_response[0] + + self.assertEqual( + template_response.isready, + True, + "Check display text of newly created template" + ) + + # Deploy new virtual machine using template + virtual_machine = VirtualMachine.create( + self.apiclient, + self.services["virtual_machine"], + templateid=template.id, + accountid=self.account.account.name, + serviceofferingid=self.service_offering.id, + mode=self.services["mode"] + ) + vm_response = list_virtual_machines( + self.apiclient, + id=virtual_machine.id, + account=self.account.account.name, + domainid=self.account.account.domainid + ) + #Verify VM response to check whether VM deployment was successful + self.assertNotEqual( + len(vm_response), + 0, + "Check VMs available in List VMs response" + ) + vm = vm_response[0] + self.assertEqual( + vm.state, + 'Running', + "Check the state of VM created from Template" + ) + return + + +class TestTemplates(cloudstackTestCase): + + @classmethod + def setUpClass(cls): + + cls.services = Services().services + cls.api_client = fetch_api_client() + + # Get Zone, templates etc + cls.zone = get_zone(cls.api_client, cls.services) + + template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostypeid"] + ) + cls.services["virtual_machine"]["zoneid"] = cls.zone.id + cls.account = Account.create( + cls.api_client, + cls.services["account"] + ) + + cls.services["account"] = cls.account.account.name + cls.service_offering = ServiceOffering.create( + cls.api_client, + cls.services["service_offering"] + ) + + # create virtual machine + cls.virtual_machine = VirtualMachine.create( + cls.api_client, + cls.services["virtual_machine"], + templateid=template.id, + accountid=cls.account.account.name, + serviceofferingid=cls.service_offering.id, + ) + #Stop virtual machine + cls.virtual_machine.stop(cls.api_client) + + #Wait before server has be successfully stopped + time.sleep(30) + list_volume = list_volumes( + cls.api_client, + virtualmachineid=cls.virtual_machine.id, + type='ROOT' + ) + cls.volume = list_volume[0] + + #Create template from volume + cls.template = Template.create( + cls.api_client, + cls.services["template"], + cls.volume.id + ) + cls._cleanup = [ + cls.service_offering, + cls.account, + ] + + @classmethod + def tearDownClass(cls): + try: + cls.api_client = fetch_api_client() + #Cleanup created resources such as templates and VMs + cleanup_resources(cls.api_client, cls._cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + + return + + def setUp(self): + + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.cleanup = [] + return + + def tearDown(self): + try: + #Clean up, terminate the created templates + cleanup_resources(self.apiclient, self.cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + + return + + def test_01_create_template_volume(self): + """Test Create template from volume + """ + + # Validate the following: + # 1. Deploy new VM using the template created from Volume + # 2. VM should be in Up and Running state + + virtual_machine = VirtualMachine.create( + self.apiclient, + self.services["virtual_machine"], + templateid=self.template.id, + accountid=self.account.account.name, + serviceofferingid=self.service_offering.id, + ) + + self.cleanup.append(virtual_machine) + vm_response = list_virtual_machines( + self.apiclient, + id=virtual_machine.id, + account=self.account.account.name, + domainid=self.account.account.domainid + ) + #Verify VM response to check whether VM deployment was successful + self.assertNotEqual( + len(vm_response), + 0, + "Check VMs available in List VMs response" + ) + vm = vm_response[0] + self.assertEqual( + vm.state, + 'Running', + "Check the state of VM created from Template" + ) + return + + def test_02_copy_template(self): + """Test for copy template from one zone to another""" + + # Validate the following + # 1. copy template should be successful and + # secondary storage should contain new copied template. + + cmd = copyTemplate.copyTemplateCmd() + cmd.id = self.template.id + cmd.destzoneid = self.services["destzoneid"] + cmd.sourcezoneid = self.zone.id + self.apiclient.copyTemplate(cmd) + + # Verify template is copied to another zone using ListTemplates + list_template_response = list_templates( + self.apiclient, + templatefilter=\ + self.services["templatefilter"], + id=self.template.id, + zoneid=self.services["destzoneid"] + ) + self.assertNotEqual( + len(list_template_response), + 0, + "Check template extracted in List Templates" + ) + + template_response = list_template_response[0] + self.assertEqual( + template_response.id, + self.template.id, + "Check ID of the downloaded template" + ) + self.assertEqual( + template_response.zoneid, + self.services["destzoneid"], + "Check zone ID of the copied template" + ) + + # Cleanup- Delete the copied template + cmd = deleteTemplate.deleteTemplateCmd() + cmd.id = self.template.id + cmd.zoneid = self.services["destzoneid"] + self.apiclient.deleteTemplate(cmd) + return + + + def test_03_delete_template(self): + """Test Delete template + """ + + # Validate the following: + # 1. Create a template and verify it is shown in list templates response + # 2. Delete the created template and again verify list template response + + # Verify template response for updated attributes + list_template_response = list_templates( + self.apiclient, + templatefilter=\ + self.services["template"]["templatefilter"], + id=self.template.id, + zoneid=self.zone.id + ) + self.assertNotEqual( + len(list_template_response), + 0, + "Check template available in List Templates" + ) + template_response = list_template_response[0] + + self.assertEqual( + template_response.id, + self.template.id, + "Check display text of updated template" + ) + self.template.delete(self.apiclient) + + list_template_response = list_templates( + self.apiclient, + templatefilter=\ + self.services["template"]["templatefilter"], + id=self.template.id, + zoneid=self.zone.id + ) + self.assertEqual( + list_template_response, + None, + "Check template available in List Templates" + ) + return + + def test_04_template_from_snapshot(self): + """Create Template from snapshot + """ + + # Validate the following + # 2. Snapshot the Root disk + # 3. Create Template from snapshot + # 4. Deploy Virtual machine using this template + # 5. VM should be in running state + + volumes = list_volumes( + self.apiclient, + virtualmachineid=self.virtual_machine.id, + type='ROOT' + ) + volume = volumes[0] + + #Create a snapshot of volume + snapshot = Snapshot.create(self.apiclient, volume.id) + self.cleanup.append(snapshot) + + # Generate template from the snapshot + template = Template.create_from_snapshot( + self.apiclient, + snapshot, + self.services["template"] + ) + self.cleanup.append(template) + # Verify created template + templates = list_templates( + self.apiclient, + templatefilter=\ + self.services["template"]["templatefilter"], + id=template.id + ) + self.assertNotEqual( + templates, + None, + "Check if result exists in list item call" + ) + + self.assertEqual( + templates[0].id, + template.id, + "Check new template id in list resources call" + ) + + # Deploy new virtual machine using template + virtual_machine = VirtualMachine.create( + self.apiclient, + self.services["virtual_machine"], + templateid=template.id, + accountid=self.account.account.name, + serviceofferingid=self.service_offering.id, + ) + self.cleanup.append(virtual_machine) + + vm_response = list_virtual_machines( + self.apiclient, + id=virtual_machine.id, + account=self.account.account.name, + domainid=self.account.account.domainid + ) + #Verify VM response to check whether VM deployment was successful + self.assertNotEqual( + len(vm_response), + 0, + "Check VMs available in List VMs response" + ) + vm = vm_response[0] + self.assertEqual( + vm.state, + 'Running', + "Check the state of VM created from Template" + ) + return diff --git a/tools/testClient/testcase/P1-tests/test_volumes.py b/tools/testClient/testcase/P1-tests/test_volumes.py new file mode 100644 index 00000000000..7ad18d9383b --- /dev/null +++ b/tools/testClient/testcase/P1-tests/test_volumes.py @@ -0,0 +1,849 @@ +# -*- encoding: utf-8 -*- +# +# Copyright (c) 2012 Citrix. All rights reserved. +# +""" P1 tests for Volumes +""" +#Import Local Modules +from cloudstackTestCase import * +from cloudstackAPI import * +from testcase.libs.utils import * +from testcase.libs.base import * +from testcase.libs.common import * +import remoteSSHClient +#Import System modules +import os +import urllib +import time +import tempfile + + +class Services: + """Test Volume Services + """ + + def __init__(self): + self.services = { + "account": { + "email": "test@test.com", + "firstname": "Test", + "lastname": "User", + "username": "test", + # Random characters are appended for unique + # username + "password": "fr3sca", + }, + "service_offering": { + "name": "Tiny Instance", + "displaytext": "Tiny Instance", + "cpunumber": 1, + "cpuspeed": 100, # in MHz + "memory": 64, # In MBs + }, + "disk_offering": { + "displaytext": "Small", + "name": "Small", + "disksize": 1 + }, + "volume": { + "diskname": "TestDiskServ", + "domainid": 1, + "max": 6, + }, + "virtual_machine": { + "displayname": "testVM", + "hypervisor": 'XenServer', + "domainid": 1, + "protocol": 'TCP', + "ssh_port": 22, + "username": "root", + "password": "password", + "privateport": 22, + "publicport": 22, + }, + "iso": # ISO settings for Attach/Detach ISO tests + { + "displaytext": "Test ISO", + "name": "testISO", + "url": "http://iso.linuxquestions.org/download/504/1819/http/gd4.tuwien.ac.at/dsl-4.4.10.iso", + # Source URL where ISO is located + "ostypeid": 76, + }, + "sleep": 90, + "domainid": 1, + "ostypeid": 12, + "zoneid": 1, + # Optional, if specified the mentioned zone will be + # used for tests + "mode": 'advanced', + } + + +class TestAttachVolume(cloudstackTestCase): + + @classmethod + def setUpClass(cls): + cls.api_client = fetch_api_client() + cls.services = Services().services + + # Get Zone, Domain and templates + cls.zone = get_zone(cls.api_client, cls.services) + cls.disk_offering = DiskOffering.create( + cls.api_client, + cls.services["disk_offering"] + ) + template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostypeid"] + ) + cls.services["zoneid"] = cls.zone.id + cls.services["virtual_machine"]["zoneid"] = cls.zone.id + cls.services["virtual_machine"]["template"] = template.id + + # Create VMs, NAT Rules etc + cls.account = Account.create( + cls.api_client, + cls.services["account"] + ) + + cls.services["account"] = cls.account.account.name + cls.service_offering = ServiceOffering.create( + cls.api_client, + cls.services["service_offering"] + ) + cls.virtual_machine = VirtualMachine.create( + cls.api_client, + cls.services["virtual_machine"], + accountid=cls.account.account.name, + serviceofferingid=cls.service_offering.id, + ) + cls._cleanup = [ + cls.service_offering, + cls.disk_offering, + cls.account + ] + + def setUp(self): + + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.cleanup = [] + + def test_01_volume_attach(self): + """Test Attach volumes (max capacity) + """ + + # Validate the following + # 1. Deploy a vm and create 5 data disk + # 2. Attach all the created Volume to the vm. + # 3. Reboot the VM. VM should be successfully rebooted + # 4. Stop the VM. Stop VM should be successful + # 5. Start The VM. Start VM should be successful + + # Create 5 volumes and attach to VM + for i in range(self.services["volume"]["max"]): + volume = Volume.create( + self.apiclient, + self.services["volume"], + zoneid=self.zone.id, + account=self.account.account.name, + diskofferingid=self.disk_offering.id + ) + # Check List Volume response for newly created volume + list_volume_response = list_volumes( + self.apiclient, + id=volume.id + ) + self.assertNotEqual( + list_volume_response, + None, + "Check if volume exists in ListVolumes" + ) + # Attach volume to VM + self.virtual_machine.attach_volume( + self.apiclient, + volume + ) + + # Check all volumes attached to same VM + list_volume_response = list_volumes( + self.apiclient, + virtualmachineid=self.virtual_machine.id, + type='DATADISK' + ) + self.assertNotEqual( + list_volume_response, + None, + "Check if volume exists in ListVolumes" + ) + self.assertEqual( + len(list_volume_response), + self.services["volume"]["max"], + "Check number of data volumes attached to VM" + ) + # Reboot VM + self.virtual_machine.reboot(self.apiclient) + # Sleep to ensure that VM is in ready state + time.sleep(self.services["sleep"]) + + vm_response = list_virtual_machines( + self.apiclient, + id=self.virtual_machine.id, + ) + #Verify VM response to check whether VM deployment was successful + self.assertNotEqual( + len(vm_response), + 0, + "Check VMs available in List VMs response" + ) + vm = vm_response[0] + self.assertEqual( + vm.state, + 'Running', + "Check the state of VM" + ) + + # Stop VM + self.virtual_machine.stop(self.apiclient) + # Sleep to ensure that VM is in ready state + time.sleep(self.services["sleep"]) + vm_response = list_virtual_machines( + self.apiclient, + id=self.virtual_machine.id, + ) + #Verify VM response to check whether VM deployment was successful + self.assertNotEqual( + len(vm_response), + 0, + "Check VMs available in List VMs response" + ) + + vm = vm_response[0] + self.assertEqual( + vm.state, + 'Stopped', + "Check the state of VM" + ) + + # Start VM + self.virtual_machine.start(self.apiclient) + # Sleep to ensure that VM is in ready state + time.sleep(self.services["sleep"]) + + vm_response = list_virtual_machines( + self.apiclient, + id=self.virtual_machine.id, + ) + #Verify VM response to check whether VM deployment was successful + self.assertNotEqual( + len(vm_response), + 0, + "Check VMs available in List VMs response" + ) + + vm = vm_response[0] + self.assertEqual( + vm.state, + 'Running', + "Check the state of VM" + ) + return + + def test_02_volume_attach_max(self): + """Test attach volumes (more than max) to an instance + """ + + # Validate the following + # 1. Attach one more data volume to VM (Already 5 attached) + # 2. Attach volume should fail + + # Create a volume and attach to VM + volume = Volume.create( + self.apiclient, + self.services["volume"], + zoneid=self.zone.id, + account=self.account.account.name, + diskofferingid=self.disk_offering.id + ) + # Check List Volume response for newly created volume + list_volume_response = list_volumes( + self.apiclient, + id=volume.id + ) + self.assertNotEqual( + list_volume_response, + None, + "Check if volume exists in ListVolumes" + ) + # Attach volume to VM + with self.assertRaises(Exception): + self.virtual_machine.attach_volume( + self.apiclient, + volume + ) + return + + def tearDown(self): + #Clean up, terminate the created volumes + cleanup_resources(self.apiclient, self.cleanup) + return + + @classmethod + def tearDownClass(cls): + try: + cls.api_client = fetch_api_client() + cleanup_resources(cls.api_client, cls._cleanup) + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + + +class TestAttachDetachVolume(cloudstackTestCase): + + @classmethod + def setUpClass(cls): + cls.api_client = fetch_api_client() + cls.services = Services().services + + # Get Zone, Domain and templates + cls.zone = get_zone(cls.api_client, cls.services) + cls.disk_offering = DiskOffering.create( + cls.api_client, + cls.services["disk_offering"] + ) + template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostypeid"] + ) + cls.services["zoneid"] = cls.zone.id + cls.services["virtual_machine"]["zoneid"] = cls.zone.id + cls.services["virtual_machine"]["template"] = template.id + + # Create VMs, NAT Rules etc + cls.account = Account.create( + cls.api_client, + cls.services["account"] + ) + + cls.services["account"] = cls.account.account.name + cls.service_offering = ServiceOffering.create( + cls.api_client, + cls.services["service_offering"] + ) + cls.virtual_machine = VirtualMachine.create( + cls.api_client, + cls.services["virtual_machine"], + accountid=cls.account.account.name, + serviceofferingid=cls.service_offering.id, + ) + cls._cleanup = [ + cls.service_offering, + cls.disk_offering, + cls.account + ] + + def setUp(self): + + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.cleanup = [] + + def tearDown(self): + #Clean up, terminate the created volumes + cleanup_resources(self.apiclient, self.cleanup) + return + + @classmethod + def tearDownClass(cls): + try: + cls.api_client = fetch_api_client() + cleanup_resources(cls.api_client, cls._cleanup) + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + + def test_01_volume_attach_detach(self): + """Test Volume attach/detach to VM (5 data volumes) + """ + + # Validate the following + # 1. Deploy a vm and create 5 data disk + # 2. Attach all the created Volume to the vm. + # 3. Detach all the volumes attached. + # 4. Reboot the VM. VM should be successfully rebooted + # 5. Stop the VM. Stop VM should be successful + # 6. Start The VM. Start VM should be successful + + volumes = [] + # Create 5 volumes and attach to VM + for i in range(self.services["volume"]["max"]): + volume = Volume.create( + self.apiclient, + self.services["volume"], + zoneid=self.zone.id, + account=self.account.account.name, + diskofferingid=self.disk_offering.id + ) + self.cleanup.append(volume) + volumes.append(volume) + + # Check List Volume response for newly created volume + list_volume_response = list_volumes( + self.apiclient, + id=volume.id + ) + self.assertNotEqual( + list_volume_response, + None, + "Check if volume exists in ListVolumes" + ) + # Attach volume to VM + self.virtual_machine.attach_volume( + self.apiclient, + volume + ) + + # Check all volumes attached to same VM + list_volume_response = list_volumes( + self.apiclient, + virtualmachineid=self.virtual_machine.id, + type='DATADISK' + ) + self.assertNotEqual( + list_volume_response, + None, + "Check if volume exists in ListVolumes" + ) + self.assertEqual( + len(list_volume_response), + self.services["volume"]["max"], + "Check number of data volumes attached to VM" + ) + + # Detach all volumes from VM + for volume in volumes: + self.virtual_machine.detach_volume( + self.apiclient, + volume + ) + # Reboot VM + self.virtual_machine.reboot(self.apiclient) + # Sleep to ensure that VM is in ready state + time.sleep(self.services["sleep"]) + + vm_response = list_virtual_machines( + self.apiclient, + id=self.virtual_machine.id, + ) + #Verify VM response to check whether VM deployment was successful + self.assertNotEqual( + len(vm_response), + 0, + "Check VMs available in List VMs response" + ) + vm = vm_response[0] + self.assertEqual( + vm.state, + 'Running', + "Check the state of VM" + ) + + # Stop VM + self.virtual_machine.stop(self.apiclient) + # Sleep to ensure that VM is in ready state + time.sleep(self.services["sleep"]) + + vm_response = list_virtual_machines( + self.apiclient, + id=self.virtual_machine.id, + ) + #Verify VM response to check whether VM deployment was successful + self.assertNotEqual( + len(vm_response), + 0, + "Check VMs available in List VMs response" + ) + vm = vm_response[0] + self.assertEqual( + vm.state, + 'Stopped', + "Check the state of VM" + ) + + # Start VM + self.virtual_machine.start(self.apiclient) + # Sleep to ensure that VM is in ready state + time.sleep(self.services["sleep"]) + + vm_response = list_virtual_machines( + self.apiclient, + id=self.virtual_machine.id, + ) + #Verify VM response to check whether VM deployment was successful + self.assertNotEqual( + len(vm_response), + 0, + "Check VMs available in List VMs response" + ) + vm = vm_response[0] + self.assertEqual( + vm.state, + 'Running', + "Check the state of VM" + ) + return + + +class TestAttachVolumeISO(cloudstackTestCase): + + @classmethod + def setUpClass(cls): + cls.api_client = fetch_api_client() + cls.services = Services().services + + # Get Zone, Domain and templates + cls.zone = get_zone(cls.api_client, cls.services) + cls.disk_offering = DiskOffering.create( + cls.api_client, + cls.services["disk_offering"] + ) + template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostypeid"] + ) + cls.services["zoneid"] = cls.zone.id + cls.services["virtual_machine"]["zoneid"] = cls.zone.id + cls.services["iso"]["zoneid"] = cls.zone.id + cls.services["virtual_machine"]["template"] = template.id + + # Create VMs, NAT Rules etc + cls.account = Account.create( + cls.api_client, + cls.services["account"] + ) + + cls.services["account"] = cls.account.account.name + cls.service_offering = ServiceOffering.create( + cls.api_client, + cls.services["service_offering"] + ) + cls.virtual_machine = VirtualMachine.create( + cls.api_client, + cls.services["virtual_machine"], + accountid=cls.account.account.name, + serviceofferingid=cls.service_offering.id, + ) + cls._cleanup = [ + cls.service_offering, + cls.disk_offering, + cls.account + ] + + @classmethod + def tearDownClass(cls): + try: + cleanup_resources(cls.api_client, cls._cleanup) + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + + def setUp(self): + + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.cleanup = [] + + def tearDown(self): + try: + #Clean up, terminate the created instance, volumes and snapshots + cleanup_resources(self.apiclient, self.cleanup) + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return + + def test_01_volume_iso_attach(self): + """Test Volumes and ISO attach + """ + + # Validate the following + # 1. Create and attach 5 data volumes to VM + # 2. Create an ISO. Attach it to VM instance + # 3. Verify that attach ISO is successful + + # Create 5 volumes and attach to VM + for i in range(self.services["volume"]["max"]): + volume = Volume.create( + self.apiclient, + self.services["volume"], + zoneid=self.zone.id, + account=self.account.account.name, + diskofferingid=self.disk_offering.id + ) + # Check List Volume response for newly created volume + list_volume_response = list_volumes( + self.apiclient, + id=volume.id + ) + self.assertNotEqual( + list_volume_response, + None, + "Check if volume exists in ListVolumes" + ) + # Attach volume to VM + self.virtual_machine.attach_volume( + self.apiclient, + volume + ) + + # Check all volumes attached to same VM + list_volume_response = list_volumes( + self.apiclient, + virtualmachineid=self.virtual_machine.id, + type='DATADISK' + ) + self.assertNotEqual( + list_volume_response, + None, + "Check if volume exists in ListVolumes" + ) + self.assertEqual( + len(list_volume_response), + self.services["volume"]["max"], + "Check number of data volumes attached to VM" + ) + # Create an ISO and attach it to VM + iso = Iso.create( + self.apiclient, + self.services["iso"], + account=self.account.account.name, + domainid=self.account.account.domainid, + ) + self.cleanup.append(iso) + iso.download(self.apiclient) + + #Attach ISO to virtual machine + cmd = attachIso.attachIsoCmd() + cmd.id = iso.id + cmd.virtualmachineid = self.virtual_machine.id + self.apiclient.attachIso(cmd) + + # Verify ISO is attached to VM + vm_response = list_virtual_machines( + self.apiclient, + id=self.virtual_machine.id, + ) + #Verify VM response to check whether VM deployment was successful + self.assertNotEqual( + len(vm_response), + 0, + "Check VMs available in List VMs response" + ) + vm = vm_response[0] + self.assertEqual( + vm.isoid, + iso.id, + "Check ISO is attached to VM or not" + ) + return + + +class TestVolumes(cloudstackTestCase): + + @classmethod + def setUpClass(cls): + cls.api_client = fetch_api_client() + cls.services = Services().services + # Get Zone, Domain and templates + cls.zone = get_zone(cls.api_client, cls.services) + cls.disk_offering = DiskOffering.create( + cls.api_client, + cls.services["disk_offering"] + ) + template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostypeid"] + ) + cls.services["zoneid"] = cls.zone.id + cls.services["virtual_machine"]["zoneid"] = cls.zone.id + cls.services["virtual_machine"]["template"] = template.id + cls.services["virtual_machine"]["diskofferingid"] = cls.disk_offering.id + + # Create VMs, VMs etc + cls.account = Account.create( + cls.api_client, + cls.services["account"] + ) + + cls.services["account"] = cls.account.account.name + cls.service_offering = ServiceOffering.create( + cls.api_client, + cls.services["service_offering"] + ) + cls.virtual_machine = VirtualMachine.create( + cls.api_client, + cls.services["virtual_machine"], + accountid=cls.account.account.name, + serviceofferingid=cls.service_offering.id, + ) + + cls.volume = Volume.create( + cls.api_client, + cls.services["volume"], + zoneid=cls.zone.id, + account=cls.account.account.name, + diskofferingid=cls.disk_offering.id + ) + cls._cleanup = [ + cls.service_offering, + cls.disk_offering, + cls.account + ] + + @classmethod + def tearDownClass(cls): + try: + cleanup_resources(cls.api_client, cls._cleanup) + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + + def setUp(self): + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.cleanup = [] + + def tearDown(self): + #Clean up, terminate the created volumes + cleanup_resources(self.apiclient, self.cleanup) + return + + def test_01_attach_volume(self): + """Attach a created Volume to a Running VM + """ + # Validate the following + # 1. Create a data volume. + # 2. List Volumes should not have vmname and virtualmachineid fields in + # response before volume attach (to VM) + # 3. Attch volume to VM. Attach volume should be successful. + # 4. List Volumes should have vmname and virtualmachineid fields in + # response before volume attach (to VM) + + # Check the list volumes response for vmname and virtualmachineid + list_volume_response = list_volumes( + self.apiclient, + id=self.volume.id + ) + self.assertNotEqual( + list_volume_response, + None, + "Check if volume exists in ListVolumes" + ) + + volume = list_volume_response[0] + + self.assertEqual( + volume.type, + 'DATADISK', + "Check volume type from list volume response" + ) + + self.assertEqual( + hasattr(volume, 'vmname'), + True, + "Check whether volume has vmname field" + ) + self.assertEqual( + hasattr(volume, 'virtualmachineid'), + True, + "Check whether volume has virtualmachineid field" + ) + + # Attach ISO to VM + self.virtual_machine.attach_volume(self.apiclient, self.volume) + + # Check all volumes attached to same VM + list_volume_response = list_volumes( + self.apiclient, + virtualmachineid=self.virtual_machine.id, + type='DATADISK' + ) + self.assertNotEqual( + list_volume_response, + None, + "Check if volume exists in ListVolumes" + ) + volume = list_volume_response[0] + self.assertEqual( + volume.vmname, + self.virtual_machine.name, + "Check virtual machine name in list volumes response" + ) + self.assertEqual( + volume.virtualmachineid, + self.virtual_machine.id, + "Check VM ID in list Volume response" + ) + return + + def test_02_detach_volume(self): + """Detach a Volume attached to a VM + """ + + # Validate the following + # 1. Data disk should be detached from instance + # 2. Listvolumes should not have vmname and virtualmachineid fields for + # that volume. + + self.virtual_machine.detach_volume(self.apiclient, self.volume) + + #Sleep to ensure the current state will reflected in other calls + time.sleep(self.services["sleep"]) + + list_volume_response = list_volumes( + self.apiclient, + id=self.volume.id + ) + self.assertNotEqual( + list_volume_response, + None, + "Check if volume exists in ListVolumes" + ) + volume = list_volume_response[0] + self.assertEqual( + volume.virtualmachineid, + None, + "Check if volume state (detached) is reflected" + ) + + self.assertEqual( + volume.vmname, + None, + "Check if volume state (detached) is reflected" + ) + return + + def test_03_delete_detached_volume(self): + """Delete a Volume unattached to an VM + """ + # Validate the following + # 1. volume should be deleted successfully and listVolume should not + # contain the deleted volume details. + + cmd = deleteVolume.deleteVolumeCmd() + cmd.id = self.volume.id + self.apiclient.deleteVolume(cmd) + + #Sleep to ensure the current state will reflected in other calls + time.sleep(self.services["sleep"]) + + list_volume_response = list_volumes( + self.apiclient, + id=self.volume.id, + type='DATADISK' + ) + self.assertEqual( + list_volume_response, + None, + "Check if volume exists in ListVolumes" + ) + return