diff --git a/tools/testClient/testcase/BVT-tests/base.py b/tools/testClient/testcase/BVT-tests/base.py index cb1da086c97..04b934d91e1 100644 --- a/tools/testClient/testcase/BVT-tests/base.py +++ b/tools/testClient/testcase/BVT-tests/base.py @@ -1,6 +1,6 @@ # -*- encoding: utf-8 -*- # -# Copyright (c) 2011 Citrix. All rights reserved. +# Copyright (c) 2012 Citrix. All rights reserved. # """ Base class for all Cloudstack resources - Virtual machine, Volume, Snapshot etc @@ -26,25 +26,7 @@ class Account: cmd.lastname = services["lastname"] cmd.password = services["password"] cmd.username = services["username"] - account = apiclient.createAccount(cmd) - #create a network - cmd = createNetwork.createNetworkCmd() - cmd.name = cmd.displaytext = "Virtual Network" - cmd.networkofferingid = services["networkofferingid"] - - cmd.account = account.account.name - cmd.domainid = account.account.domainid - cmd.zoneid = services["zoneid"] - - account.network = apiclient.createNetwork(cmd) - #Allocate the Source NAT IP Address - account.public_ip = PublicIPAddress.create( - apiclient, - accountid = account.account.name, - zoneid = services["zoneid"], - domainid = account.account.domainid - ) return Account(account.__dict__) @@ -70,15 +52,19 @@ class VirtualMachine: cmd = deployVirtualMachine.deployVirtualMachineCmd() cmd.serviceofferingid = services["serviceoffering"] cmd.zoneid = services["zoneid"] + cmd.hypervisor = services["hypervisor"] + cmd.account = accountid or services["account"] + cmd.domainid = services["domainid"] + if networkids: cmd.networkids = networkids elif "networkids" in services: cmd.networkids = services["networkids"] - cmd.hypervisor = services["hypervisor"] - cmd.account = accountid or services["accountid"] - cmd.domainid = services["domainid"] - cmd.templateid = templateid or services["template"] + if templateid: + cmd.templateid = templateid + elif "template" in services: + cmd.templateid = services["template"] if "diskoffering" in services: cmd.diskofferingid = services["diskoffering"] @@ -210,9 +196,22 @@ class Template: cmd.displaytext = services["displaytext"] cmd.name = "-".join([services["name"], random_gen()]) cmd.ostypeid = services["ostypeid"] - cmd.isfeatured = services["isfeatured"] or False - cmd.ispublic = services["ispublic"] or False - cmd.isextractable = services["isextractable"] or False + + if "isfeatured" in services: + cmd.isfeatured = services["isfeatured"] + else: + cmd.isfeatured = False + + if "ispublic" in services: + cmd.ispublic = services["ispublic"] + else: + cmd.ispublic = False + + if "isextractable" in services: + cmd.isextractable = services["isextractable"] + else: + cmd.isextractable = False + cmd.volumeid = volume.id return Template(apiclient.createTemplate(cmd).__dict__) @@ -278,18 +277,18 @@ class Iso: return -class PublicIPAddress(): +class PublicIPAddress: """Manage Public IP Addresses""" def __init__(self, items): self.__dict__.update(items) @classmethod - def create(cls, apiclient, accountid, zoneid = 1, domainid = 1): + def create(cls, apiclient, accountid, zoneid = None, domainid = None): cmd = associateIpAddress.associateIpAddressCmd() cmd.account = accountid - cmd.zoneid = zoneid - cmd.domainid = domainid + cmd.zoneid = zoneid or services["zoneid"] + cmd.domainid = domainid or services["domainid"] return PublicIPAddress(apiclient.associateIpAddress(cmd).__dict__) def delete(self, apiclient): @@ -406,7 +405,7 @@ class LoadBalancerRule: def create(cls, apiclient, services, ipaddressid, accountid=None): cmd = createLoadBalancerRule.createLoadBalancerRuleCmd() cmd.publicipid = ipaddressid or services["ipaddressid"] - cmd.account = accountid or services["accountid"] + cmd.account = accountid or services["account"] cmd.name = services["name"] cmd.algorithm = services["alg"] cmd.privateport = services["privateport"] @@ -431,3 +430,101 @@ class LoadBalancerRule: cmd.virtualmachineids = [vm.id for vm in vms] self.apiclient.removeFromLoadBalancerRule(cmd) return + +class Cluster: + """Manage Cluster life cycle""" + + def __init__(self, items): + self.__dict__.update(items) + + @classmethod + def create(cls, apiclient, services): + + cmd = addCluster.addClusterCmd() + cmd.clustertype = services["clustertype"] + cmd.hypervisor = services["hypervisor"] + cmd.zoneid = services["zoneid"] + cmd.podid = services["podid"] + + if "username" in services: + cmd.username = services["username"] + if "password" in services: + cmd.password = services["password"] + if "url" in services: + cmd.url = services["url"] + if "clustername" in services: + cmd.clustername = services["clustername"] + + return Cluster(apiclient.addCluster(cmd)[0].__dict__) + + def delete(self, apiclient): + cmd = deleteCluster.deleteClusterCmd() + cmd.id = self.id + apiclient.deleteCluster(cmd) + return + +class Host: + """Manage Host life cycle""" + + def __init__(self, items): + self.__dict__.update(items) + + @classmethod + def create(cls, apiclient, cluster, services): + + cmd = addHost.addHostCmd() + cmd.hypervisor = services["hypervisor"] + cmd.url = services["url"] + cmd.zoneid = services["zoneid"] + cmd.clusterid = cluster.id + cmd.podid = services["podid"] + + if "clustertype" in services: + cmd.clustertype = services["clustertype"] + if "username" in services: + cmd.username = services["username"] + if "password" in services: + cmd.password = services["password"] + + return Host(apiclient.addHost(cmd).__dict__) + + def delete(self, apiclient): + # Host must be in maintenance mode before deletion + cmd = prepareHostForMaintenance.prepareHostForMaintenanceCmd() + cmd.id = self.id + apiclient.prepareHostForMaintenance(cmd) + time.sleep(60) + + cmd = deleteHost.deleteHostCmd() + cmd.id = self.id + apiclient.deleteHost(cmd) + return + +class StoragePool: + """Manage Storage pools""" + + def __init__(self, items): + self.__dict__.update(items) + + @classmethod + def create(cls, apiclient, services): + + cmd = createStoragePool.createStoragePoolCmd() + cmd.name = services["name"] + cmd.podid = services["podid"] + cmd.url = services["url"] + cmd.clusterid = services["clusterid"] + cmd.zoneid = services["zoneid"] + + return StoragePool(apiclient.createStoragePool(cmd).__dict__) + + def delete(self, apiclient): + # Storage pool must be in maintenance mode before deletion + cmd = enableStorageMaintenance.enableStorageMaintenanceCmd() + cmd.id = self.id + apiclient.enableStorageMaintenance(cmd) + time.sleep(60) + cmd = deleteStoragePool.deleteStoragePoolCmd() + cmd.id = self.id + apiclient.deleteStoragePool(cmd) + return diff --git a/tools/testClient/testcase/BVT-tests/settings.py b/tools/testClient/testcase/BVT-tests/settings.py index 6e5cbe3cdce..19e2a5c4030 100644 --- a/tools/testClient/testcase/BVT-tests/settings.py +++ b/tools/testClient/testcase/BVT-tests/settings.py @@ -1,6 +1,6 @@ # -*- encoding: utf-8 -*- # -# Copyright (c) 2011 Citrix. All rights reserved. +# Copyright (c) 2012 Citrix. All rights reserved. # """Test Information Services """ @@ -347,16 +347,16 @@ TEST_NETWORK_SERVICES = { "email" : "test@test.com", "firstname" : "Test", "lastname" : "User", - "username" : "testuser79", + "username" : "testuser1", "password" : "fr3sca", "zoneid" : 1, "networkofferingid" : 6, }, "server" : { - "template" : 256, + "template" : 206, "zoneid" : 1, - "serviceoffering" : 40, + "serviceoffering" : 1, "diskoffering" : 3, "displayname" : "testserver", "username" : "root", @@ -365,10 +365,10 @@ TEST_NETWORK_SERVICES = { "hypervisor":'XenServer', "account":'admin', "domainid":1, - "ipaddressid":3, + "ipaddressid":10, "privateport":22, "publicport":22, - "ipaddress":'69.41.185.229', + "ipaddress":'192.168.100.250', "protocol":'TCP', }, "natrule" : @@ -381,9 +381,177 @@ TEST_NETWORK_SERVICES = { { "name" : "SSH", "alg" : "roundrobin", - "privateport" : 22, - "publicport" : 22, + "privateport" : 80, + "publicport" : 80, } } + +TEST_SSVM_SERVICES = { + "ssvm": { + "id": 1, + "zoneid": 1, + "publicport": 22, + "username":'root', + "password" : 'fr3sca', + }, + "cpvm": { + "id": 2, + "zoneid": 1, + "publicport": 22, + "username":'root', + "password" : 'fr3sca', + "mgmtserverIP": '192.168.100.154' + }, + "host": { + "username":'root', + "password" : 'fr3sca', + "publicport": 22, + }, + } + +TEST_HOSTS_SERVICES = { + "clusters":{ + 0 : { + "clustername": "Xen Cluster", + "clustertype":"CloudManaged", + "hypervisor": "XenServer", + "zoneid": 1, + "podid":1, + }, + 1 : { + # TODO + "clustername": "KVM Cluster", + "clustertype":"CloudManaged", + "hypervisor": "KVM", + "zoneid": 1, + "podid":1, + }, + 2 : { + "hypervisor": 'VMware', + "clustertype": 'ExternalManaged', + "zoneid": 1, + "podid": 1, + "username": 'administrator', + "password": 'fr3sca', + "url":'http://192.168.100.17/CloudStack-Clogeny-Pune/Pune-1', + "clustername": '192.168.100.17/CloudStack-Clogeny-Pune/Pune-1', + }, + }, + "hosts" :{ + "xenserver" : { + "zoneid": 1, + "podid": 1, + "clusterid":16, + "hypervisor":'XenServer', + "clustertype": 'ExternalManaged', + "url": 'http://192.168.100.210', + "username" : "administrator", + "password" : "fr3sca", + }, + "kvm" : { + "zoneid": 1, + "podid": 1, + "clusterid":16, + "hypervisor":'KVM', + "clustertype": 'ExternalManaged', + "url": 'http://192.168.100.203', + "username" : "administrator", + "password" : "fr3sca", + }, + "vmware" : { + "zoneid": 1, + "podid": 1, + "clusterid":16, + "hypervisor":'VMware', + "clustertype": 'ExternalManaged', + "url": 'http://192.168.100.203', + "username" : "administrator", + "password" : "fr3sca", + }, + } + } +TEST_PRIMARY_STORAGE_SERVICES = { + + "nfs":{ + 0 : { + "url": "nfs://192.168.100.131/Primary", + #Format: File_System_Type/Location/Path + "name": "Primary XEN", + "podid" : 1, + "clusterid" : 1, #XEN Cluster + "zoneid" : 1, + }, + 1 : { + "url": "nfs://192.168.100.131/export", + "name": "Primary KVM", + "podid" : 2, + "clusterid" : 1, #KVM Cluster + "zoneid" : 1, + }, + 2 : { + "url": "nfs://192.168.100.131/Primary", + "name": "Primary VMWare", + "podid" : 1, + "clusterid" : 33, #VMWare Cluster + "zoneid" : 1, + }, + }, + "iscsi":{ + 0 : { + "url": "iscsi://192.168.100.21/export", + "name": "Primary XEN", + "podid" : 2, + "clusterid" : 1, #XEN Cluster + "zoneid" : 1, + # TODO : lun no., iqn no. + }, + 1 : { + "url": "iscsi://192.168.100.21/export", + "name": "Primary KVM", + "podid" : 2, + "clusterid" : 1, #KVM Cluster + "zoneid" : 1, + # TODO : lun no., iqn no. + }, + }, + } + +TEST_SEC_STORAGE_SERVICES = { + "storage": { + "zoneid":1, + "url": "nfs://192.168.100.131/SecondaryStorage" + #Format: File_System_Type/Location/Path + } + } + +TEST_ROUTER_SERVICES = { + "virtual_machine" : + { + "template" : 206, + "zoneid" : 1, + "serviceoffering" : 1, + "displayname" : "testserver", + "username" : "root", + "password" : "fr3sca", + "ssh_port" : 22, + "hypervisor":'XenServer', + "domainid":1, + "ipaddressid":10, + "privateport":22, + "publicport":22, + "ipaddress":'192.168.100.250', + "protocol":'TCP', + }, + "account" : { + "email" : "test@test.com", + "firstname" : "Test", + "lastname" : "User", + "username" : "testuser1", + "password" : "fr3sca", + "zoneid" : 1, + "networkofferingid" : 6, + }, + "sleep_time": 300, + } \ No newline at end of file diff --git a/tools/testClient/testcase/BVT-tests/test_disk_offerings.py b/tools/testClient/testcase/BVT-tests/test_disk_offerings.py index e50d6e1eaa5..4e3ab56d735 100644 --- a/tools/testClient/testcase/BVT-tests/test_disk_offerings.py +++ b/tools/testClient/testcase/BVT-tests/test_disk_offerings.py @@ -1,6 +1,6 @@ # -*- encoding: utf-8 -*- # -# Copyright (c) 2011 Citrix. All rights reserved. +# Copyright (c) 2012 Citrix. All rights reserved. # """ BVT tests for Disk offerings""" @@ -227,4 +227,3 @@ class TestDiskOfferings(cloudstackTestCase): return - diff --git a/tools/testClient/testcase/BVT-tests/test_hosts.py b/tools/testClient/testcase/BVT-tests/test_hosts.py new file mode 100644 index 00000000000..2d48c7b03f1 --- /dev/null +++ b/tools/testClient/testcase/BVT-tests/test_hosts.py @@ -0,0 +1,118 @@ +# -*- encoding: utf-8 -*- +# +# Copyright (c) 2012 Citrix. All rights reserved. +# +""" BVT tests for Hosts and Clusters +""" +#Import Local Modules +from cloudstackTestCase import * +from cloudstackAPI import * +from settings import * +from utils import * +from base import * + +#Import System modules +import time + +services = TEST_HOSTS_SERVICES + +class TestHosts(cloudstackTestCase): + + def setUp(self): + + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.cleanup = [] + return + + def tearDown(self): + try: + self.dbclient.close() + #Clean up, terminate the created templates + cleanup_resources(self.apiclient, self.cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" %e) + return + + def test_01_clusters(self): + """Test Add clusters & hosts - XEN, KVM, VWARE + """ + + # Validate the following: + # 1. Verify hypervisortype returned by API is Xen/KVM/VWare + # 2. Verify that the cluster is in 'Enabled' allocation state + # 3. Verify that the host is added successfully and in Up state with listHosts API response + + #Create clusters with Hypervisor type XEN/KVM/VWare + for k,v in services["clusters"].items(): + cluster = Cluster.create(self.apiclient, v) + + self.assertEqual( + cluster.hypervisortype, + v["hypervisor"], + "Check hypervisor type of created cluster is " + v["hypervisor"] + " or not" + ) + self.assertEqual( + cluster.allocationstate, + 'Enabled', + "Check whether allocation state of cluster is enabled" + ) + + #If host is externally managed host is already added with cluster + cmd = listHosts.listHostsCmd() + cmd.clusterid = cluster.id + response = apiclient.listHosts(cmd) + + if not len(response): + hypervisor_type = str(cluster.hypervisortype.lower()) + host = Host.create( + self.apiclient, + cluster, + services["hosts"][hypervisor_type] + ) + + #Cleanup Host & Cluster + self.cleanup.append(host) + self.cleanup.append(cluster) + + cmd = listHosts.listHostsCmd() + cmd.clusterid = cluster.id + list_hosts_response = self.apiclient.listHosts(cmd) + + self.assertNotEqual( + len(list_hosts_response), + 0, + "Check list Hosts response" + ) + + host_response = list_hosts_response[0] + #Check if host is Up and running + self.assertEqual( + host_response.state, + 'Up', + "Check if state of host is Up or not" + ) + #Verify List Cluster Response has newly added cluster + cmd = listClusters.listClustersCmd() + cmd.id = cluster.id + list_cluster_response = self.apiclient.listClusters(cmd) + + self.assertNotEqual( + len(list_cluster_response), + 0, + "Check list Hosts response" + ) + + cluster_response = list_cluster_response[0] + self.assertEqual( + cluster_response.id, + cluster.id, + "Check cluster ID with list clusters response" + ) + self.assertEqual( + cluster_response.hypervisortype, + cluster.hypervisortype, + "Check hypervisor type with list clusters response is " + v["hypervisor"] + " or not" + ) + return \ No newline at end of file diff --git a/tools/testClient/testcase/BVT-tests/test_iso.py b/tools/testClient/testcase/BVT-tests/test_iso.py index 6504a4878c0..082f193ef55 100644 --- a/tools/testClient/testcase/BVT-tests/test_iso.py +++ b/tools/testClient/testcase/BVT-tests/test_iso.py @@ -1,6 +1,6 @@ # -*- encoding: utf-8 -*- # -# Copyright (c) 2011 Citrix. All rights reserved. +# Copyright (c) 2012 Citrix. All rights reserved. # """ BVT tests for Templates ISO """ diff --git a/tools/testClient/testcase/BVT-tests/test_network.py b/tools/testClient/testcase/BVT-tests/test_network.py index 4d1e801a4c1..57f637d8121 100644 --- a/tools/testClient/testcase/BVT-tests/test_network.py +++ b/tools/testClient/testcase/BVT-tests/test_network.py @@ -1,9 +1,9 @@ # -*- encoding: utf-8 -*- # -# Copyright (c) 2011 Citrix. All rights reserved. +# Copyright (c) 2012 Citrix. All rights reserved. # -""" BVT tests for Virtual Machine Life Cycle +""" BVT tests for Network Life Cycle """ #Import Local Modules from cloudstackTestCase import * @@ -15,24 +15,20 @@ from base import * #Import System modules import time - services = TEST_NETWORK_SERVICES - class TestPublicIP(cloudstackTestCase): - """Test Associate/Disassociate Public IP Addresses - """ + def setUp(self): self.apiclient = self.testClient.getApiClient() def test_public_ip_admin_account(self): - """Test Associate/Disassociate IP address for Admin Account - """ + """Test for Associate/Disassociate public IP address for admin account""" + # Validate the following: # 1. listPubliIpAddresses API returns the list of acquired addresses # 2. the returned list should contain our acquired IP address - ip_address = PublicIPAddress.create( self.apiclient, services["admin_account"], @@ -41,8 +37,10 @@ class TestPublicIP(cloudstackTestCase): ) cmd = listPublicIpAddresses.listPublicIpAddressesCmd() cmd.id = ip_address.ipaddress.id + list_pub_ip_addr_resp = self.apiclient.listPublicIpAddresses(cmd) + #listPublicIpAddresses should return newly created public IP self.assertNotEqual( len(list_pub_ip_addr_resp), 0, @@ -53,10 +51,12 @@ class TestPublicIP(cloudstackTestCase): ip_address.ipaddress.id, "Check Correct IP Address is returned in the List Cacls" ) + ip_address.delete(self.apiclient) # Validate the following: #1. listPublicIpAddresses API should no more return the released address + cmd = listPublicIpAddresses.listPublicIpAddressesCmd() cmd.id = ip_address.ipaddress.id list_pub_ip_addr_resp = self.apiclient.listPublicIpAddresses(cmd) @@ -66,16 +66,16 @@ class TestPublicIP(cloudstackTestCase): None, "Check if disassociated IP Address is no longer available" ) - + return def test_public_ip_user_account(self): - """Test Associate/Disassociate IP address for Non-Admin User Account - """ + """Test for Associate/Disassociate public IP address for user account""" # Validate the following: # 1. listPubliIpAddresses API returns the list of acquired addresses # 2. the returned list should contain our acquired IP address + ip_address = PublicIPAddress.create( self.apiclient, services["user_account"], @@ -84,6 +84,8 @@ class TestPublicIP(cloudstackTestCase): ) cmd = listPublicIpAddresses.listPublicIpAddressesCmd() cmd.id = ip_address.ipaddress.id + + #listPublicIpAddresses should return newly created public IP list_pub_ip_addr_resp = self.apiclient.listPublicIpAddresses(cmd) self.assertNotEqual( @@ -96,11 +98,9 @@ class TestPublicIP(cloudstackTestCase): ip_address.ipaddress.id, "Check Correct IP Address is returned in the List Call" ) + ip_address.delete(self.apiclient) - - # Validate the following: - #1. listPublicIpAddresses API should no more return the released address cmd = listPublicIpAddresses.listPublicIpAddressesCmd() cmd.id = ip_address.ipaddress.id list_pub_ip_addr_resp = self.apiclient.listPublicIpAddresses(cmd) @@ -110,11 +110,11 @@ class TestPublicIP(cloudstackTestCase): None, "Check if disassociated IP Address is no longer available" ) + return class TestPortForwarding(cloudstackTestCase): - """Test Port Forwarding Rules for Source and Non-Source IP Addresses - """ + @classmethod def setUpClass(cls): @@ -125,31 +125,37 @@ class TestPortForwarding(cloudstackTestCase): cls.api_client, services["server"], accountid=cls.account.account.name, - networkids=[str(cls.account.network.id)] ) cls._cleanup = [cls.virtual_machine, cls.account] def setUp(self): self.apiclient = self.testClient.getApiClient() + self.cleanup = [] + return @classmethod - def tearDownClass(self): + def tearDownClass(cls): cleanup_resources(cls.api_client, cls._cleanup) + return def tearDown(self): - cleanup_resources(self.cleanup) + cleanup_resources(self.apiclient, self.cleanup) + return def test_01_port_fwd_on_src_nat(self): - """Port Forwarding Tests for Source NAT IP Addresses - """ - src_nat_ip_addr = self.account.public_ip.ipaddress - + """Test for port forwarding on source NAT""" #Validate the following: #1. listPortForwarding rules API should return the added PF rule #2. attempt to do an ssh into the user VM through the sourceNAT + cmd = listPublicIpAddresses.listPublicIpAddressesCmd() + cmd.account = self.account.account.name + cmd.domainid = services["server"]["domainid"] + src_nat_ip_addr = self.apiclient.listPublicIpAddresses(cmd)[0] + + #Create NAT rule nat_rule = NATRule.create(self.apiclient, self.virtual_machine, services["natrule"], src_nat_ip_addr.id) time.sleep(60) @@ -167,8 +173,7 @@ class TestPortForwarding(cloudstackTestCase): nat_rule.id, "Check Correct Port forwarding Rule is returned" ) - - + #SSH virtual machine to test port forwarding try: self.virtual_machine.get_ssh_client(src_nat_ip_addr.ipaddress) except Exception as e: @@ -177,10 +182,6 @@ class TestPortForwarding(cloudstackTestCase): nat_rule.delete(self.apiclient) time.sleep(60) - #Validate the following: - #1. listPortForwardingRules should not return the deleted rule anymore - #2. attempt to do ssh should now fail - cmd = listPortForwardingRules.listPortForwardingRulesCmd() cmd.id = nat_rule.id list_nat_rule_response = self.apiclient.listPortForwardingRules(cmd) @@ -190,7 +191,7 @@ class TestPortForwarding(cloudstackTestCase): None, "Check Port Forwarding Rule is deleted" ) - self.debug("Check if the Public SSH port is inaccessible") + # Check if the Public SSH port is inaccessible with self.assertRaises(Exception): remoteSSHClient.remoteSSHClient( ip.ipaddress, @@ -198,19 +199,29 @@ class TestPortForwarding(cloudstackTestCase): self.virtual_machine.username, self.virtual_machine.password ) + return def test_02_port_fwd_on_non_src_nat(self): - """Port Forwarding Tests for Non-Source NAT IP Addresses - """ + """Test for port forwarding on non source NAT""" - ip_address = PublicIPAddress.create(self.apiclient, self.account) + #Validate the following: + #1. listPortForwardingRules should not return the deleted rule anymore + #2. attempt to do ssh should now fail + + ip_address = PublicIPAddress.create(self.apiclient, self.account.account.name) self.clean_up.append(ip_address) - nat_rule = NATRule.create(self.apiclient, self.virtual_machine, services) + #Create NAT rule + nat_rule = NATRule.create( + self.apiclient, + self.virtual_machine, + services["natrule"], + ip_address.ipaddress.id + ) time.sleep(60) #Validate the following: - #1. listPortForwarding rules API should return the added PF rule - #2. attempt to do an ssh into the user VM through the sourceNAT + #1. listPortForwardingRules should not return the deleted rule anymore + #2. attempt to do ssh should now fail cmd = listPortForwardingRules.listPortForwardingRulesCmd() cmd.id = nat_rule.id @@ -227,19 +238,14 @@ class TestPortForwarding(cloudstackTestCase): "Check Correct Port forwarding Rule is returned" ) - try: - self.virtual_machine.get_ssh_client(public_ip = ip_address.ipaddress) + self.virtual_machine.get_ssh_client(public_ip = ip_address.ipaddress.ipaddress) except Exception as e: - self.fail("SSH Access failed for %s: %s" %(self.virtual_machine.ipaddress, e)) + self.fail("SSH Access failed for %s: %s" %(self.virtual_machine.ipaddress.ipaddress, e)) nat_rule.delete(apiclient) time.sleep(60) - #Validate the following: - #1. listPortForwardingRules should not return the deleted rule anymore - #2. attempt to do ssh should now fail - cmd = listPortForwardingRules.listPortForwardingRulesCmd() cmd.id = nat_rule.id list_nat_rule_response = self.apiclient.listPortForwardingRules(cmd) @@ -250,20 +256,18 @@ class TestPortForwarding(cloudstackTestCase): None, "Check Port Forwarding Rule is deleted" ) - self.debug("Check if the Public SSH port is inaccessible") + # Check if the Public SSH port is inaccessible with self.assertRaises(Exception): remoteSSHClient.remoteSSHClient( - ip_address.ipaddress, + ip_address.ipaddress.ipaddress, self.virtual_machine.ssh_port, self.virtual_machine.username, self.virtual_machine.password ) - - + return class TestLoadBalancingRule(cloudstackTestCase): - """Test Load Balancing Rules for Source and Non-Source IP Addresses - """ + @classmethod def setUpClass(cls): @@ -274,13 +278,11 @@ class TestLoadBalancingRule(cloudstackTestCase): cls.api_client, services["server"], accountid=cls.account.account.name, - networkids=[str(cls.account.network.id)] ) cls.vm_2 = VirtualMachine.create( cls.api_client, services["server"], accountid=cls.account.account.name, - networkids=[str(cls.account.network.id)] ) cls.non_src_nat_ip = PublicIPAddress.create( cls.api_client, @@ -292,51 +294,79 @@ class TestLoadBalancingRule(cloudstackTestCase): def setUp(self): self.apiclient = self.testClient.getApiClient() - self.dbclient = self.testClient.getDbConnection() self.cleanup = [] + return def tearDown(self): cleanup_resources(self.apiclient, self.cleanup) + return @classmethod def tearDownClass(cls): cleanup_resources(cls.api_client, cls._cleanup) - + return def test_01_create_lb_rule_src_nat(self): - """Test Port Forwarding Rules for Source NAT IP Addresses - """ - src_nat_ip_addr = self.account.public_ip.ipaddress + """Test to create Load balancing rule with source NAT""" - lb_rule = LoadBalancerRule.create( - self.apiclient, - services["lbrule"], - src_nat_ip_addr.id, - accountid = self.account.account.name - ) - self.cleanup.append(lb_rule) - - lb_rule.assign(self.apiclient, [self.vm_1, self.vm_2]) # Validate the Following: #1. listLoadBalancerRules should return the added rule #2. attempt to ssh twice on the load balanced IP #3. verify using the hostname of the VM that round robin is indeed happening as expected + + cmd = listPublicIpAddresses.listPublicIpAddressesCmd() + cmd.account = self.account.account.name + cmd.domainid = services["server"]["domainid"] + src_nat_ip_addr = self.apiclient.listPublicIpAddresses(cmd)[0] + + #Create Load Balancer rule and assign VMs to rule + lb_rule = LoadBalancerRule.create( + self.apiclient, + services["lbrule"], + src_nat_ip_addr.id, + accountid = self.account.account.name + ) + self.cleanup.append(lb_rule) + + lb_rule.assign(self.apiclient, [self.vm_1, self.vm_2]) cmd = listLoadBalancerRules.listLoadBalancerRulesCmd() cmd.id = lb_rule.id lb_rules = self.apiclient.listLoadBalancerRules(cmd) + #verify listLoadBalancerRules lists the added load balancing rule self.assertNotEqual( len(lb_rules), 0, "Check Load Balancer Rule in its List" ) - self.assertEqual( lb_rules[0].id, lb_rule.id, "Check List Load Balancer Rules returns valid Rule" ) + # listLoadBalancerRuleInstances should list all instances associated with that LB rule + cmd = listLoadBalancerRuleInstances.listLoadBalancerRuleInstancesCmd() + cmd.id = lb_rule.id + lb_instance_rules = self.apiclient.listLoadBalancerRuleInstances(cmd) + + self.assertNotEqual( + len(lb_instance_rules), + 0, + "Check Load Balancer instances Rule in its List" + ) + + self.assertEqual( + lb_instance_rules[0].id, + self.vm_2.id, + "Check List Load Balancer instances Rules returns valid VM ID associated with it" + ) + + self.assertEqual( + lb_instance_rules[1].id, + self.vm_1.id, + "Check List Load Balancer instances Rules returns valid VM ID associated with it" + ) ssh_1 = remoteSSHClient.remoteSSHClient( src_nat_ip_addr.ipaddress, @@ -360,43 +390,74 @@ class TestLoadBalancingRule(cloudstackTestCase): self.assertIn(self.vm_1.name, hostnames, "Check if ssh succeeded for server1") self.assertIn(self.vm_2.name, hostnames, "Check if ssh succeeded for server2") + #SSH should pass till there is a last VM associated with LB rule + lb_rule.remove(self.apiclient, [self.vm_2]) + hostnames.append(ssh_1.execute("hostname")[0]) + self.assertIn(self.vm_1.name, hostnames, "Check if ssh succeeded for server1") + + lb_rule.remove(self.apiclient, [self.vm_1]) + with self.assertRaises(Exception): + ssh_1.execute("hostname")[0] + return + def test_02_create_lb_rule_non_nat(self): - """Test Load Balancing Rules for Non-Source IP Addresses - """ - lb_rule = LoadBalancerRule.create( - self.apiclient, - services["lbrule"], - cls.non_src_nat_ip.ipaddress.id, - accountid = self.account.account.name - ) - self.cleanup.append(lb_rule) - - lb_rule.assign(self.apiclient, [self.vm_1, self.vm_2]) + """Test to create Load balancing rule with source NAT""" # Validate the Following: #1. listLoadBalancerRules should return the added rule #2. attempt to ssh twice on the load balanced IP #3. verify using the hostname of the VM that round robin is indeed happening as expected + #Create Load Balancer rule and assign VMs to rule + lb_rule = LoadBalancerRule.create( + self.apiclient, + services["lbrule"], + self.non_src_nat_ip.ipaddress.id, + accountid = self.account.account.name + ) + self.cleanup.append(lb_rule) + + lb_rule.assign(self.apiclient, [self.vm_1, self.vm_2]) cmd = listLoadBalancerRules.listLoadBalancerRulesCmd() cmd.id = lb_rule.id lb_rules = self.apiclient.listLoadBalancerRules(cmd) + #verify listLoadBalancerRules lists the added load balancing rule self.assertNotEqual( len(lb_rules), 0, "Check Load Balancer Rule in its List" ) - self.assertEqual( lb_rules[0].id, lb_rule.id, "Check List Load Balancer Rules returns valid Rule" ) + # listLoadBalancerRuleInstances should list all instances associated with that LB rule + cmd = listLoadBalancerRuleInstances.listLoadBalancerRuleInstancesCmd() + cmd.id = lb_rule.id + lb_instance_rules = self.apiclient.listLoadBalancerRuleInstances(cmd) + self.assertNotEqual( + len(lb_instance_rules), + 0, + "Check Load Balancer instances Rule in its List" + ) + + self.assertEqual( + lb_instance_rules[0].id, + self.vm_2.id, + "Check List Load Balancer instances Rules returns valid VM ID associated with it" + ) + + self.assertEqual( + lb_instance_rules[1].id, + self.vm_1.id, + "Check List Load Balancer instances Rules returns valid VM ID associated with it" + ) ssh_1 = remoteSSHClient.remoteSSHClient( - cls.non_src_nat_ip.ipaddress.ipaddress, + self.non_src_nat_ip.ipaddress.ipaddress, services['lbrule']["publicport"], self.vm_1.username, self.vm_1.password @@ -406,7 +467,7 @@ class TestLoadBalancingRule(cloudstackTestCase): hostnames = [ssh_1.execute("hostname")[0]] time.sleep(20) ssh_2 = remoteSSHClient.remoteSSHClient( - cls.non_src_nat_ip.ipaddress.ipaddress, + self.non_src_nat_ip.ipaddress.ipaddress, services['lbrule']["publicport"], self.vm_1.username, self.vm_1.password @@ -417,11 +478,18 @@ class TestLoadBalancingRule(cloudstackTestCase): self.assertIn(self.vm_1.name, hostnames, "Check if ssh succeeded for server1") self.assertIn(self.vm_2.name, hostnames, "Check if ssh succeeded for server2") + #SSH should pass till there is a last VM associated with LB rule + lb_rule.remove(self.apiclient, [self.vm_2]) + hostnames.append(ssh_1.execute("hostname")[0]) + self.assertIn(self.vm_1.name, hostnames, "Check if ssh succeeded for server1") + lb_rule.remove(self.apiclient, [self.vm_1]) + with self.assertRaises(Exception): + ssh_1.execute("hostname")[0] + return class TestRebootRouter(cloudstackTestCase): - """Test Load Balancing Rules work post Router Reboot - """ + def setUp(self): self.apiclient = self.testClient.getApiClient() @@ -431,9 +499,9 @@ class TestRebootRouter(cloudstackTestCase): self.apiclient, services["server"], accountid=self.account.account.name, - networkids=[str(self.account.network.id)] ) src_nat_ip_addr = self.account.public_ip.ipaddress + lb_rule = LoadBalancerRule.create( self.apiclient, services["lbrule"], @@ -442,14 +510,17 @@ class TestRebootRouter(cloudstackTestCase): ) lb_rule.assign(self.apiclient, [self.vm_1]) #nat_rule = NATRule.create(self.apiclient, self.vm_1, services["natrule"], src_nat_ip_addr.id) - self.cleanup = [self.vm_1, lb_rule, self.account] - + self.cleanup = [self.vm_1, lb_rule, account] + return def test_reboot_router(self): + """Test for reboot router""" #Validate the Following #1. Post restart PF and LB rules should still function #2. verify if the ssh into the virtual machine still works through the sourceNAT Ip + + #Retrieve router for the user account cmd = listRouters.listRoutersCmd() cmd.account = self.account.account.name cmd.domainid = self.account.account.domainid @@ -459,9 +530,15 @@ class TestRebootRouter(cloudstackTestCase): cmd = rebootRouter.rebootRouterCmd() cmd.id = router.id self.apiclient.rebootRouter(cmd) + #Sleep to ensure router is rebooted properly time.sleep(60) - src_nat_ip_addr = self.account.public_ip.ipaddress + cmd = listPublicIpAddresses.listPublicIpAddressesCmd() + cmd.account = self.account.account.name + cmd.domainid = services["server"]["domainid"] + src_nat_ip_addr = self.apiclient.listPublicIpAddresses(cmd)[0] + + #we should be able to SSH after successful reboot try: remoteSSHClient.remoteSSHClient( src_nat_ip_addr.ipaddress, @@ -471,57 +548,62 @@ class TestRebootRouter(cloudstackTestCase): ) except Exception as e: self.fail("SSH Access failed for %s: %s" %(self.vm_1.ipaddress, e)) + return def tearDown(self): - cleanup_resources(self.cleanup) + cleanup_resources(self.apiclient, self.cleanup) + return class TestAssignRemoveLB(cloudstackTestCase): - """Assign Load Balancer Rule to two Virtual Machines, Remove One VM - and associate another VM. - """ def setUp(self): self.apiclient = self.testClient.getApiClient() + #Create VMs, accounts self.account = Account.create(self.apiclient, services["account"], admin=True) self.vm_1 = VirtualMachine.create( - self.apiclient, - services["server"], - accountid=self.account.account.name, - networkids=[str(self.account.network.id)] - ) + self.apiclient, + services["server"], + accountid=self.account.account.name, + ) + self.vm_2 = VirtualMachine.create( - self.apiclient, - services["server"], - accountid=self.account.account.name, - networkids=[str(self.account.network.id)] - ) + self.apiclient, + services["server"], + accountid=self.account.account.name, + ) self.vm_3 = VirtualMachine.create( - self.apiclient, - services["server"], - accountid=self.account.account.name, - networkids=[str(self.account.network.id)] - ) - self.non_src_nat_ip = self.account.public_ip.ipaddress + self.apiclient, + services["server"], + accountid=self.account.account.name, + ) + self.cleanup = [self.vm_1, self.vm_2, self.vm_3] + return def test_assign_and_removal_elb(self): + """Test for assign & removing load balancing rule""" #Validate: #1. Verify list API - listLoadBalancerRules lists all the rules with the relevant ports #2. listLoadBalancerInstances will list the instances associated with the corresponding rule. #3. verify ssh attempts should pass as long as there is at least one instance associated with the rule + cmd = listPublicIpAddresses.listPublicIpAddressesCmd() + cmd.account = self.account.account.name + cmd.domainid = services["server"]["domainid"] + self.non_src_nat_ip = self.apiclient.listPublicIpAddresses(cmd)[0] + lb_rule = LoadBalancerRule.create( - self.apiclient, - services["lbrule"], - self.non_src_nat_ip.id, - self.account.account.name - ) + self.apiclient, + services["lbrule"], + self.non_src_nat_ip.id, + self.account.account.name + ) self.cleanup.append(lb_rule) lb_rule.assign(self.apiclient, [self.vm_1, self.vm_2]) - + #Create SSH client for each VM ssh_1 = remoteSSHClient.remoteSSHClient( self.non_src_nat_ip.ipaddress, services["natrule"]["publicport"], @@ -549,6 +631,7 @@ class TestAssignRemoveLB(cloudstackTestCase): self.assertIn(self.vm_1.name, res_1, "Check if ssh succeeded for server1") self.assertIn(self.vm_2.name, res_2, "Check if ssh succeeded for server2") + #Removing VM and assigning another VM to LB rule lb_rule.remove(self.apiclient, [self.vm_2]) res_1 = ssh_1.execute("hostname")[0] @@ -562,6 +645,161 @@ class TestAssignRemoveLB(cloudstackTestCase): self.assertIn(self.vm_1.name, res_1, "Check if ssh succeeded for server1") self.assertIn(self.vm_3.name, res_3, "Check if ssh succeeded for server3") + return def teardown(self): - cleanup_resources(self.cleanup) + cleanup_resources(self.apiclient, self.cleanup) + return + +class TestReleaseIP(cloudstackTestCase): + + def setUp(self): + self.apiclient = self.testClient.getApiClient() + #Create an account, network, VM, Port forwarding rule, LB rules and IP addresses + self.account = Account.create(self.apiclient, services["account"], admin=True) + + self.virtual_machine = VirtualMachine.create( + self.apiclient, + services["server"], + accountid=self.account.account.name, + ) + + self.ip_address = PublicIPAddress.create( + self.apiclient, + self.account.account.name, + services["zoneid"], + self.account.account.domainid + ) + + cmd = listPublicIpAddresses.listPublicIpAddressesCmd() + cmd.account = self.account.account.name + cmd.domainid = services["server"]["domainid"] + self.ip_addr = self.apiclient.listPublicIpAddresses(cmd)[0] + + self.nat_rule = NATRule.create(self.apiclient, self.virtual_machine, services["natrule"], self.ip_addr.id) + self.lb_rule = LoadBalancerRule.create(self.apiclient, services["lbrule"], self.ip_addr.id, accountid = self.account.account.name) + self.cleanup = [self.virtual_machine, self.account] + return + + def teardown(self): + cleanup_resources(self.apiclient,self.cleanup) + + def test_releaseIP(self): + """Test for Associate/Disassociate public IP address""" + + self.ip_address.delete(self.apiclient) + + # ListPublicIpAddresses should not list deleted Public IP address + cmd = listPublicIpAddresses.listPublicIpAddressesCmd() + cmd.id = self.ip_addr.id + list_pub_ip_addr_resp = self.apiclient.listPublicIpAddresses(cmd) + + self.assertEqual( + list_pub_ip_addr_resp, + None, + "Check if disassociated IP Address is no longer available" + ) + + # ListPortForwardingRules should not list associated rules with Public IP address + cmd = listPortForwardingRules.listPortForwardingRulesCmd() + cmd.id = self.nat_rule.id + list_nat_rules = self.apiclient.listPortForwardingRules(cmd) + + self.assertEqual( + list_nat_rules, + None, + "Check if Port forwarding rules for disassociated IP Address are no longer available" + ) + + # listLoadBalancerRules should not list associated rules with Public IP address + cmd = listLoadBalancerRules.listLoadBalancerRulesCmd() + cmd.id = self.lb_rule.id + list_lb_rules = self.apiclient.listLoadBalancerRules(cmd) + + self.assertEqual( + list_lb_rules, + None, + "Check if LB rules for disassociated IP Address are no longer available" + ) + + # SSH Attempt though public IP should fail + with self.assertRaises(Exception): + ssh_2 = remoteSSHClient.remoteSSHClient( + self.ip_addr.ipaddress, + services["natrule"]["publicport"], + self.virtual_machine.username, + self.virtual_machine.password + ) + return + + +class TestDeleteAccount(cloudstackTestCase): + + def setUp(self): + + self.apiclient = self.testClient.getApiClient() + #Create an account, network, VM and IP addresses + self.account = Account.create(self.apiclient, services["account"], admin=True) + self.vm_1 = VirtualMachine.create( + self.apiclient, + services["server"], + accountid=self.account.account.name, + ) + + cmd = listPublicIpAddresses.listPublicIpAddressesCmd() + cmd.account = self.account.account.name + cmd.domainid = services["server"]["domainid"] + src_nat_ip_addr = self.apiclient.listPublicIpAddresses(cmd)[0] + + self.lb_rule = LoadBalancerRule.create( + self.apiclient, + services["lbrule"], + src_nat_ip_addr.id, + self.account.account.name + ) + self.lb_rule.assign(self.apiclient, [self.vm_1]) + self.nat_rule = NATRule.create(self.apiclient, self.vm_1, services["natrule"], src_nat_ip_addr.id) + self.cleanup = [] + return + + def test_delete_account(self): + """Test for delete account""" + + #Validate the Following + # 1. after account.cleanup.interval (global setting) time all the PF/LB rules should be deleted + # 2. verify that list(LoadBalancer/PortForwarding)Rules API does not return any rules for the account + # 3. The domR should have been expunged for this account + + self.account.delete(self.apiclient) + time.sleep(120) + + # ListLoadBalancerRules should not list associated rules with deleted account + cmd = listLoadBalancerRules.listLoadBalancerRulesCmd() + cmd.account = self.account.account.name + cmd.domainid = services["server"]["domainid"] + # Unable to find account testuser1 in domain 1 : Exception + with self.assertRaises(Exception): + self.apiclient.listLoadBalancerRules(cmd) + + # ListPortForwardingRules should not list associated rules with deleted account + cmd = listPortForwardingRules.listPortForwardingRulesCmd() + cmd.account = self.account.account.name + cmd.domainid = services["server"]["domainid"] + + with self.assertRaises(Exception): + self.apiclient.listPortForwardingRules(cmd) + + + #Retrieve router for the user account + cmd = listRouters.listRoutersCmd() + cmd.account = self.account.account.name + cmd.domainid = self.account.account.domainid + with self.assertRaises(Exception): + routers = self.apiclient.listRouters(cmd) + + return + + def tearDown(self): + cleanup_resources(self.apiclient, self.cleanup) + return + diff --git a/tools/testClient/testcase/BVT-tests/test_primary_storage.py b/tools/testClient/testcase/BVT-tests/test_primary_storage.py new file mode 100644 index 00000000000..6dd1bf84ce0 --- /dev/null +++ b/tools/testClient/testcase/BVT-tests/test_primary_storage.py @@ -0,0 +1,138 @@ +# -*- encoding: utf-8 -*- +# +# Copyright (c) 2012 Citrix. All rights reserved. +# +""" BVT tests for Primary Storage +""" +#Import Local Modules +from cloudstackTestCase import * +from cloudstackAPI import * +from settings import * +import remoteSSHClient +from utils import * +from base import * + +#Import System modules +import time + +services = TEST_PRIMARY_STORAGE_SERVICES + +class TestPrimaryStorageServices(cloudstackTestCase): + + def setUp(self): + + self.apiclient = self.testClient.getApiClient() + self.cleanup = [] + return + + def tearDown(self): + try: + #Clean up, terminate the created templates + cleanup_resources(self.apiclient, self.cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" %e) + return + + def test_01_primary_storage(self): + """Test primary storage pools - XEN, KVM, VMWare + """ + + # Validate the following: + # 1. verify hypervisortype returned by api is Xen/KVM/VMWare + # 2. verify that the cluster is in 'Enabled' allocation state + # 3. verify that the host is added successfully and in Up state with listHosts api response + + #Create NFS storage pools with on XEN/KVM/VMWare clusters + for k,v in services["nfs"].items(): + + #Host should be present before adding primary storage + cmd = listHosts.listHostsCmd() + cmd.clusterid = v["clusterid"] + list_hosts_response = self.apiclient.listHosts(cmd) + + self.assertNotEqual( + len(list_hosts_response), + 0, + "Check list Hosts response for hypervisor type : " + v["hypervisor"] + ) + + storage = StoragePool.create(self.apiclient, v) + self.cleanup.append(storage) + + self.assertEqual( + storage.state, + 'Up', + "Check state of primary storage is Up or not for hypervisor type : " + v["hypervisor"] + ) + + self.assertEqual( + storage.type, + 'NetworkFilesystem', + "Check type of the storage pool created for hypervisor type : " + v["hypervisor"] + ) + + #Verify List Storage pool Response has newly added storage pool + cmd = listStoragePools.listStoragePoolsCmd() + cmd.id = storage.id + storage_pools_response = self.apiclient.listStoragePools(cmd) + + self.assertNotEqual( + len(storage_pools_response), + 0, + "Check list Hosts response" + ) + + storage_response = storage_pools_response[0] + self.assertEqual( + storage_response.id, + storage.id, + "Check storage pool ID with list storage pools response for hypervisor type : " + v["hypervisor"] + ) + self.assertEqual( + storage.type, + storage_response.type, + "Check type of the storage pool for hypervisor type : " + v["hypervisor"] + ) + # Call cleanup for reusing primary storage + cleanup_resources(self.apiclient, self.cleanup) + self.cleanup = [] + + # Create iSCSI storage pools with on XEN/KVM clusters + for k,v in services["iscsi"].items(): + storage = StoragePool.create(self.apiclient, v) + self.cleanup.append(storage) + + self.assertEqual( + storage.state, + 'Up', + "Check state of primary storage is Up or not for hypervisor type : " + v["hypervisor"] + ) + + #Verify List Storage pool Response has newly added storage pool + cmd = listStoragePools.listStoragePoolsCmd() + cmd.id = storage.id + storage_pools_response = self.apiclient.listStoragePools(cmd) + + self.assertNotEqual( + len(storage_pools_response), + 0, + "Check list Hosts response for hypervisor type : " + v["hypervisor"] + ) + + storage_response = storage_pools_response[0] + self.assertEqual( + storage_response.id, + storage.id, + "Check storage pool ID with list storage pools response for hypervisor type : " + v["hypervisor"] + ) + self.assertEqual( + storage.type, + storage_response.type, + "Check type of the storage pool for hypervisor type : " + v["hypervisor"] + ) + + # Call cleanup for reusing primary storage + cleanup_resources(self.apiclient, self.cleanup) + self.cleanup = [] + return \ No newline at end of file diff --git a/tools/testClient/testcase/BVT-tests/test_routers.py b/tools/testClient/testcase/BVT-tests/test_routers.py new file mode 100644 index 00000000000..36730db7297 --- /dev/null +++ b/tools/testClient/testcase/BVT-tests/test_routers.py @@ -0,0 +1,537 @@ +# -*- encoding: utf-8 -*- +# +# Copyright (c) 2012 Citrix. All rights reserved. +# +""" BVT tests for routers +""" +#Import Local Modules +from cloudstackTestCase import * +from cloudstackAPI import * +from settings import * +import remoteSSHClient +from utils import * +from base import * + +#Import System modules +import time + +services = TEST_ROUTER_SERVICES + +class TestRouterServices(cloudstackTestCase): + + @classmethod + def setUpClass(cls): + + cls.api_client = fetch_api_client() + + #Create an account, network, VM and IP addresses + cls.account = Account.create(cls.api_client, services["account"]) + cls.vm_1 = VirtualMachine.create( + cls.api_client, + services["virtual_machine"], + accountid=cls.account.account.name, + ) + + cls.cleanup = [cls.vm_1, cls.account] + return + + @classmethod + def tearDownClass(cls): + try: + cls.api_client = fetch_api_client() + #Clean up, terminate the created templates + cleanup_resources(cls.api_client, cls.cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" %e) + return + + def setUp(self): + self.apiclient = self.testClient.getApiClient() + return + + def test_01_router_basic(self): + """Test router basic setup + """ + + # Validate the following: + # 1. verify that listRouters returned a 'Running' router + # 2. router will have dns same as that seen in listZones + # 3. router will have a guestIP and a linkLocalIp" + + cmd = listRouters.listRoutersCmd() + list_router_response = self.apiclient.listRouters(cmd) + + self.assertNotEqual( + len(list_router_response), + 0, + "Check list router response" + ) + for i in range(len(list_router_response)): + self.assertEqual( + list_router_response[i].state, + 'Running', + "Check list router response for router state" + ) + + cmd = listZones.listZonesCmd() + cmd.zoneid = list_router_response[i].zoneid + zone = self.apiclient.listZones(cmd)[0] + + self.assertEqual( + list_router_response[i].dns1, + zone.dns1, + "Compare DNS1 of router and zone" + ) + self.assertEqual( + list_router_response[i].dns2, + zone.dns2, + "Compare DNS2 of router and zone" + ) + self.assertEqual( + hasattr(list_router_response[i],'guestipaddress'), + True, + "Check whether router has guest IP field" + ) + + self.assertEqual( + hasattr(list_router_response[i],'linklocalip'), + True, + "Check whether router has link local IP field" + ) + return + + def test_02_router_advanced(self): + """Test router advanced setup + """ + + # Validate the following + # 1. verify that listRouters returned a 'Running' router + # 2. router will have dns and gateway as in listZones, listVlanIpRanges + # 3. router will have guest,public and linklocal IPs + + cmd = listRouters.listRoutersCmd() + cmd.account = self.account.account.name + cmd.domainid = services["virtual_machine"]["domainid"] + list_router_response = self.apiclient.listRouters(cmd) + + self.assertNotEqual( + len(list_router_response), + 0, + "Check list router response" + ) + for i in range(len(list_router_response)): + self.assertEqual( + list_router_response[i].state, + 'Running', + "Check list router response for router state" + ) + + cmd = listZones.listZonesCmd() + cmd.zoneid = list_router_response[i].zoneid + zone = self.apiclient.listZones(cmd)[0] + + self.assertEqual( + list_router_response[i].dns1, + zone.dns1, + "Compare DNS1 of router and zone" + ) + self.assertEqual( + list_router_response[i].dns2, + zone.dns2, + "Compare DNS2 of router and zone" + ) + self.assertEqual( + hasattr(list_router_response[i],'guestipaddress'), + True, + "Check whether router has guest IP field" + ) + + self.assertEqual( + hasattr(list_router_response[i],'linklocalip'), + True, + "Check whether router has link local IP field" + ) + + #Fetch corresponding ip ranges information from listVlanIpRanges + cmd = listVlanIpRanges.listVlanIpRangesCmd() + cmd.id = list_ssvm_response[i].zoneid + ipranges_response = self.apiclient.listVlanIpRanges(cmd)[0] + + self.assertEqual( + list_router_response[i].gateway, + ipranges_response.gateway, + "Check gateway with that of corresponding IP range" + ) + return + + def test_03_stop_router(self): + """Test stop router + """ + + # Validate the following + # 1. listRouter should report the router for the account as stopped + + cmd = listRouters.listRoutersCmd() + cmd.account = self.account.account.name + cmd.domainid = services["virtual_machine"]["domainid"] + router = self.apiclient.listRouters(cmd)[0] + + #Stop the router + cmd = stopRouter.stopRouterCmd() + cmd.id = router.id + self.apiclient.stopRouter(cmd) + + #List routers to check state of router + + cmd = listRouters.listRoutersCmd() + cmd.id = router.id + router_response = self.apiclient.listRouters(cmd)[0] + + #List router should have router in stopped state + self.assertEqual( + router_response.state, + 'Stopped', + "Check list router response for router state" + ) + return + + def test_04_start_router(self): + """Test start router + """ + + # Validate the following + # 1. listRouter should report the router for the account as stopped + + cmd = listRouters.listRoutersCmd() + cmd.account = self.account.account.name + cmd.domainid = services["virtual_machine"]["domainid"] + router = self.apiclient.listRouters(cmd)[0] + + #Start the router + cmd = startRouter.startRouterCmd() + cmd.id = router.id + self.apiclient.startRouter(cmd) + + #List routers to check state of router + + cmd = listRouters.listRoutersCmd() + cmd.id = router.id + router_response = self.apiclient.listRouters(cmd)[0] + + #List router should have router in running state + self.assertEqual( + router_response.state, + 'Running', + "Check list router response for router state" + ) + return + + def test_05_reboot_router(self): + """Test reboot router + """ + + # Validate the following + # 1. listRouter should report the router for the account as stopped + + cmd = listRouters.listRoutersCmd() + cmd.account = self.account.account.name + cmd.domainid = services["virtual_machine"]["domainid"] + router = self.apiclient.listRouters(cmd)[0] + + public_ip = router.publicip + + #Reboot the router + cmd = rebootRouter.rebootRouterCmd() + cmd.id = router.id + self.apiclient.rebootRouter(cmd) + + #List routers to check state of router + cmd = listRouters.listRoutersCmd() + cmd.id = router.id + router_response = self.apiclient.listRouters(cmd)[0] + + #List router should have router in running state and same public IP + self.assertEqual( + router_response.state, + 'Running', + "Check list router response for router state" + ) + + self.assertEqual( + router_response.publicip, + public_ip, + "Check list router response for router public IP" + ) + return + + def test_05_network_gc(self): + """Test network GC + """ + + # Validate the following + # 1. stop All User VMs in the account + # 2. wait for network.gc.interval time" + # 3. After network.gc.interval, router should be stopped + # 4. ListRouters should return the router in Stopped state + + cmd = listVirtualMachines.listVirtualMachinesCmd() + cmd.account = self.account.account.name + cmd.domainid = services["virtual_machine"]["domainid"] + list_vms = self.apiclient.listVirtualMachines(cmd) + + self.assertNotEqual( + len(list_vms), + 0, + "Check length of list VM response" + ) + + for i in range(len(list_vms)): + # Stop all virtual machines associated with that account + cmd = stopVirtualMachine.stopVirtualMachineCmd() + cmd.id = list_vms[i].id + self.apiclient.stopVirtualMachine(cmd) + + # Wait for network.gc.interval + cmd = listConfigurations.listConfigurationsCmd() + cmd.name = 'network.gc.interval' + response = self.apiclient.listConfigurations(cmd)[0] + + time.sleep(int(response.value)) + + #Check status of network router + cmd = listRouters.listRoutersCmd() + cmd.account = self.account.account.name + cmd.domainid = services["virtual_machine"]["domainid"] + router = self.apiclient.listRouters(cmd)[0] + + self.assertEqual( + router.state, + 'Stopped', + "Check state of the router after stopping all VMs associated with that account" + ) + return + + def test_06_router_internal_basic(self): + """Test router internal basic zone + """ + # Validate the following + # 1. Router only does dhcp + # 2. Verify that ports 67 (DHCP) and 53 (DNS) are open on UDP by checking status of dnsmasq process + + # Find router associated with user account + cmd = listRouters.listRoutersCmd() + cmd.account = self.account.account.name + cmd.domainid = services["virtual_machine"]["domainid"] + router = self.apiclient.listRouters(cmd)[0] + + cmd = listHosts.listHostsCmd() + cmd.zoneid = router.zoneid + cmd.type = 'Routing' + cmd.state = 'Up' + host = self.apiclient.listHosts(cmd)[0] + + #SSH to the machine + ssh = remoteSSHClient.remoteSSHClient( + host.ipaddress, + services['virtual_machine']["publicport"], + services['virtual_machine']["username"], + services['virtual_machine']["password"] + ) + ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s " % router.linklocalip + + # Double hop into router + timeout = 5 + # Ensure the SSH login is successful + c = ssh_command + "service dnsmasq status" + while True: + res = ssh.execute(c)[0] + if res != "Host key verification failed.": + break + elif timeout == 0: + break + time.sleep(5) + timeout = timeout - 1 + self.assertEqual( + res.count("is running"), + 1, + "Check dnsmasq service is running or not" + ) + return + + def test_07_router_internal_adv(self): + """Test router internal advanced zone + """ + # Validate the following + # 1. Router does dhcp, dns, gateway, LB, PF, FW + # 2. verify that dhcp, dns ports are open on UDP + # 3. dnsmasq, haproxy processes should be running + + # Find router associated with user account + cmd = listRouters.listRoutersCmd() + cmd.account = self.account.account.name + cmd.domainid = services["virtual_machine"]["domainid"] + router = self.apiclient.listRouters(cmd)[0] + + cmd = listHosts.listHostsCmd() + cmd.zoneid = router.zoneid + cmd.type = 'Routing' + cmd.state = 'Up' + host = self.apiclient.listHosts(cmd)[0] + + #SSH to the machine + ssh = remoteSSHClient.remoteSSHClient( + host.ipaddress, + services['virtual_machine']["publicport"], + services['virtual_machine']["username"], + services['virtual_machine']["password"] + ) + ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s " % router.linklocalip + + # Double hop into router + timeout = 5 + # Ensure the SSH login is successful + c = ssh_command + "service dnsmasq status" + while True: + res = ssh.execute(c)[0] + if res != "Host key verification failed.": + break + elif timeout == 0: + break + time.sleep(5) + timeout = timeout - 1 + self.assertEqual( + res.count("running"), + 1, + "Check dnsmasq service is running or not" + ) + + timeout = 5 + # Ensure the SSH login is successful + c = ssh_command + "service haproxy status" + while True: + res = ssh.execute(c)[0] + if res != "Host key verification failed.": + break + elif timeout == 0: + break + time.sleep(5) + timeout = timeout - 1 + self.assertEqual( + res.count("running"), + 1, + "Check haproxy service is running or not" + ) + return + + def test_08_restart_network_cleanup(self): + """Test restart network + """ + + # Validate the following + # 1. When cleanup = true, router is destroyed and a new one created + # 2. New router will have new publicIp and linkLocalIp and all it's services should resume + + # Find router associated with user account + cmd = listRouters.listRoutersCmd() + cmd.account = self.account.account.name + cmd.domainid = services["virtual_machine"]["domainid"] + router = self.apiclient.listRouters(cmd)[0] + + #Store old values before restart + old_linklocalip = router.linklocalip + + cmd = listNetworks.listNetworksCmd() + cmd.account = self.account.account.name + cmd.domainid = services["virtual_machine"]["domainid"] + network = self.apiclient.listNetworks(cmd)[0] + + cmd = restartNetwork.restartNetworkCmd() + cmd.id = network.id + cmd.cleanup = True + self.apiclient.restartNetwork(cmd) + + # Get router details after restart + cmd = listRouters.listRoutersCmd() + cmd.account = self.account.account.name + cmd.domainid = services["virtual_machine"]["domainid"] + router = self.apiclient.listRouters(cmd)[0] + + self.assertNotEqual( + router.linklocalip, + old_linklocalip, + "Check linklocal IP after restart" + ) + return + + def test_08_restart_network_wo_cleanup(self): + """Test restart network without cleanup + """ + + # Validate the following + # 1. When cleanup = false, router is restarted and all services inside the router are restarted + # 2. check 'uptime' to see if the actual restart happened + + cmd = listNetworks.listNetworksCmd() + cmd.account = self.account.account.name + cmd.domainid = services["virtual_machine"]["domainid"] + network = self.apiclient.listNetworks(cmd)[0] + + cmd = restartNetwork.restartNetworkCmd() + cmd.id = network.id + cmd.cleanup = False + self.apiclient.restartNetwork(cmd) + + # Get router details after restart + cmd = listRouters.listRoutersCmd() + cmd.account = self.account.account.name + cmd.domainid = services["virtual_machine"]["domainid"] + router = self.apiclient.listRouters(cmd)[0] + + cmd = listHosts.listHostsCmd() + cmd.zoneid = router.zoneid + cmd.type = 'Routing' + cmd.state = 'Up' + host = self.apiclient.listHosts(cmd)[0] + + #SSH to the machine + ssh = remoteSSHClient.remoteSSHClient( + host.ipaddress, + services['virtual_machine']["publicport"], + services['virtual_machine']["username"], + services['virtual_machine']["password"] + ) + ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s uptime" % router.linklocalip + + # Double hop into router to check router uptime + timeout = 5 + while True: + res = ssh.execute(ssh_command)[0] + if res != "Host key verification failed.": + break + elif timeout == 0: + break + time.sleep(5) + timeout = timeout - 1 + + # res = 12:37:14 up 1 min, 0 users, load average: 0.61, 0.22, 0.08 + # Split result to check the uptime + result = res.split() + self.assertEqual( + str(result[1]), + 'up', + "Check router is running or not" + ) + if str(result[3]) == "min,": + self.assertEqual( + (int(result[2]) < 3), + True, + "Check uptime is less than 3 mins or not" + ) + else: + self.assertEqual( + str(result[3]), + 'sec,', + "Check uptime is in seconds" + ) + return \ No newline at end of file diff --git a/tools/testClient/testcase/BVT-tests/test_secondary_storage.py b/tools/testClient/testcase/BVT-tests/test_secondary_storage.py new file mode 100644 index 00000000000..d130487164a --- /dev/null +++ b/tools/testClient/testcase/BVT-tests/test_secondary_storage.py @@ -0,0 +1,151 @@ +# -*- encoding: utf-8 -*- +# +# Copyright (c) 2012 Citrix. All rights reserved. +# +""" BVT tests for Secondary Storage +""" +#Import Local Modules +from cloudstackTestCase import * +from cloudstackAPI import * +from settings import * +import remoteSSHClient +from utils import * +from base import * + +#Import System modules +import time + +services = TEST_SEC_STORAGE_SERVICES + +class TestSecStorageServices(cloudstackTestCase): + + def setUp(self): + + self.apiclient = self.testClient.getApiClient() + self.cleanup = [] + return + + def tearDown(self): + try: + #Clean up, terminate the created templates + cleanup_resources(self.apiclient, self.cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" %e) + return + + def test_01_add_sec_storage(self): + """Test secondary storage + """ + + # Validate the following: + # 1. secondary storage should be added to the zone. + # 2. Verify with listHosts and type secondarystorage + + cmd = addSecondaryStorage.addSecondaryStorageCmd() + cmd.zoneid = services["storage"]["zoneid"] + cmd.url = services["storage"]["url"] + sec_storage = self.apiclient.addSecondaryStorage(cmd) + + self.assertEqual( + sec_storage.zoneid, + services["storage"]["zoneid"], + "Check zoneid where sec storage is added" + ) + + cmd = listHosts.listHostsCmd() + cmd.type = 'SecondaryStorage' + cmd.id = sec_storage.id + list_hosts_response = self.apiclient.listHosts(cmd) + + self.assertNotEqual( + len(list_hosts_response), + 0, + "Check list Hosts response" + ) + + host_response = list_hosts_response[0] + #Check if host is Up and running + self.assertEqual( + host_response.id, + sec_storage.id, + "Check ID of secondary storage" + ) + self.assertEqual( + sec_storage.type, + host_response.type, + "Check type of host from list hosts response" + ) + return + + def test_02_sys_vm_start(self): + """Test system VM start + """ + + # 1. verify listHosts has all 'routing' hosts in UP state + # 2. verify listStoragePools shows all primary storage pools in UP state + # 3. verify that secondary storage was added successfully + + cmd = listHosts.listHostsCmd() + cmd.type = 'Routing' + list_hosts_response = self.apiclient.listHosts(cmd) + # ListHosts has all 'routing' hosts in UP state + self.assertNotEqual( + len(list_hosts_response), + 0, + "Check list host response" + ) + for i in range(len(list_hosts_response)): + + host_response = list_hosts_response[i] + self.assertEqual( + host_response.state, + 'Up', + "Check state of routing hosts is Up or not" + ) + + # ListStoragePools shows all primary storage pools in UP state + cmd = listStoragePools.listStoragePoolsCmd() + cmd.name = 'Primary' + list_storage_response = self.apiclient.listStoragePools(cmd) + + self.assertNotEqual( + len(list_storage_response), + 0, + "Check list storage pools response" + ) + + for i in range(len(list_hosts_response)): + storage_response = list_storage_response[i] + self.assertEqual( + storage_response.state, + 'Up', + "Check state of primary storage pools is Up or not" + ) + + # Secondary storage is added successfully + cmd = listHosts.listHostsCmd() + cmd.type = 'SecondaryStorage' + list_hosts_response = self.apiclient.listHosts(cmd) + + self.assertNotEqual( + len(list_hosts_response), + 0, + "Check list Hosts response" + ) + + host_response = list_hosts_response[0] + #Check if host is Up and running + self.assertEqual( + host_response.state, + 'Up', + "Check state of secondary storage" + ) + return + + def test_03_sys_template_ready(self): + """Test system templates are ready + """ + + # TODO + return \ No newline at end of file diff --git a/tools/testClient/testcase/BVT-tests/test_service_offerings.py b/tools/testClient/testcase/BVT-tests/test_service_offerings.py index 661ee272dcc..0a69946bdb3 100644 --- a/tools/testClient/testcase/BVT-tests/test_service_offerings.py +++ b/tools/testClient/testcase/BVT-tests/test_service_offerings.py @@ -1,6 +1,6 @@ # -*- encoding: utf-8 -*- # -# Copyright (c) 2011 Citrix. All rights reserved. +# Copyright (c) 2012 Citrix. All rights reserved. # """ BVT tests for Service offerings""" diff --git a/tools/testClient/testcase/BVT-tests/test_snapshots.py b/tools/testClient/testcase/BVT-tests/test_snapshots.py index 6ad24b5b460..06126fe7ca2 100644 --- a/tools/testClient/testcase/BVT-tests/test_snapshots.py +++ b/tools/testClient/testcase/BVT-tests/test_snapshots.py @@ -1,6 +1,6 @@ # -*- encoding: utf-8 -*- # -# Copyright (c) 2011 Citrix. All rights reserved. +# Copyright (c) 2012 Citrix. All rights reserved. # """ BVT tests for Snapshots """ @@ -31,10 +31,8 @@ class TestSnapshots(cloudstackTestCase): cls.virtual_machine.delete(cls.api_client) cls.virtual_machine_without_disk.delete(cls.api_client) cls.nat_rule.delete(cls.api_client) - except Exception as e: raise Exception("Warning: Exception during cleanup : %s" %e) - return def setUp(self): @@ -47,7 +45,6 @@ class TestSnapshots(cloudstackTestCase): try: #Clean up, terminate the created instance, volumes and snapshots cleanup_resources(self.apiclient, self.cleanup) - except Exception as e: raise Exception("Warning: Exception during cleanup : %s" %e) return @@ -58,7 +55,6 @@ class TestSnapshots(cloudstackTestCase): cmd = listVolumes.listVolumesCmd() cmd.virtualmachineid = self.virtual_machine_with_disk.id cmd.type = 'ROOT' - volumes = self.apiclient.listVolumes(cmd) snapshot = Snapshot.create(self.apiclient, volumes[0].id) @@ -67,7 +63,6 @@ class TestSnapshots(cloudstackTestCase): list_snapshots = self.apiclient.listSnapshots(cmd) self.assertNotEqual(list_snapshots, None, "Check if result exists in list item call") - self.assertEqual( list_snapshots[0].id, snapshot.id, @@ -90,7 +85,6 @@ class TestSnapshots(cloudstackTestCase): ) return - def test_02_snapshot_data_disk(self): """Test Snapshot Data Disk """ @@ -98,7 +92,6 @@ class TestSnapshots(cloudstackTestCase): cmd = listVolumes.listVolumesCmd() cmd.virtualmachineid = self.virtual_machine_with_disk.id cmd.type = 'DATADISK' - volume = self.apiclient.listVolumes(cmd) snapshot = Snapshot.create(self.apiclient, volume[0].id) @@ -107,13 +100,11 @@ class TestSnapshots(cloudstackTestCase): list_snapshots = self.apiclient.listSnapshots(cmd) self.assertNotEqual(list_snapshots, None, "Check if result exists in list item call") - self.assertEqual( list_snapshots[0].id, snapshot.id, "Check resource id in list resources call" ) - self.debug("select backup_snap_id from snapshots where id = %s;" % snapshot.id) qresultset = self.dbclient.execute("select backup_snap_id from snapshots where id = %s;" % snapshot.id) self.assertNotEqual( @@ -134,7 +125,6 @@ class TestSnapshots(cloudstackTestCase): def test_03_volume_from_snapshot(self): """Create volumes from snapshots """ - #1. Login to machine; create temp/test directories on data volume #2. Snapshot the Volume #3. Create another Volume from snapshot @@ -142,9 +132,7 @@ class TestSnapshots(cloudstackTestCase): #5. Compare data random_data_0 = random_gen(100) random_data_1 = random_gen(100) - ssh_client = self.virtual_machine.get_ssh_client(services["server_with_disk"]["ipaddress"]) - #Format partition using ext3 format_volume_to_ext3(ssh_client, services["diskdevice"]) cmds = [ "mkdir -p %s" % services["mount_dir"], @@ -170,37 +158,31 @@ class TestSnapshots(cloudstackTestCase): ] for c in cmds: self.debug(ssh_client.execute(c)) - cmd = listVolumes.listVolumesCmd() cmd.hostid = self.virtual_machine.id cmd.type = 'DATADISK' list_volume_response = self.apiclient.listVolumes(cmd) volume = list_volume_response[0] - #Create snapshot from attached volume snapshot = Snapshot.create(self.apiclient, volume.id) self.cleanup.append(snapshot) #Create volume from snapshot volume = Volume.create_from_snapshot(self.apiclient, snapshot.id, services) self.cleanup.append(volume) - cmd = listVolumes.listVolumesCmd() cmd.id = volume.id list_volumes = self.apiclient.listVolumes(cmd) - self.assertNotEqual( len(list_volumes), None, "Check Volume list Length" ) - self.assertEqual ( list_volumes[0].id, volume.id, "Check Volume in the List Volumes" ) - #Attaching volume to new VM new_virtual_machine = self.virtual_machine_without_disk self.cleanup.append(new_virtual_machine) @@ -209,7 +191,6 @@ class TestSnapshots(cloudstackTestCase): cmd.virtualmachineid = new_virtual_machine.id volume = self.apiclient.attachVolume(cmd) - #Login to VM to verify test directories and files ssh = new_virtual_machine.get_ssh_client(services["server_without_disk"]["ipaddress"]) cmds = [ @@ -221,26 +202,26 @@ class TestSnapshots(cloudstackTestCase): self.debug(ssh.execute(c)) returned_data_0 = ssh.execute("cat %s/%s/%s" %( - services["sub_dir"], - services["sub_lvl_dir1"], + services["sub_dir"], + services["sub_lvl_dir1"], services["random_data"] ) ) returned_data_1 = ssh.execute("cat %s/%s/%s" %( - services["sub_dir"], - services["sub_lvl_dir2"], + services["sub_dir"], + services["sub_lvl_dir2"], services["random_data"] ) ) #Verify returned data self.assertEqual( - random_data_0, - returned_data_0[0], + random_data_0, + returned_data_0[0], "Verify newly attached volume contents with existing one" ) self.assertEqual( - random_data_1, - returned_data_1[0], + random_data_1, + returned_data_1[0], "Verify newly attached volume contents with existing one" ) @@ -253,12 +234,10 @@ class TestSnapshots(cloudstackTestCase): def test_04_delete_snapshot(self): """Test Delete Snapshot """ - cmd = listVolumes.listVolumesCmd() cmd.hostid = self.virtual_machine.id cmd.type = 'DATADISK' list_volumes = self.apiclient.listVolumes(cmd) - cmd = listSnapshots.listSnapshotsCmd() cmd.id = list_volumes[0].id list_snapshots = self.apiclient.listSnapshots(cmd) @@ -270,7 +249,6 @@ class TestSnapshots(cloudstackTestCase): cmd = listSnapshots.listSnapshotsCmd() cmd.id = snapshot.id list_snapshots = self.apiclient.listSnapshots(cmd) - self.assertEqual(list_snapshots, None, "Check if result exists in list item call") return @@ -284,12 +262,9 @@ class TestSnapshots(cloudstackTestCase): cmd = listVolumes.listVolumesCmd() cmd.virtualmachineid = self.virtual_machine_with_disk.id cmd.type = 'ROOT' - volume = self.apiclient.listVolumes(cmd) - recurring_snapshot = SnapshotPolicy.create(self.apiclient, volume[0].id, services["recurring_snapshot"]) self.cleanup.append(recurring_snapshot) - #ListSnapshotPolicy should return newly created policy cmd = listSnapshotPolicies.listSnapshotPoliciesCmd() cmd.id = recurring_snapshot.id @@ -297,7 +272,6 @@ class TestSnapshots(cloudstackTestCase): list_snapshots_policy = self.apiclient.listSnapshotPolicies(cmd) self.assertNotEqual(list_snapshots_policy, None, "Check if result exists in list item call") - snapshots_policy = list_snapshots_policy[0] self.assertEqual( snapshots_policy.id, @@ -309,17 +283,13 @@ class TestSnapshots(cloudstackTestCase): services["recurring_snapshot"]["maxsnaps"], "Check interval type in list resources call" ) - #Sleep for (maxsnaps+1) hours to verify only maxsnaps snapshots are retained time.sleep(((services["recurring_snapshot"]["maxsnaps"])+1)*3600) - cmd = listSnapshots.listSnapshotsCmd() cmd.volumeid=volume.id cmd.intervaltype = services["recurring_snapshot"]["intervaltype"] cmd.snapshottype = 'RECURRING' - list_snapshots = self.apiclient.listSnapshots(cmd) - self.assertEqual( len(list_snapshots), services["recurring_snapshot"]["maxsnaps"], @@ -330,7 +300,6 @@ class TestSnapshots(cloudstackTestCase): def test_06_recurring_snapshot_data_disk(self): """Test Recurring Snapshot data Disk """ - #1. Create snapshot policy for data disk #2. ListSnapshot policy should return newly created policy #3. Verify only most recent number (maxsnaps) snapshots retailed @@ -338,12 +307,9 @@ class TestSnapshots(cloudstackTestCase): cmd = listVolumes.listVolumesCmd() cmd.virtualmachineid = self.virtual_machine_with_disk.id cmd.type = 'DATADISK' - volume = self.apiclient.listVolumes(cmd) - recurring_snapshot = SnapshotPolicy.create(self.apiclient, volume[0].id, services["recurring_snapshot"]) self.cleanup.append(recurring_snapshot) - #ListSnapshotPolicy should return newly created policy cmd = listSnapshotPolicies.listSnapshotPoliciesCmd() cmd.id = recurring_snapshot.id @@ -351,7 +317,6 @@ class TestSnapshots(cloudstackTestCase): list_snapshots_policy = self.apiclient.listSnapshotPolicies(cmd) self.assertNotEqual(list_snapshots_policy, None, "Check if result exists in list item call") - snapshots_policy = list_snapshots_policy[0] self.assertEqual( snapshots_policy.id, @@ -402,26 +367,26 @@ class TestSnapshots(cloudstackTestCase): "mount %s1 %s" %(services["diskdevice"], services["mount_dir"]), "pushd %s" % services["mount_dir"], "mkdir -p %s/{%s,%s} " %( - services["sub_dir"], - services["sub_lvl_dir1"], + services["sub_dir"], + services["sub_lvl_dir1"], services["sub_lvl_dir2"] ), "echo %s > %s/%s/%s" %( - random_data_0, - services["sub_dir"], - services["sub_lvl_dir1"], + random_data_0, + services["sub_dir"], + services["sub_lvl_dir1"], services["random_data"] ), "echo %s > %s/%s/%s" %( - random_data_1, - services["sub_dir"], - services["sub_lvl_dir2"], + random_data_1, + services["sub_dir"], + services["sub_lvl_dir2"], services["random_data"] ) ] + for c in cmds: ssh_client.execute(c) - cmd = listVolumes.listVolumesCmd() cmd.virtualmachineid = self.virtual_machine.id cmd.type = 'ROOT' @@ -449,8 +414,8 @@ class TestSnapshots(cloudstackTestCase): # Deploy new virtual machine using template new_virtual_machine = VirtualMachine.create( - self.apiclient, - services["server_without_disk"], + self.apiclient, + services["server_without_disk"], template.id ) self.cleanup.append(new_virtual_machine) @@ -465,17 +430,16 @@ class TestSnapshots(cloudstackTestCase): ssh.execute(c) returned_data_0 = ssh.execute("cat %s/%s/%s" %( - services["sub_dir"], - services["sub_lvl_dir1"], + services["sub_dir"], + services["sub_lvl_dir1"], services["random_data"] ) ) returned_data_1 = ssh.execute("cat %s/%s/%s" %( - services["sub_dir"], - services["sub_lvl_dir2"], + services["sub_dir"], + services["sub_lvl_dir2"], services["random_data"] ) ) #Verify returned data self.assertEqual(random_data_0, returned_data_0[0], "Verify newly attached volume contents with existing one") self.assertEqual(random_data_1, returned_data_1[0], "Verify newly attached volume contents with existing one") - return diff --git a/tools/testClient/testcase/BVT-tests/test_ssvm.py b/tools/testClient/testcase/BVT-tests/test_ssvm.py new file mode 100644 index 00000000000..9754fb6876a --- /dev/null +++ b/tools/testClient/testcase/BVT-tests/test_ssvm.py @@ -0,0 +1,615 @@ +# -*- encoding: utf-8 -*- +# +# Copyright (c) 2012 Citrix. All rights reserved. +# +""" BVT tests for SSVM +""" +#Import Local Modules +from cloudstackTestCase import * +from cloudstackAPI import * +from settings import * +import remoteSSHClient +from utils import * +from base import * +import telnetlib + +#Import System modules +import time + +services = TEST_SSVM_SERVICES + +class TestSSVMs(cloudstackTestCase): + + def setUp(self): + + self.apiclient = self.testClient.getApiClient() + self.cleanup = [] + return + + def tearDown(self): + try: + #Clean up, terminate the created templates + cleanup_resources(self.apiclient, self.cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" %e) + return + + def test_01_list_sec_storage_vm(self): + """Test List secondary storage VMs + """ + + # Validate the following: + # 1. listSystemVM (systemvmtype=secondarystoragevm) should return only ONE SSVM per zone + # 2. The returned SSVM should be in Running state + # 3. listSystemVM for secondarystoragevm should list publicip, privateip and link-localip + # 4. The gateway programmed on the ssvm by listSystemVm should be the same as the gateway returned by listVlanIpRanges + # 5. DNS entries must match those given for the zone + + cmd = listSystemVms.listSystemVmsCmd() + cmd.systemvmtype = 'secondarystoragevm' + list_ssvm_response = self.apiclient.listSystemVms(cmd) + + #Verify SSVM response + self.assertNotEqual( + len(list_ssvm_response), + 0, + "Check list System VMs response" + ) + + cmd = listZones.listZonesCmd() + list_zones_response = self.apiclient.listZones(cmd) + + self.assertEqual( + len(list_ssvm_response), + len(list_zones_response), + "Check number of SSVMs with number of zones" + ) + #For each secondary storage VM check private IP, public IP, link local IP and DNS + for i in range(len(list_ssvm_response)): + + self.assertEqual( + list_ssvm_response[i].state, + 'Running', + "Check whether state of SSVM is running" + ) + + self.assertEqual( + hasattr(list_ssvm_response[i],'privateip'), + True, + "Check whether SSVM has private IP field" + ) + + self.assertEqual( + hasattr(list_ssvm_response[i],'linklocalip'), + True, + "Check whether SSVM has link local IP field" + ) + + self.assertEqual( + hasattr(list_ssvm_response[i],'publicip'), + True, + "Check whether SSVM has public IP field" + ) + + #Fetch corresponding ip ranges information from listVlanIpRanges + cmd = listVlanIpRanges.listVlanIpRangesCmd() + cmd.id = list_ssvm_response[i].zoneid + ipranges_response = self.apiclient.listVlanIpRanges(cmd)[0] + + self.assertEqual( + list_ssvm_response[i].gateway, + ipranges_response.gateway, + "Check gateway with that of corresponding ip range" + ) + + #Fetch corresponding zone information from listZones + cmd = listZones.listZonesCmd() + cmd.id = list_ssvm_response[i].zoneid + zone_response = self.apiclient.listZones(cmd)[0] + + self.assertEqual( + list_ssvm_response[i].dns1, + zone_response.dns1, + "Check DNS1 with that of corresponding zone" + ) + + self.assertEqual( + list_ssvm_response[i].dns2, + zone_response.dns2, + "Check DNS2 with that of corresponding zone" + ) + return + + def test_02_list_cpvm_vm(self): + """Test List console proxy VMs + """ + + # Validate the following: + # 1. listSystemVM (systemvmtype=consoleproxy) should return at least ONE CPVM per zone + # 2. The returned ConsoleProxyVM should be in Running state + # 3. listSystemVM for console proxy should list publicip, privateip and link-localip + # 4. The gateway programmed on the console proxy should be the same as the gateway returned by listZones + # 5. DNS entries must match those given for the zone + + cmd = listSystemVms.listSystemVmsCmd() + cmd.systemvmtype = 'consoleproxy' + list_cpvm_response = self.apiclient.listSystemVms(cmd) + + #Verify CPVM response + self.assertNotEqual( + len(list_cpvm_response), + 0, + "Check list System VMs response" + ) + + cmd = listZones.listZonesCmd() + list_zones_response = self.apiclient.listZones(cmd) + + self.assertEqual( + len(list_cpvm_response), + len(list_zones_response), + "Check number of CPVMs with number of zones" + ) + #For each CPVM check private IP, public IP, link local IP and DNS + for i in range(len(list_cpvm_response)): + + self.assertEqual( + list_cpvm_response[i].state, + 'Running', + "Check whether state of CPVM is running" + ) + + self.assertEqual( + hasattr(list_cpvm_response[i],'privateip'), + True, + "Check whether CPVM has private IP field" + ) + + self.assertEqual( + hasattr(list_cpvm_response[i],'linklocalip'), + True, + "Check whether CPVM has link local IP field" + ) + + self.assertEqual( + hasattr(list_cpvm_response[i],'publicip'), + True, + "Check whether CPVM has public IP field" + ) + #Fetch corresponding ip ranges information from listVlanIpRanges + cmd = listVlanIpRanges.listVlanIpRangesCmd() + cmd.id = list_ssvm_response[i].zoneid + ipranges_response = self.apiclient.listVlanIpRanges(cmd)[0] + + self.assertEqual( + list_cpvm_response[i].gateway, + ipranges_response.gateway, + "Check gateway with that of corresponding ip range" + ) + + #Fetch corresponding zone information from listZones + cmd = listZones.listZonesCmd() + cmd.id = list_cpvm_response[i].zoneid + zone_response = self.apiclient.listZones(cmd)[0] + + self.assertEqual( + list_cpvm_response[i].dns1, + zone_response.dns1, + "Check DNS1 with that of corresponding zone" + ) + + self.assertEqual( + list_cpvm_response[i].dns2, + zone_response.dns2, + "Check DNS2 with that of corresponding zone" + ) + return + + def test_03_ssvm_internals(self): + """Test SSVM Internals""" + + # Validate the following + # 1. The SSVM check script should not return any WARN|ERROR|FAIL messages + # 2. If you are unable to login to the SSVM with the signed key then test is deemed a failure + # 3. There should be only one ""cloud"" process running within the SSVM + # 4. If no process is running/multiple process are running then the test is a failure + + cmd = listHosts.listHostsCmd() + cmd.zoneid = services["ssvm"]["zoneid"] + cmd.type = 'Routing' + cmd.state = 'Up' + host = self.apiclient.listHosts(cmd)[0] + + cmd = listSystemVms.listSystemVmsCmd() + cmd.systemvmtype = 'secondarystoragevm' + cmd.hostid = host.id + ssvm = self.apiclient.listSystemVms(cmd)[0] + + self.debug("Cheking cloud process status") + #SSH to the machine + ssh = remoteSSHClient.remoteSSHClient( + host.ipaddress, + services['host']["publicport"], + services['host']["username"], + services['host']["password"] + ) + + timeout = 5 + ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s " % ssvm.linklocalip + c = ssh_command + "/usr/local/cloud/systemvm/ssvm-check.sh |grep -e ERROR -e WARNING -e FAIL" + + # Ensure the SSH login is successful + while True: + res = ssh.execute(c)[0] + #Output:Tests Complete. Look for ERROR or WARNING above. + + if res != "Host key verification failed.": + break + elif timeout == 0: + break + time.sleep(5) + timeout = timeout - 1 + + self.assertEqual( + res.count("ERROR"), + 1, + "Check for Errors in tests" + ) + + self.assertEqual( + res.count("WARNING"), + 1, + "Check for warnings in tests" + ) + + self.assertEqual( + res.count("FAIL"), + 1, + "Check for failed tests" + ) + + #Check status of cloud service + c = ssh_command + "service cloud status" + res = ssh.execute(c)[0] + # cloud.com service (type=secstorage) is running: process id: 2346 + self.assertEqual( + res.count("is running"), + 1, + "Check cloud service is running or not" + ) + return + + def test_04_cpvm_internals(self): + """Test CPVM Internals""" + + # Validate the following + # 1. test that telnet access on 8250 is available to the management server for the CPVM + # 2. No telnet access, test FAIL + # 3. Service cloud status should report cloud agent status to be running + + cmd = listHosts.listHostsCmd() + cmd.zoneid = services["cpvm"]["zoneid"] + cmd.type = 'Routing' + cmd.state = 'Up' + host = self.apiclient.listHosts(cmd)[0] + + cmd = listSystemVms.listSystemVmsCmd() + cmd.systemvmtype = 'consoleproxy' + cmd.hostid = host.id + cpvm = self.apiclient.listSystemVms(cmd)[0] + + with assertNotRaises(Exception): + telnet = telnetlib.Telnet( services["cpvm"]["mgmtserverIP"] , '8250') + + self.debug("Cheking cloud process status") + #SSH to the machine + ssh = remoteSSHClient.remoteSSHClient( + host.ipaddress, + services['host']["publicport"], + services['host']["username"], + services['host']["password"] + ) + timeout = 5 + # Check status of cloud service + ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s " % cpvm.linklocalip + c = ssh_command + "service cloud status" + + while True: + res = ssh.execute(c)[0] + # Response: cloud.com service (type=secstorage) is running: process id: 2346 + + #Check if double hop is successful + if res != "Host key verification failed.": + break + elif timeout == 0: + break + time.sleep(5) + timeout = timeout - 1 + + self.assertEqual( + res.count("is running"), + 1, + "Check cloud service is running or not" + ) + return + + def test_05_stop_ssvm(self): + """Test stop SSVM + """ + + # Validate the following + # 1. The SSVM should go to stop state + # 2. After a brief delay of say one minute, the SSVM should be restarted once again and return to Running state with previous two test cases still passing + # 3. If either of the two above steps fail the test is a failure + + cmd = stopSystemVm.stopSystemVmCmd() + cmd.id = services["ssvm"]["id"] + cmd.systemvmtype = 'secondarystoragevm' + self.apiclient.stopSystemVm(cmd) + + #Sleep to ensure that SSVM is properly restarted + time.sleep(90) + cmd = listSystemVms.listSystemVmsCmd() + cmd.id = services["ssvm"]["id"] + list_ssvm_response = self.apiclient.listSystemVms(cmd) + + ssvm_response = list_ssvm_response[0] + self.assertEqual( + ssvm_response.state, + 'Running', + "Check whether SSVM is running or not" + ) + # Call above tests to ensure SSVM is properly running + self.test_01_list_sec_storage_vm() + self.test_03_ssvm_internals() + return + + def test_06_stop_cpvm(self): + """Test stop CPVM + """ + + # Validate the following + # 1. The CPVM should go to stop state + # 2. After a brief delay of say one minute, the SSVM should be restarted once again and return to Running state with previous two test cases still passing + # 3. If either of the two above steps fail the test is a failure + + cmd = stopSystemVm.stopSystemVmCmd() + cmd.id = services["cpvm"]["id"] + cmd.systemvmtype = 'consoleproxy' + self.apiclient.stopSystemVm(cmd) + + cmd = listSystemVms.listSystemVmsCmd() + cmd.id = services["cpvm"]["id"] + + timeout = 10 + while True : + list_cpvm_response = self.apiclient.listSystemVms(cmd) + if not list_cpvm_response: + break; + else: + if timeout == 0: + break + #Sleep to ensure that SSVM is properly restarted + time.sleep(10) + timeout = timeout - 1 + + cpvm_response = list_cpvm_response[0] + self.assertEqual( + cpvm_response.state, + 'Running', + "Check whether CPVM is running or not" + ) + # Call above tests to ensure CPVM is properly running + self.test_02_list_cpvm_vm() + self.test_04_cpvm_internals() + return + + def test_07_reboot_ssvm(self): + """Test reboot SSVM + """ + # Validate the following + # 1. The SSVM should go to stop and return to Running state + # 2. SSVM's public-ip and private-ip must remain the same before and after reboot + # 3. The cloud process should still be running within the SSVM + + cmd = listSystemVms.listSystemVmsCmd() + cmd.id = services["ssvm"]["id"] + cmd.systemvmtype = 'secondarystoragevm' + ssvm_response = self.apiclient.listSystemVms(cmd)[0] + + #Store the public & private IP values before reboot + old_public_ip = ssvm_response.publicip + old_private_ip = ssvm_response.privateip + + cmd = rebootSystemVm.rebootSystemVmCmd() + cmd.id = services["ssvm"]["id"] + self.apiclient.rebootSystemVm(cmd) + + #Sleep to ensure that SSVM is properly stopped/started + time.sleep(60) + cmd = listSystemVms.listSystemVmsCmd() + cmd.id = services["ssvm"]["id"] + ssvm_response = self.apiclient.listSystemVms(cmd)[0] + + self.assertEqual( + 'Running', + str(ssvm_response.state), + "Check whether CPVM is running or not" + ) + + self.assertEqual( + ssvm_response.publicip, + old_public_ip, + "Check Public IP after reboot with that of before reboot" + ) + + self.assertEqual( + ssvm_response.privateip, + old_private_ip, + "Check Private IP after reboot with that of before reboot" + ) + #Call to verify cloud process is running + self.test_03_ssvm_internals() + return + + def test_08_reboot_cpvm(self): + """Test reboot CPVM + """ + # Validate the following + # 1. The CPVM should go to stop and return to Running state + # 2. CPVM's public-ip and private-ip must remain the same before and after reboot + # 3. the cloud process should still be running within the CPVM + + cmd = listSystemVms.listSystemVmsCmd() + cmd.id = services["cpvm"]["id"] + cmd.systemvmtype = 'consoleproxy' + cpvm_response = self.apiclient.listSystemVms(cmd)[0] + + #Store the public & private IP values before reboot + old_public_ip = cpvm_response.publicip + old_private_ip = cpvm_response.privateip + + cmd = rebootSystemVm.rebootSystemVmCmd() + cmd.id = services["cpvm"]["id"] + self.apiclient.rebootSystemVm(cmd) + + #Sleep to ensure that SSVM is properly stopped/started + time.sleep(60) + + cmd = listSystemVms.listSystemVmsCmd() + cmd.id = services["cpvm"]["id"] + cpvm_response = self.apiclient.listSystemVms(cmd)[0] + + self.assertEqual( + 'Running', + str(cpvm_response.state), + "Check whether CPVM is running or not" + ) + + self.assertEqual( + cpvm_response.publicip, + old_public_ip, + "Check Public IP after reboot with that of before reboot" + ) + + self.assertEqual( + cpvm_response.privateip, + old_private_ip, + "Check Private IP after reboot with that of before reboot" + ) + #Call to verify cloud process is running + self.test_04_cpvm_internals() + return + + def test_09_destroy_ssvm(self): + """Test destroy SSVM + """ + + # Validate the following + # 1. SSVM should be completely destroyed and a new one will spin up + # 2. listSystemVMs will show a different name for the systemVM from what it was before + # 3. new SSVM will have a public/private and link-local-ip + # 4. cloud process within SSVM must be up and running + + cmd = listSystemVms.listSystemVmsCmd() + cmd.zoneid = services["ssvm"]["zoneid"] + cmd.systemvmtype = 'secondarystoragevm' + ssvm_response = self.apiclient.listSystemVms(cmd)[0] + + old_name = ssvm_response.name + + cmd = destroySystemVm.destroySystemVmCmd() + cmd.id = ssvm_response.id + self.apiclient.destroySystemVm(cmd) + + #Sleep to ensure that new SSVM is created + time.sleep(60) + + cmd = listSystemVms.listSystemVmsCmd() + cmd.zoneid = services["ssvm"]["zoneid"] + cmd.systemvmtype = 'secondarystoragevm' + ssvm_response = self.apiclient.listSystemVms(cmd)[0] + + # Verify Name, Public IP, Private IP and Link local IP for newly created SSVM + self.assertNotEqual( + ssvm_response.name, + old_name, + "Check SSVM new name with name of destroyed SSVM" + ) + self.assertEqual( + hasattr(ssvm_response,'privateip'), + True, + "Check whether SSVM has private IP field" + ) + + self.assertEqual( + hasattr(ssvm_response,'linklocalip'), + True, + "Check whether SSVM has link local IP field" + ) + + self.assertEqual( + hasattr(ssvm_response,'publicip'), + True, + "Check whether SSVM has public IP field" + ) + #Call to verify cloud process is running + self.test_03_ssvm_internals() + return + + def test_10_destroy_cpvm(self): + """Test destroy CPVM + """ + + # Validate the following + # 1. CPVM should be completely destroyed and a new one will spin up + # 2. listSystemVMs will show a different name for the systemVM from what it was before + # 3. new CPVM will have a public/private and link-local-ip + # 4. cloud process within CPVM must be up and running + + cmd = listSystemVms.listSystemVmsCmd() + cmd.zoneid = services["cpvm"]["zoneid"] + cmd.systemvmtype = 'consoleproxy' + cpvm_response = self.apiclient.listSystemVms(cmd)[0] + + old_name = cpvm_response.name + + cmd = destroySystemVm.destroySystemVmCmd() + cmd.id = cpvm_response.id + self.apiclient.destroySystemVm(cmd) + + #Sleep to ensure that new CPVM is created + time.sleep(60) + + cmd = listSystemVms.listSystemVmsCmd() + cmd.zoneid = services["cpvm"]["zoneid"] + cmd.systemvmtype = 'consoleproxy' + cpvm_response = self.apiclient.listSystemVms(cmd)[0] + + # Verify Name, Public IP, Private IP and Link local IP for newly created CPVM + self.assertNotEqual( + cpvm_response.name, + old_name, + "Check SSVM new name with name of destroyed CPVM" + ) + self.assertEqual( + hasattr(cpvm_response,'privateip'), + True, + "Check whether CPVM has private IP field" + ) + + self.assertEqual( + hasattr(cpvm_response,'linklocalip'), + True, + "Check whether CPVM has link local IP field" + ) + + self.assertEqual( + hasattr(cpvm_response,'publicip'), + True, + "Check whether CPVM has public IP field" + ) + #Call to verify cloud process is running + self.test_04_cpvm_internals() + return \ No newline at end of file diff --git a/tools/testClient/testcase/BVT-tests/test_templates.py b/tools/testClient/testcase/BVT-tests/test_templates.py new file mode 100644 index 00000000000..d3d9dcf7140 --- /dev/null +++ b/tools/testClient/testcase/BVT-tests/test_templates.py @@ -0,0 +1,567 @@ +# -*- encoding: utf-8 -*- +# +# Copyright (c) 2012 Citrix. All rights reserved. +# +""" BVT tests for Templates ISO +""" +#Import Local Modules +from cloudstackTestCase import * +from cloudstackAPI import * +from settings import * +import remoteSSHClient +from utils import * +from base import * +import urllib +from random import random +#Import System modules +import time + + +services = TEST_TEMPLATE_SERVICES + +class TestCreateTemplate(cloudstackTestCase): + + def setUp(self): + + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.cleanup = [] + return + + def tearDown(self): + try: + self.dbclient.close() + #Clean up, terminate the created templates + cleanup_resources(self.apiclient, self.cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" %e) + return + + @classmethod + def setUpClass(cls): + + cls.api_client = fetch_api_client() + #create virtual machine + cls.virtual_machine = VirtualMachine.create( + cls.api_client, + services["virtual_machine"] + ) + + #Stop virtual machine + cmd = stopVirtualMachine.stopVirtualMachineCmd() + cmd.id = cls.virtual_machine.id + cls.api_client.stopVirtualMachine(cmd) + + #Wait before server has be successfully stopped + time.sleep(30) + cmd = listVolumes.listVolumesCmd() + cmd.virtualmachineid = cls.virtual_machine.id + cmd.type = 'ROOT' + list_volume = cls.api_client.listVolumes(cmd) + cls.volume = list_volume[0] + return + + @classmethod + def tearDownClass(cls): + try: + cls.api_client = fetch_api_client() + #Cleanup resources used + cls.virtual_machine.delete(cls.api_client) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" %e) + + return + + def test_01_create_template(self): + """Test create public & private template + """ + + # Validate the following: + # 1. database (vm_template table) should be updated with newly created template + # 2. UI should show the newly added template + # 3. ListTemplates API should show the newly added template + + #Create template from Virtual machine and Volume ID + template = Template.create(self.apiclient, self.volume, services["template_1"]) + self.cleanup.append(template) + + cmd = listTemplates.listTemplatesCmd() + cmd.templatefilter = services["templatefilter"] + cmd.id = template.id + list_template_response = self.apiclient.listTemplates(cmd) + + #Verify template response to check whether template added successfully or not + template_response = list_template_response[0] + self.assertNotEqual( + len(list_template_response), + 0, + "Check template avaliable in List Templates" + ) + + self.assertEqual( + template_response.displaytext, + services["template_1"]["displaytext"], + "Check display text of newly created template" + ) + self.assertEqual( + template_response.name, + services["template_1"]["name"], + "Check name of newly created template" + ) + self.assertEqual( + template_response.ostypeid, + services["template_1"]["ostypeid"], + "Check osTypeID of newly created template" + ) + + #Verify the database entry for template + self.debug( + "select name, display_text, guest_os_id from vm_template where id = %s;" + %template.id + ) + + qresultset = self.dbclient.execute( + "select name, display_text, guest_os_id from vm_template where id = %s;" + %template.id + ) + + self.assertNotEqual( + len(qresultset), + 0, + "Check DB Query result set" + ) + + qresult = qresultset[0] + + self.assertEqual( + qresult[0], + services["template_1"]["name"], + "Compare template name with database record" + ) + + self.assertEqual( + qresult[1], + services["template_1"]["displaytext"], + "Compare template display text with database record" + ) + + self.assertEqual( + qresult[2], + services["template_1"]["ostypeid"], + "Compare template osTypeID with database record" + ) + return + +class TestTemplates(cloudstackTestCase): + + @classmethod + def setUpClass(cls): + + cls.api_client = fetch_api_client() + #create virtual machines + cls.virtual_machine = VirtualMachine.create( + cls.api_client, + services["virtual_machine"] + ) + + #Stop virtual machine + cmd = stopVirtualMachine.stopVirtualMachineCmd() + cmd.id = cls.virtual_machine.id + cls.api_client.stopVirtualMachine(cmd) + + #Wait before server has be successfully stopped + time.sleep(30) + cmd = listVolumes.listVolumesCmd() + cmd.virtualmachineid = cls.virtual_machine.id + cmd.type = 'ROOT' + list_volume = cls.api_client.listVolumes(cmd) + cls.volume = list_volume[0] + + #Create templates for Edit, Delete & update permissions testcases + cls.template_1 = Template.create(cls.api_client, cls.volume, services["template_1"]) + cls.template_2 = Template.create(cls.api_client, cls.volume, services["template_2"]) + + @classmethod + def tearDownClass(cls): + try: + cls.api_client = fetch_api_client() + + #Cleanup created resources such as templates and VMs + cls.template_2.delete(cls.api_client) + cls.virtual_machine.delete(cls.api_client) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" %e) + + return + + def setUp(self): + + self.apiclient = self.testClient.getApiClient() + self.dbclient = self.testClient.getDbConnection() + self.cleanup = [] + return + + def tearDown(self): + try: + + self.dbclient.close() + #Clean up, terminate the created templates + cleanup_resources(self.apiclient, self.cleanup) + + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" %e) + + return + + def test_02_edit_template(self): + """Test Edit template + """ + + # Validate the following: + # 1. UI should show the edited values for template + # 2. database (vm_template table) should have updated values + + new_displayText = random_gen() + new_name = random_gen() + + cmd = updateTemplate.updateTemplateCmd() + # Update template attributes + cmd.id = self.template_1.id + cmd.displaytext = new_displayText + cmd.name = new_name + cmd.bootable = services["bootable"] + cmd.passwordenabled = services["passwordenabled"] + cmd.ostypeid = services["ostypeid"] + + self.apiclient.updateTemplate(cmd) + + # Verify template response for updated attributes + cmd = listTemplates.listTemplatesCmd() + cmd.templatefilter = services["templatefilter"] + cmd.id = self.template_1.id + + list_template_response = self.apiclient.listTemplates(cmd) + template_response = list_template_response[0] + + self.assertNotEqual( + len(list_template_response), + 0, + "Check template available in List Templates" + ) + + self.assertEqual( + template_response.displaytext, + new_displayText, + "Check display text of updated template" + ) + self.assertEqual( + template_response.name, + new_name, + "Check name of updated template" + ) + self.assertEqual( + str(template_response.passwordenabled).lower(), + str(services["passwordenabled"]).lower(), + "Check passwordenabled field of updated template" + ) + self.assertEqual( + template_response.ostypeid, + services["ostypeid"], + "Check OSTypeID of updated template" + ) + # Verify database entry for updated template attributes + self.debug( + "select name, display_text, bootable, enable_password, guest_os_id from vm_template where id = %s;" + %self.template_1.id + ) + + qresultset = self.dbclient.execute( + "select name, display_text, bootable, enable_password, guest_os_id from vm_template where id = %s;" + %self.template_1.id + ) + + self.assertNotEqual( + len(qresultset), + 0, + "Check DB Query result set" + ) + + qresult = qresultset[0] + + self.assertEqual( + qresult[0], + new_name, + "Compare template name with database record" + ) + + self.assertEqual( + qresult[1], + new_displayText, + "Compare template bootable field with database record" + ) + self.assertEqual( + qresult[2], + int(services["bootable"]), + "Compare template enable_password field with database record" + ) + self.assertEqual( + qresult[3], + int(services["passwordenabled"]), + "Compare template display text with database record" + ) + self.assertEqual( + qresult[4], + services["ostypeid"], + "Compare template guest OS ID with database record" + ) + return + + def test_03_delete_template(self): + """Test delete template + """ + + # Validate the following: + # 1. UI should not show the deleted template + # 2. database (vm_template table) should not contain deleted template + + self.template_1.delete(self.apiclient) + + cmd = listTemplates.listTemplatesCmd() + cmd.templatefilter = services["templatefilter"] + cmd.id = self.template_1.id + list_template_response = self.apiclient.listTemplates(cmd) + + # Verify template is deleted properly using ListTemplates + self.assertEqual(list_template_response, None, "Check if template exists in List Templates") + + # Verify database entry is removed for deleted template + self.debug( + "select name, display_text from vm_template where id = %s;" + %self.template_1.id + ) + + qresultset = self.dbclient.execute( + "select name, display_text from vm_template where id = %s;" + %self.template_1.id + ) + + self.assertEqual( + len(qresultset), + 1, + "Check DB Query result set" + ) + + return + + def test_04_extract_template(self): + "Test for extract template" + + # Validate the following + # 1. Admin should able extract and download the templates + # 2. ListTemplates should display all the public templates for all kind of users + # 3 .ListTemplates should not display the system templates + + cmd = extractTemplate.extractTemplateCmd() + cmd.id = self.template_2.id + cmd.mode = services["template_2"]["mode"] + cmd.zoneid = services["template_2"]["zoneid"] + list_extract_response = self.apiclient.extractTemplate(cmd) + + # Format URL to ASCII to retrieve response code + formatted_url = urllib.unquote_plus(list_extract_response.url) + url_response = urllib.urlopen(formatted_url) + response_code = url_response.getcode() + + self.assertEqual( + list_extract_response.id, + self.template_2.id, + "Check ID of the extracted template" + ) + self.assertEqual( + list_extract_response.extractMode, + services["template_2"]["mode"], + "Check mode of extraction" + ) + self.assertEqual( + list_extract_response.zoneid, + services["template_2"]["zoneid"], + "Check zone ID of extraction" + ) + self.assertEqual( + response_code, + 200, + "Check for a valid response download URL" + ) + return + + def test_05_template_permissions(self): + """Update & Test for template permissions""" + + # Validate the following + # 1. listTemplatePermissions returns valid permissions set for template + # 2. permission changes should be reflected in vm_template table in database + + cmd = updateTemplatePermissions.updateTemplatePermissionsCmd() + # Update template permissions + cmd.id = self.template_2.id + cmd.isfeatured = services["isfeatured"] + cmd.ispublic = services["ispublic"] + cmd.isextractable =services["isextractable"] + + self.apiclient.updateTemplatePermissions(cmd) + + cmd = listTemplates.listTemplatesCmd() + cmd.id = self.template_2.id + cmd.account = services["account"] + cmd.domainid = services["domainid"] + cmd.templatefilter = 'featured' + list_template_response = self.apiclient.listTemplates(cmd) + + # Verify template response for updated permissions for normal user + template_response = list_template_response[0] + + self.assertEqual( + template_response.id, + self.template_2.id, + "Check template ID" + ) + self.assertEqual( + template_response.ispublic, + int(True), + "Check ispublic permission of template" + ) + + self.assertNotEqual( + template_response.templatetype, + 'SYSTEM', + "ListTemplates should not list any system templates" + ) + + # Verify database entries for updated permissions + self.debug( + "select public, featured, extractable from vm_template where id = %s;" + %self.template_2.id + ) + qresultset = self.dbclient.execute( + "select public, featured, extractable from vm_template where id = %s;" + %self.template_2.id + ) + + self.assertNotEqual( + len(qresultset), + 0, + "Check DB Query result set" + ) + + qresult = qresultset[0] + + self.assertEqual( + qresult[0], + int(services["ispublic"]), + "Compare public permission with database record" + ) + + self.assertEqual( + qresult[1], + int(services["isfeatured"]), + "Compare featured permission with database record" + ) + self.assertEqual( + qresult[2], + int(services["isextractable"]), + "Compare extractable permission with database record" + ) + return + + def test_06_copy_template(self): + """Test for copy template from one zone to another""" + + # Validate the following + # 1. copy template should be successful and secondary storage should contain new copied template. + + cmd = copyTemplate.copyTemplateCmd() + cmd.id = self.template_2.id + cmd.destzoneid = services["destzoneid"] + cmd.sourcezoneid = services["sourcezoneid"] + self.apiclient.copyTemplate(cmd) + + # Verify template is copied to another zone using ListTemplates + cmd = listTemplates.listTemplatesCmd() + cmd.id = self.template_2.id + cmd.templatefilter = services["templatefilter"] + list_template_response = self.apiclient.listTemplates(cmd) + + template_response = list_template_response[0] + self.assertNotEqual( + len(list_template_response), + 0, + "Check template extracted in List Templates" + ) + self.assertEqual( + template_response.id, + self.template_2.id, + "Check ID of the downloaded template" + ) + self.assertEqual( + template_response.zoneid, + services["destzoneid"], + "Check zone ID of the copied template" + ) + return + + def test_07_list_public_templates(self): + """Test only public templates are visible to normal user""" + + # Validate the following + # 1. ListTemplates should show only 'public' templates for normal user + + cmd = listTemplates.listTemplatesCmd() + cmd.templatefilter = 'featured' + cmd.account = services["account"] + cmd.domainid = services["domainid"] + + list_template_response = self.apiclient.listTemplates(cmd) + + self.assertNotEqual( + len(list_template_response), + 0, + "Check template available in List Templates" + ) + #Template response should list all 'public' templates + for k in range(len(list_template_response)): + self.assertEqual( + list_template_response[k].ispublic, + True, + "ListTemplates should list only public templates" + ) + return + + def test_08_list_system_templates(self): + """Test System templates are not visible to normal user""" + + # Validate the following + # 1. ListTemplates should not show 'SYSTEM' templates for normal user + + cmd = listTemplates.listTemplatesCmd() + cmd.templatefilter = 'featured' + cmd.account = services["account"] + cmd.domainid = services["domainid"] + + list_template_response = self.apiclient.listTemplates(cmd) + + self.assertNotEqual( + len(list_template_response), + 0, + "Check template available in List Templates" + ) + + for k in range(len(list_template_response)): + self.assertNotEqual( + list_template_response[k].templatetype, + 'SYSTEM', + "ListTemplates should not list any system templates" + ) + return diff --git a/tools/testClient/testcase/BVT-tests/test_vm_life_cycle.py b/tools/testClient/testcase/BVT-tests/test_vm_life_cycle.py index f161b08766d..a1351ff4b18 100644 --- a/tools/testClient/testcase/BVT-tests/test_vm_life_cycle.py +++ b/tools/testClient/testcase/BVT-tests/test_vm_life_cycle.py @@ -1,6 +1,6 @@ # -*- encoding: utf-8 -*- # -# Copyright (c) 2011 Citrix. All rights reserved. +# Copyright (c) 2012 Citrix. All rights reserved. # """ BVT tests for Virtual Machine Life Cycle @@ -524,19 +524,16 @@ class TestVMLifeCycle(cloudstackTestCase): cmd.id = self.small_virtual_machine.id self.apiclient.destroyVirtualMachine(cmd) + # Wait for expunge.delay + cmd = listConfigurations.listConfigurationsCmd() + cmd.name = 'expunge.delay' + response = self.apiclient.listConfigurations(cmd)[0] + + time.sleep(int(response.value)) + cmd = listVirtualMachines.listVirtualMachinesCmd() cmd.id = self.small_virtual_machine.id - - timeout = 50 - while True : - list_vm_response = self.apiclient.listVirtualMachines(cmd) - if not list_vm_response: - break; - else: - if timeout == 0: - break - time.sleep(100) - timeout = timeout - 1 + list_vm_response = self.apiclient.listVirtualMachines(cmd) self.assertEqual( list_vm_response, @@ -633,4 +630,4 @@ class TestVMLifeCycle(cloudstackTestCase): result = services["diskdevice"] in res[0].split() self.assertEqual(result, False, "Check if ISO is detached from virtual machine") - return + return \ No newline at end of file diff --git a/tools/testClient/testcase/BVT-tests/test_volumes.py b/tools/testClient/testcase/BVT-tests/test_volumes.py index afc639c59f6..db4478cffe5 100644 --- a/tools/testClient/testcase/BVT-tests/test_volumes.py +++ b/tools/testClient/testcase/BVT-tests/test_volumes.py @@ -1,6 +1,6 @@ # -*- encoding: utf-8 -*- # -# Copyright (c) 2011 Citrix. All rights reserved. +# Copyright (c) 2012 Citrix. All rights reserved. # """ BVT tests for Volumes """ diff --git a/tools/testClient/testcase/BVT-tests/utils.py b/tools/testClient/testcase/BVT-tests/utils.py index bd048e8a34b..e4b20dcfa3b 100644 --- a/tools/testClient/testcase/BVT-tests/utils.py +++ b/tools/testClient/testcase/BVT-tests/utils.py @@ -1,6 +1,6 @@ # -*- encoding: utf-8 -*- # -# Copyright (c) 2011 Citrix. All rights reserved. +# Copyright (c) 2012 Citrix. All rights reserved. # """Utilities functions @@ -22,9 +22,8 @@ def random_gen(size=6, chars=string.ascii_uppercase + string.digits): def cleanup_resources(api_client, resources): """Delete resources""" - for k,v in resources.items(): - for obj in v: - obj.delete(api_client) + for obj in resources: + obj.delete(api_client) def is_server_ssh_ready(ipaddress, port, username, password, retries=50): """Return ssh handle else wait till sshd is running"""