From 4c447d92bc85d5d1c606659293655900feb88904 Mon Sep 17 00:00:00 2001 From: Chirag Jog Date: Wed, 25 Jan 2012 07:57:05 -0800 Subject: [PATCH] 1. Remove complete dependency from settings/environment 2. Add account creation/deletion to all relevant tests. --- tools/testClient/testcase/BVT-tests/base.py | 11 +- .../testcase/BVT-tests/test_hosts.py | 104 ++- .../testClient/testcase/BVT-tests/test_iso.py | 243 ++--- .../testcase/BVT-tests/test_network.py | 865 ++++++++++++------ .../BVT-tests/test_primary_storage.py | 175 ++-- .../testcase/BVT-tests/test_routers.py | 122 ++- .../BVT-tests/test_secondary_storage.py | 92 +- .../testcase/BVT-tests/test_snapshots.py | 398 +++++--- .../testcase/BVT-tests/test_ssvm.py | 135 +-- .../testcase/BVT-tests/test_templates.py | 423 ++++----- .../testcase/BVT-tests/test_vm_life_cycle.py | 20 +- .../testcase/BVT-tests/test_volumes.py | 306 +++++-- 12 files changed, 1730 insertions(+), 1164 deletions(-) diff --git a/tools/testClient/testcase/BVT-tests/base.py b/tools/testClient/testcase/BVT-tests/base.py index 3ec76796850..28f7b1c590d 100644 --- a/tools/testClient/testcase/BVT-tests/base.py +++ b/tools/testClient/testcase/BVT-tests/base.py @@ -284,7 +284,7 @@ class Iso: if "ispublic" in services: cmd.ispublic = services["ispublic"] - return Iso(apiclient.createTemplate(cmd)[0].__dict__) + return Iso(apiclient.registerIso(cmd)[0].__dict__) def delete(self, apiclient): cmd = deleteIso.deleteIsoCmd() @@ -608,7 +608,14 @@ def get_zone(apiclient): "Returns a default zone" cmd = listZones.listZonesCmd() - return apiclient.listZones(cmd)[1] + return apiclient.listZones(cmd)[0] + +def get_pod(apiclient, zoneid): + "Returns a default pod for specified zone" + + cmd = listPods.listPodsCmd() + cmd.zoneid = zoneid + return apiclient.listPods(cmd)[0] def get_template(apiclient, zoneid, ostypeid=12): "Returns a template" diff --git a/tools/testClient/testcase/BVT-tests/test_hosts.py b/tools/testClient/testcase/BVT-tests/test_hosts.py index 70a4bd19efd..1ab83017112 100644 --- a/tools/testClient/testcase/BVT-tests/test_hosts.py +++ b/tools/testClient/testcase/BVT-tests/test_hosts.py @@ -22,58 +22,60 @@ class Services: "clusters": { 0: { "clustername": "Xen Cluster", - "clustertype": "ExternalManaged", # CloudManaged or ExternalManaged" - "hypervisor": "XenServer", # Hypervisor type - "zoneid": 3, - "podid": 3, + "clustertype": "ExternalManaged", + # CloudManaged or ExternalManaged" + "hypervisor": "XenServer", + # Hypervisor type }, 1: { "clustername": "KVM Cluster", - "clustertype": "CloudManaged", # CloudManaged or ExternalManaged" - "hypervisor": "KVM", # Hypervisor type - "zoneid": 3, - "podid": 3, + "clustertype": "CloudManaged", + # CloudManaged or ExternalManaged" + "hypervisor": "KVM", + # Hypervisor type }, 2: { - "hypervisor": 'VMware', # Hypervisor type - "clustertype": 'ExternalManaged', # CloudManaged or ExternalManaged" - "zoneid": 3, - "podid": 3, + "hypervisor": 'VMware', + # Hypervisor type + "clustertype": 'ExternalManaged', + # CloudManaged or ExternalManaged" "username": 'administrator', "password": 'fr3sca', "url": 'http://192.168.100.17/CloudStack-Clogeny-Pune/Pune-1', - # Format: http:// vCenter Host / Datacenter / Cluster + # Format:http://vCenter Host/Datacenter/Cluster "clustername": '192.168.100.17/CloudStack-Clogeny-Pune/Pune-1', - # Format: http:// IP_Address / Datacenter / Cluster + # Format: http://IP_Address/Datacenter/Cluster }, }, "hosts": { - "xenserver": { #Must be name of corresponding Hypervisor type in cluster in small letters - "zoneid": 3, - "podid": 3, + "xenserver": { + # Must be name of corresponding Hypervisor type + # in cluster in small letters "clusterid": 16, - "hypervisor": 'XenServer', # Hypervisor type - "clustertype": 'ExternalManaged', # CloudManaged or ExternalManaged" + "hypervisor": 'XenServer', + # Hypervisor type + "clustertype": 'ExternalManaged', + # CloudManaged or ExternalManaged" "url": 'http://192.168.100.210', "username": "administrator", "password": "fr3sca", }, "kvm": { - "zoneid": 3, - "podid": 3, "clusterid": 35, - "hypervisor": 'KVM', # Hypervisor type - "clustertype": 'CloudManaged', # CloudManaged or ExternalManaged" + "hypervisor": 'KVM', + # Hypervisor type + "clustertype": 'CloudManaged', + # CloudManaged or ExternalManaged" "url": 'http://192.168.100.212', "username": "root", "password": "fr3sca", }, "vmware": { - "zoneid": 3, - "podid": 3, "clusterid": 16, - "hypervisor": 'VMware', # Hypervisor type - "clustertype": 'ExternalManaged', # CloudManaged or ExternalManaged" + "hypervisor": 'VMware', + # Hypervisor type + "clustertype": 'ExternalManaged', + # CloudManaged or ExternalManaged" "url": 'http://192.168.100.203', "username": "administrator", "password": "fr3sca", @@ -89,6 +91,27 @@ class TestHosts(cloudstackTestCase): self.dbclient = self.testClient.getDbConnection() self.services = Services().services self.cleanup = [] + + # Get Zone and pod + self.zone = get_zone(self.apiclient) + self.pod = get_pod(self.apiclient, self.zone.id) + + self.services["clusters"][0]["zoneid"] = self.zone.id + self.services["clusters"][1]["zoneid"] = self.zone.id + self.services["clusters"][2]["zoneid"] = self.zone.id + + self.services["clusters"][0]["podid"] = self.pod.id + self.services["clusters"][1]["podid"] = self.pod.id + self.services["clusters"][2]["podid"] = self.pod.id + + self.services["hosts"]["xenserver"]["zoneid"] = self.zone.id + self.services["hosts"]["kvm"]["zoneid"] = self.zone.id + self.services["hosts"]["vmware"]["zoneid"] = self.zone.id + + self.services["hosts"]["xenserver"]["podid"] = self.pod.id + self.services["hosts"]["kvm"]["podid"] = self.pod.id + self.services["hosts"]["vmware"]["podid"] = self.pod.id + return def tearDown(self): @@ -108,22 +131,23 @@ class TestHosts(cloudstackTestCase): # Validate the following: # 1. Verify hypervisortype returned by API is Xen/KVM/VWare # 2. Verify that the cluster is in 'Enabled' allocation state - # 3. Verify that the host is added successfully and in Up state with listHosts API response + # 3. Verify that the host is added successfully and in Up state + # with listHosts API response #Create clusters with Hypervisor type XEN/KVM/VWare for k, v in self.services["clusters"].items(): cluster = Cluster.create(self.apiclient, v) self.assertEqual( - cluster.hypervisortype, - v["hypervisor"], - "Check hypervisor type of created cluster is " + v["hypervisor"] + " or not" - ) + cluster.hypervisortype, + v["hypervisor"], + "Check hypervisor type is " + v["hypervisor"] + " or not" + ) self.assertEqual( - cluster.allocationstate, - 'Enabled', - "Check whether allocation state of cluster is enabled" - ) + cluster.allocationstate, + 'Enabled', + "Check whether allocation state of cluster is enabled" + ) #If host is externally managed host is already added with cluster cmd = listHosts.listHostsCmd() @@ -177,8 +201,8 @@ class TestHosts(cloudstackTestCase): "Check cluster ID with list clusters response" ) self.assertEqual( - cluster_response.hypervisortype, - cluster.hypervisortype, - "Check hypervisor type with list clusters response is " + v["hypervisor"] + " or not" - ) + cluster_response.hypervisortype, + cluster.hypervisortype, + "Check hypervisor type with is " + v["hypervisor"] + " or not" + ) return diff --git a/tools/testClient/testcase/BVT-tests/test_iso.py b/tools/testClient/testcase/BVT-tests/test_iso.py index 0e5890596bc..c47b7e9038f 100644 --- a/tools/testClient/testcase/BVT-tests/test_iso.py +++ b/tools/testClient/testcase/BVT-tests/test_iso.py @@ -14,50 +14,55 @@ from random import random #Import System modules import time + class Services: """Test ISO Services """ def __init__(self): self.services = { - "iso_1": - { - "displaytext": "Test ISO type 1", - "name": "testISOType_1", - "url": "http://iso.linuxquestions.org/download/504/1819/http/gd4.tuwien.ac.at/dsl-4.4.10.iso", - # Source URL where ISO is located - "zoneid": 1, - "isextractable": True, - "isfeatured": True, - "ispublic": True, - "ostypeid": 12, - }, - "iso_2": - { - "displaytext": "Test ISO type 2", - "name": "testISOType_2", - "url": "http://iso.linuxquestions.org/download/504/1819/http/gd4.tuwien.ac.at/dsl-4.4.10.iso", - # Source URL where ISO is located - "zoneid": 1, - "isextractable": True, - "isfeatured": True, - "ispublic": True, - "ostypeid": 12, - "mode": 'HTTP_DOWNLOAD', - # Used in Extract template, value must be HTTP_DOWNLOAD - "ostypeid": 12, - }, - "destzoneid": 2, # Copy ISO from one zone to another (Destination Zone) - "sourcezoneid": 1, # Copy ISO from one zone to another (Source Zone) - "isfeatured": True, - "ispublic": True, - "isextractable": True, - "bootable": True, # For edit template - "passwordenabled": True, - "ostypeid": 15, - "account": 'bhavin333', # Normal user, no admin rights - "domainid": 1, - } + "account": { + "email": "test@test.com", + "firstname": "Test", + "lastname": "User", + "username": "test", + "password": "password", + }, + "iso_1": + { + "displaytext": "Test ISO type 1", + "name": "testISOType_1", + "url": "http://iso.linuxquestions.org/download/504/1819/http/gd4.tuwien.ac.at/dsl-4.4.10.iso", + # Source URL where ISO is located + "isextractable": True, + "isfeatured": True, + "ispublic": True, + "ostypeid": 12, + }, + "iso_2": + { + "displaytext": "Test ISO type 2", + "name": "testISOType_2", + "url": "http://iso.linuxquestions.org/download/504/1819/http/gd4.tuwien.ac.at/dsl-4.4.10.iso", + # Source URL where ISO is located + "isextractable": True, + "isfeatured": True, + "ispublic": True, + "ostypeid": 12, + "mode": 'HTTP_DOWNLOAD', + # Used in Extract template, value must be HTTP_DOWNLOAD + }, + "destzoneid": 2, + # Copy ISO from one zone to another (Destination Zone) + "isfeatured": True, + "ispublic": True, + "isextractable": True, + "bootable": True, # For edit template + "passwordenabled": True, + "ostypeid": 12, + "domainid": 1, + } + class TestCreateIso(cloudstackTestCase): @@ -65,6 +70,9 @@ class TestCreateIso(cloudstackTestCase): self.services = Services().services self.apiclient = self.testClient.getApiClient() self.dbclient = self.testClient.getDbConnection() + # Get Zone, Domain and templates + self.zone = get_zone(self.apiclient) + self.services["iso_2"]["zoneid"] = self.zone.id self.cleanup = [] return @@ -85,7 +93,8 @@ class TestCreateIso(cloudstackTestCase): """ # Validate the following: - # 1. database (vm_template table) should be updated with newly created ISO + # 1. database (vm_template table) should be + # updated with newly created ISO # 2. UI should show the newly added ISO # 3. listIsos API should show the newly added ISO @@ -120,55 +129,40 @@ class TestCreateIso(cloudstackTestCase): self.services["iso_2"]["zoneid"], "Check zone ID of newly created ISO" ) - - #Verify the database entry for ISO - self.debug( - "select name, display_text from vm_template where id = %s and format='ISO';" - % iso.id - ) - qresultset = self.dbclient.execute( - "select name, display_text from vm_template where id = %s and format='ISO';" - % iso.id - ) - - self.assertNotEqual( - len(qresultset), - 0, - "Check DB Query result set" - ) - - qresult = qresultset[0] - - self.assertEqual( - qresult[0], - self.services["iso_2"]["name"], - "Compare ISO name with database record" - ) - - self.assertEqual( - qresult[1], - self.services["iso_2"]["displaytext"], - "Compare ISO display text with database record" - ) return + class TestISO(cloudstackTestCase): @classmethod def setUpClass(cls): cls.services = Services().services cls.api_client = fetch_api_client() + + # Get Zone, Domain and templates + cls.zone = get_zone(cls.api_client) + cls.services["iso_1"]["zoneid"] = cls.zone.id + cls.services["iso_2"]["zoneid"] = cls.zone.id + cls.services["sourcezoneid"] = cls.zone.id + #Create an account, network, VM and IP addresses + cls.account = Account.create( + cls.api_client, + cls.services["account"], + ) + cls.services["account"] = cls.account.account.name cls.iso_1 = Iso.create(cls.api_client, cls.services["iso_1"]) cls.iso_1.download(cls.api_client) cls.iso_2 = Iso.create(cls.api_client, cls.services["iso_2"]) cls.iso_2.download(cls.api_client) + cls._cleanup = [cls.iso_2, cls.account] return @classmethod def tearDownClass(cls): try: cls.api_client = fetch_api_client() - cls.iso_2.delete(cls.api_client) + #Clean up, terminate the created templates + cleanup_resources(cls.api_client, cls.cleanup) except Exception as e: raise Exception("Warning: Exception during cleanup : %s" % e) @@ -247,47 +241,6 @@ class TestISO(cloudstackTestCase): self.services["ostypeid"], "Check OSTypeID of updated ISO" ) - - #Verify database entry for updateIso - self.debug( - "select name, display_text, bootable, guest_os_id from vm_template where id = %s and format='ISO';" - % self.iso_1.id - ) - qresultset = self.dbclient.execute( - "select name, display_text, bootable, guest_os_id from vm_template where id = %s and format='ISO';" - % self.iso_1.id - ) - - self.assertNotEqual( - len(qresultset), - 0, - "Check DB Query result set" - ) - - qresult = qresultset[0] - - self.assertEqual( - qresult[0], - new_name, - "Compare ISO name with database record" - ) - - self.assertEqual( - qresult[1], - new_displayText, - "Compare ISO display text with database record" - ) - self.assertEqual( - qresult[2], - int(self.services["bootable"]), - "Compare template enable_password field with database record" - ) - - self.assertEqual( - qresult[3], - self.services["ostypeid"], - "Compare template guest OS ID with database record" - ) return def test_03_delete_iso(self): @@ -305,23 +258,11 @@ class TestISO(cloudstackTestCase): cmd.id = self.iso_1.id list_iso_response = self.apiclient.listIsos(cmd) - self.assertEqual(list_iso_response, None, "Check if ISO exists in ListIsos") - - #Verify whether database entry is deleted or not - self.debug( - "select name, display_text from vm_template where id = %s and format='ISO';" - % self.iso_1.id - ) - qresultset = self.dbclient.execute( - "select name, display_text from vm_template where id = %s and format='ISO';" - % self.iso_1.id - ) - self.assertEqual( - len(qresultset), - 1, - "Check DB Query result set" - ) + list_iso_response, + None, + "Check if ISO exists in ListIsos" + ) return def test_04_extract_Iso(self): @@ -329,7 +270,8 @@ class TestISO(cloudstackTestCase): # Validate the following # 1. Admin should able extract and download the ISO - # 2. ListIsos should display all the public templates for all kind of users + # 2. ListIsos should display all the public templates + # for all kind of users # 3 .ListIsos should not display the system templates cmd = extractIso.extractIsoCmd() @@ -370,7 +312,8 @@ class TestISO(cloudstackTestCase): # validate the following # 1. listIsos returns valid permissions set for ISO - # 2. permission changes should be reflected in vm_template table in database + # 2. permission changes should be reflected in vm_template + # table in database cmd = updateIsoPermissions.updateIsoPermissionsCmd() cmd.id = self.iso_2.id @@ -383,8 +326,8 @@ class TestISO(cloudstackTestCase): #Verify ListIsos have updated permissions for the ISO for normal user cmd = listIsos.listIsosCmd() cmd.id = self.iso_2.id - cmd.account = self.services["account"] - cmd.domainid = self.services["domainid"] + cmd.account = self.account.account.name + cmd.domainid = self.account.account.domainid list_iso_response = self.apiclient.listIsos(cmd) iso_response = list_iso_response[0] @@ -405,48 +348,14 @@ class TestISO(cloudstackTestCase): self.services["isfeatured"], "Check isfeatured permission of ISO" ) - - #Verify database entry for updated ISO permissions - self.debug( - "select public, featured, extractable from vm_template where id = %s and format='ISO';" - % self.iso_2.id - ) - qresultset = self.dbclient.execute( - "select public, featured, extractable from vm_template where id = %s and format='ISO';" - % self.iso_2.id - ) - - self.assertNotEqual( - len(qresultset), - 0, - "Check DB Query result set" - ) - - qresult = qresultset[0] - - self.assertEqual( - qresult[0], - int(self.services["ispublic"]), - "Compare ispublic permission with database record" - ) - - self.assertEqual( - qresult[1], - int(self.services["isfeatured"]), - "Compare isfeatured permission with database record" - ) - self.assertEqual( - qresult[2], - int(self.services["isextractable"]), - "Compare extractable permission with database record" - ) return def test_06_copy_iso(self): """Test for copy ISO from one zone to another""" #Validate the following - #1. copy ISO should be successful and secondary storage should contain new copied ISO. + #1. copy ISO should be successful and secondary storage + # should contain new copied ISO. cmd = copyIso.copyIsoCmd() cmd.id = self.iso_2.id diff --git a/tools/testClient/testcase/BVT-tests/test_network.py b/tools/testClient/testcase/BVT-tests/test_network.py index 8eeaaa4abf0..9e9b888b729 100644 --- a/tools/testClient/testcase/BVT-tests/test_network.py +++ b/tools/testClient/testcase/BVT-tests/test_network.py @@ -14,22 +14,28 @@ from base import * #Import System modules import time + class Services: """Test Network Services """ def __init__(self): self.services = { - "admin_account": "admin", - "user_account": "testuser", - "zoneid": 1, - "domainid": 1, + "ostypeid": 12, + # Cent OS 5.3 (64 bit) + "network": { + "name": "Test Network", + "displaytext": "Test Network", + "networkoffering": 6, + }, "service_offering": { - "name": "Tiny Service Offering", - "displaytext": "Tiny service offering", + "name": "Tiny Instance", + "displaytext": "Tiny Instance", "cpunumber": 1, - "cpuspeed": 100, # in MHz - "memory": 64, # In MBs + "cpuspeed": 100, + # in MHz + "memory": 64, + # In MBs }, "account": { "email": "test@test.com", @@ -37,17 +43,13 @@ class Services: "lastname": "User", "username": "test", "password": "password", - "zoneid": 1, }, "server": { - "template": 7, # Template used for VM creation - "zoneid": 3, - "displayname": "testserver", + "displayname": "Small Instance", "username": "root", - "password": "fr3sca", - "hypervisor": 'VMWare', - "account": 'testuser', + "password": "password", + "hypervisor": 'XenServer', "domainid": 1, "privateport": 22, "publicport": 22, @@ -63,67 +65,85 @@ class Services: "lbrule": { "name": "SSH", - "alg": "roundrobin", # Algorithm used for load balancing + "alg": "roundrobin", + # Algorithm used for load balancing "privateport": 80, "publicport": 80, } } + class TestPublicIP(cloudstackTestCase): def setUp(self): self.apiclient = self.testClient.getApiClient() self.services = Services().services - def test_public_ip_admin_account(self): - """Test for Associate/Disassociate public IP address for admin account""" + @classmethod + def setUpClass(cls): + cls.api_client = fetch_api_client() + cls.services = Services().services + # Get Zone, Domain and templates + cls.zone = get_zone(cls.api_client) - # Validate the following: - # 1. listPubliIpAddresses API returns the list of acquired addresses - # 2. the returned list should contain our acquired IP address + # Create Accounts & networks + cls.account = Account.create( + cls.api_client, + cls.services["account"], + admin=True + ) - ip_address = PublicIPAddress.create( - self.apiclient, - self.services["admin_account"], - self.services["zoneid"], - self.services["domainid"] + cls.user = Account.create( + cls.api_client, + cls.services["account"], + ) + cls.services["network"]["zoneid"] = cls.zone.id + cls.account_network = Network.create( + cls.api_client, + cls.services["network"], + cls.account.account.name, + cls.account.account.domainid + ) + cls.user_network = Network.create( + cls.api_client, + cls.services["network"], + cls.user.account.name, + cls.user.account.domainid + ) + + # Create Source NAT IP addresses + account_src_nat_ip = PublicIPAddress.create( + cls.api_client, + cls.account.account.name, + cls.zone.id, + cls.account.account.domainid ) - cmd = listPublicIpAddresses.listPublicIpAddressesCmd() - cmd.id = ip_address.ipaddress.id - - list_pub_ip_addr_resp = self.apiclient.listPublicIpAddresses(cmd) - - #listPublicIpAddresses should return newly created public IP - self.assertNotEqual( - len(list_pub_ip_addr_resp), - 0, - "Check if new IP Address is associated" - ) - self.assertEqual( - list_pub_ip_addr_resp[0].id, - ip_address.ipaddress.id, - "Check Correct IP Address is returned in the List Cacls" - ) - - ip_address.delete(self.apiclient) - - # Validate the following: - #1. listPublicIpAddresses API should no more return the released address - - cmd = listPublicIpAddresses.listPublicIpAddressesCmd() - cmd.id = ip_address.ipaddress.id - list_pub_ip_addr_resp = self.apiclient.listPublicIpAddresses(cmd) - - self.assertEqual( - list_pub_ip_addr_resp, - None, - "Check if disassociated IP Address is no longer available" - ) + user_src_nat_ip = PublicIPAddress.create( + cls.api_client, + cls.user.account.name, + cls.zone.id, + cls.user.account.domainid + ) + cls._cleanup = [ + cls.account_network, + cls.user_network, + cls.account, + cls.user, + ] return + @classmethod + def tearDownClass(cls): + try: + #Cleanup resources used + cleanup_resources(cls.api_client, cls._cleanup) + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + return - def test_public_ip_user_account(self): - """Test for Associate/Disassociate public IP address for user account""" + def test_public_ip_admin_account(self): + """Test for Associate/Disassociate + public IP address for admin account""" # Validate the following: # 1. listPubliIpAddresses API returns the list of acquired addresses @@ -131,9 +151,56 @@ class TestPublicIP(cloudstackTestCase): ip_address = PublicIPAddress.create( self.apiclient, - self.services["user_account"], - self.services["zoneid"], - self.services["domainid"] + self.account.account.name, + self.zone.id, + self.account.account.domainid + ) + cmd = listPublicIpAddresses.listPublicIpAddressesCmd() + cmd.id = ip_address.ipaddress.id + + list_pub_ip_addr_resp = self.apiclient.listPublicIpAddresses(cmd) + + #listPublicIpAddresses should return newly created public IP + self.assertNotEqual( + len(list_pub_ip_addr_resp), + 0, + "Check if new IP Address is associated" + ) + self.assertEqual( + list_pub_ip_addr_resp[0].id, + ip_address.ipaddress.id, + "Check Correct IP Address is returned in the List Cacls" + ) + + ip_address.delete(self.apiclient) + + # Validate the following: + # 1.listPublicIpAddresses should no more return the released address + + cmd = listPublicIpAddresses.listPublicIpAddressesCmd() + cmd.id = ip_address.ipaddress.id + list_pub_ip_addr_resp = self.apiclient.listPublicIpAddresses(cmd) + + self.assertEqual( + list_pub_ip_addr_resp, + None, + "Check if disassociated IP Address is no longer available" + ) + return + + def test_public_ip_user_account(self): + """Test for Associate/Disassociate + public IP address for user account""" + + # Validate the following: + # 1. listPubliIpAddresses API returns the list of acquired addresses + # 2. the returned list should contain our acquired IP address + + ip_address = PublicIPAddress.create( + self.apiclient, + self.user.account.name, + self.zone.id, + self.user.account.domainid ) cmd = listPublicIpAddresses.listPublicIpAddressesCmd() cmd.id = ip_address.ipaddress.id @@ -147,9 +214,9 @@ class TestPublicIP(cloudstackTestCase): "Check if new IP Address is associated" ) self.assertEqual( - list_pub_ip_addr_resp[0].id, - ip_address.ipaddress.id, - "Check Correct IP Address is returned in the List Call" + list_pub_ip_addr_resp[0].id, + ip_address.ipaddress.id, + "Check Correct IP Address is returned in the List Call" ) ip_address.delete(self.apiclient) @@ -159,10 +226,10 @@ class TestPublicIP(cloudstackTestCase): list_pub_ip_addr_resp = self.apiclient.listPublicIpAddresses(cmd) self.assertEqual( - list_pub_ip_addr_resp, - None, - "Check if disassociated IP Address is no longer available" - ) + list_pub_ip_addr_resp, + None, + "Check if disassociated IP Address is no longer available" + ) return @@ -173,17 +240,37 @@ class TestPortForwarding(cloudstackTestCase): cls.api_client = fetch_api_client() cls.services = Services().services - #Create an account, network, VM and IP addresses - cls.account = Account.create(cls.api_client, cls.services["account"], admin = True) - cls.service_offering = ServiceOffering.create(cls.api_client, cls.services["service_offering"]) - cls.virtual_machine = VirtualMachine.create( - cls.api_client, - cls.services["server"], - accountid = cls.account.account.name, - serviceofferingid = cls.service_offering.id - ) - cls._cleanup = [cls.virtual_machine, cls.account, cls.service_offering] + # Get Zone, Domain and templates + cls.zone = get_zone(cls.api_client) + template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostypeid"] + ) + #Create an account, network, VM and IP addresses + cls.account = Account.create( + cls.api_client, + cls.services["account"], + admin=True + ) + cls.services["server"]["zoneid"] = cls.zone.id + cls.service_offering = ServiceOffering.create( + cls.api_client, + cls.services["service_offering"] + ) + cls.virtual_machine = VirtualMachine.create( + cls.api_client, + cls.services["server"], + templateid=template.id, + accountid=cls.account.account.name, + serviceofferingid=cls.service_offering.id + ) + cls._cleanup = [ + cls.virtual_machine, + cls.account, + cls.service_offering + ] def setUp(self): self.apiclient = self.testClient.getApiClient() @@ -192,11 +279,14 @@ class TestPortForwarding(cloudstackTestCase): @classmethod def tearDownClass(cls): - #cleanup_resources(cls.api_client, cls._cleanup) - return + try: + cls.api_client = fetch_api_client() + cleanup_resources(cls.api_client, cls._cleanup) + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) def tearDown(self): - #cleanup_resources(self.apiclient, self.cleanup) + cleanup_resources(self.apiclient, self.cleanup) return def test_01_port_fwd_on_src_nat(self): @@ -208,12 +298,16 @@ class TestPortForwarding(cloudstackTestCase): cmd = listPublicIpAddresses.listPublicIpAddressesCmd() cmd.account = self.account.account.name - cmd.domainid = self.services["server"]["domainid"] + cmd.domainid = self.account.account.domainid src_nat_ip_addr = self.apiclient.listPublicIpAddresses(cmd)[0] #Create NAT rule - nat_rule = NATRule.create(self.apiclient, self.virtual_machine, self.services["natrule"], src_nat_ip_addr.id) - time.sleep(60) + nat_rule = NATRule.create( + self.apiclient, + self.virtual_machine, + self.services["natrule"], + src_nat_ip_addr.id + ) cmd = listPortForwardingRules.listPortForwardingRulesCmd() cmd.id = nat_rule.id @@ -233,10 +327,12 @@ class TestPortForwarding(cloudstackTestCase): try: self.virtual_machine.get_ssh_client(src_nat_ip_addr.ipaddress) except Exception as e: - self.fail("SSH Access failed for %s: %s" % (self.virtual_machine.ipaddress, e)) + self.fail( + "SSH Access failed for %s: %s" % \ + (self.virtual_machine.ipaddress, e) + ) nat_rule.delete(self.apiclient) - time.sleep(60) cmd = listPortForwardingRules.listPortForwardingRulesCmd() cmd.id = nat_rule.id @@ -267,11 +363,11 @@ class TestPortForwarding(cloudstackTestCase): ip_address = PublicIPAddress.create( self.apiclient, self.account.account.name, - self.services["zoneid"], - self.services["domainid"], + self.zone.id, + self.account.account.domainid, self.services["server"] ) - self.clean_up.append(ip_address) + self.cleanup.append(ip_address) #Create NAT rule nat_rule = NATRule.create( self.apiclient, @@ -279,8 +375,6 @@ class TestPortForwarding(cloudstackTestCase): self.services["natrule"], ip_address.ipaddress.id ) - time.sleep(60) - #Validate the following: #1. listPortForwardingRules should not return the deleted rule anymore #2. attempt to do ssh should now fail @@ -303,16 +397,17 @@ class TestPortForwarding(cloudstackTestCase): try: self.virtual_machine.get_ssh_client(ip_address.ipaddress.ipaddress) except Exception as e: - self.fail("SSH Access failed for %s: %s" % (self.virtual_machine.ipaddress.ipaddress, e)) + self.fail( + "SSH Access failed for %s: %s" % \ + (self.virtual_machine.ipaddress.ipaddress, e) + ) - nat_rule.delete(apiclient) - time.sleep(60) + nat_rule.delete(self.apiclient) cmd = listPortForwardingRules.listPortForwardingRulesCmd() cmd.id = nat_rule.id list_nat_rule_response = self.apiclient.listPortForwardingRules(cmd) - self.assertEqual( len(list_nat_rule_response), None, @@ -336,29 +431,53 @@ class TestLoadBalancingRule(cloudstackTestCase): cls.api_client = fetch_api_client() cls.services = Services().services + # Get Zone, Domain and templates + cls.zone = get_zone(cls.api_client) + template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostypeid"] + ) + cls.services["server"]["zoneid"] = cls.zone.id + #Create an account, network, VM and IP addresses - cls.account = Account.create(cls.api_client, cls.services["account"], admin = True) - cls.service_offering = ServiceOffering.create(cls.api_client, cls.services["service_offering"]) + cls.account = Account.create( + cls.api_client, + cls.services["account"], + admin=True + ) + cls.service_offering = ServiceOffering.create( + cls.api_client, + cls.services["service_offering"] + ) cls.vm_1 = VirtualMachine.create( - cls.api_client, - cls.services["server"], - accountid = cls.account.account.name, - serviceofferingid = cls.service_offering.id - ) + cls.api_client, + cls.services["server"], + templateid=template.id, + accountid=cls.account.account.name, + serviceofferingid=cls.service_offering.id + ) cls.vm_2 = VirtualMachine.create( - cls.api_client, - cls.services["server"], - accountid = cls.account.account.name, - serviceofferingid = cls.service_offering.id - ) + cls.api_client, + cls.services["server"], + templateid=template.id, + accountid=cls.account.account.name, + serviceofferingid=cls.service_offering.id + ) cls.non_src_nat_ip = PublicIPAddress.create( - cls.api_client, - cls.account.account.name, - cls.services["zoneid"], - cls.services["domainid"], - cls.services["server"] - ) - cls._cleanup = [cls.vm_1, cls.vm_2, cls.non_src_nat_ip, cls.account, cls.service_offering] + cls.api_client, + cls.account.account.name, + cls.zone.id, + cls.account.account.domainid, + cls.services["server"] + ) + cls._cleanup = [ + cls.vm_1, + cls.vm_2, + cls.non_src_nat_ip, + cls.account, + cls.service_offering + ] def setUp(self): self.apiclient = self.testClient.getApiClient() @@ -380,11 +499,12 @@ class TestLoadBalancingRule(cloudstackTestCase): # Validate the Following: #1. listLoadBalancerRules should return the added rule #2. attempt to ssh twice on the load balanced IP - #3. verify using the hostname of the VM that round robin is indeed happening as expected + #3. verify using the hostname of the VM + # that round robin is indeed happening as expected cmd = listPublicIpAddresses.listPublicIpAddressesCmd() cmd.account = self.account.account.name - cmd.domainid = self.services["server"]["domainid"] + cmd.domainid = self.account.account.domainid src_nat_ip_addr = self.apiclient.listPublicIpAddresses(cmd)[0] #Create Load Balancer rule and assign VMs to rule @@ -392,7 +512,7 @@ class TestLoadBalancingRule(cloudstackTestCase): self.apiclient, self.services["lbrule"], src_nat_ip_addr.id, - accountid = self.account.account.name + accountid=self.account.account.name ) self.cleanup.append(lb_rule) @@ -413,7 +533,8 @@ class TestLoadBalancingRule(cloudstackTestCase): "Check List Load Balancer Rules returns valid Rule" ) - # listLoadBalancerRuleInstances should list all instances associated with that LB rule + # listLoadBalancerRuleInstances should list all + # instances associated with that LB rule cmd = listLoadBalancerRuleInstances.listLoadBalancerRuleInstancesCmd() cmd.id = lb_rule.id lb_instance_rules = self.apiclient.listLoadBalancerRuleInstances(cmd) @@ -425,43 +546,55 @@ class TestLoadBalancingRule(cloudstackTestCase): ) self.assertEqual( - lb_instance_rules[0].id, - self.vm_2.id, - "Check List Load Balancer instances Rules returns valid VM ID associated with it" - ) + lb_instance_rules[0].id, + self.vm_2.id, + "Check List Load Balancer instances Rules returns valid VM ID" + ) self.assertEqual( - lb_instance_rules[1].id, - self.vm_1.id, - "Check List Load Balancer instances Rules returns valid VM ID associated with it" - ) + lb_instance_rules[1].id, + self.vm_1.id, + "Check List Load Balancer instances Rules returns valid VM ID" + ) ssh_1 = remoteSSHClient.remoteSSHClient( - src_nat_ip_addr.ipaddress, - self.services['natrule']["publicport"], - self.vm_1.username, - self.vm_1.password - ) + src_nat_ip_addr.ipaddress, + self.services['natrule']["publicport"], + self.vm_1.username, + self.vm_1.password + ) - #If Round Robin Algorithm is chosen, each ssh command should alternate between VMs + # If Round Robin Algorithm is chosen, + # each ssh command should alternate between VMs hostnames = [ssh_1.execute("hostname")[0]] time.sleep(20) ssh_2 = remoteSSHClient.remoteSSHClient( - src_nat_ip_addr.ipaddress, - self.services['natrule']["publicport"], - self.vm_1.username, - self.vm_1.password - ) - + src_nat_ip_addr.ipaddress, + self.services['natrule']["publicport"], + self.vm_1.username, + self.vm_1.password + ) hostnames.append(ssh_2.execute("hostname")[0]) - self.assertIn(self.vm_1.name, hostnames, "Check if ssh succeeded for server1") - self.assertIn(self.vm_2.name, hostnames, "Check if ssh succeeded for server2") + self.assertIn( + self.vm_1.name, + hostnames, + "Check if ssh succeeded for server1" + ) + self.assertIn( + self.vm_2.name, + hostnames, + "Check if ssh succeeded for server2" + ) #SSH should pass till there is a last VM associated with LB rule lb_rule.remove(self.apiclient, [self.vm_2]) hostnames.append(ssh_1.execute("hostname")[0]) - self.assertIn(self.vm_1.name, hostnames, "Check if ssh succeeded for server1") + self.assertIn( + self.vm_1.name, + hostnames, + "Check if ssh succeeded for server1" + ) lb_rule.remove(self.apiclient, [self.vm_1]) with self.assertRaises(Exception): @@ -474,14 +607,15 @@ class TestLoadBalancingRule(cloudstackTestCase): # Validate the Following: #1. listLoadBalancerRules should return the added rule #2. attempt to ssh twice on the load balanced IP - #3. verify using the hostname of the VM that round robin is indeed happening as expected + #3. verify using the hostname of the VM that + # round robin is indeed happening as expected #Create Load Balancer rule and assign VMs to rule lb_rule = LoadBalancerRule.create( self.apiclient, self.services["lbrule"], self.non_src_nat_ip.ipaddress.id, - accountid = self.account.account.name + accountid=self.account.account.name ) self.cleanup.append(lb_rule) @@ -501,7 +635,8 @@ class TestLoadBalancingRule(cloudstackTestCase): lb_rule.id, "Check List Load Balancer Rules returns valid Rule" ) - # listLoadBalancerRuleInstances should list all instances associated with that LB rule + # listLoadBalancerRuleInstances should list + # all instances associated with that LB rule cmd = listLoadBalancerRuleInstances.listLoadBalancerRuleInstancesCmd() cmd.id = lb_rule.id lb_instance_rules = self.apiclient.listLoadBalancerRuleInstances(cmd) @@ -513,43 +648,55 @@ class TestLoadBalancingRule(cloudstackTestCase): ) self.assertEqual( - lb_instance_rules[0].id, - self.vm_2.id, - "Check List Load Balancer instances Rules returns valid VM ID associated with it" + lb_instance_rules[0].id, + self.vm_2.id, + "Check List Load Balancer instances Rules returns valid VM ID" ) self.assertEqual( - lb_instance_rules[1].id, - self.vm_1.id, - "Check List Load Balancer instances Rules returns valid VM ID associated with it" - ) + lb_instance_rules[1].id, + self.vm_1.id, + "Check List Load Balancer instances Rules returns valid VM ID" + ) ssh_1 = remoteSSHClient.remoteSSHClient( - self.non_src_nat_ip.ipaddress.ipaddress, - self.services['natrule']["publicport"], - self.vm_1.username, - self.vm_1.password - ) + self.non_src_nat_ip.ipaddress.ipaddress, + self.services['natrule']["publicport"], + self.vm_1.username, + self.vm_1.password + ) - #If Round Robin Algorithm is chosen, each ssh command should alternate between VMs + # If Round Robin Algorithm is chosen, + # each ssh command should alternate between VMs hostnames = [ssh_1.execute("hostname")[0]] time.sleep(20) ssh_2 = remoteSSHClient.remoteSSHClient( - self.non_src_nat_ip.ipaddress.ipaddress, - self.services['natrule']["publicport"], - self.vm_1.username, - self.vm_1.password - ) - + self.non_src_nat_ip.ipaddress.ipaddress, + self.services['natrule']["publicport"], + self.vm_1.username, + self.vm_1.password + ) hostnames.append(ssh_2.execute("hostname")[0]) - self.assertIn(self.vm_1.name, hostnames, "Check if ssh succeeded for server1") - self.assertIn(self.vm_2.name, hostnames, "Check if ssh succeeded for server2") + self.assertIn( + self.vm_1.name, + hostnames, + "Check if ssh succeeded for server1" + ) + self.assertIn( + self.vm_2.name, + hostnames, + "Check if ssh succeeded for server2" + ) #SSH should pass till there is a last VM associated with LB rule lb_rule.remove(self.apiclient, [self.vm_2]) hostnames.append(ssh_1.execute("hostname")[0]) - self.assertIn(self.vm_1.name, hostnames, "Check if ssh succeeded for server1") + self.assertIn( + self.vm_1.name, + hostnames, + "Check if ssh succeeded for server1" + ) lb_rule.remove(self.apiclient, [self.vm_1]) with self.assertRaises(Exception): @@ -563,21 +710,47 @@ class TestRebootRouter(cloudstackTestCase): self.apiclient = self.testClient.getApiClient() self.services = Services().services + + # Get Zone, Domain and templates + self.zone = get_zone(self.apiclient) + template = get_template( + self.apiclient, + self.zone.id, + self.services["ostypeid"] + ) + self.services["server"]["zoneid"] = self.zone.id + #Create an account, network, VM and IP addresses - self.account = Account.create(self.apiclient, self.services["account"], admin = True) - self.service_offering = ServiceOffering.create(self.apiclient, self.services["service_offering"]) - self.vm_1 = VirtualMachine.create( + self.account = Account.create( + self.apiclient, + self.services["account"], + admin=True + ) + self.service_offering = ServiceOffering.create( self.apiclient, - self.services["server"], - accountid = self.account.account.name, - serviceofferingid = self.service_offering.id - ) + self.services["service_offering"] + ) + self.vm_1 = VirtualMachine.create( + self.apiclient, + self.services["server"], + templateid=template.id, + accountid=self.account.account.name, + serviceofferingid=self.service_offering.id + ) cmd = listPublicIpAddresses.listPublicIpAddressesCmd() cmd.account = self.account.account.name cmd.domainid = self.account.account.domainid src_nat_ip_addr = self.apiclient.listPublicIpAddresses(cmd)[0] + self.public_ip = PublicIPAddress.create( + self.apiclient, + self.vm_1.account, + self.vm_1.zoneid, + self.vm_1.domainid, + self.services["server"] + ) + lb_rule = LoadBalancerRule.create( self.apiclient, self.services["lbrule"], @@ -585,8 +758,19 @@ class TestRebootRouter(cloudstackTestCase): self.account.account.name ) lb_rule.assign(self.apiclient, [self.vm_1]) - self.nat_rule = NATRule.create(self.apiclient, self.vm_1, self.services["natrule"], ipaddressid = src_nat_ip_addr.id) - self.cleanup = [self.vm_1, lb_rule, self.service_offering, self.account, self.nat_rule] + self.nat_rule = NATRule.create( + self.apiclient, + self.vm_1, + self.services["natrule"], + ipaddressid=self.public_ip.ipaddress.id + ) + self.cleanup = [ + self.vm_1, + lb_rule, + self.service_offering, + self.nat_rule, + self.account, + ] return def test_reboot_router(self): @@ -594,7 +778,8 @@ class TestRebootRouter(cloudstackTestCase): #Validate the Following #1. Post restart PF and LB rules should still function - #2. verify if the ssh into the virtual machine still works through the sourceNAT Ip + #2. verify if the ssh into the virtual machine + # still works through the sourceNAT Ip #Retrieve router for the user account cmd = listRouters.listRoutersCmd() @@ -609,21 +794,19 @@ class TestRebootRouter(cloudstackTestCase): #Sleep to ensure router is rebooted properly time.sleep(60) - cmd = listPublicIpAddresses.listPublicIpAddressesCmd() - cmd.account = self.account.account.name - cmd.domainid = self.services["server"]["domainid"] - src_nat_ip_addr = self.apiclient.listPublicIpAddresses(cmd)[0] - #we should be able to SSH after successful reboot try: remoteSSHClient.remoteSSHClient( - src_nat_ip_addr.ipaddress, - self.services["natrule"]["publicport"], - self.vm_1.username, - self.vm_1.password - ) + self.nat_rule.ipaddress, + self.services["natrule"]["publicport"], + self.vm_1.username, + self.vm_1.password + ) except Exception as e: - self.fail("SSH Access failed for %s: %s" % (self.vm_1.ipaddress, e)) + self.fail( + "SSH Access failed for %s: %s" % \ + (self.vm_1.ipaddress, e) + ) return def tearDown(self): @@ -636,89 +819,130 @@ class TestAssignRemoveLB(cloudstackTestCase): def setUp(self): self.apiclient = self.testClient.getApiClient() self.services = Services().services + # Get Zone, Domain and templates + self.zone = get_zone(self.apiclient) + template = get_template( + self.apiclient, + self.zone.id, + self.services["ostypeid"] + ) + self.services["server"]["zoneid"] = self.zone.id + #Create VMs, accounts - self.account = Account.create(self.apiclient, self.services["account"], admin = True) - self.service_offering = ServiceOffering.create(self.apiclient, self.services["service_offering"]) + self.account = Account.create( + self.apiclient, + self.services["account"], + admin=True + ) + self.service_offering = ServiceOffering.create( + self.apiclient, + self.services["service_offering"] + ) self.vm_1 = VirtualMachine.create( - self.apiclient, - self.services["server"], - accountid = self.account.account.name, - serviceofferingid = self.service_offering.id - ) + self.apiclient, + self.services["server"], + templateid=template.id, + accountid=self.account.account.name, + serviceofferingid=self.service_offering.id + ) self.vm_2 = VirtualMachine.create( - self.apiclient, - self.services["server"], - accountid = self.account.account.name, - serviceofferingid = self.service_offering.id - ) + self.apiclient, + self.services["server"], + templateid=template.id, + accountid=self.account.account.name, + serviceofferingid=self.service_offering.id + ) self.vm_3 = VirtualMachine.create( - self.apiclient, - self.services["server"], - accountid = self.account.account.name, - serviceofferingid = self.service_offering.id - ) + self.apiclient, + self.services["server"], + templateid=template.id, + accountid=self.account.account.name, + serviceofferingid=self.service_offering.id + ) - self.cleanup = [self.vm_1, self.vm_2, self.vm_3, self.account, self.service_offering] + self.cleanup = [ + self.vm_1, + self.vm_2, + self.vm_3, + self.account, + self.service_offering + ] return - def test_assign_and_removal_elb(self): """Test for assign & removing load balancing rule""" - #Validate: - #1. Verify list API - listLoadBalancerRules lists all the rules with the relevant ports - #2. listLoadBalancerInstances will list the instances associated with the corresponding rule. - #3. verify ssh attempts should pass as long as there is at least one instance associated with the rule + # Validate: + #1. Verify list API - listLoadBalancerRules lists + # all the rules with the relevant ports + #2. listLoadBalancerInstances will list + # the instances associated with the corresponding rule. + #3. verify ssh attempts should pass as long as there + # is at least one instance associated with the rule cmd = listPublicIpAddresses.listPublicIpAddressesCmd() cmd.account = self.account.account.name - cmd.domainid = self.services["server"]["domainid"] + cmd.domainid = self.account.account.domainid self.non_src_nat_ip = self.apiclient.listPublicIpAddresses(cmd)[0] lb_rule = LoadBalancerRule.create( - self.apiclient, - self.services["lbrule"], - self.non_src_nat_ip.id, - self.account.account.name - ) + self.apiclient, + self.services["lbrule"], + self.non_src_nat_ip.id, + self.account.account.name + ) self.cleanup.append(lb_rule) lb_rule.assign(self.apiclient, [self.vm_1, self.vm_2]) #Create SSH client for each VM ssh_1 = remoteSSHClient.remoteSSHClient( - self.non_src_nat_ip.ipaddress, - self.services["natrule"]["publicport"], - self.vm_1.username, - self.vm_1.password - ) + self.non_src_nat_ip.ipaddress, + self.services["natrule"]["publicport"], + self.vm_1.username, + self.vm_1.password + ) ssh_2 = remoteSSHClient.remoteSSHClient( - self.non_src_nat_ip.ipaddress, - self.services["natrule"]["publicport"], - self.vm_2.username, - self.vm_2.password - ) + self.non_src_nat_ip.ipaddress, + self.services["natrule"]["publicport"], + self.vm_2.username, + self.vm_2.password + ) ssh_3 = remoteSSHClient.remoteSSHClient( - self.non_src_nat_ip.ipaddress, - self.services["natrule"]["publicport"], - self.vm_3.username, - self.vm_3.password - ) - #If Round Robin Algorithm is chosen, each ssh command should alternate between VMs + self.non_src_nat_ip.ipaddress, + self.services["natrule"]["publicport"], + self.vm_3.username, + self.vm_3.password + ) + # If Round Robin Algorithm is chosen, + # each ssh command should alternate between VMs res_1 = ssh_1.execute("hostname")[0] time.sleep(20) res_2 = ssh_2.execute("hostname")[0] - self.assertIn(self.vm_1.name, res_1, "Check if ssh succeeded for server1") - self.assertIn(self.vm_2.name, res_2, "Check if ssh succeeded for server2") + self.assertIn( + self.vm_1.name, + res_1, + "Check if ssh succeeded for server1" + ) + self.assertIn( + self.vm_2.name, + res_2, + "Check if ssh succeeded for server2" + ) #Removing VM and assigning another VM to LB rule lb_rule.remove(self.apiclient, [self.vm_2]) res_1 = ssh_1.execute("hostname")[0] - self.assertIn(self.vm_1.name, res_1, "Check if ssh succeeded for server1") + + self.assertIn( + self.vm_1.name, + res_1, + "Check if ssh succeeded for server1" + ) lb_rule.assign(self.apiclient, [self.vm_3]) @@ -726,8 +950,16 @@ class TestAssignRemoveLB(cloudstackTestCase): time.sleep(20) res_3 = ssh_3.execute("hostname")[0] - self.assertIn(self.vm_1.name, res_1, "Check if ssh succeeded for server1") - self.assertIn(self.vm_3.name, res_3, "Check if ssh succeeded for server3") + self.assertIn( + self.vm_1.name, + res_1, + "Check if ssh succeeded for server1" + ) + self.assertIn( + self.vm_3.name, + res_3, + "Check if ssh succeeded for server3" + ) return def teardown(self): @@ -740,33 +972,64 @@ class TestReleaseIP(cloudstackTestCase): def setUp(self): self.apiclient = self.testClient.getApiClient() self.services = Services().services - #Create an account, network, VM, Port forwarding rule, LB rules and IP addresses - self.account = Account.create(self.apiclient, self.services["account"], admin = True) - self.service_offering = ServiceOffering.create(self.apiclient, self.services["service_offering"]) + # Get Zone, Domain and templates + self.zone = get_zone(self.apiclient) + template = get_template( + self.apiclient, + self.zone.id, + self.services["ostypeid"] + ) + self.services["server"]["zoneid"] = self.zone.id + + #Create an account, network, VM, Port forwarding rule, LB rules + self.account = Account.create( + self.apiclient, + self.services["account"], + admin=True + ) + + self.service_offering = ServiceOffering.create( + self.apiclient, + self.services["service_offering"] + ) self.virtual_machine = VirtualMachine.create( - self.apiclient, - self.services["server"], - accountid = self.account.account.name, - serviceofferingid = self.service_offering.id - ) + self.apiclient, + self.services["server"], + templateid=template.id, + accountid=self.account.account.name, + serviceofferingid=self.service_offering.id + ) self.ip_address = PublicIPAddress.create( self.apiclient, self.account.account.name, - self.services["zoneid"], + self.zone.id, self.account.account.domainid ) cmd = listPublicIpAddresses.listPublicIpAddressesCmd() cmd.account = self.account.account.name - cmd.domainid = self.services["server"]["domainid"] + cmd.domainid = self.account.account.domainid self.ip_addr = self.apiclient.listPublicIpAddresses(cmd)[0] - self.nat_rule = NATRule.create(self.apiclient, self.virtual_machine, self.services["natrule"], self.ip_addr.id) - self.lb_rule = LoadBalancerRule.create(self.apiclient, self.services["lbrule"], self.ip_addr.id, accountid = self.account.account.name) - self.cleanup = [self.virtual_machine, self.account] + self.nat_rule = NATRule.create( + self.apiclient, + self.virtual_machine, + self.services["natrule"], + self.ip_addr.id + ) + self.lb_rule = LoadBalancerRule.create( + self.apiclient, + self.services["lbrule"], + self.ip_addr.id, + accountid=self.account.account.name + ) + self.cleanup = [ + self.virtual_machine, + self.account + ] return def teardown(self): @@ -783,41 +1046,43 @@ class TestReleaseIP(cloudstackTestCase): list_pub_ip_addr_resp = self.apiclient.listPublicIpAddresses(cmd) self.assertEqual( - list_pub_ip_addr_resp, - None, - "Check if disassociated IP Address is no longer available" - ) + list_pub_ip_addr_resp, + None, + "Check if disassociated IP Address is no longer available" + ) - # ListPortForwardingRules should not list associated rules with Public IP address + # ListPortForwardingRules should not list + # associated rules with Public IP address cmd = listPortForwardingRules.listPortForwardingRulesCmd() cmd.id = self.nat_rule.id list_nat_rules = self.apiclient.listPortForwardingRules(cmd) self.assertEqual( - list_nat_rules, - None, - "Check if Port forwarding rules for disassociated IP Address are no longer available" - ) + list_nat_rules, + None, + "Check if PF rules are no longer available for IP address" + ) - # listLoadBalancerRules should not list associated rules with Public IP address + # listLoadBalancerRules should not list + # associated rules with Public IP address cmd = listLoadBalancerRules.listLoadBalancerRulesCmd() cmd.id = self.lb_rule.id list_lb_rules = self.apiclient.listLoadBalancerRules(cmd) self.assertEqual( - list_lb_rules, - None, - "Check if LB rules for disassociated IP Address are no longer available" - ) + list_lb_rules, + None, + "Check if LB rules for IP Address are no longer available" + ) # SSH Attempt though public IP should fail with self.assertRaises(Exception): ssh_2 = remoteSSHClient.remoteSSHClient( - self.ip_addr.ipaddress, - self.services["natrule"]["publicport"], - self.virtual_machine.username, - self.virtual_machine.password - ) + self.ip_addr.ipaddress, + self.services["natrule"]["publicport"], + self.virtual_machine.username, + self.virtual_machine.password + ) return @@ -827,19 +1092,37 @@ class TestDeleteAccount(cloudstackTestCase): self.apiclient = self.testClient.getApiClient() self.services = Services().services + + # Get Zone, Domain and templates + self.zone = get_zone(self.apiclient) + template = get_template( + self.apiclient, + self.zone.id, + self.services["ostypeid"] + ) + self.services["server"]["zoneid"] = self.zone.id + #Create an account, network, VM and IP addresses - self.account = Account.create(self.apiclient, self.services["account"], admin = True) - self.service_offering = ServiceOffering.create(self.apiclient, self.services["service_offering"]) + self.account = Account.create( + self.apiclient, + self.services["account"], + admin=True + ) + self.service_offering = ServiceOffering.create( + self.apiclient, + self.services["service_offering"] + ) self.vm_1 = VirtualMachine.create( - self.apiclient, - self.services["server"], - accountid = self.account.account.name, - serviceofferingid = self.service_offering.id - ) + self.apiclient, + self.services["server"], + templateid=template.id, + accountid=self.account.account.name, + serviceofferingid=self.service_offering.id + ) cmd = listPublicIpAddresses.listPublicIpAddressesCmd() cmd.account = self.account.account.name - cmd.domainid = self.services["server"]["domainid"] + cmd.domainid = self.account.account.domainid src_nat_ip_addr = self.apiclient.listPublicIpAddresses(cmd)[0] self.lb_rule = LoadBalancerRule.create( @@ -849,7 +1132,12 @@ class TestDeleteAccount(cloudstackTestCase): self.account.account.name ) self.lb_rule.assign(self.apiclient, [self.vm_1]) - self.nat_rule = NATRule.create(self.apiclient, self.vm_1, self.services["natrule"], src_nat_ip_addr.id) + self.nat_rule = NATRule.create( + self.apiclient, + self.vm_1, + self.services["natrule"], + src_nat_ip_addr.id + ) self.cleanup = [] return @@ -857,37 +1145,40 @@ class TestDeleteAccount(cloudstackTestCase): """Test for delete account""" #Validate the Following - # 1. after account.cleanup.interval (global setting) time all the PF/LB rules should be deleted - # 2. verify that list(LoadBalancer/PortForwarding)Rules API does not return any rules for the account + # 1. after account.cleanup.interval (global setting) + # time all the PF/LB rules should be deleted + # 2. verify that list(LoadBalancer/PortForwarding)Rules + # API does not return any rules for the account # 3. The domR should have been expunged for this account self.account.delete(self.apiclient) + # Sleep to ensure that all resources are deleted time.sleep(120) - # ListLoadBalancerRules should not list associated rules with deleted account + # ListLoadBalancerRules should not list + # associated rules with deleted account cmd = listLoadBalancerRules.listLoadBalancerRulesCmd() cmd.account = self.account.account.name - cmd.domainid = self.services["server"]["domainid"] + cmd.domainid = self.account.account.domainid # Unable to find account testuser1 in domain 1 : Exception with self.assertRaises(Exception): self.apiclient.listLoadBalancerRules(cmd) - # ListPortForwardingRules should not list associated rules with deleted account + # ListPortForwardingRules should not + # list associated rules with deleted account cmd = listPortForwardingRules.listPortForwardingRulesCmd() cmd.account = self.account.account.name - cmd.domainid = self.services["server"]["domainid"] + cmd.domainid = self.account.account.domainid with self.assertRaises(Exception): self.apiclient.listPortForwardingRules(cmd) - #Retrieve router for the user account cmd = listRouters.listRoutersCmd() cmd.account = self.account.account.name cmd.domainid = self.account.account.domainid with self.assertRaises(Exception): routers = self.apiclient.listRouters(cmd) - return def tearDown(self): diff --git a/tools/testClient/testcase/BVT-tests/test_primary_storage.py b/tools/testClient/testcase/BVT-tests/test_primary_storage.py index 099e40bd60d..a515575c20e 100644 --- a/tools/testClient/testcase/BVT-tests/test_primary_storage.py +++ b/tools/testClient/testcase/BVT-tests/test_primary_storage.py @@ -19,53 +19,43 @@ class Services: def __init__(self): self.services = { - "nfs": { - 0: { - "url": "nfs://192.168.100.131/Primary", - # Format: File_System_Type/Location/Path - "name": "Primary XEN", - "podid": 3, - "clusterid": 1, # XEN Cluster - "zoneid": 3, - "hypervisor": 'XEN', - }, - 1: { - "url": "nfs://192.168.100.131/Primary", - "name": "Primary KVM", - "podid": 3, - "clusterid": 40, # KVM Cluster - "zoneid": 3, - "hypervisor": 'KVM', - }, - 2: { - "url": "nfs://192.168.100.131/Primary", - "name": "Primary VMWare", - "podid": 3, - "clusterid": 33, # VMWare Cluster - "zoneid": 3, - "hypervisor": 'VMWare', - }, - }, - "iscsi": { - 0: { - "url": "iscsi://192.168.100.21/iqn.2012-01.localdomain.clo-cstack-cos6:iser/1", - # Format : iscsi : // IP Address/ IQN number/ LUN # - "name": "Primary iSCSI", - "podid": 1, - "clusterid": 1, # XEN Cluster - "zoneid": 1, - "hypervisor": 'XEN', - }, - 1: { - "url": "iscsi://192.168.100.21/export", - "name": "Primary KVM", - "podid": 3, - "clusterid": 1, # KVM Cluster - "zoneid": 3, - "hypervisor": 'KVM', - }, - }, - } + "nfs": { + 0: { + "url": "nfs://192.168.100.131/Primary", + # Format: File_System_Type/Location/Path + "name": "Primary XEN", + "clusterid": 1, # XEN Cluster + "hypervisor": 'XEN', + }, + 1: { + "url": "nfs://192.168.100.131/Primary", + "name": "Primary KVM", + "clusterid": 40, # KVM Cluster + "hypervisor": 'KVM', + }, + 2: { + "url": "nfs://192.168.100.131/Primary", + "name": "Primary VMWare", + "clusterid": 33, # VMWare Cluster + "hypervisor": 'VMWare', + }, + }, + "iscsi": { + 0: { + "url": "iscsi://192.168.100.21/iqn.2012-01.localdomain.clo-cstack-cos6:iser/1", + # Format : iscsi://IP Address/IQN number/LUN# + "name": "Primary iSCSI", + "clusterid": 1, # XEN Cluster + "hypervisor": 'XEN', + }, + 1: { + "url": "iscsi://192.168.100.21/export", + "name": "Primary KVM", + "clusterid": 1, # KVM Cluster + "hypervisor": 'KVM', + }, + }, + } class TestPrimaryStorageServices(cloudstackTestCase): @@ -74,6 +64,24 @@ class TestPrimaryStorageServices(cloudstackTestCase): self.apiclient = self.testClient.getApiClient() self.services = Services().services self.cleanup = [] + # Get Zone and pod + self.zone = get_zone(self.apiclient) + self.pod = get_pod(self.apiclient, self.zone.id) + + self.services["nfs"][0]["zoneid"] = self.zone.id + self.services["nfs"][1]["zoneid"] = self.zone.id + self.services["nfs"][2]["zoneid"] = self.zone.id + + self.services["nfs"][0]["podid"] = self.pod.id + self.services["nfs"][1]["podid"] = self.pod.id + self.services["nfs"][2]["podid"] = self.pod.id + + self.services["iscsi"][0]["zoneid"] = self.zone.id + self.services["iscsi"][1]["zoneid"] = self.zone.id + + self.services["iscsi"][0]["podid"] = self.pod.id + self.services["iscsi"][1]["podid"] = self.pod.id + return def tearDown(self): @@ -92,7 +100,8 @@ class TestPrimaryStorageServices(cloudstackTestCase): # Validate the following: # 1. verify hypervisortype returned by api is Xen/KVM/VMWare # 2. verify that the cluster is in 'Enabled' allocation state - # 3. verify that the host is added successfully and in Up state with listHosts api response + # 3. verify that the host is added successfully and + # in Up state with listHosts api response #Create NFS storage pools with on XEN/KVM/VMWare clusters for k, v in self.services["nfs"].items(): @@ -103,25 +112,25 @@ class TestPrimaryStorageServices(cloudstackTestCase): list_hosts_response = self.apiclient.listHosts(cmd) self.assertNotEqual( - len(list_hosts_response), - 0, - "Check list Hosts response for hypervisor type : " + v["hypervisor"] - ) + len(list_hosts_response), + 0, + "Check list Hosts for hypervisor: " + v["hypervisor"] + ) storage = StoragePool.create(self.apiclient, v) self.cleanup.append(storage) self.assertEqual( - storage.state, - 'Up', - "Check state of primary storage is Up or not for hypervisor type : " + v["hypervisor"] - ) + storage.state, + 'Up', + "Check primary storage state for hypervisor: " + v["hypervisor"] + ) self.assertEqual( - storage.type, - 'NetworkFilesystem', - "Check type of the storage pool created for hypervisor type : " + v["hypervisor"] - ) + storage.type, + 'NetworkFilesystem', + "Check storage pool type for hypervisor : " + v["hypervisor"] + ) #Verify List Storage pool Response has newly added storage pool cmd = listStoragePools.listStoragePoolsCmd() @@ -136,15 +145,15 @@ class TestPrimaryStorageServices(cloudstackTestCase): storage_response = storage_pools_response[0] self.assertEqual( - storage_response.id, - storage.id, - "Check storage pool ID with list storage pools response for hypervisor type : " + v["hypervisor"] - ) + storage_response.id, + storage.id, + "Check storage pool ID for hypervisor: " + v["hypervisor"] + ) self.assertEqual( - storage.type, - storage_response.type, - "Check type of the storage pool for hypervisor type : " + v["hypervisor"] - ) + storage.type, + storage_response.type, + "Check storage pool type for hypervisor: " + v["hypervisor"] + ) # Call cleanup for reusing primary storage cleanup_resources(self.apiclient, self.cleanup) self.cleanup = [] @@ -155,10 +164,10 @@ class TestPrimaryStorageServices(cloudstackTestCase): self.cleanup.append(storage) self.assertEqual( - storage.state, - 'Up', - "Check state of primary storage is Up or not for hypervisor type : " + v["hypervisor"] - ) + storage.state, + 'Up', + "Check primary storage state for hypervisor: " + v["hypervisor"] + ) #Verify List Storage pool Response has newly added storage pool cmd = listStoragePools.listStoragePoolsCmd() @@ -166,22 +175,22 @@ class TestPrimaryStorageServices(cloudstackTestCase): storage_pools_response = self.apiclient.listStoragePools(cmd) self.assertNotEqual( - len(storage_pools_response), - 0, - "Check list Hosts response for hypervisor type : " + v["hypervisor"] + len(storage_pools_response), + 0, + "Check Hosts response for hypervisor: " + v["hypervisor"] ) storage_response = storage_pools_response[0] self.assertEqual( - storage_response.id, - storage.id, - "Check storage pool ID with list storage pools response for hypervisor type : " + v["hypervisor"] - ) + storage_response.id, + storage.id, + "Check storage pool ID for hypervisor: " + v["hypervisor"] + ) self.assertEqual( - storage.type, - storage_response.type, - "Check type of the storage pool for hypervisor type : " + v["hypervisor"] - ) + storage.type, + storage_response.type, + "Check storage pool type hypervisor: " + v["hypervisor"] + ) # Call cleanup for reusing primary storage cleanup_resources(self.apiclient, self.cleanup) diff --git a/tools/testClient/testcase/BVT-tests/test_routers.py b/tools/testClient/testcase/BVT-tests/test_routers.py index bfc7a78c1e2..acb3ea6c28e 100644 --- a/tools/testClient/testcase/BVT-tests/test_routers.py +++ b/tools/testClient/testcase/BVT-tests/test_routers.py @@ -14,6 +14,7 @@ from base import * #Import System modules import time + class Services: """Test router Services """ @@ -29,11 +30,9 @@ class Services: }, "virtual_machine": { - "template": 7, # Template used for VM creation - "zoneid": 3, - "displayname": "testserver", + "displayname": "Test VM", "username": "root", - "password": "fr3sca", + "password": "password", "ssh_port": 22, "hypervisor": 'VMWare', "domainid": 1, @@ -47,30 +46,49 @@ class Services: "lastname": "User", "username": "testuser", "password": "fr3sca", - "zoneid": 1, }, - "sleep_time": 300, + "ostypeid":12, } + class TestRouterServices(cloudstackTestCase): @classmethod def setUpClass(cls): cls.api_client = fetch_api_client() - cls.services = Services().services - #Create an account, network, VM and IP addresses - cls.account = Account.create(cls.api_client, cls.services["account"], admin = True) - cls.service_offering = ServiceOffering.create(cls.api_client, cls.services["service_offering"]) - cls.vm_1 = VirtualMachine.create( - cls.api_client, - cls.services["virtual_machine"], - accountid = cls.account.account.name, - serviceofferingid = cls.service_offering.id - ) + # Get Zone, Domain and templates + cls.zone = get_zone(cls.api_client) + template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostypeid"] + ) + cls.services["virtual_machine"]["zoneid"] = cls.zone.id - cls.cleanup = [cls.vm_1, cls.account, cls.service_offering] + #Create an account, network, VM and IP addresses + cls.account = Account.create( + cls.api_client, + cls.services["account"], + admin=True + ) + cls.service_offering = ServiceOffering.create( + cls.api_client, + cls.services["service_offering"] + ) + cls.vm_1 = VirtualMachine.create( + cls.api_client, + cls.services["virtual_machine"], + templateid=template.id, + accountid=cls.account.account.name, + serviceofferingid=cls.service_offering.id + ) + cls.cleanup = [ + cls.vm_1, + cls.account, + cls.service_offering + ] return @classmethod @@ -150,7 +168,7 @@ class TestRouterServices(cloudstackTestCase): cmd = listRouters.listRoutersCmd() cmd.account = self.account.account.name - cmd.domainid = self.services["virtual_machine"]["domainid"] + cmd.domainid = self.account.account.domainid list_router_response = self.apiclient.listRouters(cmd) self.assertNotEqual( @@ -212,7 +230,7 @@ class TestRouterServices(cloudstackTestCase): cmd = listRouters.listRoutersCmd() cmd.account = self.account.account.name - cmd.domainid = self.services["virtual_machine"]["domainid"] + cmd.domainid = self.account.account.domainid router = self.apiclient.listRouters(cmd)[0] #Stop the router @@ -221,7 +239,6 @@ class TestRouterServices(cloudstackTestCase): self.apiclient.stopRouter(cmd) #List routers to check state of router - cmd = listRouters.listRoutersCmd() cmd.id = router.id router_response = self.apiclient.listRouters(cmd)[0] @@ -243,7 +260,7 @@ class TestRouterServices(cloudstackTestCase): cmd = listRouters.listRoutersCmd() cmd.account = self.account.account.name - cmd.domainid = self.services["virtual_machine"]["domainid"] + cmd.domainid = self.account.account.domainid router = self.apiclient.listRouters(cmd)[0] #Start the router @@ -252,7 +269,6 @@ class TestRouterServices(cloudstackTestCase): self.apiclient.startRouter(cmd) #List routers to check state of router - cmd = listRouters.listRoutersCmd() cmd.id = router.id router_response = self.apiclient.listRouters(cmd)[0] @@ -274,7 +290,7 @@ class TestRouterServices(cloudstackTestCase): cmd = listRouters.listRoutersCmd() cmd.account = self.account.account.name - cmd.domainid = self.services["virtual_machine"]["domainid"] + cmd.domainid = self.account.account.domainid router = self.apiclient.listRouters(cmd)[0] public_ip = router.publicip @@ -315,7 +331,7 @@ class TestRouterServices(cloudstackTestCase): cmd = listVirtualMachines.listVirtualMachinesCmd() cmd.account = self.account.account.name - cmd.domainid = self.services["virtual_machine"]["domainid"] + cmd.domainid = self.account.account.domainid list_vms = self.apiclient.listVirtualMachines(cmd) self.assertNotEqual( @@ -340,13 +356,13 @@ class TestRouterServices(cloudstackTestCase): #Check status of network router cmd = listRouters.listRoutersCmd() cmd.account = self.account.account.name - cmd.domainid = self.services["virtual_machine"]["domainid"] + cmd.domainid = self.account.account.domainid router = self.apiclient.listRouters(cmd)[0] self.assertEqual( - router.state, - 'Stopped', - "Check state of the router after stopping all VMs associated with that account" + router.state, + 'Stopped', + "Check state of the router after stopping all VMs associated" ) return @@ -355,12 +371,13 @@ class TestRouterServices(cloudstackTestCase): """ # Validate the following # 1. Router only does dhcp - # 2. Verify that ports 67 (DHCP) and 53 (DNS) are open on UDP by checking status of dnsmasq process + # 2. Verify that ports 67 (DHCP) and 53 (DNS) are open on UDP + # by checking status of dnsmasq process # Find router associated with user account cmd = listRouters.listRoutersCmd() cmd.account = self.account.account.name - cmd.domainid = self.services["virtual_machine"]["domainid"] + cmd.domainid = self.account.account.domainid router = self.apiclient.listRouters(cmd)[0] cmd = listHosts.listHostsCmd() @@ -371,12 +388,13 @@ class TestRouterServices(cloudstackTestCase): #SSH to the machine ssh = remoteSSHClient.remoteSSHClient( - host.ipaddress, - self.services['virtual_machine']["publicport"], - self.services['virtual_machine']["username"], - self.services['virtual_machine']["password"] - ) - ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s " % router.linklocalip + host.ipaddress, + self.services['virtual_machine']["publicport"], + self.vm_1.username, + self.vm_1.password + ) + ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s "\ + % router.linklocalip # Double hop into router timeout = 5 @@ -419,12 +437,13 @@ class TestRouterServices(cloudstackTestCase): #SSH to the machine ssh = remoteSSHClient.remoteSSHClient( - host.ipaddress, - self.services['virtual_machine']["publicport"], - self.services['virtual_machine']["username"], - self.services['virtual_machine']["password"] - ) - ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s " % router.linklocalip + host.ipaddress, + self.services['virtual_machine']["publicport"], + self.vm_1.username, + self.vm_1.password + ) + ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s "\ + % router.linklocalip # Double hop into router timeout = 5 @@ -468,7 +487,8 @@ class TestRouterServices(cloudstackTestCase): # Validate the following # 1. When cleanup = true, router is destroyed and a new one created - # 2. New router will have new publicIp and linkLocalIp and all it's services should resume + # 2. New router will have new publicIp and linkLocalIp and + # all it's services should resume # Find router associated with user account cmd = listRouters.listRoutersCmd() @@ -507,7 +527,8 @@ class TestRouterServices(cloudstackTestCase): """ # Validate the following - # 1. When cleanup = false, router is restarted and all services inside the router are restarted + # 1. When cleanup = false, router is restarted and + # all services inside the router are restarted # 2. check 'uptime' to see if the actual restart happened cmd = listNetworks.listNetworksCmd() @@ -534,12 +555,13 @@ class TestRouterServices(cloudstackTestCase): #SSH to the machine ssh = remoteSSHClient.remoteSSHClient( - host.ipaddress, - self.services['virtual_machine']["publicport"], - self.services['virtual_machine']["username"], - self.services['virtual_machine']["password"] - ) - ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s uptime" % router.linklocalip + host.ipaddress, + self.services['virtual_machine']["publicport"], + self.vm_1.username, + self.vm_1.password + ) + ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s uptime"\ + % router.linklocalip # Double hop into router to check router uptime timeout = 5 diff --git a/tools/testClient/testcase/BVT-tests/test_secondary_storage.py b/tools/testClient/testcase/BVT-tests/test_secondary_storage.py index aa158faa380..3a067b2502e 100644 --- a/tools/testClient/testcase/BVT-tests/test_secondary_storage.py +++ b/tools/testClient/testcase/BVT-tests/test_secondary_storage.py @@ -19,35 +19,27 @@ class Services: def __init__(self): self.services = { - "storage": { - "zoneid": 3, - "podid": 3, - "url": "nfs://192.168.100.131/SecStorage" - # Format: File_System_Type/Location/Path - }, - "hypervisors": { - 0: { - "hypervisor": "XenServer", - "zoneid": 3, - "podid": 3, - "templatefilter": "self", - }, - 1: { - "hypervisor": "KVM", - "zoneid": 3, - "podid": 3, - "templatefilter": "self", - }, - 2: { - "hypervisor": "VMWare", - "zoneid": 3, - "podid": 3, - "templatefilter": "self", - }, - }, - "sleep": 180, - "timeout": 5, - } + "storage": { + "url": "nfs://192.168.100.131/SecStorage" + # Format: File_System_Type/Location/Path + }, + "hypervisors": { + 0: { + "hypervisor": "XenServer", + "templatefilter": "self", + }, + 1: { + "hypervisor": "KVM", + "templatefilter": "self", + }, + 2: { + "hypervisor": "VMWare", + "templatefilter": "self", + }, + }, + "sleep": 180, + "timeout": 5, + } class TestSecStorageServices(cloudstackTestCase): @@ -56,6 +48,20 @@ class TestSecStorageServices(cloudstackTestCase): self.apiclient = self.testClient.getApiClient() self.cleanup = [] self.services = Services().services + # Get Zone and pod + self.zone = get_zone(self.apiclient) + self.pod = get_pod(self.apiclient, self.zone.id) + + self.services["storage"]["zoneid"] = self.zone.id + self.services["storage"]["podid"] = self.pod.id + + self.services["hypervisors"][0]["zoneid"] = self.zone.id + self.services["hypervisors"][1]["zoneid"] = self.zone.id + self.services["hypervisors"][2]["zoneid"] = self.zone.id + + self.services["hypervisors"][0]["podid"] = self.pod.id + self.services["hypervisors"][1]["podid"] = self.pod.id + self.services["hypervisors"][2]["podid"] = self.pod.id return def tearDown(self): @@ -75,13 +81,13 @@ class TestSecStorageServices(cloudstackTestCase): # 2. Verify with listHosts and type secondarystorage cmd = addSecondaryStorage.addSecondaryStorageCmd() - cmd.zoneid = self.services["storage"]["zoneid"] + cmd.zoneid = self.zone.id cmd.url = self.services["storage"]["url"] sec_storage = self.apiclient.addSecondaryStorage(cmd) self.assertEqual( sec_storage.zoneid, - self.services["storage"]["zoneid"], + self.zone.id, "Check zoneid where sec storage is added" ) @@ -115,13 +121,14 @@ class TestSecStorageServices(cloudstackTestCase): """ # 1. verify listHosts has all 'routing' hosts in UP state - # 2. verify listStoragePools shows all primary storage pools in UP state + # 2. verify listStoragePools shows all primary storage pools + # in UP state # 3. verify that secondary storage was added successfully cmd = listHosts.listHostsCmd() cmd.type = 'Routing' - cmd.zoneid = self.services["storage"]["zoneid"] - cmd.podid = self.services["storage"]["podid"] + cmd.zoneid = self.zone.id + cmd.podid = self.pod.id list_hosts_response = self.apiclient.listHosts(cmd) # ListHosts has all 'routing' hosts in UP state self.assertNotEqual( @@ -139,8 +146,8 @@ class TestSecStorageServices(cloudstackTestCase): # ListStoragePools shows all primary storage pools in UP state cmd = listStoragePools.listStoragePoolsCmd() cmd.name = 'Primary' - cmd.zoneid = self.services["storage"]["zoneid"] - cmd.podid = self.services["storage"]["podid"] + cmd.zoneid = self.zone.id + cmd.podid = self.pod.id list_storage_response = self.apiclient.listStoragePools(cmd) self.assertNotEqual( @@ -159,7 +166,7 @@ class TestSecStorageServices(cloudstackTestCase): # Secondary storage is added successfully cmd = listHosts.listHostsCmd() cmd.type = 'SecondaryStorage' - cmd.zoneid = self.services["storage"]["zoneid"] + cmd.zoneid = self.zone.id timeout = self.services["timeout"] while True: @@ -187,8 +194,8 @@ class TestSecStorageServices(cloudstackTestCase): cmd = listSystemVms.listSystemVmsCmd() cmd.systemvmtype = 'secondarystoragevm' - cmd.zoneid = self.services["storage"]["zoneid"] - cmd.podid = self.services["storage"]["podid"] + cmd.zoneid = self.zone.id + cmd.podid = self.pod.id timeout = self.services["timeout"] @@ -222,13 +229,14 @@ class TestSecStorageServices(cloudstackTestCase): # Validate the following # If SSVM is in UP state and running - # 1. wait for listTemplates to show all builtin templates downloaded for all added hypervisors and in “Ready” state" + # 1. wait for listTemplates to show all builtin templates + # downloaded for all added hypervisors and in “Ready” state" for k, v in self.services["hypervisors"].items(): cmd = listTemplates.listTemplatesCmd() cmd.hypervisor = v["hypervisor"] - cmd.zoneid = v["zoneid"] + cmd.zoneid = self.zone.id cmd.templatefilter = v["templatefilter"] list_templates = self.apiclient.listTemplates(cmd) @@ -241,7 +249,7 @@ class TestSecStorageServices(cloudstackTestCase): cmd = listTemplates.listTemplatesCmd() cmd.id = templateid cmd.templatefilter = v["templatefilter"] - cmd.zoneid = v["zoneid"] + cmd.zoneid = self.zone.id while True and (templateid != None): diff --git a/tools/testClient/testcase/BVT-tests/test_snapshots.py b/tools/testClient/testcase/BVT-tests/test_snapshots.py index e242ed359e8..55c75679053 100644 --- a/tools/testClient/testcase/BVT-tests/test_snapshots.py +++ b/tools/testClient/testcase/BVT-tests/test_snapshots.py @@ -17,81 +17,83 @@ class Services: def __init__(self): self.services = { - "service_offering": { - "name": "Tiny Service Offering", - "displaytext": "Tiny service offering", + "account": { + "email": "test@test.com", + "firstname": "Test", + "lastname": "User", + "username": "test", + # Random characters are appended for unique + # username + "password": "fr3sca", + }, + "service_offering": { + "name": "Tiny Instance", + "displaytext": "Tiny Instance", "cpunumber": 1, - "cpuspeed": 100, # in MHz - "memory": 64, # In MBs - }, - "server_with_disk": + "cpuspeed": 200, # in MHz + "memory": 256, # In MBs + }, + "disk_offering": { + "displaytext": "Small", + "name": "Small", + "disksize": 1 + }, + "server_with_disk": { - "template": 256, # Template used for VM creation - "zoneid": 1, - "diskoffering": 3, # Optional, if specified data disk will be allocated to VM "displayname": "testserver", "username": "root", "password": "password", "ssh_port": 22, "hypervisor": 'XenServer', - "account": 'testuser', "domainid": 1, "privateport": 22, "publicport": 22, "protocol": 'TCP', }, - "server_without_disk": + "server_without_disk": { - "template": 256, # Template used for VM creation - "zoneid": 1, "displayname": "testserver", "username": "root", "password": "password", "ssh_port": 22, "hypervisor": 'XenServer', - "account": 'testuser', "domainid": 1, - "privateport": 22, # For NAT rule creation + "privateport": 22, + # For NAT rule creation "publicport": 22, "protocol": 'TCP', }, - "recurring_snapshot": + "recurring_snapshot": { - "intervaltype": 'HOURLY', # Frequency of snapshots + "intervaltype": 'HOURLY', + # Frequency of snapshots "maxsnaps": 2, # Should be min 2 "schedule": 1, "timezone": 'US/Arizona', # Timezone Formats - http://cloud.mindtouch.us/CloudStack_Documentation/Developer's_Guide%3A_CloudStack }, - "templates": + "templates": { - "displaytext": 'Test template snapshot', - "name": 'template_from_snapshot', + "displaytext": 'Template from snapshot', + "name": 'Template from snapshot', "ostypeid": 12, "templatefilter": 'self', }, - "small_instance": - { - "zoneid": 1, - "serviceofferingid": 1, - }, + "ostypeid": 12, + # Cent OS 5.3 (64 bit) "diskdevice": "/dev/xvda", - "template": 256, - "zoneid": 1, - "diskoffering": 3, "diskname": "TestDiskServ", "size": 1, # GBs - "account": 'testuser', "domainid": 1, "mount_dir": "/mnt/tmp", "sub_dir": "test", "sub_lvl_dir1": "test1", "sub_lvl_dir2": "test2", "random_data": "random.data", - "exportpath": 'SecondaryStorage', + "exportpath": 'SecStorage', "sec_storage": '192.168.100.131', # IP address of Sec storage where snapshots are stored "mgmt_server_ip": '192.168.100.154', @@ -107,11 +109,55 @@ class TestSnapshots(cloudstackTestCase): def setUpClass(cls): cls.api_client = fetch_api_client() cls.services = Services().services - cls.service_offering = ServiceOffering.create(cls.api_client, cls.services["service_offering"]) + # Get Zone, Domain and templates + cls.zone = get_zone(cls.api_client) + cls.disk_offering = DiskOffering.create( + cls.api_client, + cls.services["disk_offering"] + ) + template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostypeid"] + ) + cls.services["server_with_disk"]["zoneid"] = cls.zone.id + cls.services["server_with_disk"]["diskoffering"] = cls.disk_offering.id + + cls.services["server_without_disk"]["zoneid"] = cls.zone.id + + cls.services["template"] = template.id + cls.services["zoneid"] = cls.zone.id + cls.services["diskoffering"] = cls.disk_offering.id + + # Create VMs, NAT Rules etc + cls.account = Account.create( + cls.api_client, + cls.services["account"], + admin=True + ) + + cls.services["account"] = cls.account.account.name + + cls.service_offering = ServiceOffering.create( + cls.api_client, + cls.services["service_offering"] + ) cls.virtual_machine = cls.virtual_machine_with_disk = \ - VirtualMachine.create(cls.api_client, cls.services["server_with_disk"], serviceofferingid = cls.service_offering.id) + VirtualMachine.create( + cls.api_client, + cls.services["server_with_disk"], + templateid=template.id, + accountid=cls.account.account.name, + serviceofferingid=cls.service_offering.id + ) cls.virtual_machine_without_disk = \ - VirtualMachine.create(cls.api_client, cls.services["server_without_disk"], serviceofferingid = cls.service_offering.id) + VirtualMachine.create( + cls.api_client, + cls.services["server_without_disk"], + templateid=template.id, + accountid=cls.account.account.name, + serviceofferingid=cls.service_offering.id + ) cls.public_ip = PublicIPAddress.create( cls.api_client, @@ -120,8 +166,20 @@ class TestSnapshots(cloudstackTestCase): cls.virtual_machine.domainid, cls.services["server_with_disk"] ) - cls.nat_rule = NATRule.create(cls.api_client, cls.virtual_machine, cls.services["server_with_disk"], ipaddressid = cls.public_ip.ipaddress.id) - cls._cleanup = [cls.virtual_machine, cls.nat_rule, cls.virtual_machine_without_disk, cls.public_ip, cls.service_offering] + cls.nat_rule = NATRule.create( + cls.api_client, + cls.virtual_machine, + cls.services["server_with_disk"], + ipaddressid=cls.public_ip.ipaddress.id + ) + cls._cleanup = [ + cls.virtual_machine, + cls.nat_rule, + cls.public_ip, + cls.service_offering, + cls.disk_offering, + cls.account, + ] return @classmethod @@ -152,9 +210,11 @@ class TestSnapshots(cloudstackTestCase): """ # Validate the following - # 1.listSnapshots should list the snapshot that was created. - # 2.verify that secondary storage NFS share contains the reqd volume under /secondary/snapshots/$volumeid/$snapshot_uuid - # 3.verify backup_snap_id was non null in the `snapshots` table + # 1. listSnapshots should list the snapshot that was created. + # 2. verify that secondary storage NFS share contains + # the reqd volume under + # /secondary/snapshots/$volumeid/$snapshot_uuid + # 3. verify backup_snap_id was non null in the `snapshots` table cmd = listVolumes.listVolumesCmd() cmd.virtualmachineid = self.virtual_machine_with_disk.id @@ -167,21 +227,30 @@ class TestSnapshots(cloudstackTestCase): cmd.id = snapshot.id list_snapshots = self.apiclient.listSnapshots(cmd) - self.assertNotEqual(list_snapshots, None, "Check if result exists in list item call") + self.assertNotEqual( + list_snapshots, + None, + "Check if result exists in list item call" + ) self.assertEqual( list_snapshots[0].id, snapshot.id, "Check resource id in list resources call" ) - self.debug("select backup_snap_id, account_id, volume_id, path from snapshots where id = %s;" % snapshot.id) - qresultset = self.dbclient.execute("select backup_snap_id, account_id, volume_id, path from snapshots where id = %s;" % snapshot.id) + self.debug( + "select backup_snap_id, account_id, volume_id, path from snapshots where id = %s;" \ + % snapshot.id + ) + qresultset = self.dbclient.execute( + "select backup_snap_id, account_id, volume_id, path from snapshots where id = %s;" \ + % snapshot.id + ) self.assertNotEqual( len(qresultset), 0, "Check DB Query result set" ) - qresult = qresultset[0] self.assertNotEqual( str(qresult[0]), @@ -190,25 +259,29 @@ class TestSnapshots(cloudstackTestCase): ) # Login to management server to check snapshot present on sec disk - ssh_client = remoteSSHClient.remoteSSHClient( - self.services["mgmt_server_ip"], - self.services["ssh_port"], - self.services["username"], - self.services["password"] - ) - + ssh_client = self.virtual_machine_without_disk.get_ssh_client( + self.nat_rule.ipaddress + ) cmds = [ "mkdir -p %s" % self.services["mount_dir"], - "mount %s:/%s %s" % (self.services["sec_storage"], self.services["exportpath"], self.services["mount_dir"]), - "ls %s/snapshots/%s/%s" % (self.services["mount_dir"], qresult[1], qresult[2]), + "mount %s:/%s %s" % ( + self.services["sec_storage"], + self.services["exportpath"], + self.services["mount_dir"] + ), + "ls %s/snapshots/%s/%s" % ( + self.services["mount_dir"], + qresult[1], + qresult[2] + ), ] for c in cmds: result = ssh_client.execute(c) self.assertEqual( - result[0], - str(qresult[3]) + ".vhd", - "Check snapshot UUID in secondary storage and database" + result[0], + str(qresult[3]) + ".vhd", + "Check snapshot UUID in secondary storage and database" ) return @@ -226,14 +299,24 @@ class TestSnapshots(cloudstackTestCase): cmd.id = snapshot.id list_snapshots = self.apiclient.listSnapshots(cmd) - self.assertNotEqual(list_snapshots, None, "Check if result exists in list item call") + self.assertNotEqual( + list_snapshots, + None, + "Check if result exists in list item call" + ) self.assertEqual( list_snapshots[0].id, snapshot.id, "Check resource id in list resources call" ) - self.debug("select backup_snap_id, account_id, volume_id, path from snapshots where id = %s;" % 6)#snapshot.id - qresultset = self.dbclient.execute("select backup_snap_id, account_id, volume_id, backup_snap_id from snapshots where id = %s;" % 6) # snapshot.id + self.debug( + "select backup_snap_id, account_id, volume_id, path from snapshots where id = %s;" \ + % snapshot.id + ) + qresultset = self.dbclient.execute( + "select backup_snap_id, account_id, volume_id, backup_snap_id from snapshots where id = %s;" \ + % snapshot.id + ) self.assertNotEqual( len(qresultset), 0, @@ -249,16 +332,20 @@ class TestSnapshots(cloudstackTestCase): ) # Login to management server to check snapshot present on sec disk - ssh_client = remoteSSHClient.remoteSSHClient( - self.services["mgmt_server_ip"], - self.services["ssh_port"], - self.services["username"], - self.services["password"] - ) - + ssh_client = self.virtual_machine_with_disk.get_ssh_client( + self.nat_rule.ipaddress + ) cmds = [ "mkdir -p %s" % self.services["mount_dir"], - "mount %s:/%s %s" % (self.services["sec_storage"], self.services["exportpath"], self.services["mount_dir"]), - "ls %s/snapshots/%s/%s" % (self.services["mount_dir"], qresult[1], qresult[2]), + "mount %s:/%s %s" % ( + self.services["sec_storage"], + self.services["exportpath"], + self.services["mount_dir"] + ), + "ls %s/snapshots/%s/%s" % ( + self.services["mount_dir"], + qresult[1], + qresult[2] + ), ] for c in cmds: result = ssh_client.execute(c) @@ -281,11 +368,19 @@ class TestSnapshots(cloudstackTestCase): random_data_0 = random_gen(100) random_data_1 = random_gen(100) - ssh_client = self.virtual_machine.get_ssh_client(self.nat_rule.ipaddress) + ssh_client = self.virtual_machine.get_ssh_client( + self.nat_rule.ipaddress + ) #Format partition using ext3 - format_volume_to_ext3(ssh_client, self.services["diskdevice"]) + format_volume_to_ext3( + ssh_client, + self.services["diskdevice"] + ) cmds = [ "mkdir -p %s" % self.services["mount_dir"], - "mount %s1 %s" % (self.services["diskdevice"], self.services["mount_dir"]), + "mount %s1 %s" % ( + self.services["diskdevice"], + self.services["mount_dir"] + ), "pushd %s" % self.services["mount_dir"], "mkdir -p %s/{%s,%s} " % ( self.services["sub_dir"], @@ -317,7 +412,11 @@ class TestSnapshots(cloudstackTestCase): snapshot = Snapshot.create(self.apiclient, volume.id) self.cleanup.append(snapshot) #Create volume from snapshot - volume = Volume.create_from_snapshot(self.apiclient, snapshot.id, self.services) + volume = Volume.create_from_snapshot( + self.apiclient, + snapshot.id, + self.services + ) self.cleanup.append(volume) cmd = listVolumes.listVolumesCmd() cmd.id = volume.id @@ -344,35 +443,37 @@ class TestSnapshots(cloudstackTestCase): ssh = new_virtual_machine.get_ssh_client(self.nat_rule.ipaddress) cmds = [ "mkdir %s" % self.services["mount_dir"], - "mount %s1 %s" % (self.services["diskdevice"], self.services["mount_dir"]) + "mount %s1 %s" % ( + self.services["diskdevice"], + self.services["mount_dir"] + ) ] for c in cmds: self.debug(ssh.execute(c)) returned_data_0 = ssh.execute("cat %s/%s/%s" % ( - self.services["sub_dir"], - self.services["sub_lvl_dir1"], - self.services["random_data"] - ) - ) + self.services["sub_dir"], + self.services["sub_lvl_dir1"], + self.services["random_data"] + )) returned_data_1 = ssh.execute("cat %s/%s/%s" % ( - self.services["sub_dir"], - self.services["sub_lvl_dir2"], - self.services["random_data"] + self.services["sub_dir"], + self.services["sub_lvl_dir2"], + self.services["random_data"] )) #Verify returned data self.assertEqual( - random_data_0, - returned_data_0[0], - "Verify newly attached volume contents with existing one" + random_data_0, + returned_data_0[0], + "Verify newly attached volume contents with existing one" ) self.assertEqual( - random_data_1, - returned_data_1[0], - "Verify newly attached volume contents with existing one" - ) + random_data_1, + returned_data_1[0], + "Verify newly attached volume contents with existing one" + ) #detach volume for cleanup cmd = detachVolume.detachVolumeCmd() @@ -398,7 +499,11 @@ class TestSnapshots(cloudstackTestCase): cmd = listSnapshots.listSnapshotsCmd() cmd.id = snapshot.id list_snapshots = self.apiclient.listSnapshots(cmd) - self.assertEqual(list_snapshots, None, "Check if result exists in list item call") + self.assertEqual( + list_snapshots, + None, + "Check if result exists in list item call" + ) return def test_05_recurring_snapshot_root_disk(self): @@ -412,7 +517,11 @@ class TestSnapshots(cloudstackTestCase): cmd.virtualmachineid = self.virtual_machine_with_disk.id cmd.type = 'ROOT' volume = self.apiclient.listVolumes(cmd) - recurring_snapshot = SnapshotPolicy.create(self.apiclient, volume[0].id, self.services["recurring_snapshot"]) + recurring_snapshot = SnapshotPolicy.create( + self.apiclient, + volume[0].id, + self.services["recurring_snapshot"] + ) self.cleanup.append(recurring_snapshot) #ListSnapshotPolicy should return newly created policy cmd = listSnapshotPolicies.listSnapshotPoliciesCmd() @@ -420,20 +529,27 @@ class TestSnapshots(cloudstackTestCase): cmd.volumeid = volume[0].id list_snapshots_policy = self.apiclient.listSnapshotPolicies(cmd) - self.assertNotEqual(list_snapshots_policy, None, "Check if result exists in list item call") + self.assertNotEqual( + list_snapshots_policy, + None, + "Check if result exists in list item call" + ) snapshots_policy = list_snapshots_policy[0] self.assertEqual( - snapshots_policy.id, - recurring_snapshot.id, - "Check recurring snapshot id in list resources call" + snapshots_policy.id, + recurring_snapshot.id, + "Check recurring snapshot id in list resources call" ) self.assertEqual( - snapshots_policy.maxsnaps, - self.services["recurring_snapshot"]["maxsnaps"], - "Check interval type in list resources call" + snapshots_policy.maxsnaps, + self.services["recurring_snapshot"]["maxsnaps"], + "Check interval type in list resources call" ) - #Sleep for (maxsnaps+1) hours to verify only maxsnaps snapshots are retained - time.sleep(((self.services["recurring_snapshot"]["maxsnaps"]) + 1) * 3600) + # Sleep for (maxsnaps+1) hours to verify + # only maxsnaps snapshots are retained + time.sleep( + ((self.services["recurring_snapshot"]["maxsnaps"]) + 1) * 3600 + ) cmd = listSnapshots.listSnapshotsCmd() cmd.volumeid = volume.id cmd.intervaltype = self.services["recurring_snapshot"]["intervaltype"] @@ -457,7 +573,11 @@ class TestSnapshots(cloudstackTestCase): cmd.virtualmachineid = self.virtual_machine_with_disk.id cmd.type = 'DATADISK' volume = self.apiclient.listVolumes(cmd) - recurring_snapshot = SnapshotPolicy.create(self.apiclient, volume[0].id, self.services["recurring_snapshot"]) + recurring_snapshot = SnapshotPolicy.create( + self.apiclient, + volume[0].id, + self.services["recurring_snapshot"] + ) self.cleanup.append(recurring_snapshot) #ListSnapshotPolicy should return newly created policy cmd = listSnapshotPolicies.listSnapshotPoliciesCmd() @@ -465,12 +585,16 @@ class TestSnapshots(cloudstackTestCase): cmd.volumeid = volume[0].id list_snapshots_policy = self.apiclient.listSnapshotPolicies(cmd) - self.assertNotEqual(list_snapshots_policy, None, "Check if result exists in list item call") + self.assertNotEqual( + list_snapshots_policy, + None, + "Check if result exists in list item call" + ) snapshots_policy = list_snapshots_policy[0] self.assertEqual( - snapshots_policy.id, - recurring_snapshot.id, - "Check recurring snapshot id in list resources call" + snapshots_policy.id, + recurring_snapshot.id, + "Check recurring snapshot id in list resources call" ) self.assertEqual( snapshots_policy.maxsnaps, @@ -478,8 +602,11 @@ class TestSnapshots(cloudstackTestCase): "Check interval type in list resources call" ) - #Sleep for (maxsnaps+1) hours to verify only maxsnaps snapshots are retained - time.sleep(((self.services["recurring_snapshot"]["maxsnaps"]) + 1) * 3600) + # Sleep for (maxsnaps+1) hours to + # verify only maxsnaps snapshots are retained + time.sleep( + ((self.services["recurring_snapshot"]["maxsnaps"]) + 1) * 3600 + ) cmd = listSnapshots.listSnapshotsCmd() cmd.volumeid = volume.id @@ -510,10 +637,15 @@ class TestSnapshots(cloudstackTestCase): random_data_1 = random_gen(100) #Login to virtual machine - ssh_client = self.virtual_machine.get_ssh_client(self.nat_rule.ipaddress) + ssh_client = self.virtual_machine.get_ssh_client( + self.nat_rule.ipaddress + ) cmds = [ "mkdir -p %s" % self.services["mount_dir"], - "mount %s1 %s" % (self.services["diskdevice"], self.services["mount_dir"]), + "mount %s1 %s" % ( + self.services["diskdevice"], + self.services["mount_dir"] + ), "pushd %s" % self.services["mount_dir"], "mkdir -p %s/{%s,%s} " % ( self.services["sub_dir"], @@ -546,14 +678,22 @@ class TestSnapshots(cloudstackTestCase): self.cleanup.append(snapshot) # Generate template from the snapshot - template = Template.create_from_snapshot(self.apiclient, snapshot, self.services["templates"]) + template = Template.create_from_snapshot( + self.apiclient, + snapshot, + self.services["templates"] + ) cmd = listTemplates.listTemplatesCmd() cmd.templatefilter = self.services["templates"]["templatefilter"] cmd.id = template.id list_templates = self.apiclient.listTemplates(cmd) - self.assertNotEqual(list_templates, None, "Check if result exists in list item call") + self.assertNotEqual( + list_templates, + None, + "Check if result exists in list item call" + ) self.assertEqual( list_templates[0].id, @@ -563,34 +703,46 @@ class TestSnapshots(cloudstackTestCase): # Deploy new virtual machine using template new_virtual_machine = VirtualMachine.create( - self.apiclient, - self.services["server_without_disk"], - templateid = template.id, - serviceofferingid = self.service_offering.id - ) + self.apiclient, + self.services["server_without_disk"], + templateid=template.id, + accountid=self.account.account.name, + serviceofferingid=self.service_offering.id + ) self.cleanup.append(new_virtual_machine) #Login to VM & mount directory ssh = new_virtual_machine.get_ssh_client(self.nat_rule.ipaddress) cmds = [ "mkdir %s" % self.services["mount_dir"], - "mount %s1 %s" % (self.services["diskdevice"], self.services["mount_dir"]) + "mount %s1 %s" % ( + self.services["diskdevice"], + self.services["mount_dir"] + ) ] for c in cmds: ssh.execute(c) returned_data_0 = ssh.execute("cat %s/%s/%s" % ( - self.services["sub_dir"], - self.services["sub_lvl_dir1"], - self.services["random_data"] + self.services["sub_dir"], + self.services["sub_lvl_dir1"], + self.services["random_data"] )) returned_data_1 = ssh.execute("cat %s/%s/%s" % ( - self.services["sub_dir"], - self.services["sub_lvl_dir2"], - self.services["random_data"] + self.services["sub_dir"], + self.services["sub_lvl_dir2"], + self.services["random_data"] )) #Verify returned data - self.assertEqual(random_data_0, returned_data_0[0], "Verify newly attached volume contents with existing one") - self.assertEqual(random_data_1, returned_data_1[0], "Verify newly attached volume contents with existing one") + self.assertEqual( + random_data_0, + returned_data_0[0], + "Verify newly attached volume contents with existing one" + ) + self.assertEqual( + random_data_1, + returned_data_1[0], + "Verify newly attached volume contents with existing one" + ) return diff --git a/tools/testClient/testcase/BVT-tests/test_ssvm.py b/tools/testClient/testcase/BVT-tests/test_ssvm.py index b28607dbe27..bce03419e4f 100644 --- a/tools/testClient/testcase/BVT-tests/test_ssvm.py +++ b/tools/testClient/testcase/BVT-tests/test_ssvm.py @@ -23,12 +23,10 @@ class Services: self.services = { "ssvm": { "id": 1, # SSVM ID in particular zone - "zoneid": 1, }, "cpvm": { "id": 2, # CPVM ID in particular zone - "zoneid": 1, - "mgmtserverIP": '192.168.100.154' # For Telnet + "mgmtserverIP": '192.168.100.154' # For Telnet }, "host": { "username": 'root', # Credentials for SSH @@ -44,6 +42,9 @@ class TestSSVMs(cloudstackTestCase): self.apiclient = self.testClient.getApiClient() self.cleanup = [] self.services = Services().services + self.zone = get_zone(self.apiclient) + self.services["ssvm"]["zoneid"] = self.zone.id + self.services["cpvm"]["zoneid"] = self.zone.id return def tearDown(self): @@ -60,10 +61,13 @@ class TestSSVMs(cloudstackTestCase): """ # Validate the following: - # 1. listSystemVM (systemvmtype=secondarystoragevm) should return only ONE SSVM per zone + # 1. listSystemVM (systemvmtype=secondarystoragevm) + # should return only ONE SSVM per zone # 2. The returned SSVM should be in Running state - # 3. listSystemVM for secondarystoragevm should list publicip, privateip and link-localip - # 4. The gateway programmed on the ssvm by listSystemVm should be the same as the gateway returned by listVlanIpRanges + # 3. listSystemVM for secondarystoragevm should list publicip, + # privateip and link-localip + # 4. The gateway programmed on the ssvm by listSystemVm should be + # the same as the gateway returned by listVlanIpRanges # 5. DNS entries must match those given for the zone cmd = listSystemVms.listSystemVmsCmd() @@ -85,7 +89,8 @@ class TestSSVMs(cloudstackTestCase): len(list_zones_response), "Check number of SSVMs with number of zones" ) - #For each secondary storage VM check private IP, public IP, link local IP and DNS + #For each secondary storage VM check private IP, + #public IP, link local IP and DNS for ssvm in list_ssvm_response: self.assertEqual( @@ -146,10 +151,13 @@ class TestSSVMs(cloudstackTestCase): """ # Validate the following: - # 1. listSystemVM (systemvmtype=consoleproxy) should return at least ONE CPVM per zone + # 1. listSystemVM (systemvmtype=consoleproxy) should return + # at least ONE CPVM per zone # 2. The returned ConsoleProxyVM should be in Running state - # 3. listSystemVM for console proxy should list publicip, privateip and link-localip - # 4. The gateway programmed on the console proxy should be the same as the gateway returned by listZones + # 3. listSystemVM for console proxy should list publicip, privateip + # and link-localip + # 4. The gateway programmed on the console proxy should be the same + # as the gateway returned by listZones # 5. DNS entries must match those given for the zone cmd = listSystemVms.listSystemVmsCmd() @@ -230,10 +238,13 @@ class TestSSVMs(cloudstackTestCase): """Test SSVM Internals""" # Validate the following - # 1. The SSVM check script should not return any WARN|ERROR|FAIL messages - # 2. If you are unable to login to the SSVM with the signed key then test is deemed a failure + # 1. The SSVM check script should not return any + # WARN|ERROR|FAIL messages + # 2. If you are unable to login to the SSVM with the signed key + # then test is deemed a failure # 3. There should be only one ""cloud"" process running within the SSVM - # 4. If no process is running/multiple process are running then the test is a failure + # 4. If no process is running/multiple process are running + # then the test is a failure cmd = listHosts.listHostsCmd() cmd.zoneid = self.services["ssvm"]["zoneid"] @@ -249,14 +260,15 @@ class TestSSVMs(cloudstackTestCase): self.debug("Cheking cloud process status") #SSH to the machine ssh = remoteSSHClient.remoteSSHClient( - host.ipaddress, - self.services['host']["publicport"], - self.services['host']["username"], - self.services['host']["password"] - ) + host.ipaddress, + self.services['host']["publicport"], + self.services['host']["username"], + self.services['host']["password"] + ) timeout = 5 - ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s " % ssvm.linklocalip + ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s " \ + % ssvm.linklocalip c = ssh_command + "/usr/local/cloud/systemvm/ssvm-check.sh |grep -e ERROR -e WARNING -e FAIL" # Ensure the SSH login is successful @@ -304,9 +316,11 @@ class TestSSVMs(cloudstackTestCase): """Test CPVM Internals""" # Validate the following - # 1. test that telnet access on 8250 is available to the management server for the CPVM + # 1. test that telnet access on 8250 is available to + # the management server for the CPVM # 2. No telnet access, test FAIL - # 3. Service cloud status should report cloud agent status to be running + # 3. Service cloud status should report cloud agent status to be + # running cmd = listHosts.listHostsCmd() cmd.zoneid = self.services["cpvm"]["zoneid"] @@ -320,24 +334,29 @@ class TestSSVMs(cloudstackTestCase): cpvm = self.apiclient.listSystemVms(cmd)[0] with assertNotRaises(Exception): - telnet = telnetlib.Telnet(self.services["cpvm"]["mgmtserverIP"] , '8250') + telnet = telnetlib.Telnet( + self.services["cpvm"]["mgmtserverIP"] , + '8250' + ) self.debug("Cheking cloud process status") #SSH to the machine ssh = remoteSSHClient.remoteSSHClient( - host.ipaddress, - self.services['host']["publicport"], - self.services['host']["username"], - self.services['host']["password"] - ) + host.ipaddress, + self.services['host']["publicport"], + self.services['host']["username"], + self.services['host']["password"] + ) timeout = 5 # Check status of cloud service - ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s " % cpvm.linklocalip + ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s " \ + % cpvm.linklocalip c = ssh_command + "service cloud status" while True: res = ssh.execute(c)[0] - # Response: cloud.com service (type=secstorage) is running: process id: 2346 + # Response: cloud.com service (type=secstorage) is + # running: process id: 2346 #Check if double hop is successful if res != "Host key verification failed.": @@ -360,7 +379,9 @@ class TestSSVMs(cloudstackTestCase): # Validate the following # 1. The SSVM should go to stop state - # 2. After a brief delay of say one minute, the SSVM should be restarted once again and return to Running state with previous two test cases still passing + # 2. After a brief delay of say one minute, the SSVM should be + # restarted once again and return to Running state with previous two + # test cases still passing # 3. If either of the two above steps fail the test is a failure cmd = stopSystemVm.stopSystemVmCmd() @@ -391,7 +412,9 @@ class TestSSVMs(cloudstackTestCase): # Validate the following # 1. The CPVM should go to stop state - # 2. After a brief delay of say one minute, the SSVM should be restarted once again and return to Running state with previous two test cases still passing + # 2. After a brief delay of say one minute, the SSVM should be + # restarted once again and return to Running state with previous + # two test cases still passing # 3. If either of the two above steps fail the test is a failure cmd = stopSystemVm.stopSystemVmCmd() @@ -430,7 +453,8 @@ class TestSSVMs(cloudstackTestCase): """ # Validate the following # 1. The SSVM should go to stop and return to Running state - # 2. SSVM's public-ip and private-ip must remain the same before and after reboot + # 2. SSVM's public-ip and private-ip must remain the same + # before and after reboot # 3. The cloud process should still be running within the SSVM cmd = listSystemVms.listSystemVmsCmd() @@ -459,16 +483,16 @@ class TestSSVMs(cloudstackTestCase): ) self.assertEqual( - ssvm_response.publicip, - old_public_ip, - "Check Public IP after reboot with that of before reboot" - ) + ssvm_response.publicip, + old_public_ip, + "Check Public IP after reboot with that of before reboot" + ) self.assertEqual( - ssvm_response.privateip, - old_private_ip, - "Check Private IP after reboot with that of before reboot" - ) + ssvm_response.privateip, + old_private_ip, + "Check Private IP after reboot with that of before reboot" + ) #Call to verify cloud process is running self.test_03_ssvm_internals() return @@ -478,7 +502,8 @@ class TestSSVMs(cloudstackTestCase): """ # Validate the following # 1. The CPVM should go to stop and return to Running state - # 2. CPVM's public-ip and private-ip must remain the same before and after reboot + # 2. CPVM's public-ip and private-ip must remain + # the same before and after reboot # 3. the cloud process should still be running within the CPVM cmd = listSystemVms.listSystemVmsCmd() @@ -508,16 +533,16 @@ class TestSSVMs(cloudstackTestCase): ) self.assertEqual( - cpvm_response.publicip, - old_public_ip, - "Check Public IP after reboot with that of before reboot" - ) + cpvm_response.publicip, + old_public_ip, + "Check Public IP after reboot with that of before reboot" + ) self.assertEqual( - cpvm_response.privateip, - old_private_ip, - "Check Private IP after reboot with that of before reboot" - ) + cpvm_response.privateip, + old_private_ip, + "Check Private IP after reboot with that of before reboot" + ) #Call to verify cloud process is running self.test_04_cpvm_internals() return @@ -528,7 +553,8 @@ class TestSSVMs(cloudstackTestCase): # Validate the following # 1. SSVM should be completely destroyed and a new one will spin up - # 2. listSystemVMs will show a different name for the systemVM from what it was before + # 2. listSystemVMs will show a different name for the + # systemVM from what it was before # 3. new SSVM will have a public/private and link-local-ip # 4. cloud process within SSVM must be up and running @@ -551,7 +577,8 @@ class TestSSVMs(cloudstackTestCase): cmd.systemvmtype = 'secondarystoragevm' ssvm_response = self.apiclient.listSystemVms(cmd)[0] - # Verify Name, Public IP, Private IP and Link local IP for newly created SSVM + # Verify Name, Public IP, Private IP and Link local IP + # for newly created SSVM self.assertNotEqual( ssvm_response.name, old_name, @@ -584,7 +611,8 @@ class TestSSVMs(cloudstackTestCase): # Validate the following # 1. CPVM should be completely destroyed and a new one will spin up - # 2. listSystemVMs will show a different name for the systemVM from what it was before + # 2. listSystemVMs will show a different name for the systemVM from + # what it was before # 3. new CPVM will have a public/private and link-local-ip # 4. cloud process within CPVM must be up and running @@ -607,7 +635,8 @@ class TestSSVMs(cloudstackTestCase): cmd.systemvmtype = 'consoleproxy' cpvm_response = self.apiclient.listSystemVms(cmd)[0] - # Verify Name, Public IP, Private IP and Link local IP for newly created CPVM + # Verify Name, Public IP, Private IP and Link local IP + # for newly created CPVM self.assertNotEqual( cpvm_response.name, old_name, diff --git a/tools/testClient/testcase/BVT-tests/test_templates.py b/tools/testClient/testcase/BVT-tests/test_templates.py index c4708b315ad..3c9a673c03b 100644 --- a/tools/testClient/testcase/BVT-tests/test_templates.py +++ b/tools/testClient/testcase/BVT-tests/test_templates.py @@ -14,71 +14,70 @@ from random import random #Import System modules import time + class Services: """Test Templates Services """ def __init__(self): self.services = { - "service_offering": { - "name": "Tiny Service Offering", - "displaytext": "Tiny service offering", + "account": { + "email": "test@test.com", + "firstname": "Test", + "lastname": "User", + "username": "test", + # Random characters are appended for unique + # username + "password": "fr3sca", + }, + "service_offering": { + "name": "Tiny Instance", + "displaytext": "Tiny Instance", "cpunumber": 1, - "cpuspeed": 100, # in MHz - "memory": 64, # In MBs - }, + "cpuspeed": 200, # in MHz + "memory": 256, # In MBs + }, + "disk_offering": { + "displaytext": "Small", + "name": "Small", + "disksize": 1 + }, + "virtual_machine": { + "displayname": "testVM", + "hypervisor": 'XenServer', + "domainid": 1, + "protocol": 'TCP', + "ssh_port": 22, + "username": "root", + "password": "password" + }, + "volume": { + "diskname": "Test Volume", + }, + "template_1": { + "displaytext": "Cent OS Template", + "name": "Cent OS Template", + "ostypeid": 12, + }, + "template_2": { + "displaytext": "Public Template", + "name": "Public template", + "ostypeid": 12, + "isfeatured": True, + "ispublic": True, + "isextractable": True, + "mode": "HTTP_DOWNLOAD", + }, + "templatefilter": 'self', + "destzoneid": 2, # For Copy template (Destination zone) + "isfeatured": True, + "ispublic": True, + "isextractable": False, + "bootable": True, + "passwordenabled": True, + "ostypeid": 12, + } - "virtual_machine": - { - "template": 256, # Template used for VM creation - "zoneid": 1, - "serviceoffering": 1, - "displayname": "testVM", - "hypervisor": 'XenServer', - "account": 'testuser', # Account for which VM should be created - "domainid": 1, - "protocol": 'TCP', - "ssh_port": 22, - "username" : "root", - "password": "password" - }, - "volume": - { - "offerings": 1, - "volumeoffering": 3, - "diskname": "TestVolumeTemplate", - "zoneid": 1, - "diskofferingid": 3, - }, - "template_1": - { - "displaytext": "Test Template Type 1", - "name": "testTemplate", - "ostypeid": 12, - }, - "template_2": - { - "displaytext": "Test Template Type 2", - "name": "testTemplate", - "ostypeid": 12, - "isfeatured": True, - "ispublic": True, - "isextractable": True, - "mode": "HTTP_DOWNLOAD", - "zoneid": 1, - }, - "templatefilter": 'self', - "destzoneid": 2, # For Copy template (Destination zone) - "sourcezoneid": 1, # For Copy template (Source zone) - "isfeatured": True, - "ispublic": True, - "isextractable": False, - "bootable": True, - "passwordenabled": True, - "ostypeid": 15, - "account": 'testuser', # Normal user - "domainid": 1, - } class TestCreateTemplate(cloudstackTestCase): @@ -103,13 +102,42 @@ class TestCreateTemplate(cloudstackTestCase): def setUpClass(cls): cls.services = Services().services cls.api_client = fetch_api_client() - cls.service_offering = ServiceOffering.create(cls.api_client, cls.services["service_offering"]) + + # Get Zone, Domain and templates + cls.zone = get_zone(cls.api_client) + cls.disk_offering = DiskOffering.create( + cls.api_client, + cls.services["disk_offering"] + ) + template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostypeid"] + ) + cls.services["virtual_machine"]["zoneid"] = cls.zone.id + cls.services["volume"]["diskoffering"] = cls.disk_offering.id + cls.services["volume"]["zoneid"] = cls.zone.id + cls.services["sourcezoneid"] = cls.zone.id + + cls.account = Account.create( + cls.api_client, + cls.services["account"], + admin=True + ) + cls.services["account"] = cls.account.account.name + + cls.service_offering = ServiceOffering.create( + cls.api_client, + cls.services["service_offering"] + ) #create virtual machine cls.virtual_machine = VirtualMachine.create( - cls.api_client, - cls.services["virtual_machine"], - serviceofferingid = cls.service_offering.id - ) + cls.api_client, + cls.services["virtual_machine"], + templateid=template.id, + accountid=cls.account.account.name, + serviceofferingid=cls.service_offering.id + ) #Stop virtual machine cmd = stopVirtualMachine.stopVirtualMachineCmd() @@ -123,7 +151,12 @@ class TestCreateTemplate(cloudstackTestCase): cmd.type = 'ROOT' list_volume = cls.api_client.listVolumes(cmd) cls.volume = list_volume[0] - cls._cleanup = [cls.virtual_machine, cls.service_offering] + cls._cleanup = [ + cls.virtual_machine, + cls.service_offering, + cls.account, + cls.disk_offering, + ] return @classmethod @@ -143,12 +176,17 @@ class TestCreateTemplate(cloudstackTestCase): """ # Validate the following: - # 1. database (vm_template table) should be updated with newly created template + # 1. database (vm_template table) should be updated + # with newly created template # 2. UI should show the newly added template # 3. ListTemplates API should show the newly added template #Create template from Virtual machine and Volume ID - template = Template.create(self.apiclient, self.volume, self.services["template_1"]) + template = Template.create( + self.apiclient, + self.volume, + self.services["template_1"] + ) self.cleanup.append(template) cmd = listTemplates.listTemplatesCmd() @@ -156,7 +194,7 @@ class TestCreateTemplate(cloudstackTestCase): cmd.id = template.id list_template_response = self.apiclient.listTemplates(cmd) - #Verify template response to check whether template added successfully or not + #Verify template response to check whether template added successfully template_response = list_template_response[0] self.assertNotEqual( len(list_template_response), @@ -180,46 +218,9 @@ class TestCreateTemplate(cloudstackTestCase): self.services["template_1"]["ostypeid"], "Check osTypeID of newly created template" ) - - #Verify the database entry for template - self.debug( - "select name, display_text, guest_os_id from vm_template where id = %s;" - % template.id - ) - - qresultset = self.dbclient.execute( - "select name, display_text, guest_os_id from vm_template where id = %s;" - % template.id - ) - - self.assertNotEqual( - len(qresultset), - 0, - "Check DB Query result set" - ) - - qresult = qresultset[0] - - name = qresult[0] - self.assertEqual( - name.count(self.services["template_1"]["name"]), - 1, - "Compare template name with database record" - ) - - self.assertEqual( - qresult[1], - self.services["template_1"]["displaytext"], - "Compare template display text with database record" - ) - - self.assertEqual( - qresult[2], - self.services["template_1"]["ostypeid"], - "Compare template osTypeID with database record" - ) return + class TestTemplates(cloudstackTestCase): @classmethod @@ -227,14 +228,49 @@ class TestTemplates(cloudstackTestCase): cls.services = Services().services cls.api_client = fetch_api_client() - cls.service_offering = ServiceOffering.create(cls.api_client, cls.services["service_offering"]) - #create virtual machines - cls.virtual_machine = VirtualMachine.create( - cls.api_client, - cls.services["virtual_machine"], - serviceofferingid = cls.service_offering.id - ) + # Get Zone, Domain and templates + cls.zone = get_zone(cls.api_client) + cls.disk_offering = DiskOffering.create( + cls.api_client, + cls.services["disk_offering"] + ) + template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostypeid"] + ) + cls.services["virtual_machine"]["zoneid"] = cls.zone.id + cls.services["volume"]["diskoffering"] = cls.disk_offering.id + cls.services["volume"]["zoneid"] = cls.zone.id + cls.services["template_2"]["zoneid"] = cls.zone.id + cls.services["sourcezoneid"] = cls.zone.id + + cls.account = Account.create( + cls.api_client, + cls.services["account"], + admin=True + ) + + cls.user = Account.create( + cls.api_client, + cls.services["account"] + ) + + cls.services["account"] = cls.account.account.name + + cls.service_offering = ServiceOffering.create( + cls.api_client, + cls.services["service_offering"] + ) + #create virtual machine + cls.virtual_machine = VirtualMachine.create( + cls.api_client, + cls.services["virtual_machine"], + templateid=template.id, + accountid=cls.account.account.name, + serviceofferingid=cls.service_offering.id + ) #Stop virtual machine cmd = stopVirtualMachine.stopVirtualMachineCmd() cmd.id = cls.virtual_machine.id @@ -249,9 +285,24 @@ class TestTemplates(cloudstackTestCase): cls.volume = list_volume[0] #Create templates for Edit, Delete & update permissions testcases - cls.template_1 = Template.create(cls.api_client, cls.volume, cls.services["template_1"]) - cls.template_2 = Template.create(cls.api_client, cls.volume, cls.services["template_2"]) - cls._cleanup = [cls.template_2, cls.virtual_machine, cls.service_offering] + cls.template_1 = Template.create( + cls.api_client, + cls.volume, + cls.services["template_1"] + ) + cls.template_2 = Template.create( + cls.api_client, + cls.volume, + cls.services["template_2"] + ) + cls._cleanup = [ + cls.template_2, + cls.virtual_machine, + cls.service_offering, + cls.disk_offering, + cls.account, + cls.user + ] @classmethod def tearDownClass(cls): @@ -340,51 +391,6 @@ class TestTemplates(cloudstackTestCase): self.services["ostypeid"], "Check OSTypeID of updated template" ) - # Verify database entry for updated template attributes - self.debug( - "select name, display_text, bootable, enable_password, guest_os_id from vm_template where id = %s;" - % self.template_1.id - ) - - qresultset = self.dbclient.execute( - "select name, display_text, bootable, enable_password, guest_os_id from vm_template where id = %s;" - % self.template_1.id - ) - - self.assertNotEqual( - len(qresultset), - 0, - "Check DB Query result set" - ) - - qresult = qresultset[0] - - self.assertEqual( - qresult[0], - new_name, - "Compare template name with database record" - ) - - self.assertEqual( - qresult[1], - new_displayText, - "Compare template bootable field with database record" - ) - self.assertEqual( - qresult[2], - int(self.services["bootable"]), - "Compare template enable_password field with database record" - ) - self.assertEqual( - qresult[3], - int(self.services["passwordenabled"]), - "Compare template display text with database record" - ) - self.assertEqual( - qresult[4], - self.services["ostypeid"], - "Compare template guest OS ID with database record" - ) return def test_03_delete_template(self): @@ -403,25 +409,11 @@ class TestTemplates(cloudstackTestCase): list_template_response = self.apiclient.listTemplates(cmd) # Verify template is deleted properly using ListTemplates - self.assertEqual(list_template_response, None, "Check if template exists in List Templates") - - # Verify database entry is removed for deleted template - self.debug( - "select name, display_text from vm_template where id = %s;" - % self.template_1.id - ) - - qresultset = self.dbclient.execute( - "select name, display_text from vm_template where id = %s;" - % self.template_1.id - ) - self.assertEqual( - len(qresultset), - 1, - "Check DB Query result set" - ) - + list_template_response, + None, + "Check if template exists in List Templates" + ) return def test_04_extract_template(self): @@ -429,13 +421,14 @@ class TestTemplates(cloudstackTestCase): # Validate the following # 1. Admin should able extract and download the templates - # 2. ListTemplates should display all the public templates for all kind of users + # 2. ListTemplates should display all the public templates + # for all kind of users # 3 .ListTemplates should not display the system templates cmd = extractTemplate.extractTemplateCmd() cmd.id = self.template_2.id cmd.mode = self.services["template_2"]["mode"] - cmd.zoneid = self.services["template_2"]["zoneid"] + cmd.zoneid = self.zone.id list_extract_response = self.apiclient.extractTemplate(cmd) # Format URL to ASCII to retrieve response code @@ -469,8 +462,10 @@ class TestTemplates(cloudstackTestCase): """Update & Test for template permissions""" # Validate the following - # 1. listTemplatePermissions returns valid permissions set for template - # 2. permission changes should be reflected in vm_template table in database + # 1. listTemplatePermissions returns valid + # permissions set for template + # 2. permission changes should be reflected in vm_template + # table in database cmd = updateTemplatePermissions.updateTemplatePermissionsCmd() # Update template permissions @@ -483,8 +478,8 @@ class TestTemplates(cloudstackTestCase): cmd = listTemplates.listTemplatesCmd() cmd.id = self.template_2.id - cmd.account = self.services["account"] - cmd.domainid = self.services["domainid"] + cmd.account = self.account.account.name + cmd.domainid = self.account.account.domainid cmd.templatefilter = 'featured' list_template_response = self.apiclient.listTemplates(cmd) @@ -503,44 +498,9 @@ class TestTemplates(cloudstackTestCase): ) self.assertNotEqual( - template_response.templatetype, - 'SYSTEM', - "ListTemplates should not list any system templates" - ) - - # Verify database entries for updated permissions - self.debug( - "select public, featured, extractable from vm_template where id = %s;" - % self.template_2.id - ) - qresultset = self.dbclient.execute( - "select public, featured, extractable from vm_template where id = %s;" - % self.template_2.id - ) - - self.assertNotEqual( - len(qresultset), - 0, - "Check DB Query result set" - ) - - qresult = qresultset[0] - - self.assertEqual( - qresult[0], - int(self.services["ispublic"]), - "Compare public permission with database record" - ) - - self.assertEqual( - qresult[1], - int(self.services["isfeatured"]), - "Compare featured permission with database record" - ) - self.assertEqual( - qresult[2], - int(self.services["isextractable"]), - "Compare extractable permission with database record" + template_response.templatetype, + 'SYSTEM', + "ListTemplates should not list any system templates" ) return @@ -548,7 +508,8 @@ class TestTemplates(cloudstackTestCase): """Test for copy template from one zone to another""" # Validate the following - # 1. copy template should be successful and secondary storage should contain new copied template. + # 1. copy template should be successful and + # secondary storage should contain new copied template. cmd = copyTemplate.copyTemplateCmd() cmd.id = self.template_2.id @@ -588,8 +549,8 @@ class TestTemplates(cloudstackTestCase): cmd = listTemplates.listTemplatesCmd() cmd.templatefilter = 'featured' - cmd.account = self.services["account"] - cmd.domainid = self.services["domainid"] + cmd.account = self.user.account.name + cmd.domainid = self.user.account.domainid list_template_response = self.apiclient.listTemplates(cmd) @@ -600,7 +561,7 @@ class TestTemplates(cloudstackTestCase): ) #Template response should list all 'public' templates for template in list_template_response: - self.assertEqual( + self.assertEqual( template.ispublic, True, "ListTemplates should list only public templates" @@ -615,8 +576,8 @@ class TestTemplates(cloudstackTestCase): cmd = listTemplates.listTemplatesCmd() cmd.templatefilter = 'featured' - cmd.account = self.services["account"] - cmd.domainid = self.services["domainid"] + cmd.account = self.user.account.name + cmd.domainid = self.user.account.domainid list_template_response = self.apiclient.listTemplates(cmd) @@ -627,9 +588,9 @@ class TestTemplates(cloudstackTestCase): ) for template in list_template_response: - self.assertNotEqual( - template.templatetype, - 'SYSTEM', - "ListTemplates should not list any system templates" + self.assertNotEqual( + template.templatetype, + 'SYSTEM', + "ListTemplates should not list any system templates" ) return diff --git a/tools/testClient/testcase/BVT-tests/test_vm_life_cycle.py b/tools/testClient/testcase/BVT-tests/test_vm_life_cycle.py index b24f6cfcb90..6aadc866862 100644 --- a/tools/testClient/testcase/BVT-tests/test_vm_life_cycle.py +++ b/tools/testClient/testcase/BVT-tests/test_vm_life_cycle.py @@ -271,14 +271,14 @@ class TestVMLifeCycle(cloudstackTestCase): ) cls.public_ip = PublicIPAddress.create( cls.api_client, - cls.small_virtual_machine.account, - cls.small_virtual_machine.zoneid, - cls.small_virtual_machine.domainid, + cls.virtual_machine.account, + cls.virtual_machine.zoneid, + cls.virtual_machine.domainid, cls.services["small"] ) cls.nat_rule = NATRule.create( cls.api_client, - cls.small_virtual_machine, + cls.virtual_machine, cls.services["small"], ipaddressid=cls.public_ip.ipaddress.id ) @@ -717,11 +717,17 @@ class TestVMLifeCycle(cloudstackTestCase): c = "fdisk -l|grep %s|head -1" % self.services["diskdevice"] res = ssh_client.execute(c) #Disk /dev/xvdd: 4393 MB, 4393723904 bytes - actual_disk_size = res[0].split()[4] + + # Res may contain more than one strings depending on environment + # Split res with space as delimiter to form new list (result) + result = [] + for i in res: + for k in i.split(): + result.append(k) self.assertEqual( - str(iso.size), - actual_disk_size, + str(iso.size) in result, + True, "Check size of the attached ISO" ) diff --git a/tools/testClient/testcase/BVT-tests/test_volumes.py b/tools/testClient/testcase/BVT-tests/test_volumes.py index 936c93b7d2b..e60452c55fe 100644 --- a/tools/testClient/testcase/BVT-tests/test_volumes.py +++ b/tools/testClient/testcase/BVT-tests/test_volumes.py @@ -12,79 +12,104 @@ from base import * import remoteSSHClient #Import System modules import os -import urllib2 +import urllib import time import tempfile + class Services: """Test Volume Services """ def __init__(self): self.services = { + "account": { + "email": "test@test.com", + "firstname": "Test", + "lastname": "User", + "username": "test", + # Random characters are appended for unique + # username + "password": "fr3sca", + }, "service_offering": { - "name": "Tiny Service Offering", - "displaytext": "Tiny service offering", + "name": "Tiny Instance", + "displaytext": "Tiny Instance", "cpunumber": 1, - "cpuspeed": 100, # in MHz - "memory": 64, # In MBs - }, + "cpuspeed": 200, # in MHz + "memory": 256, # In MBs + }, + "disk_offering": { + "displaytext": "Small", + "name": "Small", + "disksize": 1 + }, "volume_offerings": { 0: { - "offerings": 1, - "volumeoffering": 3, "diskname": "TestDiskServ", - "zoneid": 1, - "diskofferingid": 3, - "account": 'testuser', # Account for which volume offering is created - "domainid": 1, - }, - 1: { - "offerings": 1, - "volumeoffering": 4, - "diskname": "TestDiskServ", - "zoneid": 1, - "diskofferingid": 3, - "account": 'testuser', - "domainid": 1, - }, - 2: { - "offerings": 1, - "volumeoffering": 5, - "diskname": "TestDiskServ", - "zoneid": 1, - "diskofferingid": 3, - "account": 'testuser', "domainid": 1, }, }, - "customdiskofferingid": 52, #Custom disk offering should be available - "customdisksize": 2, # GBs - "volumeoffering": 3, - "serviceoffering": 1, - "template": 256, - "zoneid": 1, + "customdisksize": 1, # GBs "username": "root", # Creds for SSH to VM "password": "password", "ssh_port": 22, "diskname": "TestDiskServ", "hypervisor": 'XenServer', - "account": 'testuser', # Account for VM instance "domainid": 1, "privateport": 22, "publicport": 22, "protocol": 'TCP', "diskdevice": "/dev/sda", + "ostypeid": 12, } + class TestCreateVolume(cloudstackTestCase): @classmethod def setUpClass(cls): cls.api_client = fetch_api_client() cls.services = Services().services - cls.service_offering = ServiceOffering.create(cls.api_client, cls.services["service_offering"]) - cls.virtual_machine = VirtualMachine.create(cls.api_client, cls.services, serviceofferingid = cls.service_offering.id) + + # Get Zone, Domain and templates + cls.zone = get_zone(cls.api_client) + cls.disk_offering = DiskOffering.create( + cls.api_client, + cls.services["disk_offering"] + ) + cls.custom_disk_offering = DiskOffering.create( + cls.api_client, + cls.services["disk_offering"], + custom=True + ) + template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostypeid"] + ) + cls.services["zoneid"] = cls.zone.id + cls.services["template"] = template.id + cls.services["customdiskofferingid"] = cls.custom_disk_offering.id + + # Create VMs, NAT Rules etc + cls.account = Account.create( + cls.api_client, + cls.services["account"], + admin=True + ) + + cls.services["account"] = cls.account.account.name + cls.service_offering = ServiceOffering.create( + cls.api_client, + cls.services["service_offering"] + ) + cls.virtual_machine = VirtualMachine.create( + cls.api_client, + cls.services, + accountid=cls.account.account.name, + serviceofferingid=cls.service_offering.id + ) cls.public_ip = PublicIPAddress.create( cls.api_client, @@ -93,8 +118,21 @@ class TestCreateVolume(cloudstackTestCase): cls.virtual_machine.domainid, cls.services ) - cls.nat_rule = NATRule.create(cls.api_client, cls.virtual_machine, cls.services, ipaddressid = cls.public_ip.ipaddress.id) - cls._cleanup = [cls.nat_rule, cls.virtual_machine, cls.service_offering, cls.public_ip] + cls.nat_rule = NATRule.create( + cls.api_client, + cls.virtual_machine, + cls.services, + ipaddressid=cls.public_ip.ipaddress.id + ) + cls._cleanup = [ + cls.nat_rule, + cls.virtual_machine, + cls.service_offering, + cls.public_ip, + cls.disk_offering, + cls.custom_disk_offering, + cls.account + ] def setUp(self): @@ -107,7 +145,13 @@ class TestCreateVolume(cloudstackTestCase): """ self.volumes = [] for k, v in self.services["volume_offerings"].items(): - volume = Volume.create(self.apiClient, v) + volume = Volume.create( + self.apiClient, + v, + zoneid=self.zone.id, + account=self.account.account.name, + diskofferingid=self.disk_offering.id + ) self.volumes.append(volume) self.cleanup.append(volume) @@ -115,30 +159,49 @@ class TestCreateVolume(cloudstackTestCase): self.volumes.append(volume) self.cleanup.append(volume) - #Attach a volume with different disk offerings and check the memory allocated to each of them + #Attach a volume with different disk offerings + #and check the memory allocated to each of them for volume in self.volumes: cmd = listVolumes.listVolumesCmd() cmd.id = volume.id list_volume_response = self.apiClient.listVolumes(cmd) - self.assertNotEqual(list_volume_response, None, "Check if volume exists in ListVolumes") - qresultset = self.dbclient.execute("select id from volumes where id = %s" % volume.id) - self.assertNotEqual(len(qresultset), 0, "Check if volume exists in Database") - attached_volume = self.virtual_machine.attach_volume(self.apiClient, volume) + self.assertNotEqual( + list_volume_response, + None, + "Check if volume exists in ListVolumes" + ) + attached_volume = self.virtual_machine.attach_volume( + self.apiClient, + volume + ) ssh = self.virtual_machine.get_ssh_client(self.nat_rule.ipaddress) ssh.execute("reboot") #Sleep to ensure the machine is rebooted properly time.sleep(120) - ssh = self.virtual_machine.get_ssh_client(self.nat_rule.ipaddress, reconnect = True) + ssh = self.virtual_machine.get_ssh_client( + self.nat_rule.ipaddress, + reconnect=True + ) c = "fdisk -l|grep %s|head -1" % self.services["diskdevice"] res = ssh.execute(c) - #Disk /dev/sda: 21.5 GB, 21474836480 bytes + # Disk /dev/sda doesn't contain a valid partition table + # Disk /dev/sda: 21.5 GB, 21474836480 bytes - actual_disk_size = res[0].split()[4] + # Res may return more than one lines + # Split res with space as delimiter to form new list (result) + result = [] + for i in res: + for k in i.split(): + result.append(k) - self.assertEqual(str(list_volume_response[0].size), actual_disk_size, "Check if promised disk size actually available") + self.assertEqual( + str(list_volume_response[0].size) in result, + True, + "Check if promised disk size actually available" + ) self.virtual_machine.detach_volume(self.apiClient, volume) def tearDown(self): @@ -160,9 +223,39 @@ class TestVolumes(cloudstackTestCase): def setUpClass(cls): cls.api_client = fetch_api_client() cls.services = Services().services + # Get Zone, Domain and templates + cls.zone = get_zone(cls.api_client) + cls.disk_offering = DiskOffering.create( + cls.api_client, + cls.services["disk_offering"] + ) + template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostypeid"] + ) + cls.services["zoneid"] = cls.zone.id + cls.services["template"] = template.id + cls.services["diskofferingid"] = cls.disk_offering.id - cls.service_offering = ServiceOffering.create(cls.api_client, cls.services["service_offering"]) - cls.virtual_machine = VirtualMachine.create(cls.api_client, cls.services, serviceofferingid = cls.service_offering.id) + # Create VMs, NAT Rules etc + cls.account = Account.create( + cls.api_client, + cls.services["account"], + admin=True + ) + + cls.services["account"] = cls.account.account.name + cls.service_offering = ServiceOffering.create( + cls.api_client, + cls.services["service_offering"] + ) + cls.virtual_machine = VirtualMachine.create( + cls.api_client, + cls.services, + accountid=cls.account.account.name, + serviceofferingid=cls.service_offering.id + ) cls.public_ip = PublicIPAddress.create( cls.api_client, @@ -171,9 +264,24 @@ class TestVolumes(cloudstackTestCase): cls.virtual_machine.domainid, cls.services ) - cls.nat_rule = NATRule.create(cls.api_client, cls.virtual_machine, cls.services, ipaddressid = cls.public_ip.ipaddress.id) - cls.volume = Volume.create(cls.api_client, cls.services) - cls._cleanup = [cls.nat_rule, cls.virtual_machine, cls.volume, cls.public_ip, cls.service_offering] + cls.nat_rule = NATRule.create( + cls.api_client, + cls.virtual_machine, + cls.services, + ipaddressid=cls.public_ip.ipaddress.id + ) + cls.volume = Volume.create( + cls.api_client, + cls.services + ) + cls._cleanup = [ + cls.nat_rule, + cls.virtual_machine, + cls.public_ip, + cls.service_offering, + cls.disk_offering, + cls.account + ] @classmethod def tearDownClass(cls): @@ -189,6 +297,11 @@ class TestVolumes(cloudstackTestCase): def test_02_attach_volume(self): """Attach a created Volume to a Running VM """ + # Validate the following + # 1. shows list of volumes + # 2. "Attach Disk" pop-up box will display with list of instances + # 3. disk should be attached to instance successfully + self.virtual_machine.attach_volume(self.apiClient, self.volume) #Sleep to ensure the current state will reflected in other calls @@ -197,29 +310,38 @@ class TestVolumes(cloudstackTestCase): cmd.id = self.volume.id list_volume_response = self.apiClient.listVolumes(cmd) - self.assertNotEqual(list_volume_response, None, "Check if volume exists in ListVolumes") + self.assertNotEqual( + list_volume_response, + None, + "Check if volume exists in ListVolumes" + ) volume = list_volume_response[0] - self.assertNotEqual(volume.virtualmachineid, None, "Check if volume state (attached) is reflected") - - qresultset = self.dbclient.execute("select instance_id, device_id from volumes where id = %s" % self.volume.id) - self.assertNotEqual(len(qresultset), 0, "Check if volume exists in Database") - - qresult = qresultset[0] - self.assertEqual(qresult[0], self.virtual_machine.id, "Check if volume is assc. with virtual machine in Database") - #self.assertEqual(qresult[1], 0, "Check if device is valid in the database") + self.assertNotEqual( + volume.virtualmachineid, + None, + "Check if volume state (attached) is reflected" + ) #Format the attached volume to a known fs - format_volume_to_ext3(self.virtual_machine.get_ssh_client(self.nat_rule.ipaddress)) + format_volume_to_ext3( + self.virtual_machine.get_ssh_client( + self.nat_rule.ipaddress + )) def test_03_download_attached_volume(self): """Download a Volume attached to a VM """ + # Validate the following + # 1. download volume will fail with proper error message + # "Failed - Invalid state of the volume with ID: + # It should be either detached or the VM should be in stopped state cmd = extractVolume.extractVolumeCmd() cmd.id = self.volume.id cmd.mode = "HTTP_DOWNLOAD" cmd.zoneid = self.services["zoneid"] - #A proper exception should be raised; downloading attach VM is not allowed + # A proper exception should be raised; + # downloading attach VM is not allowed with self.assertRaises(Exception): self.apiClient.deleteVolume(cmd) @@ -227,9 +349,14 @@ class TestVolumes(cloudstackTestCase): """Delete a Volume attached to a VM """ + # Validate the following + # 1. delete volume will fail with proper error message + # "Failed - Invalid state of the volume with ID: + # It should be either detached or the VM should be in stopped state + cmd = deleteVolume.deleteVolumeCmd() cmd.id = self.volume.id - #A proper exception should be raised; deleting attach VM is not allowed + #Proper exception should be raised; deleting attach VM is not allowed with self.assertRaises(Exception): self.apiClient.deleteVolume(cmd) @@ -237,6 +364,11 @@ class TestVolumes(cloudstackTestCase): def test_05_detach_volume(self): """Detach a Volume attached to a VM """ + + # Validate the following + # Data disk should be detached from instance and detached data disk + # details should be updated properly + self.virtual_machine.detach_volume(self.apiClient, self.volume) #Sleep to ensure the current state will reflected in other calls time.sleep(60) @@ -244,21 +376,24 @@ class TestVolumes(cloudstackTestCase): cmd.id = self.volume.id list_volume_response = self.apiClient.listVolumes(cmd) - self.assertNotEqual(list_volume_response, None, "Check if volume exists in ListVolumes") + self.assertNotEqual( + list_volume_response, + None, + "Check if volume exists in ListVolumes" + ) volume = list_volume_response[0] - self.assertEqual(volume.virtualmachineid, None, "Check if volume state (detached) is reflected") - - qresultset = self.dbclient.execute("select instance_id, device_id from volumes where id = %s" % self.volume.id) - self.assertNotEqual(len(qresultset), 0, "Check if volume exists in Database") - - qresult = qresultset[0] - self.assertEqual(qresult[0], None, "Check if volume is unassc. with virtual machine in Database") - self.assertEqual(qresult[1], None, "Check if no device is valid in the database") - + self.assertEqual( + volume.virtualmachineid, + None, + "Check if volume state (detached) is reflected" + ) + return def test_06_download_detached_volume(self): """Download a Volume unattached to an VM """ + # Validate the following + # 1. able to download the volume when its not attached to instance cmd = extractVolume.extractVolumeCmd() cmd.id = self.volume.id @@ -278,11 +413,20 @@ class TestVolumes(cloudstackTestCase): except Exception as e: print e - self.fail("Extract Volume Failed with invalid URL %s (vol id: %s)" % (extract_vol.url, self.volume.id)) + self.fail( + "Extract Volume Failed with invalid URL %s (vol id: %s)" \ + % (extract_vol.url, self.volume.id) + ) def test_07_delete_detached_volume(self): """Delete a Volume unattached to an VM """ + # Validate the following + # 1. volume should be deleted successfully and listVolume should not + # contain the deleted volume details. + # 2. "Delete Volume" menu item not shown under "Actions" menu. + # (UI should not allow to delete the volume when it is attached + # to instance by hiding the menu Item) cmd = deleteVolume.deleteVolumeCmd() cmd.id = self.volume.id @@ -294,4 +438,8 @@ class TestVolumes(cloudstackTestCase): cmd.type = 'DATADISK' list_volume_response = self.apiClient.listVolumes(cmd) - self.assertEqual(list_volume_response, None, "Check if volume exists in ListVolumes") + self.assertEqual( + list_volume_response, + None, + "Check if volume exists in ListVolumes" + )