diff --git a/tools/testClient/testcase/BVT-tests/base.py b/tools/testClient/testcase/BVT-tests/base.py index a9432d467d0..cb1da086c97 100644 --- a/tools/testClient/testcase/BVT-tests/base.py +++ b/tools/testClient/testcase/BVT-tests/base.py @@ -8,6 +8,50 @@ from utils import is_server_ssh_ready, random_gen from cloudstackAPI import * +#Import System modules +import time + +class Account: + """ Account Life Cycle """ + def __init__(self, items): + self.__dict__.update(items) + + @classmethod + def create(cls, apiclient, services, admin=False): + cmd = createAccount.createAccountCmd() + #0 - User, 1 - Root Admin + cmd.accounttype = int(admin) + cmd.email = services["email"] + cmd.firstname = services["firstname"] + cmd.lastname = services["lastname"] + cmd.password = services["password"] + cmd.username = services["username"] + + account = apiclient.createAccount(cmd) + #create a network + cmd = createNetwork.createNetworkCmd() + cmd.name = cmd.displaytext = "Virtual Network" + cmd.networkofferingid = services["networkofferingid"] + + cmd.account = account.account.name + cmd.domainid = account.account.domainid + cmd.zoneid = services["zoneid"] + + account.network = apiclient.createNetwork(cmd) + #Allocate the Source NAT IP Address + account.public_ip = PublicIPAddress.create( + apiclient, + accountid = account.account.name, + zoneid = services["zoneid"], + domainid = account.account.domainid + ) + + return Account(account.__dict__) + + def delete(self, apiclient): + cmd = deleteAccount.deleteAccountCmd() + cmd.id = self.account.id + apiclient.deleteAccount(cmd) class VirtualMachine: """Manage virtual machine lifecycle @@ -20,18 +64,29 @@ class VirtualMachine: self.ssh_client = None #extract out the ipaddress self.ipaddress = self.nic[0].ipaddress - + @classmethod - def create(cls, apiclient, services): + def create(cls, apiclient, services, templateid=None, accountid=None, networkids = None): cmd = deployVirtualMachine.deployVirtualMachineCmd() cmd.serviceofferingid = services["serviceoffering"] - cmd.templateid = services["template"] cmd.zoneid = services["zoneid"] + if networkids: + cmd.networkids = networkids + elif "networkids" in services: + cmd.networkids = services["networkids"] + + cmd.hypervisor = services["hypervisor"] + cmd.account = accountid or services["accountid"] + cmd.domainid = services["domainid"] + cmd.templateid = templateid or services["template"] + if "diskoffering" in services: cmd.diskofferingid = services["diskoffering"] return VirtualMachine(apiclient.deployVirtualMachine(cmd).__dict__, services) - def get_ssh_client(self, reconnect=False): + def get_ssh_client(self, ipaddress = None, reconnect=False): + if ipaddress != None: + self.ipaddress = ipaddress if reconnect: self.ssh_client = is_server_ssh_ready( self.ipaddress, @@ -47,6 +102,22 @@ class VirtualMachine: ) return self.ssh_client + def create_nat_rule(self, apiclient, services): + cmd = createPortForwardingRule.createPortForwardingRuleCmd() + cmd.ipaddressid = services["ipaddressid"] + cmd.privateport = services["privateport"] + cmd.publicport = services["publicport"] + cmd.protocol = services["protocol"] + cmd.virtualmachineid = self.id + return apiclient.createPortForwardingRule(cmd) + + + def delete_nat_rule(self, apiclient, nat_rule): + cmd = deletePortForwardingRule.deletePortForwardingRuleCmd() + cmd.id = nat_rule.id + apiclient.deletePortForwardingRule(cmd) + return + def delete(self, apiclient): cmd = destroyVirtualMachine.destroyVirtualMachineCmd() cmd.id = self.id @@ -69,15 +140,17 @@ class Volume: """ def __init__(self, items): self.__dict__.update(items) - + @classmethod def create(cls, apiclient, services): cmd = createVolume.createVolumeCmd() cmd.name = services["diskname"] cmd.diskofferingid = services["volumeoffering"] cmd.zoneid = services["zoneid"] + cmd.account = services["account"] + cmd.domainid= services["domainid"] return Volume(apiclient.createVolume(cmd).__dict__) - + @classmethod def create_custom_disk(cls, apiclient, services): cmd = createVolume.createVolumeCmd() @@ -85,18 +158,22 @@ class Volume: cmd.diskofferingid = services["customdiskofferingid"] cmd.size = services["customdisksize"] cmd.zoneid = services["zoneid"] + cmd.account = services["account"] + cmd.domainid= services["domainid"] return Volume(apiclient.createVolume(cmd).__dict__) - - + + @classmethod def create_from_snapshot(cls, apiclient, snapshot_id, services): - cmd = createVolume.createVolumeCmd() + cmd = createVolume.createVolumeCmd() cmd.name = "-".join([services["diskname"], random_gen()]) cmd.snapshotid = snapshot_id cmd.zoneid = services["zoneid"] cmd.size = services["size"] + cmd.account = services["account"] + cmd.domainid = services["domainid"] return Volume(apiclient.createVolume(cmd).__dict__) - + def delete(self, apiclient): cmd = deleteVolume.deleteVolumeCmd() cmd.id = self.id @@ -119,3 +196,238 @@ class Snapshot: cmd.id = self.id apiclient.deleteSnapshot(cmd) +class Template: + """Manage template life cycle""" + + def __init__(self, items): + self.__dict__.update(items) + + @classmethod + def create(cls, apiclient, volume, services): + + #Create template from Virtual machine and Volume ID + cmd = createTemplate.createTemplateCmd() + cmd.displaytext = services["displaytext"] + cmd.name = "-".join([services["name"], random_gen()]) + cmd.ostypeid = services["ostypeid"] + cmd.isfeatured = services["isfeatured"] or False + cmd.ispublic = services["ispublic"] or False + cmd.isextractable = services["isextractable"] or False + cmd.volumeid = volume.id + return Template(apiclient.createTemplate(cmd).__dict__) + + @classmethod + def create_from_snapshot(cls, apiclient, snapshot, services): + + #Create template from Virtual machine and Snapshot ID + cmd = createTemplate.createTemplateCmd() + cmd.displaytext = services["displaytext"] + cmd.name = services["name"] + cmd.ostypeid = services["ostypeid"] + cmd.snapshotid = snapshot.id + return Template(apiclient.createTemplate(cmd).__dict__) + + def delete(self, apiclient): + cmd = deleteTemplate.deleteTemplateCmd() + cmd.id = self.id + apiclient.deleteTemplate(cmd) + + +class Iso: + """Manage ISO life cycle""" + + def __init__(self, items): + self.__dict__.update(items) + + @classmethod + def create(cls, apiclient, services): + + #Create ISO from URL + cmd = registerIso.registerIsoCmd() + cmd.displaytext = services["displaytext"] + cmd.name = services["name"] + cmd.ostypeid = services["ostypeid"] + cmd.url = services["url"] + cmd.zoneid = services["zoneid"] + cmd.isextractable = services["isextractable"] + cmd.isfeatured = services["isfeatured"] + cmd.ispublic = services["ispublic"] + return Iso(apiclient.createTemplate(cmd)[0].__dict__) + + def delete(self, apiclient): + + cmd = deleteIso.deleteIsoCmd() + cmd.id = self.id + apiclient.deleteIso(cmd) + return + + def download(self, apiclient): + #Ensuring ISO is successfully downloaded + while True: + time.sleep(120) + + cmd= listIsos.listIsosCmd() + cmd.id = self.id + response = apiclient.listIsos(cmd)[0] + # Check whether download is in progress (for Ex: 10% Downloaded) + # or ISO is 'Successfully Installed' + if response.status == 'Successfully Installed': + return + elif 'Downloaded' not in response.status.split(): + raise Exception + return + + +class PublicIPAddress(): + """Manage Public IP Addresses""" + + def __init__(self, items): + self.__dict__.update(items) + + @classmethod + def create(cls, apiclient, accountid, zoneid = 1, domainid = 1): + cmd = associateIpAddress.associateIpAddressCmd() + cmd.account = accountid + cmd.zoneid = zoneid + cmd.domainid = domainid + return PublicIPAddress(apiclient.associateIpAddress(cmd).__dict__) + + def delete(self, apiclient): + cmd = disassociateIpAddress.disassociateIpAddressCmd() + cmd.id = self.ipaddress.id + apiclient.disassociateIpAddress(cmd) + return + +class NATRule: + """Manage NAT rule""" + + def __init__(self, items): + self.__dict__.update(items) + + @classmethod + def create(cls, apiclient, virtual_machine, services, ipaddressid=None): + + cmd = createPortForwardingRule.createPortForwardingRuleCmd() + cmd.ipaddressid = ipaddressid or services["ipaddressid"] + cmd.privateport = services["privateport"] + cmd.publicport = services["publicport"] + cmd.protocol = services["protocol"] + cmd.virtualmachineid = virtual_machine.id + return NATRule(apiclient.createPortForwardingRule(cmd).__dict__) + + + def delete(self, apiclient): + cmd = deletePortForwardingRule.deletePortForwardingRuleCmd() + cmd.id = self.id + apiclient.deletePortForwardingRule(cmd) + return + +class ServiceOffering: + """Manage service offerings cycle""" + + def __init__(self, items): + self.__dict__.update(items) + + @classmethod + def create(cls, apiclient, services): + + cmd = createServiceOffering.createServiceOfferingCmd() + cmd.cpunumber = services["cpunumber"] + cmd.cpuspeed = services["cpuspeed"] + cmd.displaytext = services["displaytext"] + cmd.memory = services["memory"] + cmd.name = services["name"] + return ServiceOffering(apiclient.createServiceOffering(cmd).__dict__) + + + def delete(self, apiclient): + + cmd = deleteServiceOffering.deleteServiceOfferingCmd() + cmd.id = self.id + apiclient.deleteServiceOffering(cmd) + return + + +class DiskOffering: + """Manage disk offerings cycle""" + + def __init__(self, items): + self.__dict__.update(items) + + @classmethod + def create(cls, apiclient, services): + + cmd =createDiskOffering.createDiskOfferingCmd() + cmd.displaytext = services["displaytext"] + cmd.name = services["name"] + cmd.disksize=services["disksize"] + return DiskOffering(apiclient.createDiskOffering(cmd).__dict__) + + + def delete(self, apiclient): + + cmd = deleteDiskOffering.deleteDiskOfferingCmd() + cmd.id = self.id + apiclient.deleteDiskOffering(cmd) + return + +class SnapshotPolicy: + """Manage snapshot policies""" + + def __init__(self, items): + self.__dict__.update(items) + + @classmethod + def create(cls, apiclient, volumeid, services): + + cmd = createSnapshotPolicy.createSnapshotPolicyCmd() + cmd.intervaltype = services["intervaltype"] + cmd.maxsnaps = services["maxsnaps"] + cmd.schedule = services["schedule"] + cmd.timezone = services["timezone"] + cmd.volumeid = volumeid + return SnapshotPolicy(apiclient.createSnapshotPolicy(cmd).__dict__) + + + def delete(self, apiclient): + + cmd = deleteSnapshotPolicies.deleteSnapshotPoliciesCmd() + cmd.id = self.id + apiclient.deleteSnapshotPolicies(cmd) + return + +class LoadBalancerRule: + """Manage Load Balancer rule""" + + def __init__(self, items): + self.__dict__.update(items) + + @classmethod + def create(cls, apiclient, services, ipaddressid, accountid=None): + cmd = createLoadBalancerRule.createLoadBalancerRuleCmd() + cmd.publicipid = ipaddressid or services["ipaddressid"] + cmd.account = accountid or services["accountid"] + cmd.name = services["name"] + cmd.algorithm = services["alg"] + cmd.privateport = services["privateport"] + cmd.publicport = services["publicport"] + return LoadBalancerRule(apiclient.createLoadBalancerRule(cmd).__dict__) + + def delete(self, apiclient): + cmd = deleteLoadBalancerRule.deleteLoadBalancerRuleCmd() + cmd.id = self.id + apiclient.deleteLoadBalancerRule(cmd) + return + + def assign(self, apiclient, vms): + cmd = assignToLoadBalancerRule.assignToLoadBalancerRuleCmd() + cmd.id = self.id + cmd.virtualmachineids = [str(vm.id) for vm in vms] + apiclient.assignToLoadBalancerRule(cmd) + return + + def remove(self, apiclient, vms): + cmd = removeFromLoadBalancerRule.removeFromLoadBalancerRuleCmd() + cmd.virtualmachineids = [vm.id for vm in vms] + self.apiclient.removeFromLoadBalancerRule(cmd) + return diff --git a/tools/testClient/testcase/BVT-tests/settings.py b/tools/testClient/testcase/BVT-tests/settings.py index 7ed46fccb55..6e5cbe3cdce 100644 --- a/tools/testClient/testcase/BVT-tests/settings.py +++ b/tools/testClient/testcase/BVT-tests/settings.py @@ -2,104 +2,160 @@ # # Copyright (c) 2011 Citrix. All rights reserved. # -"""Test Information Services +"""Test Information Services """ TEST_VM_LIFE_CYCLE_SERVICES = { - "small" : + "small" : { - "template" : 7, + "template" : 256, "zoneid" : 1, - "serviceoffering" : 1, + "serviceoffering" : 27, "diskoffering" : 3, "displayname" : "testserver", "username" : "root", "password" : "password", "ssh_port" : 22, - + "networkids":205, + "hypervisor":'XenServer', + "account":'admin', + "domainid":1, + "ipaddressid":3, + "privateport":22, + "publicport":22, + "ipaddress":'69.41.185.229', + "protocol":'TCP', }, - - "medium" : + "medium" : { - "template" : 7, + "template" : 256, "zoneid" : 1, - "serviceoffering" : 2, + "serviceoffering" : 27, "displayname" : "testserver", "username" : "root", "password" : "password", "ssh_port" : 22, - + "networkids":205, + "hypervisor":'XenServer', + "account":'admin', + "domainid":1, + "ipaddressid":3, + "privateport":22, + "publicport":22, + "ipaddress":'69.41.185.229', + "protocol":'TCP', }, "service_offerings" : { "small" : { - "id": 1, + "id": 40, "cpunumber" : 1, "cpuspeed" : 500, "memory" : 524288 }, "medium" : { - "id" : 2, + "id" : 39, "cpunumber" : 1, "cpuspeed" : 1000, "memory" : 1048576 - } - } + } + }, + "iso": + { + "displaytext" : "Test ISO type", + "name": "testISO", + "url": "http://iso.linuxquestions.org/download/504/1819/http/gd4.tuwien.ac.at/dsl-4.4.10.iso", + "zoneid" : 1, + "isextractable": True, + "isfeatured":True, + "ispublic": True, + "ostypeid":12, + "mode":'HTTP_DOWNLOAD', + "ostypeid":12, + }, + "diskdevice": '/dev/xvdd', + "mount_dir": "/mnt/tmp", + "hostid": 1, } - TEST_SNAPSHOT_SERVICES = { - "server_with_disk" : + "server_with_disk" : { - "template" : 7, + "template" : 256, "zoneid" : 1, - "serviceoffering" : 1, + "serviceoffering" : 27, "diskoffering" : 3, "displayname" : "testserver", "username" : "root", "password" : "password", "ssh_port" : 22, - + "networkids":205, + "hypervisor":'XenServer', + "account":'admin', + "domainid":1, + "ipaddressid":3, + "privateport":22, + "publicport":22, + "ipaddress":'69.41.185.229', + "protocol":'TCP', }, - "server_without_disk" : + "server_without_disk" : { - "template" : 7, + "template" : 256, "zoneid" : 1, - "serviceoffering" : 2, + "serviceoffering" : 27, "displayname" : "testserver", "username" : "root", "password" : "password", "ssh_port" : 22, - + "networkids":205, + "hypervisor":'XenServer', + "account":'admin', + "domainid":1, + "ipaddressid":3, + "privateport":22, + "publicport":22, + "ipaddress":'69.41.185.229', + "protocol":'TCP', }, + "recurring_snapshot" : { "intervaltype" : 'HOURLY', - "maxsnaps" : 8, + "maxsnaps" : 2, "schedule" : 1, "timezone" : 'US/Arizona', }, - "template": + + "templates" : { - "displaytext": "Test template from snapshot", - "name" : "template_from_snapshot", - "ostypeid": 12, - "templatefilter": 'featured', + "displaytext": 'Test template snapshot', + "name" : 'template_from_snapshot_3', + "ostypeid" : 12, + "templatefilter" : 'self', }, "small_instance": { "zoneid": 1, "serviceofferingid": 2, }, - "diskdevice" : "/dev/sda", + "diskdevice" : "/dev/xvda", "offerings" : 1, - "template" : 7, + "template" : 256, "zoneid" : 1, "diskoffering" : 3, "diskname" : "TestDiskServ", "size" : 1, #GBs + "account":'admin', + "domainid":1, + "mount_dir": "/mnt/tmp", + "sub_dir":"test", + "sub_lvl_dir1": "test1", + "sub_lvl_dir2": "test2", + "random_data" : "random.data", + } TEST_VOLUME_SERVICES = { @@ -110,8 +166,9 @@ TEST_VOLUME_SERVICES = { "diskname" : "TestDiskServ", "zoneid" : 1, "diskofferingid": 3, + "account":'admin', + "domainid":1, }, - 1: { "offerings" : 1, "volumeoffering" : 4, @@ -127,66 +184,206 @@ TEST_VOLUME_SERVICES = { "diskofferingid": 3, }, }, - "customdiskofferingid":22, - "customdisksize" : 7, #GBs - "serviceoffering" : 1, - "template" : 7, + "customdiskofferingid":41, + "customdisksize" : 2, #GBs + "serviceoffering" : 27, + "template" : 256, "zoneid" : 1, "username" : "root", "password" : "password", "ssh_port" : 22, "diskname" : "TestDiskServ", + "networkids":205, + "hypervisor":'XenServer', + "account":'admin', + "domainid":1, + "ipaddressid":3, + "privateport":22, + "publicport":22, + "ipaddress":'69.41.185.229', + "protocol":'TCP', + "diskdevice" : "/dev/sda", } TEST_SERVICE_OFFERING = { - "off_1" : + "off_1" : { - "id": 32, - "name":"small_service_offering", - "displaytext" : "Small service offering", + "name":"Service Offering 1", + "displaytext" : "Service Offering 1", "cpunumber":1, "cpuspeed": 200, "memory": 200, - "username" : "root", - "password" : "password", - "ssh_port" : 22, - }, - "off_2" : + "off_2" : { - "id":33, - "name":"medium_service_offering", - "displaytext" : "Medium service offering", + "name":"Service Offering 2", + "displaytext" : "Service Offering 2", "cpunumber":1, "cpuspeed": 200, "memory": 200, - "username" : "root", - "password" : "password", - "ssh_port" : 22, } } TEST_DISK_OFFERING = { - "off_1" : + "off_1" : { - "id": 31, - "name":"small_disk_offering", - "displaytext" : "Small disk offering", - "username" : "root", - "password" : "password", - "ssh_port" : 22, + "name":"Disk offering 1", + "displaytext" : "Disk offering 1", "disksize": 1 }, - "off_2" : + "off_2" : { - "id":32, - "name":"medium_disk_offering", - "displaytext" : "Medium disk offering", - "username" : "root", - "password" : "password", - "ssh_port" : 22, + "name":"Disk offering 2", + "displaytext" : "Disk offering 2", "disksize": 1 } } +TEST_TEMPLATE_SERVICES ={ + "virtual_machine" : + { + "template" : 256, + "zoneid" : 1, + "serviceoffering" : 27, + "displayname" : "testVM", + "username" : "root", + "password" : "password", + "ssh_port" : 22, + "networkids":205, + "hypervisor":'XenServer', + "account":'admin', + "domainid":1, + "ipaddressid":9, + "privateport":22, + "publicport":22, + "ipaddress":'69.41.185.229', + "protocol":'TCP', + }, + "volume": + { + "offerings" : 1, + "volumeoffering" : 3, + "diskname" : "TestVolumeTemplate", + "zoneid" : 1, + "diskofferingid": 3, + }, + "template_1": + { + "displaytext": "Test Template Type 1", + "name": "testTemplate", + "ostypeid":12, + "isfeatured" : False, + "ispublic" : False, + "isextractable":False, + }, + "template_2": + { + "displaytext": "Test Template Type 2", + "name": "testTemplate", + "ostypeid":12, + "isfeatured" : True, + "ispublic" :True, + "isextractable":True, + "mode": "HTTP_DOWNLOAD", + "zoneid":1, + }, + "templatefilter": 'self', + "destzoneid": 2, + "sourcezoneid": 1, + "isfeatured" : True, + "ispublic" : True, + "isextractable":False, + "bootable":True, + "passwordenabled":True, + "ostypeid":15, + "account":'bhavin333', + "domainid":1, + } + +TEST_ISO_SERVICES = { + "iso_1": + { + "displaytext" : "Test ISO type 1", + "name": "testISOType_1", + "url": "http://iso.linuxquestions.org/download/504/1819/http/gd4.tuwien.ac.at/dsl-4.4.10.iso", + "zoneid" : 1, + "isextractable": True, + "isfeatured":True, + "ispublic": True, + "ostypeid":12, + }, + "iso_2": + { + "displaytext" : "Test ISO type 2", + "name": "testISOType_2", + "url": "http://iso.linuxquestions.org/download/504/1819/http/gd4.tuwien.ac.at/dsl-4.4.10.iso", + "zoneid" : 1, + "isextractable": True, + "isfeatured":True, + "ispublic": True, + "ostypeid":12, + "mode":'HTTP_DOWNLOAD', + "ostypeid":12, + }, + "destzoneid": 2, + "sourcezoneid": 1, + "isfeatured" : True, + "ispublic" : True, + "isextractable":True, + "bootable":True, + "passwordenabled":True, + "ostypeid":15, + "account":'bhavin333', + "domainid":1, + } + +TEST_NETWORK_SERVICES = { + "admin_account" : "admin", + "user_account" : "gaurav_cl4", + "zoneid" : 1, + "domainid" : 1, + "account" : { + "email" : "test@test.com", + "firstname" : "Test", + "lastname" : "User", + "username" : "testuser79", + "password" : "fr3sca", + "zoneid" : 1, + "networkofferingid" : 6, + }, + "server" : + { + "template" : 256, + "zoneid" : 1, + "serviceoffering" : 40, + "diskoffering" : 3, + "displayname" : "testserver", + "username" : "root", + "password" : "password", + "ssh_port" : 22, + "hypervisor":'XenServer', + "account":'admin', + "domainid":1, + "ipaddressid":3, + "privateport":22, + "publicport":22, + "ipaddress":'69.41.185.229', + "protocol":'TCP', + }, + "natrule" : + { + "privateport" : 22, + "publicport" : 22, + "protocol" : "TCP" + }, + "lbrule" : + { + "name" : "SSH", + "alg" : "roundrobin", + "privateport" : 22, + "publicport" : 22, + } + + + } diff --git a/tools/testClient/testcase/BVT-tests/test_disk_offerings.py b/tools/testClient/testcase/BVT-tests/test_disk_offerings.py index 99414a23b92..e50d6e1eaa5 100644 --- a/tools/testClient/testcase/BVT-tests/test_disk_offerings.py +++ b/tools/testClient/testcase/BVT-tests/test_disk_offerings.py @@ -14,80 +14,60 @@ from base import * services = TEST_DISK_OFFERING -class TestDiskOfferings(cloudstackTestCase): - +class TestCreateDiskOffering(cloudstackTestCase): + def setUp(self): self.apiclient = self.testClient.getApiClient() self.dbclient = self.testClient.getDbConnection() - - @classmethod - def setUpClass(cls): - cls.api_client = fetch_api_client() - - cmd =createDiskOffering.createDiskOfferingCmd() - cmd.displaytext = services["off_1"]["displaytext"] - cmd.name = services["off_1"]["name"] - cmd.disksize=services["off_1"]["disksize"] - - cls.small_disk_offering = cls.api_client.createDiskOffering(cmd) - - cmd = createDiskOffering.createDiskOfferingCmd() - cmd.displaytext = services["off_2"]["displaytext"] - cmd.name = services["off_2"]["name"] - cmd.disksize=services["off_2"]["disksize"] - - cls.medium_disk_offering = cls.api_client.createDiskOffering(cmd) - return - - @classmethod - def tearDownClass(cls): - cls.api_client = fetch_api_client() - - cmd = deleteDiskOffering.deleteDiskOfferingCmd() - cmd.id = cls.small_disk_offering.id - cls.api_client.deleteDiskOffering(cmd) - - return - + self.cleanup = [] + return + + def tearDown(self): + try: + self.dbclient.close() + #Clean up, terminate the created templates + cleanup_resources(self.apiclient, self.cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" %e) + return + def test_01_create_disk_offering(self): """Test to create disk offering""" - + # Validate the following: # 1. createDiskOfferings should return a valid information for newly created offering # 2. The Cloud Database contains the valid information - - cmd = createDiskOffering.createDiskOfferingCmd() - - #Add the required parameters for creating new service offering - cmd.displaytext = services["off_1"]["displaytext"] - cmd.name = services["off_1"]["name"] - cmd.disksize=services["off_1"]["disksize"] - - self.tmp_disk_offering = self.apiclient.createDiskOffering(cmd) - + + disk_offering = DiskOffering.create(self.apiclient, services["off_1"]) + self.cleanup.append(disk_offering) + cmd = listDiskOfferings.listDiskOfferingsCmd() - cmd.name = services["off_1"]["name"] - list_disk_response = self.apiclient.listDiskOfferings(cmd) - + cmd.id = disk_offering.id + list_disk_response = self.apiclient.listDiskOfferings(cmd) + self.assertNotEqual( len(list_disk_response), 0, "Check Disk offering is created" ) + disk_response = list_disk_response[0] + self.assertEqual( - list_disk_response[0].displaytext, + disk_response.displaytext, services["off_1"]["displaytext"], "Check server id in createServiceOffering" ) self.assertEqual( - list_disk_response[0].name, + disk_response.name, services["off_1"]["name"], "Check name in createServiceOffering" ) - + + #Verify the database entries for new disk offering qresultset = self.dbclient.execute( - "select display_text, id from disk_offering where id = %s;" - %list_disk_response[0].id + "select display_text, id from disk_offering where id = %s;" + %disk_offering.id ) self.assertNotEqual( @@ -97,7 +77,7 @@ class TestDiskOfferings(cloudstackTestCase): ) qresult = qresultset[0] - + self.assertEqual( qresult[0], services["off_1"]["displaytext"], @@ -105,51 +85,90 @@ class TestDiskOfferings(cloudstackTestCase): ) self.assertEqual( qresult[1], - list_disk_response[0].id, + disk_offering.id, "Check ID in the database" ) - cmd = deleteDiskOffering.deleteDiskOfferingCmd() - cmd.id = self.tmp_disk_offering.id - self.apiclient.deleteDiskOffering(cmd) return - + + +class TestDiskOfferings(cloudstackTestCase): + + def setUp(self): + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.cleanup = [] + + def tearDown(self): + + try: + self.dbclient.close() + #Clean up, terminate the created templates + cleanup_resources(self.apiclient, self.cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" %e) + return + + @classmethod + def setUpClass(cls): + cls.api_client = fetch_api_client() + cls.disk_offering_1 = DiskOffering.create(cls.api_client, services["off_1"]) + cls.disk_offering_2 = DiskOffering.create(cls.api_client, services["off_2"]) + return + + @classmethod + def tearDownClass(cls): + try: + cls.api_client = fetch_api_client() + cls.disk_offering_1.delete(cls.api_client) + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" %e) + return + def test_02_edit_disk_offering(self): """Test to update existing disk offering""" - + # Validate the following: # 1. updateDiskOffering should return a valid information for newly created offering - + + #Generate new name & displaytext from random data + random_displaytext = random_gen() + random_name = random_gen() + cmd = updateDiskOffering.updateDiskOfferingCmd() - cmd.id= self.small_disk_offering.id - cmd.displaytext = services["off_1"]["displaytext"] - cmd.name = services["off_1"]["name"] + cmd.id= self.disk_offering_1.id + cmd.displaytext = random_displaytext + cmd.name = random_name self.apiclient.updateDiskOffering(cmd) - + cmd = listDiskOfferings.listDiskOfferingsCmd() - cmd.id = self.small_disk_offering.id + cmd.id = self.disk_offering_1.id list_disk_response = self.apiclient.listDiskOfferings(cmd) - + self.assertNotEqual( len(list_disk_response), 0, "Check disk offering is updated" ) - + + disk_response = list_disk_response[0] + self.assertEqual( - list_disk_response[0].displaytext, - services["off_1"]["displaytext"], + disk_response.displaytext, + random_displaytext, "Check service displaytext in updateServiceOffering" ) self.assertEqual( - list_disk_response[0].name, - services["off_1"]["name"], + disk_response.name, + random_name, "Check service name in updateServiceOffering" ) - + + #Verify database entries for updated disk offerings qresultset = self.dbclient.execute( - "select display_text, id from disk_offering where id = %s;" - %self.small_disk_offering.id + "select display_text, id from disk_offering where id = %s;" + %self.disk_offering_1.id ) self.assertNotEqual( @@ -159,15 +178,15 @@ class TestDiskOfferings(cloudstackTestCase): ) qresult = qresultset[0] - + self.assertEqual( qresult[0], - services["off_1"]["displaytext"], + random_displaytext, "Compare displaytext with database record" ) self.assertEqual( qresult[1], - self.small_disk_offering.id, + self.disk_offering_1.id, "Check name in the database" ) @@ -175,28 +194,29 @@ class TestDiskOfferings(cloudstackTestCase): def test_03_delete_disk_offering(self): """Test to delete disk offering""" - + # Validate the following: # 1. deleteDiskOffering should return a valid information for newly created offering - + cmd = deleteDiskOffering.deleteDiskOfferingCmd() - cmd.id = self.medium_disk_offering.id - + cmd.id = self.disk_offering_2.id self.apiclient.deleteDiskOffering(cmd) - + cmd = listDiskOfferings.listDiskOfferingsCmd() - cmd.id = self.medium_disk_offering.id - list_disk_response = self.apiclient.listDiskOfferings(cmd) - + cmd.id = self.disk_offering_2.id + list_disk_response = self.apiclient.listDiskOfferings(cmd) + self.assertEqual( - list_disk_response, - None, + list_disk_response, + None, "Check if disk offering exists in listDiskOfferings" ) + + #Verify database entry for deleted disk offering qresultset = self.dbclient.execute( - "select display_text, name from disk_offering where id = %s;" - % str(self.medium_disk_offering.id) + "select display_text, name from disk_offering where id = %s;" + % str(self.disk_offering_2.id) ) self.assertEqual( @@ -207,7 +227,4 @@ class TestDiskOfferings(cloudstackTestCase): return - def tearDown(self): - self.dbclient.close() - return - + diff --git a/tools/testClient/testcase/BVT-tests/test_iso.py b/tools/testClient/testcase/BVT-tests/test_iso.py new file mode 100644 index 00000000000..6504a4878c0 --- /dev/null +++ b/tools/testClient/testcase/BVT-tests/test_iso.py @@ -0,0 +1,436 @@ +# -*- encoding: utf-8 -*- +# +# Copyright (c) 2011 Citrix. All rights reserved. +# +""" BVT tests for Templates ISO +""" +#Import Local Modules +from cloudstackTestCase import * +from cloudstackAPI import * +from settings import * +import remoteSSHClient +from utils import * +from base import * +import urllib +from random import random +#Import System modules +import time + +services = TEST_ISO_SERVICES + +class TestCreateIso(cloudstackTestCase): + + def setUp(self): + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.cleanup = [] + return + + def tearDown(self): + try: + + self.dbclient.close() + #Clean up, terminate the created ISOs + cleanup_resources(self.apiclient, self.cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" %e) + + return + + def test_01_create_iso(self): + """Test create public & private ISO + """ + + # Validate the following: + # 1. database (vm_template table) should be updated with newly created ISO + # 2. UI should show the newly added ISO + # 3. listIsos API should show the newly added ISO + + iso = Iso.create(self.apiclient, services["iso_2"]) + iso.download(self.apiclient) + self.cleanup.append(iso) + + cmd = listIsos.listIsosCmd() + cmd.id = iso.id + list_iso_response = self.apiclient.listIsos(cmd) + + iso_response = list_iso_response[0] + + self.assertNotEqual( + len(list_iso_response), + 0, + "Check template available in List ISOs" + ) + + self.assertEqual( + iso_response.displaytext, + services["iso_2"]["displaytext"], + "Check display text of newly created ISO" + ) + self.assertEqual( + iso_response.name, + services["iso_2"]["name"], + "Check name of newly created ISO" + ) + self.assertEqual( + iso_response.zoneid, + services["iso_2"]["zoneid"], + "Check zone ID of newly created ISO" + ) + + #Verify the database entry for ISO + self.debug( + "select name, display_text from vm_template where id = %s and format='ISO';" + %iso.id + ) + qresultset = self.dbclient.execute( + "select name, display_text from vm_template where id = %s and format='ISO';" + %iso.id + ) + + self.assertNotEqual( + len(qresultset), + 0, + "Check DB Query result set" + ) + + qresult = qresultset[0] + + self.assertEqual( + qresult[0], + services["iso_2"]["name"], + "Compare ISO name with database record" + ) + + self.assertEqual( + qresult[1], + services["iso_2"]["displaytext"], + "Compare ISO display text with database record" + ) + return + +class TestISO(cloudstackTestCase): + + @classmethod + def setUpClass(cls): + cls.api_client = fetch_api_client() + cls.iso_1 = Iso.create(cls.api_client, services["iso_1"]) + cls.iso_1.download(cls.api_client) + cls.iso_2 = Iso.create(cls.api_client, services["iso_2"]) + cls.iso_2.download(cls.api_client) + return + + @classmethod + def tearDownClass(cls): + try: + cls.api_client = fetch_api_client() + cls.iso_2.delete(cls.api_client) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" %e) + + return + + def setUp(self): + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.cleanup = [] + + def tearDown(self): + try: + self.dbclient.close() + #Clean up, terminate the created ISOs, VMs + cleanup_resources(self.apiclient, self.cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" %e) + + return + + def test_02_edit_iso(self): + """Test Edit ISO + """ + + # Validate the following: + # 1. UI should show the edited values for ISO + # 2. database (vm_template table) should have updated values + + #Generate random values for updating ISO name and Display text + new_displayText = random_gen() + new_name = random_gen() + + cmd = updateIso.updateIsoCmd() + #Assign new values to attributes + cmd.id = self.iso_1.id + cmd.displaytext = new_displayText + cmd.name = new_name + cmd.bootable = services["bootable"] + cmd.passwordenabled = services["passwordenabled"] + cmd.ostypeid = services["ostypeid"] + + self.apiclient.updateIso(cmd) + + #Check whether attributes are updated in ISO using listIsos + cmd = listIsos.listIsosCmd() + cmd.id = self.iso_1.id + list_iso_response = self.apiclient.listIsos(cmd) + + self.assertNotEqual( + len(list_iso_response), + 0, + "Check template available in List ISOs" + ) + + iso_response = list_iso_response[0] + self.assertEqual( + iso_response.displaytext, + new_displayText, + "Check display text of updated ISO" + ) + self.assertEqual( + iso_response.name, + new_name, + "Check name of updated ISO" + ) + self.assertEqual( + iso_response.bootable, + services["bootable"], + "Check if image is bootable of updated ISO" + ) + + self.assertEqual( + iso_response.ostypeid, + services["ostypeid"], + "Check OSTypeID of updated ISO" + ) + + #Verify database entry for updateIso + self.debug( + "select name, display_text, bootable, guest_os_id from vm_template where id = %s and format='ISO';" + %self.iso_1.id + ) + qresultset = self.dbclient.execute( + "select name, display_text, bootable, guest_os_id from vm_template where id = %s and format='ISO';" + %self.iso_1.id + ) + + self.assertNotEqual( + len(qresultset), + 0, + "Check DB Query result set" + ) + + qresult = qresultset[0] + + self.assertEqual( + qresult[0], + new_name, + "Compare ISO name with database record" + ) + + self.assertEqual( + qresult[1], + new_displayText, + "Compare ISO display text with database record" + ) + self.assertEqual( + qresult[2], + int(services["bootable"]), + "Compare template enable_password field with database record" + ) + + self.assertEqual( + qresult[3], + services["ostypeid"], + "Compare template guest OS ID with database record" + ) + return + + def test_03_delete_iso(self): + """Test delete ISO + """ + + # Validate the following: + # 1. UI should not show the deleted ISP + # 2. database (vm_template table) should not contain deleted ISO + + self.iso_1.delete(cls.api_client) + + #ListIsos to verify deleted ISO is properly deleted + cmd = listIsos.listIsosCmd() + cmd.id = self.iso_1.id + list_iso_response = self.apiclient.listIsos(cmd) + + self.assertEqual(list_iso_response, None, "Check if ISO exists in ListIsos") + + #Verify whether database entry is deleted or not + self.debug( + "select name, display_text from vm_template where id = %s and format='ISO';" + %self.iso_1.id + ) + qresultset = self.dbclient.execute( + "select name, display_text from vm_template where id = %s and format='ISO';" + %self.iso_1.id + ) + + self.assertEqual( + len(qresultset), + 1, + "Check DB Query result set" + ) + return + + def test_04_extract_Iso(self): + "Test for extract ISO" + + # Validate the following + # 1. Admin should able extract and download the ISO + # 2. ListIsos should display all the public templates for all kind of users + # 3 .ListIsos should not display the system templates + + cmd = extractIso.extractIsoCmd() + cmd.id = self.iso_2.id + cmd.mode = services["iso_2"]["mode"] + cmd.zoneid = services["iso_2"]["zoneid"] + list_extract_response = self.apiclient.extractIso(cmd) + + #Format URL to ASCII to retrieve response code + formatted_url = urllib.unquote_plus(list_extract_response.url) + url_response = urllib.urlopen(formatted_url) + response_code = url_response.getcode() + + self.assertEqual( + list_extract_response.id, + self.iso_2.id, + "Check ID of the downloaded ISO" + ) + self.assertEqual( + list_extract_response.extractMode, + services["iso_2"]["mode"], + "Check mode of extraction" + ) + self.assertEqual( + list_extract_response.zoneid, + services["iso_2"]["zoneid"], + "Check zone ID of extraction" + ) + self.assertEqual( + response_code, + 200, + "Check for a valid response of download URL" + ) + return + + def test_05_iso_permissions(self): + """Update & Test for ISO permissions""" + + # validate the following + # 1. listIsos returns valid permissions set for ISO + # 2. permission changes should be reflected in vm_template table in database + + cmd = updateIsoPermissions.updateIsoPermissionsCmd() + cmd.id = self.iso_2.id + #Update ISO permissions + cmd.isfeatured = services["isfeatured"] + cmd.ispublic = services["ispublic"] + cmd.isextractable = services["isextractable"] + self.apiclient.updateIsoPermissions(cmd) + + #Verify ListIsos have updated permissions for the ISO for normal user + cmd = listIsos.listIsosCmd() + cmd.id = self.iso_2.id + cmd.account = services["account"] + cmd.domainid = services["domainid"] + list_iso_response = self.apiclient.listIsos(cmd) + + iso_response = list_iso_response[0] + + self.assertEqual( + iso_response.id, + self.iso_2.id, + "Check ISO ID" + ) + self.assertEqual( + iso_response.ispublic, + services["ispublic"], + "Check ispublic permission of ISO" + ) + + self.assertEqual( + iso_response.isfeatured, + services["isfeatured"], + "Check isfeatured permission of ISO" + ) + + #Verify database entry for updated ISO permissions + self.debug( + "select public, featured, extractable from vm_template where id = %s and format='ISO';" + %self.iso_2.id + ) + qresultset = self.dbclient.execute( + "select public, featured, extractable from vm_template where id = %s and format='ISO';" + %self.iso_2.id + ) + + self.assertNotEqual( + len(qresultset), + 0, + "Check DB Query result set" + ) + + qresult = qresultset[0] + + self.assertEqual( + qresult[0], + int(services["ispublic"]), + "Compare ispublic permission with database record" + ) + + self.assertEqual( + qresult[1], + int(services["isfeatured"]), + "Compare isfeatured permission with database record" + ) + self.assertEqual( + qresult[2], + int(services["isextractable"]), + "Compare extractable permission with database record" + ) + return + + def test_06_copy_iso(self): + """Test for copy ISO from one zone to another""" + + #Validate the following + #1. copy ISO should be successful and secondary storage should contain new copied ISO. + + cmd = copyIso.copyIsoCmd() + cmd.id = self.iso_2.id + cmd.destzoneid = services["destzoneid"] + cmd.sourcezoneid = services["sourcezoneid"] + self.apiclient.copyIso(cmd) + + #Verify ISO is copied to another zone using ListIsos + cmd = listIsos.listIsosCmd() + cmd.id = self.iso_2.id + list_iso_response = self.apiclient.listIsos(cmd) + + iso_response = list_iso_response[0] + + self.assertNotEqual( + len(list_iso_response), + 0, + "Check template extracted in List ISO" + ) + self.assertEqual( + iso_response.id, + self.iso_2.id, + "Check ID of the downloaded ISO" + ) + self.assertEqual( + iso_response.zoneid, + services["destzoneid"], + "Check zone ID of the copied ISO" + ) + return diff --git a/tools/testClient/testcase/BVT-tests/test_network.py b/tools/testClient/testcase/BVT-tests/test_network.py new file mode 100644 index 00000000000..4d1e801a4c1 --- /dev/null +++ b/tools/testClient/testcase/BVT-tests/test_network.py @@ -0,0 +1,567 @@ +# -*- encoding: utf-8 -*- +# +# Copyright (c) 2011 Citrix. All rights reserved. +# + +""" BVT tests for Virtual Machine Life Cycle +""" +#Import Local Modules +from cloudstackTestCase import * +from cloudstackAPI import * +from settings import * +import remoteSSHClient +from utils import * +from base import * +#Import System modules +import time + + +services = TEST_NETWORK_SERVICES + + +class TestPublicIP(cloudstackTestCase): + """Test Associate/Disassociate Public IP Addresses + """ + def setUp(self): + self.apiclient = self.testClient.getApiClient() + + def test_public_ip_admin_account(self): + """Test Associate/Disassociate IP address for Admin Account + """ + # Validate the following: + # 1. listPubliIpAddresses API returns the list of acquired addresses + # 2. the returned list should contain our acquired IP address + + + ip_address = PublicIPAddress.create( + self.apiclient, + services["admin_account"], + services["zoneid"], + services["domainid"] + ) + cmd = listPublicIpAddresses.listPublicIpAddressesCmd() + cmd.id = ip_address.ipaddress.id + list_pub_ip_addr_resp = self.apiclient.listPublicIpAddresses(cmd) + + self.assertNotEqual( + len(list_pub_ip_addr_resp), + 0, + "Check if new IP Address is associated" + ) + self.assertEqual( + list_pub_ip_addr_resp[0].id, + ip_address.ipaddress.id, + "Check Correct IP Address is returned in the List Cacls" + ) + ip_address.delete(self.apiclient) + + # Validate the following: + #1. listPublicIpAddresses API should no more return the released address + cmd = listPublicIpAddresses.listPublicIpAddressesCmd() + cmd.id = ip_address.ipaddress.id + list_pub_ip_addr_resp = self.apiclient.listPublicIpAddresses(cmd) + + self.assertEqual( + list_pub_ip_addr_resp, + None, + "Check if disassociated IP Address is no longer available" + ) + + + + def test_public_ip_user_account(self): + """Test Associate/Disassociate IP address for Non-Admin User Account + """ + + # Validate the following: + # 1. listPubliIpAddresses API returns the list of acquired addresses + # 2. the returned list should contain our acquired IP address + ip_address = PublicIPAddress.create( + self.apiclient, + services["user_account"], + services["zoneid"], + services["domainid"] + ) + cmd = listPublicIpAddresses.listPublicIpAddressesCmd() + cmd.id = ip_address.ipaddress.id + list_pub_ip_addr_resp = self.apiclient.listPublicIpAddresses(cmd) + + self.assertNotEqual( + len(list_pub_ip_addr_resp), + 0, + "Check if new IP Address is associated" + ) + self.assertEqual( + list_pub_ip_addr_resp[0].id, + ip_address.ipaddress.id, + "Check Correct IP Address is returned in the List Call" + ) + ip_address.delete(self.apiclient) + + + # Validate the following: + #1. listPublicIpAddresses API should no more return the released address + cmd = listPublicIpAddresses.listPublicIpAddressesCmd() + cmd.id = ip_address.ipaddress.id + list_pub_ip_addr_resp = self.apiclient.listPublicIpAddresses(cmd) + + self.assertEqual( + list_pub_ip_addr_resp, + None, + "Check if disassociated IP Address is no longer available" + ) + + +class TestPortForwarding(cloudstackTestCase): + """Test Port Forwarding Rules for Source and Non-Source IP Addresses + """ + @classmethod + def setUpClass(cls): + + cls.api_client = fetch_api_client() + #Create an account, network, VM and IP addresses + cls.account = Account.create(cls.api_client, services["account"], admin=True) + cls.virtual_machine = VirtualMachine.create( + cls.api_client, + services["server"], + accountid=cls.account.account.name, + networkids=[str(cls.account.network.id)] + ) + + cls._cleanup = [cls.virtual_machine, cls.account] + + def setUp(self): + self.apiclient = self.testClient.getApiClient() + + @classmethod + def tearDownClass(self): + cleanup_resources(cls.api_client, cls._cleanup) + + def tearDown(self): + cleanup_resources(self.cleanup) + + def test_01_port_fwd_on_src_nat(self): + """Port Forwarding Tests for Source NAT IP Addresses + """ + src_nat_ip_addr = self.account.public_ip.ipaddress + + + #Validate the following: + #1. listPortForwarding rules API should return the added PF rule + #2. attempt to do an ssh into the user VM through the sourceNAT + + nat_rule = NATRule.create(self.apiclient, self.virtual_machine, services["natrule"], src_nat_ip_addr.id) + time.sleep(60) + + cmd = listPortForwardingRules.listPortForwardingRulesCmd() + cmd.id = nat_rule.id + list_nat_rule_response = self.apiclient.listPortForwardingRules(cmd) + + self.assertNotEqual( + len(list_nat_rule_response), + 0, + "Check Port Forwarding Rule is created" + ) + self.assertEqual( + list_nat_rule_response[0].id, + nat_rule.id, + "Check Correct Port forwarding Rule is returned" + ) + + + try: + self.virtual_machine.get_ssh_client(src_nat_ip_addr.ipaddress) + except Exception as e: + self.fail("SSH Access failed for %s: %s" %(self.virtual_machine.ipaddress, e)) + + nat_rule.delete(self.apiclient) + time.sleep(60) + + #Validate the following: + #1. listPortForwardingRules should not return the deleted rule anymore + #2. attempt to do ssh should now fail + + cmd = listPortForwardingRules.listPortForwardingRulesCmd() + cmd.id = nat_rule.id + list_nat_rule_response = self.apiclient.listPortForwardingRules(cmd) + + self.assertEqual( + list_nat_rule_response, + None, + "Check Port Forwarding Rule is deleted" + ) + self.debug("Check if the Public SSH port is inaccessible") + with self.assertRaises(Exception): + remoteSSHClient.remoteSSHClient( + ip.ipaddress, + self.virtual_machine.ssh_port, + self.virtual_machine.username, + self.virtual_machine.password + ) + + def test_02_port_fwd_on_non_src_nat(self): + """Port Forwarding Tests for Non-Source NAT IP Addresses + """ + + ip_address = PublicIPAddress.create(self.apiclient, self.account) + self.clean_up.append(ip_address) + nat_rule = NATRule.create(self.apiclient, self.virtual_machine, services) + time.sleep(60) + + #Validate the following: + #1. listPortForwarding rules API should return the added PF rule + #2. attempt to do an ssh into the user VM through the sourceNAT + + cmd = listPortForwardingRules.listPortForwardingRulesCmd() + cmd.id = nat_rule.id + list_nat_rule_response = self.apiclient.listPortForwardingRules(cmd) + + self.assertNotEqual( + len(list_nat_rule_response), + 0, + "Check Port Forwarding Rule is created" + ) + self.assertEqual( + list_nat_rule_response[0].id, + nat_rule.id, + "Check Correct Port forwarding Rule is returned" + ) + + + try: + self.virtual_machine.get_ssh_client(public_ip = ip_address.ipaddress) + except Exception as e: + self.fail("SSH Access failed for %s: %s" %(self.virtual_machine.ipaddress, e)) + + nat_rule.delete(apiclient) + time.sleep(60) + + #Validate the following: + #1. listPortForwardingRules should not return the deleted rule anymore + #2. attempt to do ssh should now fail + + cmd = listPortForwardingRules.listPortForwardingRulesCmd() + cmd.id = nat_rule.id + list_nat_rule_response = self.apiclient.listPortForwardingRules(cmd) + + + self.assertEqual( + len(list_nat_rule_response), + None, + "Check Port Forwarding Rule is deleted" + ) + self.debug("Check if the Public SSH port is inaccessible") + with self.assertRaises(Exception): + remoteSSHClient.remoteSSHClient( + ip_address.ipaddress, + self.virtual_machine.ssh_port, + self.virtual_machine.username, + self.virtual_machine.password + ) + + + +class TestLoadBalancingRule(cloudstackTestCase): + """Test Load Balancing Rules for Source and Non-Source IP Addresses + """ + @classmethod + def setUpClass(cls): + + cls.api_client = fetch_api_client() + #Create an account, network, VM and IP addresses + cls.account = Account.create(cls.api_client, services["account"], admin=True) + cls.vm_1 = VirtualMachine.create( + cls.api_client, + services["server"], + accountid=cls.account.account.name, + networkids=[str(cls.account.network.id)] + ) + cls.vm_2 = VirtualMachine.create( + cls.api_client, + services["server"], + accountid=cls.account.account.name, + networkids=[str(cls.account.network.id)] + ) + cls.non_src_nat_ip = PublicIPAddress.create( + cls.api_client, + cls.account.account.name, + services["zoneid"], + services["domainid"] + ) + cls._cleanup = [cls.vm_1, cls.vm_2, cls.non_src_nat_ip, cls.account] + + def setUp(self): + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.cleanup = [] + + def tearDown(self): + cleanup_resources(self.apiclient, self.cleanup) + + @classmethod + def tearDownClass(cls): + cleanup_resources(cls.api_client, cls._cleanup) + + + def test_01_create_lb_rule_src_nat(self): + """Test Port Forwarding Rules for Source NAT IP Addresses + """ + src_nat_ip_addr = self.account.public_ip.ipaddress + + lb_rule = LoadBalancerRule.create( + self.apiclient, + services["lbrule"], + src_nat_ip_addr.id, + accountid = self.account.account.name + ) + self.cleanup.append(lb_rule) + + lb_rule.assign(self.apiclient, [self.vm_1, self.vm_2]) + # Validate the Following: + #1. listLoadBalancerRules should return the added rule + #2. attempt to ssh twice on the load balanced IP + #3. verify using the hostname of the VM that round robin is indeed happening as expected + cmd = listLoadBalancerRules.listLoadBalancerRulesCmd() + cmd.id = lb_rule.id + lb_rules = self.apiclient.listLoadBalancerRules(cmd) + + self.assertNotEqual( + len(lb_rules), + 0, + "Check Load Balancer Rule in its List" + ) + + self.assertEqual( + lb_rules[0].id, + lb_rule.id, + "Check List Load Balancer Rules returns valid Rule" + ) + + + ssh_1 = remoteSSHClient.remoteSSHClient( + src_nat_ip_addr.ipaddress, + services['lbrule']["publicport"], + self.vm_1.username, + self.vm_1.password + ) + + #If Round Robin Algorithm is chosen, each ssh command should alternate between VMs + hostnames = [ssh_1.execute("hostname")[0]] + time.sleep(20) + ssh_2 = remoteSSHClient.remoteSSHClient( + src_nat_ip_addr.ipaddress, + services['lbrule']["publicport"], + self.vm_1.username, + self.vm_1.password + ) + + + hostnames.append(ssh_2.execute("hostname")[0]) + self.assertIn(self.vm_1.name, hostnames, "Check if ssh succeeded for server1") + self.assertIn(self.vm_2.name, hostnames, "Check if ssh succeeded for server2") + + def test_02_create_lb_rule_non_nat(self): + """Test Load Balancing Rules for Non-Source IP Addresses + """ + lb_rule = LoadBalancerRule.create( + self.apiclient, + services["lbrule"], + cls.non_src_nat_ip.ipaddress.id, + accountid = self.account.account.name + ) + self.cleanup.append(lb_rule) + + lb_rule.assign(self.apiclient, [self.vm_1, self.vm_2]) + + # Validate the Following: + #1. listLoadBalancerRules should return the added rule + #2. attempt to ssh twice on the load balanced IP + #3. verify using the hostname of the VM that round robin is indeed happening as expected + + cmd = listLoadBalancerRules.listLoadBalancerRulesCmd() + cmd.id = lb_rule.id + lb_rules = self.apiclient.listLoadBalancerRules(cmd) + + self.assertNotEqual( + len(lb_rules), + 0, + "Check Load Balancer Rule in its List" + ) + + self.assertEqual( + lb_rules[0].id, + lb_rule.id, + "Check List Load Balancer Rules returns valid Rule" + ) + + + ssh_1 = remoteSSHClient.remoteSSHClient( + cls.non_src_nat_ip.ipaddress.ipaddress, + services['lbrule']["publicport"], + self.vm_1.username, + self.vm_1.password + ) + + #If Round Robin Algorithm is chosen, each ssh command should alternate between VMs + hostnames = [ssh_1.execute("hostname")[0]] + time.sleep(20) + ssh_2 = remoteSSHClient.remoteSSHClient( + cls.non_src_nat_ip.ipaddress.ipaddress, + services['lbrule']["publicport"], + self.vm_1.username, + self.vm_1.password + ) + + + hostnames.append(ssh_2.execute("hostname")[0]) + self.assertIn(self.vm_1.name, hostnames, "Check if ssh succeeded for server1") + self.assertIn(self.vm_2.name, hostnames, "Check if ssh succeeded for server2") + + + +class TestRebootRouter(cloudstackTestCase): + """Test Load Balancing Rules work post Router Reboot + """ + def setUp(self): + + self.apiclient = self.testClient.getApiClient() + #Create an account, network, VM and IP addresses + self.account = Account.create(self.apiclient, services["account"], admin=True) + self.vm_1 = VirtualMachine.create( + self.apiclient, + services["server"], + accountid=self.account.account.name, + networkids=[str(self.account.network.id)] + ) + src_nat_ip_addr = self.account.public_ip.ipaddress + lb_rule = LoadBalancerRule.create( + self.apiclient, + services["lbrule"], + src_nat_ip_addr.id, + self.account.account.name + ) + lb_rule.assign(self.apiclient, [self.vm_1]) + #nat_rule = NATRule.create(self.apiclient, self.vm_1, services["natrule"], src_nat_ip_addr.id) + self.cleanup = [self.vm_1, lb_rule, self.account] + + + def test_reboot_router(self): + + #Validate the Following + #1. Post restart PF and LB rules should still function + #2. verify if the ssh into the virtual machine still works through the sourceNAT Ip + cmd = listRouters.listRoutersCmd() + cmd.account = self.account.account.name + cmd.domainid = self.account.account.domainid + routers = self.apiclient.listRouters(cmd) + router = routers[0] + + cmd = rebootRouter.rebootRouterCmd() + cmd.id = router.id + self.apiclient.rebootRouter(cmd) + time.sleep(60) + src_nat_ip_addr = self.account.public_ip.ipaddress + + try: + remoteSSHClient.remoteSSHClient( + src_nat_ip_addr.ipaddress, + services["natrule"]["publicport"], + self.vm_1.username, + self.vm_1.password + ) + except Exception as e: + self.fail("SSH Access failed for %s: %s" %(self.vm_1.ipaddress, e)) + + def tearDown(self): + cleanup_resources(self.cleanup) + +class TestAssignRemoveLB(cloudstackTestCase): + """Assign Load Balancer Rule to two Virtual Machines, Remove One VM + and associate another VM. + """ + + def setUp(self): + self.apiclient = self.testClient.getApiClient() + self.account = Account.create(self.apiclient, services["account"], admin=True) + + self.vm_1 = VirtualMachine.create( + self.apiclient, + services["server"], + accountid=self.account.account.name, + networkids=[str(self.account.network.id)] + ) + self.vm_2 = VirtualMachine.create( + self.apiclient, + services["server"], + accountid=self.account.account.name, + networkids=[str(self.account.network.id)] + ) + + self.vm_3 = VirtualMachine.create( + self.apiclient, + services["server"], + accountid=self.account.account.name, + networkids=[str(self.account.network.id)] + ) + self.non_src_nat_ip = self.account.public_ip.ipaddress + self.cleanup = [self.vm_1, self.vm_2, self.vm_3] + + def test_assign_and_removal_elb(self): + + #Validate: + #1. Verify list API - listLoadBalancerRules lists all the rules with the relevant ports + #2. listLoadBalancerInstances will list the instances associated with the corresponding rule. + #3. verify ssh attempts should pass as long as there is at least one instance associated with the rule + + lb_rule = LoadBalancerRule.create( + self.apiclient, + services["lbrule"], + self.non_src_nat_ip.id, + self.account.account.name + ) + self.cleanup.append(lb_rule) + lb_rule.assign(self.apiclient, [self.vm_1, self.vm_2]) + + ssh_1 = remoteSSHClient.remoteSSHClient( + self.non_src_nat_ip.ipaddress, + services["natrule"]["publicport"], + self.vm_1.username, + self.vm_1.password + ) + + ssh_2 = remoteSSHClient.remoteSSHClient( + self.non_src_nat_ip.ipaddress, + services["natrule"]["publicport"], + self.vm_2.username, + self.vm_2.password + ) + ssh_3 = remoteSSHClient.remoteSSHClient( + self.non_src_nat_ip.ipaddress, + services["natrule"]["publicport"], + self.vm_3.username, + self.vm_3.password + ) + #If Round Robin Algorithm is chosen, each ssh command should alternate between VMs + res_1 = ssh_1.execute("hostname")[0] + time.sleep(20) + res_2 = ssh_2.execute("hostname")[0] + + self.assertIn(self.vm_1.name, res_1, "Check if ssh succeeded for server1") + self.assertIn(self.vm_2.name, res_2, "Check if ssh succeeded for server2") + + lb_rule.remove(self.apiclient, [self.vm_2]) + + res_1 = ssh_1.execute("hostname")[0] + self.assertIn(self.vm_1.name, res_1, "Check if ssh succeeded for server1") + + lb_rule.assign(self.apiclient, [self.vm_3]) + + res_1 = ssh_1.execute("hostname")[0] + time.sleep(20) + res_3 = ssh_3.execute("hostname")[0] + + self.assertIn(self.vm_1.name, res_1, "Check if ssh succeeded for server1") + self.assertIn(self.vm_3.name, res_3, "Check if ssh succeeded for server3") + + def teardown(self): + cleanup_resources(self.cleanup) diff --git a/tools/testClient/testcase/BVT-tests/test_service_offerings.py b/tools/testClient/testcase/BVT-tests/test_service_offerings.py index 027a37f2c22..661ee272dcc 100644 --- a/tools/testClient/testcase/BVT-tests/test_service_offerings.py +++ b/tools/testClient/testcase/BVT-tests/test_service_offerings.py @@ -9,77 +9,50 @@ from cloudstackTestCase import * from cloudstackAPI import * from settings import * -import remoteSSHClient from utils import * from base import * services = TEST_SERVICE_OFFERING -class TestServiceOfferings(cloudstackTestCase): - +class TestCreateServiceOffering(cloudstackTestCase): + def setUp(self): self.apiclient = self.testClient.getApiClient() self.dbclient = self.testClient.getDbConnection() - - @classmethod - def setUpClass(cls): - - cls.api_client = fetch_api_client() - - cmd = createServiceOffering.createServiceOfferingCmd() - cmd.cpunumber = services["off_1"]["cpunumber"] - cmd.cpuspeed = services["off_1"]["cpuspeed"] - cmd.displaytext = services["off_1"]["displaytext"] - cmd.memory = services["off_1"]["memory"] - cmd.name = services["off_1"]["name"] - - cls.small_service_offering = cls.api_client.createServiceOffering(cmd) - - cmd = createServiceOffering.createServiceOfferingCmd() - cmd.cpunumber = services["off_2"]["cpunumber"] - cmd.cpuspeed = services["off_2"]["cpuspeed"] - cmd.displaytext = services["off_2"]["displaytext"] - cmd.memory = services["off_2"]["memory"] - cmd.name = services["off_2"]["name"] - - cls.medium_service_offering = cls.api_client.createServiceOffering(cmd) - return - - @classmethod - def tearDownClass(cls): - - cls.api_client = fetch_api_client() - cmd = deleteServiceOffering.deleteServiceOfferingCmd() - cmd.id = cls.small_service_offering.id - cls.api_client.deleteServiceOffering(cmd) - return - + self.cleanup = [] + + def tearDown(self): + try: + self.dbclient.close() + #Clean up, terminate the created templates + cleanup_resources(self.apiclient, self.cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" %e) + + return + def test_01_create_service_offering(self): """Test to create service offering""" - + # Validate the following: # 1. createServiceOfferings should return a valid information for newly created offering # 2. The Cloud Database contains the valid information - - cmd = createServiceOffering.createServiceOfferingCmd() - #Add the required parameters for creating new service offering - cmd.cpunumber = services["off_1"]["cpunumber"] - cmd.cpuspeed = services["off_1"]["cpuspeed"] - cmd.displaytext = services["off_1"]["displaytext"] - cmd.memory = services["off_1"]["memory"] - cmd.name = services["off_1"]["name"] - - self.tmp_service_offering = self.apiclient.createServiceOffering(cmd) - + + service_offering = ServiceOffering.create(self.apiclient, services["off_1"]) + self.cleanup.append(service_offering) + cmd = listServiceOfferings.listServiceOfferingsCmd() - cmd.name = services["off_1"]["name"] + cmd.id = service_offering.id list_service_response = self.apiclient.listServiceOfferings(cmd) - + self.assertNotEqual( len(list_service_response), 0, "Check Service offering is created" ) + service_response = list_service_response[0] + self.assertEqual( list_service_response[0].cpunumber, services["off_1"]["cpunumber"], @@ -105,12 +78,12 @@ class TestServiceOfferings(cloudstackTestCase): services["off_1"]["name"], "Check name in createServiceOffering" ) - + #Verify the service offerings with database records qresultset = self.dbclient.execute( - "select cpu, speed, ram_size from service_offering where id = %s;" - % self.tmp_service_offering.id + "select cpu, speed, ram_size from service_offering where id = %s;" + % service_offering.id ) - + self.assertNotEqual( len(qresultset), 0, @@ -118,7 +91,7 @@ class TestServiceOfferings(cloudstackTestCase): ) qresult = qresultset[0] - + self.assertEqual( qresult[0], services["off_1"]["cpunumber"], @@ -134,28 +107,68 @@ class TestServiceOfferings(cloudstackTestCase): services["off_1"]["memory"], "Check number of CPUs allocated to service offering in the database" ) - cmd = deleteServiceOffering.deleteServiceOfferingCmd() - cmd.id = self.tmp_service_offering.id - self.api_client.deleteServiceOffering(cmd) return - + + +class TestServiceOfferings(cloudstackTestCase): + + def setUp(self): + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.cleanup = [] + + def tearDown(self): + + try: + self.dbclient.close() + #Clean up, terminate the created templates + cleanup_resources(self.apiclient, self.cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" %e) + + return + + @classmethod + def setUpClass(cls): + + cls.api_client = fetch_api_client() + cls.service_offering_1 = ServiceOffering.create(cls.api_client, services["off_1"]) + cls.service_offering_2 = ServiceOffering.create(cls.api_client, services["off_2"]) + return + + @classmethod + def tearDownClass(cls): + try: + cls.api_client = fetch_api_client() + cls.service_offering_1.delete(cls.api_client) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" %e) + return + def test_02_edit_service_offering(self): """Test to update existing service offering""" - + # Validate the following: # 1. updateServiceOffering should return a valid information for newly created offering + + #Generate new name & displaytext from random data random_displaytext = random_gen() - random_name = random_gen() + random_name = random_gen() + + cmd = updateServiceOffering.updateServiceOfferingCmd() - cmd.id= self.small_service_offering.id + #Add parameters for API call + cmd.id= self.service_offering_1.id cmd.displaytext = random_displaytext cmd.name = random_name self.apiclient.updateServiceOffering(cmd) - + cmd = listServiceOfferings.listServiceOfferingsCmd() - cmd.id = self.small_service_offering.id - list_service_response = self.apiclient.listServiceOfferings(cmd) - + cmd.id = self.service_offering_1.id + list_service_response = self.apiclient.listServiceOfferings(cmd) + self.assertNotEqual( len(list_service_response), 0, @@ -172,12 +185,13 @@ class TestServiceOfferings(cloudstackTestCase): random_name, "Check server name in updateServiceOffering" ) - + + #Verify updated values in database qresultset = self.dbclient.execute( - "select id, display_text, name from disk_offering where type='Service' and id = %s;" - % self.small_service_offering.id + "select id, display_text, name from disk_offering where type='Service' and id = %s;" + % self.service_offering_1.id ) - + self.assertNotEqual( len(qresultset), 0, @@ -185,58 +199,55 @@ class TestServiceOfferings(cloudstackTestCase): ) qresult = qresultset[0] - + self.assertEqual( qresult[0], - self.small_service_offering.id, + self.service_offering_1.id, "Check service offering ID in the database" ) self.assertEqual( qresult[1], - services["off_1"]["displaytext"], + random_displaytext, "Check service offering ID in the database" ) self.assertEqual( qresult[2], - services["off_1"]["name"], + random_name, "Check service offering ID in the database" ) - + return def test_03_delete_service_offering(self): """Test to delete service offering""" - + # Validate the following: # 1. deleteServiceOffering should return a valid information for newly created offering - + cmd = deleteServiceOffering.deleteServiceOfferingCmd() #Add the required parameters required to call for API - cmd.id = self.medium_service_offering.id + cmd.id = self.service_offering_2.id self.apiclient.deleteServiceOffering(cmd) - + cmd = listServiceOfferings.listServiceOfferingsCmd() - cmd.id = self.medium_service_offering.id - list_service_response = self.apiclient.listServiceOfferings(cmd) + cmd.id = self.service_offering_2.id + list_service_response = self.apiclient.listServiceOfferings(cmd) self.assertEqual( - list_service_response, - None, + list_service_response, + None, "Check if service offering exists in listDiskOfferings" ) - + #Verify database records for deleted service offerings qresultset = self.dbclient.execute( - "select id from service_offering where id = %s;" - % self.medium_service_offering.id - ) + "select id from service_offering where id = %s;" + % self.service_offering_2.id + ) self.assertEqual( len(qresultset), 1, "Check DB Query result set" - ) - return - - def tearDown(self): - self.dbclient.close() + ) return + diff --git a/tools/testClient/testcase/BVT-tests/test_snapshots.py b/tools/testClient/testcase/BVT-tests/test_snapshots.py index 6d69820ffc3..6ad24b5b460 100644 --- a/tools/testClient/testcase/BVT-tests/test_snapshots.py +++ b/tools/testClient/testcase/BVT-tests/test_snapshots.py @@ -10,51 +10,55 @@ from cloudstackAPI import * from settings import * from utils import * from base import * -#Import System modules services = TEST_SNAPSHOT_SERVICES -MOUNT_DIR = "/mnt/tmp" -SUB_DIR = "test" -SUB_LVL_DIR1 = "test1" -SUB_LVL_DIR2 = "test2" -RANDOM_DATA = "random.data" - class TestSnapshots(cloudstackTestCase): @classmethod def setUpClass(cls): cls.api_client = fetch_api_client() - cls.virtual_machine = cls.virtual_machine_with_disk = VirtualMachine.create(cls.api_client, services["server_with_disk"]) - cls.virtual_machine_without_disk = VirtualMachine.create(cls.api_client, services["server_without_disk"]) + cls.virtual_machine = cls.virtual_machine_with_disk =\ + VirtualMachine.create(cls.api_client, services["server_with_disk"]) + cls.virtual_machine_without_disk =\ + VirtualMachine.create(cls.api_client, services["server_without_disk"]) + cls.nat_rule = NATRule.create(cls.api_client, cls.virtual_machine, services["server_with_disk"]) + return @classmethod def tearDownClass(cls): - cls.virtual_machine.delete(cls.api_client) - cls.virtual_machine_without_disk.delete(cls.api_client) + try: + cls.virtual_machine.delete(cls.api_client) + cls.virtual_machine_without_disk.delete(cls.api_client) + cls.nat_rule.delete(cls.api_client) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" %e) + return - + def setUp(self): self.apiclient = self.testClient.getApiClient() self.dbclient = self.testClient.getDbConnection() - self.cleanup = { 'virtual_machine': [], - 'snapshot' : [], - 'volume' : [] - } - - def tearDown(self): - #Clean up, terminate the created instance, volumes and snapshots - cleanup_resources(self.apiclient, self.cleanup) + self.cleanup = [] return + def tearDown(self): + try: + #Clean up, terminate the created instance, volumes and snapshots + cleanup_resources(self.apiclient, self.cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" %e) + return def test_01_snapshot_root_disk(self): """Test Snapshot Root Disk - """ + """ cmd = listVolumes.listVolumesCmd() cmd.virtualmachineid = self.virtual_machine_with_disk.id cmd.type = 'ROOT' - + volumes = self.apiclient.listVolumes(cmd) snapshot = Snapshot.create(self.apiclient, volumes[0].id) @@ -63,21 +67,38 @@ class TestSnapshots(cloudstackTestCase): list_snapshots = self.apiclient.listSnapshots(cmd) self.assertNotEqual(list_snapshots, None, "Check if result exists in list item call") - + self.assertEqual( list_snapshots[0].id, snapshot.id, "Check resource id in list resources call" ) + self.debug("select backup_snap_id from snapshots where id = %s;" % snapshot.id) + qresultset = self.dbclient.execute("select backup_snap_id from snapshots where id = %s;" % snapshot.id) + self.assertNotEqual( + len(qresultset), + 0, + "Check DB Query result set" + ) + + + qresult = qresultset[0] + self.assertNotEqual( + str(qresult[0]), + 'NULL', + "Check if backup_snap_id is not null" + ) + return + def test_02_snapshot_data_disk(self): """Test Snapshot Data Disk - """ + """ cmd = listVolumes.listVolumesCmd() cmd.virtualmachineid = self.virtual_machine_with_disk.id cmd.type = 'DATADISK' - + volume = self.apiclient.listVolumes(cmd) snapshot = Snapshot.create(self.apiclient, volume[0].id) @@ -86,96 +107,158 @@ class TestSnapshots(cloudstackTestCase): list_snapshots = self.apiclient.listSnapshots(cmd) self.assertNotEqual(list_snapshots, None, "Check if result exists in list item call") - + self.assertEqual( list_snapshots[0].id, snapshot.id, "Check resource id in list resources call" ) + self.debug("select backup_snap_id from snapshots where id = %s;" % snapshot.id) + qresultset = self.dbclient.execute("select backup_snap_id from snapshots where id = %s;" % snapshot.id) + self.assertNotEqual( + len(qresultset), + 0, + "Check DB Query result set" + ) + + + qresult = qresultset[0] + self.assertNotEqual( + str(qresult[0]), + 'NULL', + "Check if backup_snap_id is not null" + ) + return + def test_03_volume_from_snapshot(self): - """Create volumes from snapshots + """Create volumes from snapshots """ - + #1. Login to machine; create temp/test directories on data volume #2. Snapshot the Volume #3. Create another Volume from snapshot #4. Mount/Attach volume to another server - #5. Compare data + #5. Compare data random_data_0 = random_gen(100) random_data_1 = random_gen(100) - - ssh_client = self.virtual_machine.get_ssh_client() + + ssh_client = self.virtual_machine.get_ssh_client(services["server_with_disk"]["ipaddress"]) + + #Format partition using ext3 format_volume_to_ext3(ssh_client, services["diskdevice"]) - cmds = [ "mkdir -p %s" %MOUNT_DIR, - "mount %s1 %s" %(services["diskdevice"], MOUNT_DIR), - "pushd %s" %MOUNT_DIR, - "mkdir -p %s/{%s,%s} " %(SUB_DIR, SUB_LVL_DIR1, SUB_LVL_DIR2), - "echo %s > %s/%s/%s" %(random_data_0, SUB_DIR, SUB_LVL_DIR1, RANDOM_DATA), - "echo %s > %s/%s/%s" %(random_data_1, SUB_DIR, SUB_LVL_DIR1, RANDOM_DATA) - ] + cmds = [ "mkdir -p %s" % services["mount_dir"], + "mount %s1 %s" %(services["diskdevice"], services["mount_dir"]), + "pushd %s" % services["mount_dir"], + "mkdir -p %s/{%s,%s} " %( + services["sub_dir"], + services["sub_lvl_dir1"], + services["sub_lvl_dir2"] + ), + "echo %s > %s/%s/%s" %( + random_data_0, + services["sub_dir"], + services["sub_lvl_dir1"], + services["random_data"] + ), + "echo %s > %s/%s/%s" %( + random_data_1, + services["sub_dir"], + services["sub_lvl_dir2"], + services["random_data"] + ) + ] for c in cmds: self.debug(ssh_client.execute(c)) - + cmd = listVolumes.listVolumesCmd() cmd.hostid = self.virtual_machine.id cmd.type = 'DATADISK' list_volume_response = self.apiclient.listVolumes(cmd) volume = list_volume_response[0] - - snapshot = Snapshot.create(self.apiclient, volume.id) - self.cleanup['snapshot'].append(snapshot) + + #Create snapshot from attached volume + snapshot = Snapshot.create(self.apiclient, volume.id) + self.cleanup.append(snapshot) + #Create volume from snapshot volume = Volume.create_from_snapshot(self.apiclient, snapshot.id, services) - self.cleanup['volume'].append(volume) - + self.cleanup.append(volume) + cmd = listVolumes.listVolumesCmd() cmd.id = volume.id list_volumes = self.apiclient.listVolumes(cmd) - - assertNotEqual( + + self.assertNotEqual( len(list_volumes), None, "Check Volume list Length" ) - - assertEqual ( + + self.assertEqual ( list_volumes[0].id, volume.id, "Check Volume in the List Volumes" ) + #Attaching volume to new VM new_virtual_machine = self.virtual_machine_without_disk - self.cleanup['virtual_machine'].append(new_virtual_machine) + self.cleanup.append(new_virtual_machine) cmd = attachVolume.attachVolumeCmd() - cmd.id = self.volume.id - cmd.virtualmachineid = deploy_new_virtual_machine.id + cmd.id = volume.id + cmd.virtualmachineid = new_virtual_machine.id volume = self.apiclient.attachVolume(cmd) - ssh = virtual_machine.get_ssh_client() + + #Login to VM to verify test directories and files + ssh = new_virtual_machine.get_ssh_client(services["server_without_disk"]["ipaddress"]) cmds = [ - "mkdir %s" %MOUNT_DIR, - "mount %s1 %s" %(services["diskdevice"], MOUNT_DIR) + "mkdir %s" %services["mount_dir"], + "mount %s1 %s" %(services["diskdevice"], services["mount_dir"]) ] for c in cmds: self.debug(ssh.execute(c)) - returned_data_0 = ssh.execute("cat %s/%s/%s/%s" %(MOUNT_DIR, SUB_DIR, SUB_LVL_DIR1, RANDOM_DATA)) - returned_data_1 = ssh.execute("cat %s/%s/%s/%s" %(MOUNT_DIR, SUB_DIR, SUB_LVL_DIR2, RANDOM_DATA)) - self.assertEqual(random_data_0, returned_data_0, "Verify newly attached volume contents with existing one") - self.assertEqual(random_data_1, returned_data_1, "Verify newly attached volume contents with existing one") + returned_data_0 = ssh.execute("cat %s/%s/%s" %( + services["sub_dir"], + services["sub_lvl_dir1"], + services["random_data"] + ) + ) + + returned_data_1 = ssh.execute("cat %s/%s/%s" %( + services["sub_dir"], + services["sub_lvl_dir2"], + services["random_data"] + ) ) + #Verify returned data + self.assertEqual( + random_data_0, + returned_data_0[0], + "Verify newly attached volume contents with existing one" + ) + self.assertEqual( + random_data_1, + returned_data_1[0], + "Verify newly attached volume contents with existing one" + ) + + #detach volume for cleanup + cmd = detachVolume.detachVolumeCmd() + cmd.id = volume.id + self.apiclient.detachVolume(cmd) return - + def test_04_delete_snapshot(self): """Test Delete Snapshot """ - + cmd = listVolumes.listVolumesCmd() cmd.hostid = self.virtual_machine.id cmd.type = 'DATADISK' list_volumes = self.apiclient.listVolumes(cmd) - + cmd = listSnapshots.listSnapshotsCmd() cmd.id = list_volumes[0].id list_snapshots = self.apiclient.listSnapshots(cmd) @@ -187,98 +270,212 @@ class TestSnapshots(cloudstackTestCase): cmd = listSnapshots.listSnapshotsCmd() cmd.id = snapshot.id list_snapshots = self.apiclient.listSnapshots(cmd) - + self.assertEqual(list_snapshots, None, "Check if result exists in list item call") - + return + def test_05_recurring_snapshot_root_disk(self): """Test Recurring Snapshot Root Disk - """ - + """ + #1. Create snapshot policy for root disk + #2. ListSnapshot policy should return newly created policy + #3. Verify only most recent number (maxsnaps) snapshots retailed + cmd = listVolumes.listVolumesCmd() cmd.virtualmachineid = self.virtual_machine_with_disk.id cmd.type = 'ROOT' - + volume = self.apiclient.listVolumes(cmd) - - cmd = createSnapshotPolicy.createSnapshotPolicyCmd() - cmd.intervaltype=services["recurring_snapshot"]["intervaltype"] - cmd.maxsnaps=services["recurring_snapshot"]["maxsnaps"] - cmd.schedule=services["recurring_snapshot"]["schedule"] - cmd.timezone=services["recurring_snapshot"]["timezone"] - cmd.volumeid=volume[0].id - recurring_snapshot = self.apiclient.createSnapshotPolicy(cmd) + + recurring_snapshot = SnapshotPolicy.create(self.apiclient, volume[0].id, services["recurring_snapshot"]) + self.cleanup.append(recurring_snapshot) + + #ListSnapshotPolicy should return newly created policy cmd = listSnapshotPolicies.listSnapshotPoliciesCmd() cmd.id = recurring_snapshot.id cmd.volumeid=volume[0].id - list_snapshots = self.apiclient.listSnapshotPolicies(cmd) + list_snapshots_policy = self.apiclient.listSnapshotPolicies(cmd) - self.assertNotEqual(list_snapshots, None, "Check if result exists in list item call") - + self.assertNotEqual(list_snapshots_policy, None, "Check if result exists in list item call") + + snapshots_policy = list_snapshots_policy[0] self.assertEqual( - list_snapshots[0].id, + snapshots_policy.id, recurring_snapshot.id, "Check recurring snapshot id in list resources call" ) self.assertEqual( - list_snapshots[0].maxsnaps, + snapshots_policy.maxsnaps, services["recurring_snapshot"]["maxsnaps"], "Check interval type in list resources call" ) - - #Sleep for 9 hours to check only 8 snapshots are retained - time.sleep(32400) + + #Sleep for (maxsnaps+1) hours to verify only maxsnaps snapshots are retained + time.sleep(((services["recurring_snapshot"]["maxsnaps"])+1)*3600) + cmd = listSnapshots.listSnapshotsCmd() cmd.volumeid=volume.id cmd.intervaltype = services["recurring_snapshot"]["intervaltype"] cmd.snapshottype = 'RECURRING' - + list_snapshots = self.apiclient.listSnapshots(cmd) - - self.assertEqual(len(list_snapshots),8, "Check maximum number of recurring snapshots retained") - + + self.assertEqual( + len(list_snapshots), + services["recurring_snapshot"]["maxsnaps"], + "Check maximum number of recurring snapshots retained" + ) + return + def test_06_recurring_snapshot_data_disk(self): """Test Recurring Snapshot data Disk - """ - + """ + + #1. Create snapshot policy for data disk + #2. ListSnapshot policy should return newly created policy + #3. Verify only most recent number (maxsnaps) snapshots retailed + cmd = listVolumes.listVolumesCmd() cmd.virtualmachineid = self.virtual_machine_with_disk.id cmd.type = 'DATADISK' - + volume = self.apiclient.listVolumes(cmd) - - cmd = createSnapshotPolicy.createSnapshotPolicyCmd() - cmd.intervaltype=services["recurring_snapshot"]["intervaltype"] - cmd.maxsnaps=services["recurring_snapshot"]["maxsnaps"] - cmd.schedule=services["recurring_snapshot"]["schedule"] - cmd.timezone=services["recurring_snapshot"]["timezone"] - cmd.volumeid=volume[0].id - recurring_snapshot = self.apiclient.createSnapshotPolicy(cmd) - + + recurring_snapshot = SnapshotPolicy.create(self.apiclient, volume[0].id, services["recurring_snapshot"]) + self.cleanup.append(recurring_snapshot) + + #ListSnapshotPolicy should return newly created policy cmd = listSnapshotPolicies.listSnapshotPoliciesCmd() cmd.id = recurring_snapshot.id cmd.volumeid=volume[0].id - list_snapshots = self.apiclient.listSnapshotPolicies(cmd) + list_snapshots_policy = self.apiclient.listSnapshotPolicies(cmd) - self.assertNotEqual(list_snapshots, None, "Check if result exists in list item call") - + self.assertNotEqual(list_snapshots_policy, None, "Check if result exists in list item call") + + snapshots_policy = list_snapshots_policy[0] self.assertEqual( - list_snapshots[0].id, + snapshots_policy.id, recurring_snapshot.id, "Check recurring snapshot id in list resources call" ) self.assertEqual( - list_snapshots[0].maxsnaps, + snapshots_policy.maxsnaps, services["recurring_snapshot"]["maxsnaps"], "Check interval type in list resources call" ) - - #Sleep for 9 hours to check only 8 snapshots are retained - time.sleep(32400) + + #Sleep for (maxsnaps+1) hours to verify only maxsnaps snapshots are retained + time.sleep(((services["recurring_snapshot"]["maxsnaps"])+1)*3600) + cmd = listSnapshots.listSnapshotsCmd() cmd.volumeid=volume.id cmd.intervaltype = services["recurring_snapshot"]["intervaltype"] cmd.snapshottype = 'RECURRING' - + list_snapshots = self.apiclient.listSnapshots(cmd) - - self.assertEqual(len(list_snapshots),8, "Check maximum number of recurring snapshots retained") + + self.assertEqual( + len(list_snapshots), + services["recurring_snapshot"]["maxsnaps"], + "Check maximum number of recurring snapshots retained" + ) + return + + def test_07_template_from_snapshot(self): + """Create Template from snapshot + """ + + #1. Login to machine; create temp/test directories on data volume + #2. Snapshot the Volume + #3. Create Template from snapshot + #4. Deploy Virtual machine using this template + #5. Login to newly created virtual machine + #6. Compare data + + random_data_0 = random_gen(100) + random_data_1 = random_gen(100) + + #Login to virtual machine + ssh_client = self.virtual_machine.get_ssh_client(services["server_with_disk"]["ipaddress"]) + + cmds = [ "mkdir -p %s" % services["mount_dir"], + "mount %s1 %s" %(services["diskdevice"], services["mount_dir"]), + "pushd %s" % services["mount_dir"], + "mkdir -p %s/{%s,%s} " %( + services["sub_dir"], + services["sub_lvl_dir1"], + services["sub_lvl_dir2"] + ), + "echo %s > %s/%s/%s" %( + random_data_0, + services["sub_dir"], + services["sub_lvl_dir1"], + services["random_data"] + ), + "echo %s > %s/%s/%s" %( + random_data_1, + services["sub_dir"], + services["sub_lvl_dir2"], + services["random_data"] + ) + ] + for c in cmds: + ssh_client.execute(c) + + cmd = listVolumes.listVolumesCmd() + cmd.virtualmachineid = self.virtual_machine.id + cmd.type = 'ROOT' + + volume = self.apiclient.listVolumes(cmd)[0] + #Create a snapshot of volume + snapshot = Snapshot.create(self.apiclient, volume.id) + self.cleanup.append(snapshot) + + # Generate template from the snapshot + template = Template.create_from_snapshot(self.apiclient, snapshot, services["templates"]) + + cmd = listTemplates.listTemplatesCmd() + cmd.templatefilter = services["templates"]["templatefilter"] + cmd.id= template.id + list_templates = self.apiclient.listTemplates(cmd) + + self.assertNotEqual(list_templates, None, "Check if result exists in list item call") + + self.assertEqual( + list_templates[0].id, + template.id, + "Check new template id in list resources call" + ) + + # Deploy new virtual machine using template + new_virtual_machine = VirtualMachine.create( + self.apiclient, + services["server_without_disk"], + template.id + ) + self.cleanup.append(new_virtual_machine) + #Login to VM & mount directory + ssh = new_virtual_machine.get_ssh_client(services["server_without_disk"]["ipaddress"]) + cmds = [ + "mkdir %s" %services["mount_dir"], + "mount %s1 %s" %(services["diskdevice"], services["mount_dir"]) + ] + + for c in cmds: + ssh.execute(c) + + returned_data_0 = ssh.execute("cat %s/%s/%s" %( + services["sub_dir"], + services["sub_lvl_dir1"], + services["random_data"] + ) ) + returned_data_1 = ssh.execute("cat %s/%s/%s" %( + services["sub_dir"], + services["sub_lvl_dir2"], + services["random_data"] + ) ) + #Verify returned data + self.assertEqual(random_data_0, returned_data_0[0], "Verify newly attached volume contents with existing one") + self.assertEqual(random_data_1, returned_data_1[0], "Verify newly attached volume contents with existing one") + + return diff --git a/tools/testClient/testcase/BVT-tests/test_vm_life_cycle.py b/tools/testClient/testcase/BVT-tests/test_vm_life_cycle.py index 126dd5b8dd6..f161b08766d 100644 --- a/tools/testClient/testcase/BVT-tests/test_vm_life_cycle.py +++ b/tools/testClient/testcase/BVT-tests/test_vm_life_cycle.py @@ -19,24 +19,29 @@ import time services = TEST_VM_LIFE_CYCLE_SERVICES class TestDeployVM(cloudstackTestCase): - + def setUp(self): self.apiclient = self.testClient.getApiClient() self.dbclient = self.testClient.getDbConnection() + self.cleanup = [] + self.virtual_machine = VirtualMachine.create(self.apiclient, services["small"]) + self.cleanup.append(self.virtual_machine) + self.nat_rule = NATRule.create(self.apiclient, self.virtual_machine, services["small"]) + self.cleanup.append(self.nat_rule) + def test_deploy_vm(self): """Test Deploy Virtual Machine """ - - self.virtual_machine = VirtualMachine.create(self.apiclient, services["small"]) - #Validate the following: + + # Validate the following: # 1. Virtual Machine is accessible via SSH # 2. listVirtualMachines returns accurate information # 3. The Cloud Database contains the valid information - + ipaddress = self.virtual_machine.ipaddress self.debug("Verify SSH Access for virtual machine: %s" %self.virtual_machine.id) try: - self.virtual_machine.get_ssh_client() + self.virtual_machine.get_ssh_client(services["small"]["ipaddress"]) except Exception as e: self.fail("SSH Access failed for %s: %s" %(self.virtual_machine.ipaddress, e)) @@ -45,26 +50,28 @@ class TestDeployVM(cloudstackTestCase): list_vm_response = self.apiclient.listVirtualMachines(cmd) self.debug("Verify listVirtualMachines response for virtual machine: %s" %self.virtual_machine.id) + vm_response = list_vm_response[0] self.assertNotEqual( len(list_vm_response), 0, - "Check VM avaliable in List Virtual Machines" + "Check VM available in List Virtual Machines" ) self.assertEqual( - list_vm_response[0].id, + + vm_response.id, self.virtual_machine.id, "Check virtual machine id in listVirtualMachines" ) self.assertEqual( - list_vm_response[0].displayname, + vm_response.displayname, self.virtual_machine.displayname, "Check virtual machine displayname in listVirtualMachines" ) self.debug("Verify the database entry for virtual machine: %s" %self.virtual_machine.id) - + self.debug("select id, state, private_ip_address from vm_instance where id = %s;" %self.virtual_machine.id) qresultset = self.dbclient.execute("select id, state, private_ip_address from vm_instance where id = %s;" %self.virtual_machine.id) @@ -75,7 +82,7 @@ class TestDeployVM(cloudstackTestCase): ) qresult = qresultset[0] - + self.assertEqual( qresult[0], self.virtual_machine.id, @@ -88,37 +95,55 @@ class TestDeployVM(cloudstackTestCase): ) self.assertEqual( qresult[2], - self.virtual_machine.ipaddress, + ipaddress, "Check IP Address in the database" ) return def tearDown(self): try: - self.virtual_machine.delete(self.apiclient) + cleanup_resources(self.apiclient, self.cleanup) except Exception as e: self.debug("Warning! Exception in tearDown: %s" %e) - class TestVMLifeCycle(cloudstackTestCase): @classmethod def setUpClass(cls): cls.api_client = fetch_api_client() - #create small and large servers + #create small and large virtual machines cls.small_virtual_machine = VirtualMachine.create( - cls.api_client, + cls.api_client, TEST_VM_LIFE_CYCLE_SERVICES["small"] ) cls.medium_virtual_machine = VirtualMachine.create( - cls.api_client, + cls.api_client, TEST_VM_LIFE_CYCLE_SERVICES["medium"] ) + cls.virtual_machine = VirtualMachine.create( + cls.api_client, + TEST_VM_LIFE_CYCLE_SERVICES["small"] + ) + cls.nat_rule = NATRule.create(cls.api_client, cls.virtual_machine, services["small"]) + + @classmethod + def tearDownClass(cls): + cls.api_client = fetch_api_client() + cls.nat_rule.delete(cls.api_client) + return + def setUp(self): self.apiclient = self.testClient.getApiClient() self.dbclient = self.testClient.getDbConnection() + self.cleanup = [] + + def tearDown(self): + self.dbclient.close() + #Clean up, terminate the created ISOs + cleanup_resources(self.apiclient, self.cleanup) + return def test_01_stop_vm(self): """Test Stop Virtual Machine @@ -154,7 +179,7 @@ class TestVMLifeCycle(cloudstackTestCase): ) self.debug("Verify the database entry for virtual machine: %s" %self.small_virtual_machine.id) - + self.debug("select state from vm_instance where id = %s;" %self.small_virtual_machine.id) qresultset = self.dbclient.execute("select state from vm_instance where id = %s;" %self.small_virtual_machine.id) @@ -166,7 +191,7 @@ class TestVMLifeCycle(cloudstackTestCase): qresult = qresultset[0] - + self.assertEqual( qresult[0], "Stopped", @@ -200,7 +225,7 @@ class TestVMLifeCycle(cloudstackTestCase): self.debug("Verify SSH Access for virtual machine: %s" %self.small_virtual_machine.id) try: - self.small_virtual_machine.get_ssh_client() + self.small_virtual_machine.get_ssh_client(services["small"]["ipaddress"]) except Exception as e: self.fail("SSH Access failed for %s: %s" %(self.small_virtual_machine.ipaddress, e)) @@ -215,7 +240,7 @@ class TestVMLifeCycle(cloudstackTestCase): qresult = qresultset[0] - + self.assertEqual( qresult[0], 'Running', @@ -243,7 +268,7 @@ class TestVMLifeCycle(cloudstackTestCase): self.debug("Verify SSH Access for virtual machine: %s" %self.small_virtual_machine.id) try: - self.small_virtual_machine.get_ssh_client() + self.small_virtual_machine.get_ssh_client(services["small"]["ipaddress"]) except Exception as e: self.fail("SSH Access failed for %s: %s" %(self.small_virtual_machine.ipaddress, e)) @@ -262,9 +287,9 @@ class TestVMLifeCycle(cloudstackTestCase): cmd = stopVirtualMachine.stopVirtualMachineCmd() cmd.id = self.small_virtual_machine.id self.apiclient.stopVirtualMachine(cmd) - + #Sleep for 60 seconds to ensure the machine has stopped - time.sleep(60) + time.sleep(60) cmd = changeServiceForVirtualMachine.changeServiceForVirtualMachineCmd() cmd.id = self.small_virtual_machine.id cmd.serviceofferingid = services["service_offerings"]["medium"]["id"] @@ -276,13 +301,13 @@ class TestVMLifeCycle(cloudstackTestCase): self.apiclient.startVirtualMachine(cmd) try: - ssh = self.small_virtual_machine.get_ssh_client() + ssh = self.small_virtual_machine.get_ssh_client(services["small"]["ipaddress"]) except Exception as e: self.fail("SSH Access failed for %s: %s" %(self.small_virtual_machine.ipaddress, e)) - + cpuinfo = ssh.execute("cat /proc/cpuinfo") - + cpu_cnt = len([i for i in cpuinfo if "processor" in i]) #'cpu MHz\t\t: 2660.499' cpu_speed = [i for i in cpuinfo if "cpu MHz" in i][0].split()[3] @@ -316,7 +341,7 @@ class TestVMLifeCycle(cloudstackTestCase): self.apiclient.stopVirtualMachine(cmd) #Sleep before the changes are reflected - time.sleep(60) + time.sleep(60) cmd = changeServiceForVirtualMachine.changeServiceForVirtualMachineCmd() cmd.id = self.medium_virtual_machine.id cmd.serviceofferingid = services["service_offerings"]["small"]["id"] @@ -326,15 +351,15 @@ class TestVMLifeCycle(cloudstackTestCase): cmd = startVirtualMachine.startVirtualMachineCmd() cmd.id = self.medium_virtual_machine.id self.apiclient.startVirtualMachine(cmd) - + try: - ssh = self.medium_virtual_machine.get_ssh_client() + ssh = self.medium_virtual_machine.get_ssh_client(services["medium"]["ipaddress"]) except Exception as e: self.fail("SSH Access failed for %s: %s" %(self.medium_virtual_machine.ipaddress, e)) cpuinfo = ssh.execute("cat /proc/cpuinfo") - + cpu_cnt = len([i for i in cpuinfo if "processor" in i]) #'cpu MHz\t\t: 2660.499' cpu_speed = [i for i in cpuinfo if "cpu MHz" in i ][0].split()[3] @@ -394,7 +419,7 @@ class TestVMLifeCycle(cloudstackTestCase): ) self.debug("Verify the database entry for virtual machine: %s" %self.small_virtual_machine.id) - + self.debug("select state from vm_instance where id = %s;" %self.small_virtual_machine.id) qresultset = self.dbclient.execute("select state from vm_instance where id = %s;" %self.small_virtual_machine.id) self.assertNotEqual( @@ -405,7 +430,7 @@ class TestVMLifeCycle(cloudstackTestCase): qresult = qresultset[0] - + self.assertEqual( qresult[0], 'Destroyed', @@ -459,7 +484,40 @@ class TestVMLifeCycle(cloudstackTestCase): ) return - def test_08_expunge_vm(self): + def test_08_migrate_vm(self): + """Test migrate VM + """ + cmd = migrateVirtualMachine.migrateVirtualMachineCmd() + cmd.hostid = services["hostid"] + cmd.virtualmachineid = self.small_virtual_machine.id + self.apiclient.migrateVirtualMachine(cmd) + + cmd = listVirtualMachines.listVirtualMachinesCmd() + cmd.id = self.small_virtual_machine.id + list_vm_response = self.apiclient.listVirtualMachines(cmd) + + self.assertNotEqual( + list_vm_response, + None, + "Check virtual machine is listVirtualMachines" + ) + + vm_response = list_vm_response[0] + + self.assertEqual( + vm_response.id, + self.small_virtual_machine.id, + "Check virtual machine ID of migrated VM" + ) + + self.assertEqual( + vm_response.hostid, + services["hostid"], + "Check destination hostID of migrated VM" + ) + return + + def test_09_expunge_vm(self): """Test destroy(expunge) Virtual Machine """ cmd = destroyVirtualMachine.destroyVirtualMachineCmd() @@ -506,7 +564,7 @@ class TestVMLifeCycle(cloudstackTestCase): qresultset = self.dbclient.execute("select instance_id from nics where instance_id = %s;" %self.small_virtual_machine.id) qresult = qresultset[0] self.assertEqual( - + len(qresult), 0, "Check virtual_machine entry in NICS table" @@ -517,10 +575,62 @@ class TestVMLifeCycle(cloudstackTestCase): qresultset = self.dbclient.execute("select instance_id from volumes where instance_id = %s;" %self.small_virtual_machine.id) qresult = qresultset[0] self.assertEqual( - + len(qresult), 0, "Check virtual machine entry in VOLUMES table" ) return + def test_10_attachAndDetach_iso(self): + """Test for detach ISO to virtual machine""" + + # Validate the following + # 1. Create ISO + # 2. Attach ISO to VM + # 3. Log in to the VM. + # 4. The device should be available for use + # 5. Detach ISO + #6. Check the device is properly detached by logging into VM + + iso = Iso.create(self.apiclient, services["iso"]) + self.cleanup.append(iso) + iso.download(self.apiclient) + + #Attach ISO to virtual machine + cmd = attachIso.attachIsoCmd() + cmd.id = iso.id + cmd.virtualmachineid = self.virtual_machine.id + self.apiclient.attachIso(cmd) + + ssh_client = self.virtual_machine.get_ssh_client(services["small"]["ipaddress"]) + + cmds = [ "mkdir -p %s" %services["mount_dir"], + "mount -rt iso9660 %s %s" %(services["diskdevice"], services["mount_dir"]), + ] + for c in cmds: + res = ssh_client.execute(c) + + self.assertEqual(res, [], "Check mount is successful or not") + + c = "fdisk -l|grep %s|head -1" % services["diskdevice"] + res = ssh_client.execute(c) + #Disk /dev/xvdd: 4393 MB, 4393723904 bytes + actual_disk_size = res[0].split()[4] + + self.assertEqual(str(iso.size), actual_disk_size, "Check size of the attached ISO") + + #Unmount ISO + command = "umount %s" % services["diskdevice"] + ssh_client.execute(command) + + #Detach from VM + cmd = detachIso.detachIsoCmd() + cmd.virtualmachineid = self.virtual_machine.id + self.apiclient.detachIso(cmd) + + res = ssh_client.execute(c) + result = services["diskdevice"] in res[0].split() + + self.assertEqual(result, False, "Check if ISO is detached from virtual machine") + return diff --git a/tools/testClient/testcase/BVT-tests/test_volumes.py b/tools/testClient/testcase/BVT-tests/test_volumes.py index b2c4c92394e..afc639c59f6 100644 --- a/tools/testClient/testcase/BVT-tests/test_volumes.py +++ b/tools/testClient/testcase/BVT-tests/test_volumes.py @@ -8,15 +8,15 @@ from cloudstackTestCase import * from cloudstackAPI import * from settings import * -from utils import fetch_api_client, format_volume_to_ext3 -from base import VirtualMachine, Volume +from utils import * +from base import * #Import System modules import os import urllib2 import time import tempfile -services = TEST_VOLUME_SERVICES +services = TEST_VOLUME_SERVICES class TestCreateVolume(cloudstackTestCase): @@ -24,61 +24,66 @@ class TestCreateVolume(cloudstackTestCase): def setUpClass(cls): cls.api_client = fetch_api_client() cls.virtual_machine = VirtualMachine.create(cls.api_client, services) + cls.nat_rule = NATRule.create(cls.api_client, cls.virtual_machine, services) def setUp(self): self.apiClient = self.testClient.getApiClient() self.dbclient = self.testClient.getDbConnection() + self.cleanup= [] def test_01_create_volume(self): """Test Volume creation for all Disk Offerings (incl. custom) """ self.volumes = [] for k,v in services["volume_offerings"].items(): - self.volumes.append(Volume.create(self.apiClient, v)) - + volume = Volume.create(self.apiClient, v) + self.volumes.append(volume) + self.cleanup.append(volume) + self.volumes.append(Volume.create_custom_disk(self.apiClient, services)) #Attach a volume with different disk offerings and check the memory allocated to each of them for volume in self.volumes: cmd = listVolumes.listVolumesCmd() cmd.id = volume.id list_volume_response = self.apiClient.listVolumes(cmd) - + self.assertNotEqual(list_volume_response, None, "Check if volume exists in ListVolumes") qresultset = self.dbclient.execute("select id from volumes where id = %s" %volume.id) self.assertNotEqual(len(qresultset), 0, "Check if volume exists in Database") attached_volume = self.virtual_machine.attach_volume(self.apiClient,volume) - ssh = self.virtual_machine.get_ssh_client() - - #ssh.execute("shutdown -r now") + ssh = self.virtual_machine.get_ssh_client(services["ipaddress"]) + + ssh.execute("shutdown -r now") #Sleep to ensure the machine is rebooted properly - time.sleep(120) - - ssh = self.virtual_machine.get_ssh_client(True) - - res = ssh.execute("fdisk -l|grep /dev/sda|head -1") + time.sleep(120) + + ssh = self.virtual_machine.get_ssh_client(services["ipaddress"], True) + c = "fdisk -l|grep %s|head -1" % services["diskdevice"] + res = ssh.execute(c) #Disk /dev/sda: 21.5 GB, 21474836480 bytes - + actual_disk_size = res[0].split()[4] - + self.assertEqual(str(list_volume_response[0].size), actual_disk_size, "Check if promised disk size actually available") self.virtual_machine.detach_volume(self.apiClient,volume) - + def tearDown(self): - for volume in self.volumes: - volume.delete(self.apiClient) + #Clean up, terminate the created templates + cleanup_resources(self.apiClient, self.cleanup) + return @classmethod def tearDownClass(cls): try: + cls.nat_rule.delete(cls.api_client) cls.virtual_machine.delete(cls.api_client) - except Exception as e: raise Exception("Warning: Exception during cleanup : %s" %e) - + class TestVolumes(cloudstackTestCase): - @classmethod + @classmethod def setUpClass(cls): cls.api_client = fetch_api_client() cls.virtual_machine = VirtualMachine.create(cls.api_client, services) @@ -95,7 +100,7 @@ class TestVolumes(cloudstackTestCase): def setUp(self): self.apiClient = self.testClient.getApiClient() self.dbclient = self.testClient.getDbConnection() - + def test_02_attach_volume(self): """Attach a created Volume to a Running VM """ @@ -112,19 +117,19 @@ class TestVolumes(cloudstackTestCase): self.assertNotEqual(volume.virtualmachineid, None, "Check if volume state (attached) is reflected") qresultset = self.dbclient.execute("select instance_id, device_id from volumes where id = %s" %self.volume.id) - self.assertNotEqual(len(qresultset), 0, "Check if volume exists in Database") - + self.assertNotEqual(len(qresultset), 0, "Check if volume exists in Database") + qresult = qresultset[0] - self.assertEqual(qresult[0], self.virtual_machine.id, "Check if volume is assc. with virtual machine in Database") + self.assertEqual(qresult[0], self.virtual_machine.id, "Check if volume is assc. with virtual machine in Database") #self.assertEqual(qresult[1], 0, "Check if device is valid in the database") #Format the attached volume to a known fs - format_volume_to_ext3(self.virtual_machine.get_ssh_client()) + format_volume_to_ext3(self.virtual_machine.get_ssh_client(services["ipaddress"])) def test_03_download_attached_volume(self): """Download a Volume attached to a VM """ - + cmd = extractVolume.extractVolumeCmd() cmd.id = self.volume.id cmd.mode = "HTTP_DOWNLOAD" @@ -136,7 +141,7 @@ class TestVolumes(cloudstackTestCase): def test_04_delete_attached_volume(self): """Delete a Volume attached to a VM """ - + cmd = deleteVolume.deleteVolumeCmd() cmd.id = self.volume.id #A proper exception should be raised; deleting attach VM is not allowed @@ -159,17 +164,17 @@ class TestVolumes(cloudstackTestCase): self.assertEqual(volume.virtualmachineid, None, "Check if volume state (detached) is reflected") qresultset = self.dbclient.execute("select instance_id, device_id from volumes where id = %s" %self.volume.id) - self.assertNotEqual(len(qresultset), 0, "Check if volume exists in Database") + self.assertNotEqual(len(qresultset), 0, "Check if volume exists in Database") qresult = qresultset[0] - self.assertEqual(qresult[0], None, "Check if volume is unassc. with virtual machine in Database") - self.assertEqual(qresult[1], None, "Check if no device is valid in the database") + self.assertEqual(qresult[0], None, "Check if volume is unassc. with virtual machine in Database") + self.assertEqual(qresult[1], None, "Check if no device is valid in the database") def test_06_download_detached_volume(self): """Download a Volume unattached to an VM """ - + cmd = extractVolume.extractVolumeCmd() cmd.id = self.volume.id cmd.mode = "HTTP_DOWNLOAD" @@ -184,7 +189,7 @@ class TestVolumes(cloudstackTestCase): fd = open(path, 'wb') fd.write(response.read()) fd.close() - + except Exception as e: print e self.fail("Extract Volume Failed with invalid URL %s (vol id: %s)" %(extract_vol.url, self.volume.id)) @@ -193,7 +198,7 @@ class TestVolumes(cloudstackTestCase): """Delete a Volume unattached to an VM """ - cmd = deleteVolume.deleteVolumeCmd() + cmd = deleteVolume.deleteVolumeCmd() cmd.id = self.volume.id self.apiClient.deleteVolume(cmd) diff --git a/tools/testClient/testcase/BVT-tests/utils.py b/tools/testClient/testcase/BVT-tests/utils.py index ceea7beece6..bd048e8a34b 100644 --- a/tools/testClient/testcase/BVT-tests/utils.py +++ b/tools/testClient/testcase/BVT-tests/utils.py @@ -12,7 +12,7 @@ from cloudstackAPI import * import cloudstackConnection #from cloudstackConnection import cloudConnection import configGenerator -import logging +import logging import string import random @@ -53,7 +53,7 @@ def format_volume_to_ext3(ssh_client, device="/dev/sda" ): "mkfs.ext3 %s1" %device, ] for c in cmds: - print ssh_client.execute(c) + ssh_client.execute(c) def fetch_api_client(config_file='datacenterCfg'): """Fetch the Cloudstack API Client""" @@ -64,10 +64,10 @@ def fetch_api_client(config_file='datacenterCfg'): return cloudstackAPIClient.CloudStackAPIClient( cloudstackConnection.cloudConnection( mgt.mgtSvrIp, - mgt.port, - mgt.apiKey, - mgt.securityKey, - asyncTimeout, + mgt.port, + mgt.apiKey, + mgt.securityKey, + asyncTimeout, testClientLogger ) )