Add support for the following tests:

1. All the Pending Networking tests
2. SSVM Tests
3. CPVM Tests
4. Router Tests
5. Hosts Tests
6. Primary Storage Tests
This commit is contained in:
Chirag Jog 2012-01-14 21:54:11 -08:00
parent 890774db04
commit 57a4a76691
16 changed files with 2823 additions and 235 deletions

View File

@ -1,6 +1,6 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2011 Citrix. All rights reserved.
# Copyright (c) 2012 Citrix. All rights reserved.
#
""" Base class for all Cloudstack resources - Virtual machine, Volume, Snapshot etc
@ -26,25 +26,7 @@ class Account:
cmd.lastname = services["lastname"]
cmd.password = services["password"]
cmd.username = services["username"]
account = apiclient.createAccount(cmd)
#create a network
cmd = createNetwork.createNetworkCmd()
cmd.name = cmd.displaytext = "Virtual Network"
cmd.networkofferingid = services["networkofferingid"]
cmd.account = account.account.name
cmd.domainid = account.account.domainid
cmd.zoneid = services["zoneid"]
account.network = apiclient.createNetwork(cmd)
#Allocate the Source NAT IP Address
account.public_ip = PublicIPAddress.create(
apiclient,
accountid = account.account.name,
zoneid = services["zoneid"],
domainid = account.account.domainid
)
return Account(account.__dict__)
@ -70,15 +52,19 @@ class VirtualMachine:
cmd = deployVirtualMachine.deployVirtualMachineCmd()
cmd.serviceofferingid = services["serviceoffering"]
cmd.zoneid = services["zoneid"]
cmd.hypervisor = services["hypervisor"]
cmd.account = accountid or services["account"]
cmd.domainid = services["domainid"]
if networkids:
cmd.networkids = networkids
elif "networkids" in services:
cmd.networkids = services["networkids"]
cmd.hypervisor = services["hypervisor"]
cmd.account = accountid or services["accountid"]
cmd.domainid = services["domainid"]
cmd.templateid = templateid or services["template"]
if templateid:
cmd.templateid = templateid
elif "template" in services:
cmd.templateid = services["template"]
if "diskoffering" in services:
cmd.diskofferingid = services["diskoffering"]
@ -210,9 +196,22 @@ class Template:
cmd.displaytext = services["displaytext"]
cmd.name = "-".join([services["name"], random_gen()])
cmd.ostypeid = services["ostypeid"]
cmd.isfeatured = services["isfeatured"] or False
cmd.ispublic = services["ispublic"] or False
cmd.isextractable = services["isextractable"] or False
if "isfeatured" in services:
cmd.isfeatured = services["isfeatured"]
else:
cmd.isfeatured = False
if "ispublic" in services:
cmd.ispublic = services["ispublic"]
else:
cmd.ispublic = False
if "isextractable" in services:
cmd.isextractable = services["isextractable"]
else:
cmd.isextractable = False
cmd.volumeid = volume.id
return Template(apiclient.createTemplate(cmd).__dict__)
@ -278,18 +277,18 @@ class Iso:
return
class PublicIPAddress():
class PublicIPAddress:
"""Manage Public IP Addresses"""
def __init__(self, items):
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, accountid, zoneid = 1, domainid = 1):
def create(cls, apiclient, accountid, zoneid = None, domainid = None):
cmd = associateIpAddress.associateIpAddressCmd()
cmd.account = accountid
cmd.zoneid = zoneid
cmd.domainid = domainid
cmd.zoneid = zoneid or services["zoneid"]
cmd.domainid = domainid or services["domainid"]
return PublicIPAddress(apiclient.associateIpAddress(cmd).__dict__)
def delete(self, apiclient):
@ -406,7 +405,7 @@ class LoadBalancerRule:
def create(cls, apiclient, services, ipaddressid, accountid=None):
cmd = createLoadBalancerRule.createLoadBalancerRuleCmd()
cmd.publicipid = ipaddressid or services["ipaddressid"]
cmd.account = accountid or services["accountid"]
cmd.account = accountid or services["account"]
cmd.name = services["name"]
cmd.algorithm = services["alg"]
cmd.privateport = services["privateport"]
@ -431,3 +430,101 @@ class LoadBalancerRule:
cmd.virtualmachineids = [vm.id for vm in vms]
self.apiclient.removeFromLoadBalancerRule(cmd)
return
class Cluster:
"""Manage Cluster life cycle"""
def __init__(self, items):
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, services):
cmd = addCluster.addClusterCmd()
cmd.clustertype = services["clustertype"]
cmd.hypervisor = services["hypervisor"]
cmd.zoneid = services["zoneid"]
cmd.podid = services["podid"]
if "username" in services:
cmd.username = services["username"]
if "password" in services:
cmd.password = services["password"]
if "url" in services:
cmd.url = services["url"]
if "clustername" in services:
cmd.clustername = services["clustername"]
return Cluster(apiclient.addCluster(cmd)[0].__dict__)
def delete(self, apiclient):
cmd = deleteCluster.deleteClusterCmd()
cmd.id = self.id
apiclient.deleteCluster(cmd)
return
class Host:
"""Manage Host life cycle"""
def __init__(self, items):
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, cluster, services):
cmd = addHost.addHostCmd()
cmd.hypervisor = services["hypervisor"]
cmd.url = services["url"]
cmd.zoneid = services["zoneid"]
cmd.clusterid = cluster.id
cmd.podid = services["podid"]
if "clustertype" in services:
cmd.clustertype = services["clustertype"]
if "username" in services:
cmd.username = services["username"]
if "password" in services:
cmd.password = services["password"]
return Host(apiclient.addHost(cmd).__dict__)
def delete(self, apiclient):
# Host must be in maintenance mode before deletion
cmd = prepareHostForMaintenance.prepareHostForMaintenanceCmd()
cmd.id = self.id
apiclient.prepareHostForMaintenance(cmd)
time.sleep(60)
cmd = deleteHost.deleteHostCmd()
cmd.id = self.id
apiclient.deleteHost(cmd)
return
class StoragePool:
"""Manage Storage pools"""
def __init__(self, items):
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, services):
cmd = createStoragePool.createStoragePoolCmd()
cmd.name = services["name"]
cmd.podid = services["podid"]
cmd.url = services["url"]
cmd.clusterid = services["clusterid"]
cmd.zoneid = services["zoneid"]
return StoragePool(apiclient.createStoragePool(cmd).__dict__)
def delete(self, apiclient):
# Storage pool must be in maintenance mode before deletion
cmd = enableStorageMaintenance.enableStorageMaintenanceCmd()
cmd.id = self.id
apiclient.enableStorageMaintenance(cmd)
time.sleep(60)
cmd = deleteStoragePool.deleteStoragePoolCmd()
cmd.id = self.id
apiclient.deleteStoragePool(cmd)
return

View File

@ -1,6 +1,6 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2011 Citrix. All rights reserved.
# Copyright (c) 2012 Citrix. All rights reserved.
#
"""Test Information Services
"""
@ -347,16 +347,16 @@ TEST_NETWORK_SERVICES = {
"email" : "test@test.com",
"firstname" : "Test",
"lastname" : "User",
"username" : "testuser79",
"username" : "testuser1",
"password" : "fr3sca",
"zoneid" : 1,
"networkofferingid" : 6,
},
"server" :
{
"template" : 256,
"template" : 206,
"zoneid" : 1,
"serviceoffering" : 40,
"serviceoffering" : 1,
"diskoffering" : 3,
"displayname" : "testserver",
"username" : "root",
@ -365,10 +365,10 @@ TEST_NETWORK_SERVICES = {
"hypervisor":'XenServer',
"account":'admin',
"domainid":1,
"ipaddressid":3,
"ipaddressid":10,
"privateport":22,
"publicport":22,
"ipaddress":'69.41.185.229',
"ipaddress":'192.168.100.250',
"protocol":'TCP',
},
"natrule" :
@ -381,9 +381,177 @@ TEST_NETWORK_SERVICES = {
{
"name" : "SSH",
"alg" : "roundrobin",
"privateport" : 22,
"publicport" : 22,
"privateport" : 80,
"publicport" : 80,
}
}
TEST_SSVM_SERVICES = {
"ssvm": {
"id": 1,
"zoneid": 1,
"publicport": 22,
"username":'root',
"password" : 'fr3sca',
},
"cpvm": {
"id": 2,
"zoneid": 1,
"publicport": 22,
"username":'root',
"password" : 'fr3sca',
"mgmtserverIP": '192.168.100.154'
},
"host": {
"username":'root',
"password" : 'fr3sca',
"publicport": 22,
},
}
TEST_HOSTS_SERVICES = {
"clusters":{
0 : {
"clustername": "Xen Cluster",
"clustertype":"CloudManaged",
"hypervisor": "XenServer",
"zoneid": 1,
"podid":1,
},
1 : {
# TODO
"clustername": "KVM Cluster",
"clustertype":"CloudManaged",
"hypervisor": "KVM",
"zoneid": 1,
"podid":1,
},
2 : {
"hypervisor": 'VMware',
"clustertype": 'ExternalManaged',
"zoneid": 1,
"podid": 1,
"username": 'administrator',
"password": 'fr3sca',
"url":'http://192.168.100.17/CloudStack-Clogeny-Pune/Pune-1',
"clustername": '192.168.100.17/CloudStack-Clogeny-Pune/Pune-1',
},
},
"hosts" :{
"xenserver" : {
"zoneid": 1,
"podid": 1,
"clusterid":16,
"hypervisor":'XenServer',
"clustertype": 'ExternalManaged',
"url": 'http://192.168.100.210',
"username" : "administrator",
"password" : "fr3sca",
},
"kvm" : {
"zoneid": 1,
"podid": 1,
"clusterid":16,
"hypervisor":'KVM',
"clustertype": 'ExternalManaged',
"url": 'http://192.168.100.203',
"username" : "administrator",
"password" : "fr3sca",
},
"vmware" : {
"zoneid": 1,
"podid": 1,
"clusterid":16,
"hypervisor":'VMware',
"clustertype": 'ExternalManaged',
"url": 'http://192.168.100.203',
"username" : "administrator",
"password" : "fr3sca",
},
}
}
TEST_PRIMARY_STORAGE_SERVICES = {
"nfs":{
0 : {
"url": "nfs://192.168.100.131/Primary",
#Format: File_System_Type/Location/Path
"name": "Primary XEN",
"podid" : 1,
"clusterid" : 1, #XEN Cluster
"zoneid" : 1,
},
1 : {
"url": "nfs://192.168.100.131/export",
"name": "Primary KVM",
"podid" : 2,
"clusterid" : 1, #KVM Cluster
"zoneid" : 1,
},
2 : {
"url": "nfs://192.168.100.131/Primary",
"name": "Primary VMWare",
"podid" : 1,
"clusterid" : 33, #VMWare Cluster
"zoneid" : 1,
},
},
"iscsi":{
0 : {
"url": "iscsi://192.168.100.21/export",
"name": "Primary XEN",
"podid" : 2,
"clusterid" : 1, #XEN Cluster
"zoneid" : 1,
# TODO : lun no., iqn no.
},
1 : {
"url": "iscsi://192.168.100.21/export",
"name": "Primary KVM",
"podid" : 2,
"clusterid" : 1, #KVM Cluster
"zoneid" : 1,
# TODO : lun no., iqn no.
},
},
}
TEST_SEC_STORAGE_SERVICES = {
"storage": {
"zoneid":1,
"url": "nfs://192.168.100.131/SecondaryStorage"
#Format: File_System_Type/Location/Path
}
}
TEST_ROUTER_SERVICES = {
"virtual_machine" :
{
"template" : 206,
"zoneid" : 1,
"serviceoffering" : 1,
"displayname" : "testserver",
"username" : "root",
"password" : "fr3sca",
"ssh_port" : 22,
"hypervisor":'XenServer',
"domainid":1,
"ipaddressid":10,
"privateport":22,
"publicport":22,
"ipaddress":'192.168.100.250',
"protocol":'TCP',
},
"account" : {
"email" : "test@test.com",
"firstname" : "Test",
"lastname" : "User",
"username" : "testuser1",
"password" : "fr3sca",
"zoneid" : 1,
"networkofferingid" : 6,
},
"sleep_time": 300,
}

View File

@ -1,6 +1,6 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2011 Citrix. All rights reserved.
# Copyright (c) 2012 Citrix. All rights reserved.
#
""" BVT tests for Disk offerings"""
@ -227,4 +227,3 @@ class TestDiskOfferings(cloudstackTestCase):
return

View File

@ -0,0 +1,118 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2012 Citrix. All rights reserved.
#
""" BVT tests for Hosts and Clusters
"""
#Import Local Modules
from cloudstackTestCase import *
from cloudstackAPI import *
from settings import *
from utils import *
from base import *
#Import System modules
import time
services = TEST_HOSTS_SERVICES
class TestHosts(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = []
return
def tearDown(self):
try:
self.dbclient.close()
#Clean up, terminate the created templates
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" %e)
return
def test_01_clusters(self):
"""Test Add clusters & hosts - XEN, KVM, VWARE
"""
# Validate the following:
# 1. Verify hypervisortype returned by API is Xen/KVM/VWare
# 2. Verify that the cluster is in 'Enabled' allocation state
# 3. Verify that the host is added successfully and in Up state with listHosts API response
#Create clusters with Hypervisor type XEN/KVM/VWare
for k,v in services["clusters"].items():
cluster = Cluster.create(self.apiclient, v)
self.assertEqual(
cluster.hypervisortype,
v["hypervisor"],
"Check hypervisor type of created cluster is " + v["hypervisor"] + " or not"
)
self.assertEqual(
cluster.allocationstate,
'Enabled',
"Check whether allocation state of cluster is enabled"
)
#If host is externally managed host is already added with cluster
cmd = listHosts.listHostsCmd()
cmd.clusterid = cluster.id
response = apiclient.listHosts(cmd)
if not len(response):
hypervisor_type = str(cluster.hypervisortype.lower())
host = Host.create(
self.apiclient,
cluster,
services["hosts"][hypervisor_type]
)
#Cleanup Host & Cluster
self.cleanup.append(host)
self.cleanup.append(cluster)
cmd = listHosts.listHostsCmd()
cmd.clusterid = cluster.id
list_hosts_response = self.apiclient.listHosts(cmd)
self.assertNotEqual(
len(list_hosts_response),
0,
"Check list Hosts response"
)
host_response = list_hosts_response[0]
#Check if host is Up and running
self.assertEqual(
host_response.state,
'Up',
"Check if state of host is Up or not"
)
#Verify List Cluster Response has newly added cluster
cmd = listClusters.listClustersCmd()
cmd.id = cluster.id
list_cluster_response = self.apiclient.listClusters(cmd)
self.assertNotEqual(
len(list_cluster_response),
0,
"Check list Hosts response"
)
cluster_response = list_cluster_response[0]
self.assertEqual(
cluster_response.id,
cluster.id,
"Check cluster ID with list clusters response"
)
self.assertEqual(
cluster_response.hypervisortype,
cluster.hypervisortype,
"Check hypervisor type with list clusters response is " + v["hypervisor"] + " or not"
)
return

View File

@ -1,6 +1,6 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2011 Citrix. All rights reserved.
# Copyright (c) 2012 Citrix. All rights reserved.
#
""" BVT tests for Templates ISO
"""

View File

@ -1,9 +1,9 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2011 Citrix. All rights reserved.
# Copyright (c) 2012 Citrix. All rights reserved.
#
""" BVT tests for Virtual Machine Life Cycle
""" BVT tests for Network Life Cycle
"""
#Import Local Modules
from cloudstackTestCase import *
@ -15,24 +15,20 @@ from base import *
#Import System modules
import time
services = TEST_NETWORK_SERVICES
class TestPublicIP(cloudstackTestCase):
"""Test Associate/Disassociate Public IP Addresses
"""
def setUp(self):
self.apiclient = self.testClient.getApiClient()
def test_public_ip_admin_account(self):
"""Test Associate/Disassociate IP address for Admin Account
"""
"""Test for Associate/Disassociate public IP address for admin account"""
# Validate the following:
# 1. listPubliIpAddresses API returns the list of acquired addresses
# 2. the returned list should contain our acquired IP address
ip_address = PublicIPAddress.create(
self.apiclient,
services["admin_account"],
@ -41,8 +37,10 @@ class TestPublicIP(cloudstackTestCase):
)
cmd = listPublicIpAddresses.listPublicIpAddressesCmd()
cmd.id = ip_address.ipaddress.id
list_pub_ip_addr_resp = self.apiclient.listPublicIpAddresses(cmd)
#listPublicIpAddresses should return newly created public IP
self.assertNotEqual(
len(list_pub_ip_addr_resp),
0,
@ -53,10 +51,12 @@ class TestPublicIP(cloudstackTestCase):
ip_address.ipaddress.id,
"Check Correct IP Address is returned in the List Cacls"
)
ip_address.delete(self.apiclient)
# Validate the following:
#1. listPublicIpAddresses API should no more return the released address
cmd = listPublicIpAddresses.listPublicIpAddressesCmd()
cmd.id = ip_address.ipaddress.id
list_pub_ip_addr_resp = self.apiclient.listPublicIpAddresses(cmd)
@ -66,16 +66,16 @@ class TestPublicIP(cloudstackTestCase):
None,
"Check if disassociated IP Address is no longer available"
)
return
def test_public_ip_user_account(self):
"""Test Associate/Disassociate IP address for Non-Admin User Account
"""
"""Test for Associate/Disassociate public IP address for user account"""
# Validate the following:
# 1. listPubliIpAddresses API returns the list of acquired addresses
# 2. the returned list should contain our acquired IP address
ip_address = PublicIPAddress.create(
self.apiclient,
services["user_account"],
@ -84,6 +84,8 @@ class TestPublicIP(cloudstackTestCase):
)
cmd = listPublicIpAddresses.listPublicIpAddressesCmd()
cmd.id = ip_address.ipaddress.id
#listPublicIpAddresses should return newly created public IP
list_pub_ip_addr_resp = self.apiclient.listPublicIpAddresses(cmd)
self.assertNotEqual(
@ -96,11 +98,9 @@ class TestPublicIP(cloudstackTestCase):
ip_address.ipaddress.id,
"Check Correct IP Address is returned in the List Call"
)
ip_address.delete(self.apiclient)
# Validate the following:
#1. listPublicIpAddresses API should no more return the released address
cmd = listPublicIpAddresses.listPublicIpAddressesCmd()
cmd.id = ip_address.ipaddress.id
list_pub_ip_addr_resp = self.apiclient.listPublicIpAddresses(cmd)
@ -110,11 +110,11 @@ class TestPublicIP(cloudstackTestCase):
None,
"Check if disassociated IP Address is no longer available"
)
return
class TestPortForwarding(cloudstackTestCase):
"""Test Port Forwarding Rules for Source and Non-Source IP Addresses
"""
@classmethod
def setUpClass(cls):
@ -125,31 +125,37 @@ class TestPortForwarding(cloudstackTestCase):
cls.api_client,
services["server"],
accountid=cls.account.account.name,
networkids=[str(cls.account.network.id)]
)
cls._cleanup = [cls.virtual_machine, cls.account]
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.cleanup = []
return
@classmethod
def tearDownClass(self):
def tearDownClass(cls):
cleanup_resources(cls.api_client, cls._cleanup)
return
def tearDown(self):
cleanup_resources(self.cleanup)
cleanup_resources(self.apiclient, self.cleanup)
return
def test_01_port_fwd_on_src_nat(self):
"""Port Forwarding Tests for Source NAT IP Addresses
"""
src_nat_ip_addr = self.account.public_ip.ipaddress
"""Test for port forwarding on source NAT"""
#Validate the following:
#1. listPortForwarding rules API should return the added PF rule
#2. attempt to do an ssh into the user VM through the sourceNAT
cmd = listPublicIpAddresses.listPublicIpAddressesCmd()
cmd.account = self.account.account.name
cmd.domainid = services["server"]["domainid"]
src_nat_ip_addr = self.apiclient.listPublicIpAddresses(cmd)[0]
#Create NAT rule
nat_rule = NATRule.create(self.apiclient, self.virtual_machine, services["natrule"], src_nat_ip_addr.id)
time.sleep(60)
@ -167,8 +173,7 @@ class TestPortForwarding(cloudstackTestCase):
nat_rule.id,
"Check Correct Port forwarding Rule is returned"
)
#SSH virtual machine to test port forwarding
try:
self.virtual_machine.get_ssh_client(src_nat_ip_addr.ipaddress)
except Exception as e:
@ -177,10 +182,6 @@ class TestPortForwarding(cloudstackTestCase):
nat_rule.delete(self.apiclient)
time.sleep(60)
#Validate the following:
#1. listPortForwardingRules should not return the deleted rule anymore
#2. attempt to do ssh should now fail
cmd = listPortForwardingRules.listPortForwardingRulesCmd()
cmd.id = nat_rule.id
list_nat_rule_response = self.apiclient.listPortForwardingRules(cmd)
@ -190,7 +191,7 @@ class TestPortForwarding(cloudstackTestCase):
None,
"Check Port Forwarding Rule is deleted"
)
self.debug("Check if the Public SSH port is inaccessible")
# Check if the Public SSH port is inaccessible
with self.assertRaises(Exception):
remoteSSHClient.remoteSSHClient(
ip.ipaddress,
@ -198,19 +199,29 @@ class TestPortForwarding(cloudstackTestCase):
self.virtual_machine.username,
self.virtual_machine.password
)
return
def test_02_port_fwd_on_non_src_nat(self):
"""Port Forwarding Tests for Non-Source NAT IP Addresses
"""
"""Test for port forwarding on non source NAT"""
ip_address = PublicIPAddress.create(self.apiclient, self.account)
#Validate the following:
#1. listPortForwardingRules should not return the deleted rule anymore
#2. attempt to do ssh should now fail
ip_address = PublicIPAddress.create(self.apiclient, self.account.account.name)
self.clean_up.append(ip_address)
nat_rule = NATRule.create(self.apiclient, self.virtual_machine, services)
#Create NAT rule
nat_rule = NATRule.create(
self.apiclient,
self.virtual_machine,
services["natrule"],
ip_address.ipaddress.id
)
time.sleep(60)
#Validate the following:
#1. listPortForwarding rules API should return the added PF rule
#2. attempt to do an ssh into the user VM through the sourceNAT
#1. listPortForwardingRules should not return the deleted rule anymore
#2. attempt to do ssh should now fail
cmd = listPortForwardingRules.listPortForwardingRulesCmd()
cmd.id = nat_rule.id
@ -227,19 +238,14 @@ class TestPortForwarding(cloudstackTestCase):
"Check Correct Port forwarding Rule is returned"
)
try:
self.virtual_machine.get_ssh_client(public_ip = ip_address.ipaddress)
self.virtual_machine.get_ssh_client(public_ip = ip_address.ipaddress.ipaddress)
except Exception as e:
self.fail("SSH Access failed for %s: %s" %(self.virtual_machine.ipaddress, e))
self.fail("SSH Access failed for %s: %s" %(self.virtual_machine.ipaddress.ipaddress, e))
nat_rule.delete(apiclient)
time.sleep(60)
#Validate the following:
#1. listPortForwardingRules should not return the deleted rule anymore
#2. attempt to do ssh should now fail
cmd = listPortForwardingRules.listPortForwardingRulesCmd()
cmd.id = nat_rule.id
list_nat_rule_response = self.apiclient.listPortForwardingRules(cmd)
@ -250,20 +256,18 @@ class TestPortForwarding(cloudstackTestCase):
None,
"Check Port Forwarding Rule is deleted"
)
self.debug("Check if the Public SSH port is inaccessible")
# Check if the Public SSH port is inaccessible
with self.assertRaises(Exception):
remoteSSHClient.remoteSSHClient(
ip_address.ipaddress,
ip_address.ipaddress.ipaddress,
self.virtual_machine.ssh_port,
self.virtual_machine.username,
self.virtual_machine.password
)
return
class TestLoadBalancingRule(cloudstackTestCase):
"""Test Load Balancing Rules for Source and Non-Source IP Addresses
"""
@classmethod
def setUpClass(cls):
@ -274,13 +278,11 @@ class TestLoadBalancingRule(cloudstackTestCase):
cls.api_client,
services["server"],
accountid=cls.account.account.name,
networkids=[str(cls.account.network.id)]
)
cls.vm_2 = VirtualMachine.create(
cls.api_client,
services["server"],
accountid=cls.account.account.name,
networkids=[str(cls.account.network.id)]
)
cls.non_src_nat_ip = PublicIPAddress.create(
cls.api_client,
@ -292,51 +294,79 @@ class TestLoadBalancingRule(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = []
return
def tearDown(self):
cleanup_resources(self.apiclient, self.cleanup)
return
@classmethod
def tearDownClass(cls):
cleanup_resources(cls.api_client, cls._cleanup)
return
def test_01_create_lb_rule_src_nat(self):
"""Test Port Forwarding Rules for Source NAT IP Addresses
"""
src_nat_ip_addr = self.account.public_ip.ipaddress
"""Test to create Load balancing rule with source NAT"""
lb_rule = LoadBalancerRule.create(
self.apiclient,
services["lbrule"],
src_nat_ip_addr.id,
accountid = self.account.account.name
)
self.cleanup.append(lb_rule)
lb_rule.assign(self.apiclient, [self.vm_1, self.vm_2])
# Validate the Following:
#1. listLoadBalancerRules should return the added rule
#2. attempt to ssh twice on the load balanced IP
#3. verify using the hostname of the VM that round robin is indeed happening as expected
cmd = listPublicIpAddresses.listPublicIpAddressesCmd()
cmd.account = self.account.account.name
cmd.domainid = services["server"]["domainid"]
src_nat_ip_addr = self.apiclient.listPublicIpAddresses(cmd)[0]
#Create Load Balancer rule and assign VMs to rule
lb_rule = LoadBalancerRule.create(
self.apiclient,
services["lbrule"],
src_nat_ip_addr.id,
accountid = self.account.account.name
)
self.cleanup.append(lb_rule)
lb_rule.assign(self.apiclient, [self.vm_1, self.vm_2])
cmd = listLoadBalancerRules.listLoadBalancerRulesCmd()
cmd.id = lb_rule.id
lb_rules = self.apiclient.listLoadBalancerRules(cmd)
#verify listLoadBalancerRules lists the added load balancing rule
self.assertNotEqual(
len(lb_rules),
0,
"Check Load Balancer Rule in its List"
)
self.assertEqual(
lb_rules[0].id,
lb_rule.id,
"Check List Load Balancer Rules returns valid Rule"
)
# listLoadBalancerRuleInstances should list all instances associated with that LB rule
cmd = listLoadBalancerRuleInstances.listLoadBalancerRuleInstancesCmd()
cmd.id = lb_rule.id
lb_instance_rules = self.apiclient.listLoadBalancerRuleInstances(cmd)
self.assertNotEqual(
len(lb_instance_rules),
0,
"Check Load Balancer instances Rule in its List"
)
self.assertEqual(
lb_instance_rules[0].id,
self.vm_2.id,
"Check List Load Balancer instances Rules returns valid VM ID associated with it"
)
self.assertEqual(
lb_instance_rules[1].id,
self.vm_1.id,
"Check List Load Balancer instances Rules returns valid VM ID associated with it"
)
ssh_1 = remoteSSHClient.remoteSSHClient(
src_nat_ip_addr.ipaddress,
@ -360,43 +390,74 @@ class TestLoadBalancingRule(cloudstackTestCase):
self.assertIn(self.vm_1.name, hostnames, "Check if ssh succeeded for server1")
self.assertIn(self.vm_2.name, hostnames, "Check if ssh succeeded for server2")
#SSH should pass till there is a last VM associated with LB rule
lb_rule.remove(self.apiclient, [self.vm_2])
hostnames.append(ssh_1.execute("hostname")[0])
self.assertIn(self.vm_1.name, hostnames, "Check if ssh succeeded for server1")
lb_rule.remove(self.apiclient, [self.vm_1])
with self.assertRaises(Exception):
ssh_1.execute("hostname")[0]
return
def test_02_create_lb_rule_non_nat(self):
"""Test Load Balancing Rules for Non-Source IP Addresses
"""
lb_rule = LoadBalancerRule.create(
self.apiclient,
services["lbrule"],
cls.non_src_nat_ip.ipaddress.id,
accountid = self.account.account.name
)
self.cleanup.append(lb_rule)
lb_rule.assign(self.apiclient, [self.vm_1, self.vm_2])
"""Test to create Load balancing rule with source NAT"""
# Validate the Following:
#1. listLoadBalancerRules should return the added rule
#2. attempt to ssh twice on the load balanced IP
#3. verify using the hostname of the VM that round robin is indeed happening as expected
#Create Load Balancer rule and assign VMs to rule
lb_rule = LoadBalancerRule.create(
self.apiclient,
services["lbrule"],
self.non_src_nat_ip.ipaddress.id,
accountid = self.account.account.name
)
self.cleanup.append(lb_rule)
lb_rule.assign(self.apiclient, [self.vm_1, self.vm_2])
cmd = listLoadBalancerRules.listLoadBalancerRulesCmd()
cmd.id = lb_rule.id
lb_rules = self.apiclient.listLoadBalancerRules(cmd)
#verify listLoadBalancerRules lists the added load balancing rule
self.assertNotEqual(
len(lb_rules),
0,
"Check Load Balancer Rule in its List"
)
self.assertEqual(
lb_rules[0].id,
lb_rule.id,
"Check List Load Balancer Rules returns valid Rule"
)
# listLoadBalancerRuleInstances should list all instances associated with that LB rule
cmd = listLoadBalancerRuleInstances.listLoadBalancerRuleInstancesCmd()
cmd.id = lb_rule.id
lb_instance_rules = self.apiclient.listLoadBalancerRuleInstances(cmd)
self.assertNotEqual(
len(lb_instance_rules),
0,
"Check Load Balancer instances Rule in its List"
)
self.assertEqual(
lb_instance_rules[0].id,
self.vm_2.id,
"Check List Load Balancer instances Rules returns valid VM ID associated with it"
)
self.assertEqual(
lb_instance_rules[1].id,
self.vm_1.id,
"Check List Load Balancer instances Rules returns valid VM ID associated with it"
)
ssh_1 = remoteSSHClient.remoteSSHClient(
cls.non_src_nat_ip.ipaddress.ipaddress,
self.non_src_nat_ip.ipaddress.ipaddress,
services['lbrule']["publicport"],
self.vm_1.username,
self.vm_1.password
@ -406,7 +467,7 @@ class TestLoadBalancingRule(cloudstackTestCase):
hostnames = [ssh_1.execute("hostname")[0]]
time.sleep(20)
ssh_2 = remoteSSHClient.remoteSSHClient(
cls.non_src_nat_ip.ipaddress.ipaddress,
self.non_src_nat_ip.ipaddress.ipaddress,
services['lbrule']["publicport"],
self.vm_1.username,
self.vm_1.password
@ -417,11 +478,18 @@ class TestLoadBalancingRule(cloudstackTestCase):
self.assertIn(self.vm_1.name, hostnames, "Check if ssh succeeded for server1")
self.assertIn(self.vm_2.name, hostnames, "Check if ssh succeeded for server2")
#SSH should pass till there is a last VM associated with LB rule
lb_rule.remove(self.apiclient, [self.vm_2])
hostnames.append(ssh_1.execute("hostname")[0])
self.assertIn(self.vm_1.name, hostnames, "Check if ssh succeeded for server1")
lb_rule.remove(self.apiclient, [self.vm_1])
with self.assertRaises(Exception):
ssh_1.execute("hostname")[0]
return
class TestRebootRouter(cloudstackTestCase):
"""Test Load Balancing Rules work post Router Reboot
"""
def setUp(self):
self.apiclient = self.testClient.getApiClient()
@ -431,9 +499,9 @@ class TestRebootRouter(cloudstackTestCase):
self.apiclient,
services["server"],
accountid=self.account.account.name,
networkids=[str(self.account.network.id)]
)
src_nat_ip_addr = self.account.public_ip.ipaddress
lb_rule = LoadBalancerRule.create(
self.apiclient,
services["lbrule"],
@ -442,14 +510,17 @@ class TestRebootRouter(cloudstackTestCase):
)
lb_rule.assign(self.apiclient, [self.vm_1])
#nat_rule = NATRule.create(self.apiclient, self.vm_1, services["natrule"], src_nat_ip_addr.id)
self.cleanup = [self.vm_1, lb_rule, self.account]
self.cleanup = [self.vm_1, lb_rule, account]
return
def test_reboot_router(self):
"""Test for reboot router"""
#Validate the Following
#1. Post restart PF and LB rules should still function
#2. verify if the ssh into the virtual machine still works through the sourceNAT Ip
#Retrieve router for the user account
cmd = listRouters.listRoutersCmd()
cmd.account = self.account.account.name
cmd.domainid = self.account.account.domainid
@ -459,9 +530,15 @@ class TestRebootRouter(cloudstackTestCase):
cmd = rebootRouter.rebootRouterCmd()
cmd.id = router.id
self.apiclient.rebootRouter(cmd)
#Sleep to ensure router is rebooted properly
time.sleep(60)
src_nat_ip_addr = self.account.public_ip.ipaddress
cmd = listPublicIpAddresses.listPublicIpAddressesCmd()
cmd.account = self.account.account.name
cmd.domainid = services["server"]["domainid"]
src_nat_ip_addr = self.apiclient.listPublicIpAddresses(cmd)[0]
#we should be able to SSH after successful reboot
try:
remoteSSHClient.remoteSSHClient(
src_nat_ip_addr.ipaddress,
@ -471,57 +548,62 @@ class TestRebootRouter(cloudstackTestCase):
)
except Exception as e:
self.fail("SSH Access failed for %s: %s" %(self.vm_1.ipaddress, e))
return
def tearDown(self):
cleanup_resources(self.cleanup)
cleanup_resources(self.apiclient, self.cleanup)
return
class TestAssignRemoveLB(cloudstackTestCase):
"""Assign Load Balancer Rule to two Virtual Machines, Remove One VM
and associate another VM.
"""
def setUp(self):
self.apiclient = self.testClient.getApiClient()
#Create VMs, accounts
self.account = Account.create(self.apiclient, services["account"], admin=True)
self.vm_1 = VirtualMachine.create(
self.apiclient,
services["server"],
accountid=self.account.account.name,
networkids=[str(self.account.network.id)]
)
self.apiclient,
services["server"],
accountid=self.account.account.name,
)
self.vm_2 = VirtualMachine.create(
self.apiclient,
services["server"],
accountid=self.account.account.name,
networkids=[str(self.account.network.id)]
)
self.apiclient,
services["server"],
accountid=self.account.account.name,
)
self.vm_3 = VirtualMachine.create(
self.apiclient,
services["server"],
accountid=self.account.account.name,
networkids=[str(self.account.network.id)]
)
self.non_src_nat_ip = self.account.public_ip.ipaddress
self.apiclient,
services["server"],
accountid=self.account.account.name,
)
self.cleanup = [self.vm_1, self.vm_2, self.vm_3]
return
def test_assign_and_removal_elb(self):
"""Test for assign & removing load balancing rule"""
#Validate:
#1. Verify list API - listLoadBalancerRules lists all the rules with the relevant ports
#2. listLoadBalancerInstances will list the instances associated with the corresponding rule.
#3. verify ssh attempts should pass as long as there is at least one instance associated with the rule
cmd = listPublicIpAddresses.listPublicIpAddressesCmd()
cmd.account = self.account.account.name
cmd.domainid = services["server"]["domainid"]
self.non_src_nat_ip = self.apiclient.listPublicIpAddresses(cmd)[0]
lb_rule = LoadBalancerRule.create(
self.apiclient,
services["lbrule"],
self.non_src_nat_ip.id,
self.account.account.name
)
self.apiclient,
services["lbrule"],
self.non_src_nat_ip.id,
self.account.account.name
)
self.cleanup.append(lb_rule)
lb_rule.assign(self.apiclient, [self.vm_1, self.vm_2])
#Create SSH client for each VM
ssh_1 = remoteSSHClient.remoteSSHClient(
self.non_src_nat_ip.ipaddress,
services["natrule"]["publicport"],
@ -549,6 +631,7 @@ class TestAssignRemoveLB(cloudstackTestCase):
self.assertIn(self.vm_1.name, res_1, "Check if ssh succeeded for server1")
self.assertIn(self.vm_2.name, res_2, "Check if ssh succeeded for server2")
#Removing VM and assigning another VM to LB rule
lb_rule.remove(self.apiclient, [self.vm_2])
res_1 = ssh_1.execute("hostname")[0]
@ -562,6 +645,161 @@ class TestAssignRemoveLB(cloudstackTestCase):
self.assertIn(self.vm_1.name, res_1, "Check if ssh succeeded for server1")
self.assertIn(self.vm_3.name, res_3, "Check if ssh succeeded for server3")
return
def teardown(self):
cleanup_resources(self.cleanup)
cleanup_resources(self.apiclient, self.cleanup)
return
class TestReleaseIP(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
#Create an account, network, VM, Port forwarding rule, LB rules and IP addresses
self.account = Account.create(self.apiclient, services["account"], admin=True)
self.virtual_machine = VirtualMachine.create(
self.apiclient,
services["server"],
accountid=self.account.account.name,
)
self.ip_address = PublicIPAddress.create(
self.apiclient,
self.account.account.name,
services["zoneid"],
self.account.account.domainid
)
cmd = listPublicIpAddresses.listPublicIpAddressesCmd()
cmd.account = self.account.account.name
cmd.domainid = services["server"]["domainid"]
self.ip_addr = self.apiclient.listPublicIpAddresses(cmd)[0]
self.nat_rule = NATRule.create(self.apiclient, self.virtual_machine, services["natrule"], self.ip_addr.id)
self.lb_rule = LoadBalancerRule.create(self.apiclient, services["lbrule"], self.ip_addr.id, accountid = self.account.account.name)
self.cleanup = [self.virtual_machine, self.account]
return
def teardown(self):
cleanup_resources(self.apiclient,self.cleanup)
def test_releaseIP(self):
"""Test for Associate/Disassociate public IP address"""
self.ip_address.delete(self.apiclient)
# ListPublicIpAddresses should not list deleted Public IP address
cmd = listPublicIpAddresses.listPublicIpAddressesCmd()
cmd.id = self.ip_addr.id
list_pub_ip_addr_resp = self.apiclient.listPublicIpAddresses(cmd)
self.assertEqual(
list_pub_ip_addr_resp,
None,
"Check if disassociated IP Address is no longer available"
)
# ListPortForwardingRules should not list associated rules with Public IP address
cmd = listPortForwardingRules.listPortForwardingRulesCmd()
cmd.id = self.nat_rule.id
list_nat_rules = self.apiclient.listPortForwardingRules(cmd)
self.assertEqual(
list_nat_rules,
None,
"Check if Port forwarding rules for disassociated IP Address are no longer available"
)
# listLoadBalancerRules should not list associated rules with Public IP address
cmd = listLoadBalancerRules.listLoadBalancerRulesCmd()
cmd.id = self.lb_rule.id
list_lb_rules = self.apiclient.listLoadBalancerRules(cmd)
self.assertEqual(
list_lb_rules,
None,
"Check if LB rules for disassociated IP Address are no longer available"
)
# SSH Attempt though public IP should fail
with self.assertRaises(Exception):
ssh_2 = remoteSSHClient.remoteSSHClient(
self.ip_addr.ipaddress,
services["natrule"]["publicport"],
self.virtual_machine.username,
self.virtual_machine.password
)
return
class TestDeleteAccount(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
#Create an account, network, VM and IP addresses
self.account = Account.create(self.apiclient, services["account"], admin=True)
self.vm_1 = VirtualMachine.create(
self.apiclient,
services["server"],
accountid=self.account.account.name,
)
cmd = listPublicIpAddresses.listPublicIpAddressesCmd()
cmd.account = self.account.account.name
cmd.domainid = services["server"]["domainid"]
src_nat_ip_addr = self.apiclient.listPublicIpAddresses(cmd)[0]
self.lb_rule = LoadBalancerRule.create(
self.apiclient,
services["lbrule"],
src_nat_ip_addr.id,
self.account.account.name
)
self.lb_rule.assign(self.apiclient, [self.vm_1])
self.nat_rule = NATRule.create(self.apiclient, self.vm_1, services["natrule"], src_nat_ip_addr.id)
self.cleanup = []
return
def test_delete_account(self):
"""Test for delete account"""
#Validate the Following
# 1. after account.cleanup.interval (global setting) time all the PF/LB rules should be deleted
# 2. verify that list(LoadBalancer/PortForwarding)Rules API does not return any rules for the account
# 3. The domR should have been expunged for this account
self.account.delete(self.apiclient)
time.sleep(120)
# ListLoadBalancerRules should not list associated rules with deleted account
cmd = listLoadBalancerRules.listLoadBalancerRulesCmd()
cmd.account = self.account.account.name
cmd.domainid = services["server"]["domainid"]
# Unable to find account testuser1 in domain 1 : Exception
with self.assertRaises(Exception):
self.apiclient.listLoadBalancerRules(cmd)
# ListPortForwardingRules should not list associated rules with deleted account
cmd = listPortForwardingRules.listPortForwardingRulesCmd()
cmd.account = self.account.account.name
cmd.domainid = services["server"]["domainid"]
with self.assertRaises(Exception):
self.apiclient.listPortForwardingRules(cmd)
#Retrieve router for the user account
cmd = listRouters.listRoutersCmd()
cmd.account = self.account.account.name
cmd.domainid = self.account.account.domainid
with self.assertRaises(Exception):
routers = self.apiclient.listRouters(cmd)
return
def tearDown(self):
cleanup_resources(self.apiclient, self.cleanup)
return

View File

@ -0,0 +1,138 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2012 Citrix. All rights reserved.
#
""" BVT tests for Primary Storage
"""
#Import Local Modules
from cloudstackTestCase import *
from cloudstackAPI import *
from settings import *
import remoteSSHClient
from utils import *
from base import *
#Import System modules
import time
services = TEST_PRIMARY_STORAGE_SERVICES
class TestPrimaryStorageServices(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.cleanup = []
return
def tearDown(self):
try:
#Clean up, terminate the created templates
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" %e)
return
def test_01_primary_storage(self):
"""Test primary storage pools - XEN, KVM, VMWare
"""
# Validate the following:
# 1. verify hypervisortype returned by api is Xen/KVM/VMWare
# 2. verify that the cluster is in 'Enabled' allocation state
# 3. verify that the host is added successfully and in Up state with listHosts api response
#Create NFS storage pools with on XEN/KVM/VMWare clusters
for k,v in services["nfs"].items():
#Host should be present before adding primary storage
cmd = listHosts.listHostsCmd()
cmd.clusterid = v["clusterid"]
list_hosts_response = self.apiclient.listHosts(cmd)
self.assertNotEqual(
len(list_hosts_response),
0,
"Check list Hosts response for hypervisor type : " + v["hypervisor"]
)
storage = StoragePool.create(self.apiclient, v)
self.cleanup.append(storage)
self.assertEqual(
storage.state,
'Up',
"Check state of primary storage is Up or not for hypervisor type : " + v["hypervisor"]
)
self.assertEqual(
storage.type,
'NetworkFilesystem',
"Check type of the storage pool created for hypervisor type : " + v["hypervisor"]
)
#Verify List Storage pool Response has newly added storage pool
cmd = listStoragePools.listStoragePoolsCmd()
cmd.id = storage.id
storage_pools_response = self.apiclient.listStoragePools(cmd)
self.assertNotEqual(
len(storage_pools_response),
0,
"Check list Hosts response"
)
storage_response = storage_pools_response[0]
self.assertEqual(
storage_response.id,
storage.id,
"Check storage pool ID with list storage pools response for hypervisor type : " + v["hypervisor"]
)
self.assertEqual(
storage.type,
storage_response.type,
"Check type of the storage pool for hypervisor type : " + v["hypervisor"]
)
# Call cleanup for reusing primary storage
cleanup_resources(self.apiclient, self.cleanup)
self.cleanup = []
# Create iSCSI storage pools with on XEN/KVM clusters
for k,v in services["iscsi"].items():
storage = StoragePool.create(self.apiclient, v)
self.cleanup.append(storage)
self.assertEqual(
storage.state,
'Up',
"Check state of primary storage is Up or not for hypervisor type : " + v["hypervisor"]
)
#Verify List Storage pool Response has newly added storage pool
cmd = listStoragePools.listStoragePoolsCmd()
cmd.id = storage.id
storage_pools_response = self.apiclient.listStoragePools(cmd)
self.assertNotEqual(
len(storage_pools_response),
0,
"Check list Hosts response for hypervisor type : " + v["hypervisor"]
)
storage_response = storage_pools_response[0]
self.assertEqual(
storage_response.id,
storage.id,
"Check storage pool ID with list storage pools response for hypervisor type : " + v["hypervisor"]
)
self.assertEqual(
storage.type,
storage_response.type,
"Check type of the storage pool for hypervisor type : " + v["hypervisor"]
)
# Call cleanup for reusing primary storage
cleanup_resources(self.apiclient, self.cleanup)
self.cleanup = []
return

View File

@ -0,0 +1,537 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2012 Citrix. All rights reserved.
#
""" BVT tests for routers
"""
#Import Local Modules
from cloudstackTestCase import *
from cloudstackAPI import *
from settings import *
import remoteSSHClient
from utils import *
from base import *
#Import System modules
import time
services = TEST_ROUTER_SERVICES
class TestRouterServices(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
#Create an account, network, VM and IP addresses
cls.account = Account.create(cls.api_client, services["account"])
cls.vm_1 = VirtualMachine.create(
cls.api_client,
services["virtual_machine"],
accountid=cls.account.account.name,
)
cls.cleanup = [cls.vm_1, cls.account]
return
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
#Clean up, terminate the created templates
cleanup_resources(cls.api_client, cls.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" %e)
return
def setUp(self):
self.apiclient = self.testClient.getApiClient()
return
def test_01_router_basic(self):
"""Test router basic setup
"""
# Validate the following:
# 1. verify that listRouters returned a 'Running' router
# 2. router will have dns same as that seen in listZones
# 3. router will have a guestIP and a linkLocalIp"
cmd = listRouters.listRoutersCmd()
list_router_response = self.apiclient.listRouters(cmd)
self.assertNotEqual(
len(list_router_response),
0,
"Check list router response"
)
for i in range(len(list_router_response)):
self.assertEqual(
list_router_response[i].state,
'Running',
"Check list router response for router state"
)
cmd = listZones.listZonesCmd()
cmd.zoneid = list_router_response[i].zoneid
zone = self.apiclient.listZones(cmd)[0]
self.assertEqual(
list_router_response[i].dns1,
zone.dns1,
"Compare DNS1 of router and zone"
)
self.assertEqual(
list_router_response[i].dns2,
zone.dns2,
"Compare DNS2 of router and zone"
)
self.assertEqual(
hasattr(list_router_response[i],'guestipaddress'),
True,
"Check whether router has guest IP field"
)
self.assertEqual(
hasattr(list_router_response[i],'linklocalip'),
True,
"Check whether router has link local IP field"
)
return
def test_02_router_advanced(self):
"""Test router advanced setup
"""
# Validate the following
# 1. verify that listRouters returned a 'Running' router
# 2. router will have dns and gateway as in listZones, listVlanIpRanges
# 3. router will have guest,public and linklocal IPs
cmd = listRouters.listRoutersCmd()
cmd.account = self.account.account.name
cmd.domainid = services["virtual_machine"]["domainid"]
list_router_response = self.apiclient.listRouters(cmd)
self.assertNotEqual(
len(list_router_response),
0,
"Check list router response"
)
for i in range(len(list_router_response)):
self.assertEqual(
list_router_response[i].state,
'Running',
"Check list router response for router state"
)
cmd = listZones.listZonesCmd()
cmd.zoneid = list_router_response[i].zoneid
zone = self.apiclient.listZones(cmd)[0]
self.assertEqual(
list_router_response[i].dns1,
zone.dns1,
"Compare DNS1 of router and zone"
)
self.assertEqual(
list_router_response[i].dns2,
zone.dns2,
"Compare DNS2 of router and zone"
)
self.assertEqual(
hasattr(list_router_response[i],'guestipaddress'),
True,
"Check whether router has guest IP field"
)
self.assertEqual(
hasattr(list_router_response[i],'linklocalip'),
True,
"Check whether router has link local IP field"
)
#Fetch corresponding ip ranges information from listVlanIpRanges
cmd = listVlanIpRanges.listVlanIpRangesCmd()
cmd.id = list_ssvm_response[i].zoneid
ipranges_response = self.apiclient.listVlanIpRanges(cmd)[0]
self.assertEqual(
list_router_response[i].gateway,
ipranges_response.gateway,
"Check gateway with that of corresponding IP range"
)
return
def test_03_stop_router(self):
"""Test stop router
"""
# Validate the following
# 1. listRouter should report the router for the account as stopped
cmd = listRouters.listRoutersCmd()
cmd.account = self.account.account.name
cmd.domainid = services["virtual_machine"]["domainid"]
router = self.apiclient.listRouters(cmd)[0]
#Stop the router
cmd = stopRouter.stopRouterCmd()
cmd.id = router.id
self.apiclient.stopRouter(cmd)
#List routers to check state of router
cmd = listRouters.listRoutersCmd()
cmd.id = router.id
router_response = self.apiclient.listRouters(cmd)[0]
#List router should have router in stopped state
self.assertEqual(
router_response.state,
'Stopped',
"Check list router response for router state"
)
return
def test_04_start_router(self):
"""Test start router
"""
# Validate the following
# 1. listRouter should report the router for the account as stopped
cmd = listRouters.listRoutersCmd()
cmd.account = self.account.account.name
cmd.domainid = services["virtual_machine"]["domainid"]
router = self.apiclient.listRouters(cmd)[0]
#Start the router
cmd = startRouter.startRouterCmd()
cmd.id = router.id
self.apiclient.startRouter(cmd)
#List routers to check state of router
cmd = listRouters.listRoutersCmd()
cmd.id = router.id
router_response = self.apiclient.listRouters(cmd)[0]
#List router should have router in running state
self.assertEqual(
router_response.state,
'Running',
"Check list router response for router state"
)
return
def test_05_reboot_router(self):
"""Test reboot router
"""
# Validate the following
# 1. listRouter should report the router for the account as stopped
cmd = listRouters.listRoutersCmd()
cmd.account = self.account.account.name
cmd.domainid = services["virtual_machine"]["domainid"]
router = self.apiclient.listRouters(cmd)[0]
public_ip = router.publicip
#Reboot the router
cmd = rebootRouter.rebootRouterCmd()
cmd.id = router.id
self.apiclient.rebootRouter(cmd)
#List routers to check state of router
cmd = listRouters.listRoutersCmd()
cmd.id = router.id
router_response = self.apiclient.listRouters(cmd)[0]
#List router should have router in running state and same public IP
self.assertEqual(
router_response.state,
'Running',
"Check list router response for router state"
)
self.assertEqual(
router_response.publicip,
public_ip,
"Check list router response for router public IP"
)
return
def test_05_network_gc(self):
"""Test network GC
"""
# Validate the following
# 1. stop All User VMs in the account
# 2. wait for network.gc.interval time"
# 3. After network.gc.interval, router should be stopped
# 4. ListRouters should return the router in Stopped state
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.account = self.account.account.name
cmd.domainid = services["virtual_machine"]["domainid"]
list_vms = self.apiclient.listVirtualMachines(cmd)
self.assertNotEqual(
len(list_vms),
0,
"Check length of list VM response"
)
for i in range(len(list_vms)):
# Stop all virtual machines associated with that account
cmd = stopVirtualMachine.stopVirtualMachineCmd()
cmd.id = list_vms[i].id
self.apiclient.stopVirtualMachine(cmd)
# Wait for network.gc.interval
cmd = listConfigurations.listConfigurationsCmd()
cmd.name = 'network.gc.interval'
response = self.apiclient.listConfigurations(cmd)[0]
time.sleep(int(response.value))
#Check status of network router
cmd = listRouters.listRoutersCmd()
cmd.account = self.account.account.name
cmd.domainid = services["virtual_machine"]["domainid"]
router = self.apiclient.listRouters(cmd)[0]
self.assertEqual(
router.state,
'Stopped',
"Check state of the router after stopping all VMs associated with that account"
)
return
def test_06_router_internal_basic(self):
"""Test router internal basic zone
"""
# Validate the following
# 1. Router only does dhcp
# 2. Verify that ports 67 (DHCP) and 53 (DNS) are open on UDP by checking status of dnsmasq process
# Find router associated with user account
cmd = listRouters.listRoutersCmd()
cmd.account = self.account.account.name
cmd.domainid = services["virtual_machine"]["domainid"]
router = self.apiclient.listRouters(cmd)[0]
cmd = listHosts.listHostsCmd()
cmd.zoneid = router.zoneid
cmd.type = 'Routing'
cmd.state = 'Up'
host = self.apiclient.listHosts(cmd)[0]
#SSH to the machine
ssh = remoteSSHClient.remoteSSHClient(
host.ipaddress,
services['virtual_machine']["publicport"],
services['virtual_machine']["username"],
services['virtual_machine']["password"]
)
ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s " % router.linklocalip
# Double hop into router
timeout = 5
# Ensure the SSH login is successful
c = ssh_command + "service dnsmasq status"
while True:
res = ssh.execute(c)[0]
if res != "Host key verification failed.":
break
elif timeout == 0:
break
time.sleep(5)
timeout = timeout - 1
self.assertEqual(
res.count("is running"),
1,
"Check dnsmasq service is running or not"
)
return
def test_07_router_internal_adv(self):
"""Test router internal advanced zone
"""
# Validate the following
# 1. Router does dhcp, dns, gateway, LB, PF, FW
# 2. verify that dhcp, dns ports are open on UDP
# 3. dnsmasq, haproxy processes should be running
# Find router associated with user account
cmd = listRouters.listRoutersCmd()
cmd.account = self.account.account.name
cmd.domainid = services["virtual_machine"]["domainid"]
router = self.apiclient.listRouters(cmd)[0]
cmd = listHosts.listHostsCmd()
cmd.zoneid = router.zoneid
cmd.type = 'Routing'
cmd.state = 'Up'
host = self.apiclient.listHosts(cmd)[0]
#SSH to the machine
ssh = remoteSSHClient.remoteSSHClient(
host.ipaddress,
services['virtual_machine']["publicport"],
services['virtual_machine']["username"],
services['virtual_machine']["password"]
)
ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s " % router.linklocalip
# Double hop into router
timeout = 5
# Ensure the SSH login is successful
c = ssh_command + "service dnsmasq status"
while True:
res = ssh.execute(c)[0]
if res != "Host key verification failed.":
break
elif timeout == 0:
break
time.sleep(5)
timeout = timeout - 1
self.assertEqual(
res.count("running"),
1,
"Check dnsmasq service is running or not"
)
timeout = 5
# Ensure the SSH login is successful
c = ssh_command + "service haproxy status"
while True:
res = ssh.execute(c)[0]
if res != "Host key verification failed.":
break
elif timeout == 0:
break
time.sleep(5)
timeout = timeout - 1
self.assertEqual(
res.count("running"),
1,
"Check haproxy service is running or not"
)
return
def test_08_restart_network_cleanup(self):
"""Test restart network
"""
# Validate the following
# 1. When cleanup = true, router is destroyed and a new one created
# 2. New router will have new publicIp and linkLocalIp and all it's services should resume
# Find router associated with user account
cmd = listRouters.listRoutersCmd()
cmd.account = self.account.account.name
cmd.domainid = services["virtual_machine"]["domainid"]
router = self.apiclient.listRouters(cmd)[0]
#Store old values before restart
old_linklocalip = router.linklocalip
cmd = listNetworks.listNetworksCmd()
cmd.account = self.account.account.name
cmd.domainid = services["virtual_machine"]["domainid"]
network = self.apiclient.listNetworks(cmd)[0]
cmd = restartNetwork.restartNetworkCmd()
cmd.id = network.id
cmd.cleanup = True
self.apiclient.restartNetwork(cmd)
# Get router details after restart
cmd = listRouters.listRoutersCmd()
cmd.account = self.account.account.name
cmd.domainid = services["virtual_machine"]["domainid"]
router = self.apiclient.listRouters(cmd)[0]
self.assertNotEqual(
router.linklocalip,
old_linklocalip,
"Check linklocal IP after restart"
)
return
def test_08_restart_network_wo_cleanup(self):
"""Test restart network without cleanup
"""
# Validate the following
# 1. When cleanup = false, router is restarted and all services inside the router are restarted
# 2. check 'uptime' to see if the actual restart happened
cmd = listNetworks.listNetworksCmd()
cmd.account = self.account.account.name
cmd.domainid = services["virtual_machine"]["domainid"]
network = self.apiclient.listNetworks(cmd)[0]
cmd = restartNetwork.restartNetworkCmd()
cmd.id = network.id
cmd.cleanup = False
self.apiclient.restartNetwork(cmd)
# Get router details after restart
cmd = listRouters.listRoutersCmd()
cmd.account = self.account.account.name
cmd.domainid = services["virtual_machine"]["domainid"]
router = self.apiclient.listRouters(cmd)[0]
cmd = listHosts.listHostsCmd()
cmd.zoneid = router.zoneid
cmd.type = 'Routing'
cmd.state = 'Up'
host = self.apiclient.listHosts(cmd)[0]
#SSH to the machine
ssh = remoteSSHClient.remoteSSHClient(
host.ipaddress,
services['virtual_machine']["publicport"],
services['virtual_machine']["username"],
services['virtual_machine']["password"]
)
ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s uptime" % router.linklocalip
# Double hop into router to check router uptime
timeout = 5
while True:
res = ssh.execute(ssh_command)[0]
if res != "Host key verification failed.":
break
elif timeout == 0:
break
time.sleep(5)
timeout = timeout - 1
# res = 12:37:14 up 1 min, 0 users, load average: 0.61, 0.22, 0.08
# Split result to check the uptime
result = res.split()
self.assertEqual(
str(result[1]),
'up',
"Check router is running or not"
)
if str(result[3]) == "min,":
self.assertEqual(
(int(result[2]) < 3),
True,
"Check uptime is less than 3 mins or not"
)
else:
self.assertEqual(
str(result[3]),
'sec,',
"Check uptime is in seconds"
)
return

View File

@ -0,0 +1,151 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2012 Citrix. All rights reserved.
#
""" BVT tests for Secondary Storage
"""
#Import Local Modules
from cloudstackTestCase import *
from cloudstackAPI import *
from settings import *
import remoteSSHClient
from utils import *
from base import *
#Import System modules
import time
services = TEST_SEC_STORAGE_SERVICES
class TestSecStorageServices(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.cleanup = []
return
def tearDown(self):
try:
#Clean up, terminate the created templates
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" %e)
return
def test_01_add_sec_storage(self):
"""Test secondary storage
"""
# Validate the following:
# 1. secondary storage should be added to the zone.
# 2. Verify with listHosts and type secondarystorage
cmd = addSecondaryStorage.addSecondaryStorageCmd()
cmd.zoneid = services["storage"]["zoneid"]
cmd.url = services["storage"]["url"]
sec_storage = self.apiclient.addSecondaryStorage(cmd)
self.assertEqual(
sec_storage.zoneid,
services["storage"]["zoneid"],
"Check zoneid where sec storage is added"
)
cmd = listHosts.listHostsCmd()
cmd.type = 'SecondaryStorage'
cmd.id = sec_storage.id
list_hosts_response = self.apiclient.listHosts(cmd)
self.assertNotEqual(
len(list_hosts_response),
0,
"Check list Hosts response"
)
host_response = list_hosts_response[0]
#Check if host is Up and running
self.assertEqual(
host_response.id,
sec_storage.id,
"Check ID of secondary storage"
)
self.assertEqual(
sec_storage.type,
host_response.type,
"Check type of host from list hosts response"
)
return
def test_02_sys_vm_start(self):
"""Test system VM start
"""
# 1. verify listHosts has all 'routing' hosts in UP state
# 2. verify listStoragePools shows all primary storage pools in UP state
# 3. verify that secondary storage was added successfully
cmd = listHosts.listHostsCmd()
cmd.type = 'Routing'
list_hosts_response = self.apiclient.listHosts(cmd)
# ListHosts has all 'routing' hosts in UP state
self.assertNotEqual(
len(list_hosts_response),
0,
"Check list host response"
)
for i in range(len(list_hosts_response)):
host_response = list_hosts_response[i]
self.assertEqual(
host_response.state,
'Up',
"Check state of routing hosts is Up or not"
)
# ListStoragePools shows all primary storage pools in UP state
cmd = listStoragePools.listStoragePoolsCmd()
cmd.name = 'Primary'
list_storage_response = self.apiclient.listStoragePools(cmd)
self.assertNotEqual(
len(list_storage_response),
0,
"Check list storage pools response"
)
for i in range(len(list_hosts_response)):
storage_response = list_storage_response[i]
self.assertEqual(
storage_response.state,
'Up',
"Check state of primary storage pools is Up or not"
)
# Secondary storage is added successfully
cmd = listHosts.listHostsCmd()
cmd.type = 'SecondaryStorage'
list_hosts_response = self.apiclient.listHosts(cmd)
self.assertNotEqual(
len(list_hosts_response),
0,
"Check list Hosts response"
)
host_response = list_hosts_response[0]
#Check if host is Up and running
self.assertEqual(
host_response.state,
'Up',
"Check state of secondary storage"
)
return
def test_03_sys_template_ready(self):
"""Test system templates are ready
"""
# TODO
return

View File

@ -1,6 +1,6 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2011 Citrix. All rights reserved.
# Copyright (c) 2012 Citrix. All rights reserved.
#
""" BVT tests for Service offerings"""

View File

@ -1,6 +1,6 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2011 Citrix. All rights reserved.
# Copyright (c) 2012 Citrix. All rights reserved.
#
""" BVT tests for Snapshots
"""
@ -31,10 +31,8 @@ class TestSnapshots(cloudstackTestCase):
cls.virtual_machine.delete(cls.api_client)
cls.virtual_machine_without_disk.delete(cls.api_client)
cls.nat_rule.delete(cls.api_client)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" %e)
return
def setUp(self):
@ -47,7 +45,6 @@ class TestSnapshots(cloudstackTestCase):
try:
#Clean up, terminate the created instance, volumes and snapshots
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" %e)
return
@ -58,7 +55,6 @@ class TestSnapshots(cloudstackTestCase):
cmd = listVolumes.listVolumesCmd()
cmd.virtualmachineid = self.virtual_machine_with_disk.id
cmd.type = 'ROOT'
volumes = self.apiclient.listVolumes(cmd)
snapshot = Snapshot.create(self.apiclient, volumes[0].id)
@ -67,7 +63,6 @@ class TestSnapshots(cloudstackTestCase):
list_snapshots = self.apiclient.listSnapshots(cmd)
self.assertNotEqual(list_snapshots, None, "Check if result exists in list item call")
self.assertEqual(
list_snapshots[0].id,
snapshot.id,
@ -90,7 +85,6 @@ class TestSnapshots(cloudstackTestCase):
)
return
def test_02_snapshot_data_disk(self):
"""Test Snapshot Data Disk
"""
@ -98,7 +92,6 @@ class TestSnapshots(cloudstackTestCase):
cmd = listVolumes.listVolumesCmd()
cmd.virtualmachineid = self.virtual_machine_with_disk.id
cmd.type = 'DATADISK'
volume = self.apiclient.listVolumes(cmd)
snapshot = Snapshot.create(self.apiclient, volume[0].id)
@ -107,13 +100,11 @@ class TestSnapshots(cloudstackTestCase):
list_snapshots = self.apiclient.listSnapshots(cmd)
self.assertNotEqual(list_snapshots, None, "Check if result exists in list item call")
self.assertEqual(
list_snapshots[0].id,
snapshot.id,
"Check resource id in list resources call"
)
self.debug("select backup_snap_id from snapshots where id = %s;" % snapshot.id)
qresultset = self.dbclient.execute("select backup_snap_id from snapshots where id = %s;" % snapshot.id)
self.assertNotEqual(
@ -134,7 +125,6 @@ class TestSnapshots(cloudstackTestCase):
def test_03_volume_from_snapshot(self):
"""Create volumes from snapshots
"""
#1. Login to machine; create temp/test directories on data volume
#2. Snapshot the Volume
#3. Create another Volume from snapshot
@ -142,9 +132,7 @@ class TestSnapshots(cloudstackTestCase):
#5. Compare data
random_data_0 = random_gen(100)
random_data_1 = random_gen(100)
ssh_client = self.virtual_machine.get_ssh_client(services["server_with_disk"]["ipaddress"])
#Format partition using ext3
format_volume_to_ext3(ssh_client, services["diskdevice"])
cmds = [ "mkdir -p %s" % services["mount_dir"],
@ -170,37 +158,31 @@ class TestSnapshots(cloudstackTestCase):
]
for c in cmds:
self.debug(ssh_client.execute(c))
cmd = listVolumes.listVolumesCmd()
cmd.hostid = self.virtual_machine.id
cmd.type = 'DATADISK'
list_volume_response = self.apiclient.listVolumes(cmd)
volume = list_volume_response[0]
#Create snapshot from attached volume
snapshot = Snapshot.create(self.apiclient, volume.id)
self.cleanup.append(snapshot)
#Create volume from snapshot
volume = Volume.create_from_snapshot(self.apiclient, snapshot.id, services)
self.cleanup.append(volume)
cmd = listVolumes.listVolumesCmd()
cmd.id = volume.id
list_volumes = self.apiclient.listVolumes(cmd)
self.assertNotEqual(
len(list_volumes),
None,
"Check Volume list Length"
)
self.assertEqual (
list_volumes[0].id,
volume.id,
"Check Volume in the List Volumes"
)
#Attaching volume to new VM
new_virtual_machine = self.virtual_machine_without_disk
self.cleanup.append(new_virtual_machine)
@ -209,7 +191,6 @@ class TestSnapshots(cloudstackTestCase):
cmd.virtualmachineid = new_virtual_machine.id
volume = self.apiclient.attachVolume(cmd)
#Login to VM to verify test directories and files
ssh = new_virtual_machine.get_ssh_client(services["server_without_disk"]["ipaddress"])
cmds = [
@ -221,26 +202,26 @@ class TestSnapshots(cloudstackTestCase):
self.debug(ssh.execute(c))
returned_data_0 = ssh.execute("cat %s/%s/%s" %(
services["sub_dir"],
services["sub_lvl_dir1"],
services["sub_dir"],
services["sub_lvl_dir1"],
services["random_data"]
)
)
returned_data_1 = ssh.execute("cat %s/%s/%s" %(
services["sub_dir"],
services["sub_lvl_dir2"],
services["sub_dir"],
services["sub_lvl_dir2"],
services["random_data"]
) )
#Verify returned data
self.assertEqual(
random_data_0,
returned_data_0[0],
random_data_0,
returned_data_0[0],
"Verify newly attached volume contents with existing one"
)
self.assertEqual(
random_data_1,
returned_data_1[0],
random_data_1,
returned_data_1[0],
"Verify newly attached volume contents with existing one"
)
@ -253,12 +234,10 @@ class TestSnapshots(cloudstackTestCase):
def test_04_delete_snapshot(self):
"""Test Delete Snapshot
"""
cmd = listVolumes.listVolumesCmd()
cmd.hostid = self.virtual_machine.id
cmd.type = 'DATADISK'
list_volumes = self.apiclient.listVolumes(cmd)
cmd = listSnapshots.listSnapshotsCmd()
cmd.id = list_volumes[0].id
list_snapshots = self.apiclient.listSnapshots(cmd)
@ -270,7 +249,6 @@ class TestSnapshots(cloudstackTestCase):
cmd = listSnapshots.listSnapshotsCmd()
cmd.id = snapshot.id
list_snapshots = self.apiclient.listSnapshots(cmd)
self.assertEqual(list_snapshots, None, "Check if result exists in list item call")
return
@ -284,12 +262,9 @@ class TestSnapshots(cloudstackTestCase):
cmd = listVolumes.listVolumesCmd()
cmd.virtualmachineid = self.virtual_machine_with_disk.id
cmd.type = 'ROOT'
volume = self.apiclient.listVolumes(cmd)
recurring_snapshot = SnapshotPolicy.create(self.apiclient, volume[0].id, services["recurring_snapshot"])
self.cleanup.append(recurring_snapshot)
#ListSnapshotPolicy should return newly created policy
cmd = listSnapshotPolicies.listSnapshotPoliciesCmd()
cmd.id = recurring_snapshot.id
@ -297,7 +272,6 @@ class TestSnapshots(cloudstackTestCase):
list_snapshots_policy = self.apiclient.listSnapshotPolicies(cmd)
self.assertNotEqual(list_snapshots_policy, None, "Check if result exists in list item call")
snapshots_policy = list_snapshots_policy[0]
self.assertEqual(
snapshots_policy.id,
@ -309,17 +283,13 @@ class TestSnapshots(cloudstackTestCase):
services["recurring_snapshot"]["maxsnaps"],
"Check interval type in list resources call"
)
#Sleep for (maxsnaps+1) hours to verify only maxsnaps snapshots are retained
time.sleep(((services["recurring_snapshot"]["maxsnaps"])+1)*3600)
cmd = listSnapshots.listSnapshotsCmd()
cmd.volumeid=volume.id
cmd.intervaltype = services["recurring_snapshot"]["intervaltype"]
cmd.snapshottype = 'RECURRING'
list_snapshots = self.apiclient.listSnapshots(cmd)
self.assertEqual(
len(list_snapshots),
services["recurring_snapshot"]["maxsnaps"],
@ -330,7 +300,6 @@ class TestSnapshots(cloudstackTestCase):
def test_06_recurring_snapshot_data_disk(self):
"""Test Recurring Snapshot data Disk
"""
#1. Create snapshot policy for data disk
#2. ListSnapshot policy should return newly created policy
#3. Verify only most recent number (maxsnaps) snapshots retailed
@ -338,12 +307,9 @@ class TestSnapshots(cloudstackTestCase):
cmd = listVolumes.listVolumesCmd()
cmd.virtualmachineid = self.virtual_machine_with_disk.id
cmd.type = 'DATADISK'
volume = self.apiclient.listVolumes(cmd)
recurring_snapshot = SnapshotPolicy.create(self.apiclient, volume[0].id, services["recurring_snapshot"])
self.cleanup.append(recurring_snapshot)
#ListSnapshotPolicy should return newly created policy
cmd = listSnapshotPolicies.listSnapshotPoliciesCmd()
cmd.id = recurring_snapshot.id
@ -351,7 +317,6 @@ class TestSnapshots(cloudstackTestCase):
list_snapshots_policy = self.apiclient.listSnapshotPolicies(cmd)
self.assertNotEqual(list_snapshots_policy, None, "Check if result exists in list item call")
snapshots_policy = list_snapshots_policy[0]
self.assertEqual(
snapshots_policy.id,
@ -402,26 +367,26 @@ class TestSnapshots(cloudstackTestCase):
"mount %s1 %s" %(services["diskdevice"], services["mount_dir"]),
"pushd %s" % services["mount_dir"],
"mkdir -p %s/{%s,%s} " %(
services["sub_dir"],
services["sub_lvl_dir1"],
services["sub_dir"],
services["sub_lvl_dir1"],
services["sub_lvl_dir2"]
),
"echo %s > %s/%s/%s" %(
random_data_0,
services["sub_dir"],
services["sub_lvl_dir1"],
random_data_0,
services["sub_dir"],
services["sub_lvl_dir1"],
services["random_data"]
),
"echo %s > %s/%s/%s" %(
random_data_1,
services["sub_dir"],
services["sub_lvl_dir2"],
random_data_1,
services["sub_dir"],
services["sub_lvl_dir2"],
services["random_data"]
)
]
for c in cmds:
ssh_client.execute(c)
cmd = listVolumes.listVolumesCmd()
cmd.virtualmachineid = self.virtual_machine.id
cmd.type = 'ROOT'
@ -449,8 +414,8 @@ class TestSnapshots(cloudstackTestCase):
# Deploy new virtual machine using template
new_virtual_machine = VirtualMachine.create(
self.apiclient,
services["server_without_disk"],
self.apiclient,
services["server_without_disk"],
template.id
)
self.cleanup.append(new_virtual_machine)
@ -465,17 +430,16 @@ class TestSnapshots(cloudstackTestCase):
ssh.execute(c)
returned_data_0 = ssh.execute("cat %s/%s/%s" %(
services["sub_dir"],
services["sub_lvl_dir1"],
services["sub_dir"],
services["sub_lvl_dir1"],
services["random_data"]
) )
returned_data_1 = ssh.execute("cat %s/%s/%s" %(
services["sub_dir"],
services["sub_lvl_dir2"],
services["sub_dir"],
services["sub_lvl_dir2"],
services["random_data"]
) )
#Verify returned data
self.assertEqual(random_data_0, returned_data_0[0], "Verify newly attached volume contents with existing one")
self.assertEqual(random_data_1, returned_data_1[0], "Verify newly attached volume contents with existing one")
return

View File

@ -0,0 +1,615 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2012 Citrix. All rights reserved.
#
""" BVT tests for SSVM
"""
#Import Local Modules
from cloudstackTestCase import *
from cloudstackAPI import *
from settings import *
import remoteSSHClient
from utils import *
from base import *
import telnetlib
#Import System modules
import time
services = TEST_SSVM_SERVICES
class TestSSVMs(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.cleanup = []
return
def tearDown(self):
try:
#Clean up, terminate the created templates
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" %e)
return
def test_01_list_sec_storage_vm(self):
"""Test List secondary storage VMs
"""
# Validate the following:
# 1. listSystemVM (systemvmtype=secondarystoragevm) should return only ONE SSVM per zone
# 2. The returned SSVM should be in Running state
# 3. listSystemVM for secondarystoragevm should list publicip, privateip and link-localip
# 4. The gateway programmed on the ssvm by listSystemVm should be the same as the gateway returned by listVlanIpRanges
# 5. DNS entries must match those given for the zone
cmd = listSystemVms.listSystemVmsCmd()
cmd.systemvmtype = 'secondarystoragevm'
list_ssvm_response = self.apiclient.listSystemVms(cmd)
#Verify SSVM response
self.assertNotEqual(
len(list_ssvm_response),
0,
"Check list System VMs response"
)
cmd = listZones.listZonesCmd()
list_zones_response = self.apiclient.listZones(cmd)
self.assertEqual(
len(list_ssvm_response),
len(list_zones_response),
"Check number of SSVMs with number of zones"
)
#For each secondary storage VM check private IP, public IP, link local IP and DNS
for i in range(len(list_ssvm_response)):
self.assertEqual(
list_ssvm_response[i].state,
'Running',
"Check whether state of SSVM is running"
)
self.assertEqual(
hasattr(list_ssvm_response[i],'privateip'),
True,
"Check whether SSVM has private IP field"
)
self.assertEqual(
hasattr(list_ssvm_response[i],'linklocalip'),
True,
"Check whether SSVM has link local IP field"
)
self.assertEqual(
hasattr(list_ssvm_response[i],'publicip'),
True,
"Check whether SSVM has public IP field"
)
#Fetch corresponding ip ranges information from listVlanIpRanges
cmd = listVlanIpRanges.listVlanIpRangesCmd()
cmd.id = list_ssvm_response[i].zoneid
ipranges_response = self.apiclient.listVlanIpRanges(cmd)[0]
self.assertEqual(
list_ssvm_response[i].gateway,
ipranges_response.gateway,
"Check gateway with that of corresponding ip range"
)
#Fetch corresponding zone information from listZones
cmd = listZones.listZonesCmd()
cmd.id = list_ssvm_response[i].zoneid
zone_response = self.apiclient.listZones(cmd)[0]
self.assertEqual(
list_ssvm_response[i].dns1,
zone_response.dns1,
"Check DNS1 with that of corresponding zone"
)
self.assertEqual(
list_ssvm_response[i].dns2,
zone_response.dns2,
"Check DNS2 with that of corresponding zone"
)
return
def test_02_list_cpvm_vm(self):
"""Test List console proxy VMs
"""
# Validate the following:
# 1. listSystemVM (systemvmtype=consoleproxy) should return at least ONE CPVM per zone
# 2. The returned ConsoleProxyVM should be in Running state
# 3. listSystemVM for console proxy should list publicip, privateip and link-localip
# 4. The gateway programmed on the console proxy should be the same as the gateway returned by listZones
# 5. DNS entries must match those given for the zone
cmd = listSystemVms.listSystemVmsCmd()
cmd.systemvmtype = 'consoleproxy'
list_cpvm_response = self.apiclient.listSystemVms(cmd)
#Verify CPVM response
self.assertNotEqual(
len(list_cpvm_response),
0,
"Check list System VMs response"
)
cmd = listZones.listZonesCmd()
list_zones_response = self.apiclient.listZones(cmd)
self.assertEqual(
len(list_cpvm_response),
len(list_zones_response),
"Check number of CPVMs with number of zones"
)
#For each CPVM check private IP, public IP, link local IP and DNS
for i in range(len(list_cpvm_response)):
self.assertEqual(
list_cpvm_response[i].state,
'Running',
"Check whether state of CPVM is running"
)
self.assertEqual(
hasattr(list_cpvm_response[i],'privateip'),
True,
"Check whether CPVM has private IP field"
)
self.assertEqual(
hasattr(list_cpvm_response[i],'linklocalip'),
True,
"Check whether CPVM has link local IP field"
)
self.assertEqual(
hasattr(list_cpvm_response[i],'publicip'),
True,
"Check whether CPVM has public IP field"
)
#Fetch corresponding ip ranges information from listVlanIpRanges
cmd = listVlanIpRanges.listVlanIpRangesCmd()
cmd.id = list_ssvm_response[i].zoneid
ipranges_response = self.apiclient.listVlanIpRanges(cmd)[0]
self.assertEqual(
list_cpvm_response[i].gateway,
ipranges_response.gateway,
"Check gateway with that of corresponding ip range"
)
#Fetch corresponding zone information from listZones
cmd = listZones.listZonesCmd()
cmd.id = list_cpvm_response[i].zoneid
zone_response = self.apiclient.listZones(cmd)[0]
self.assertEqual(
list_cpvm_response[i].dns1,
zone_response.dns1,
"Check DNS1 with that of corresponding zone"
)
self.assertEqual(
list_cpvm_response[i].dns2,
zone_response.dns2,
"Check DNS2 with that of corresponding zone"
)
return
def test_03_ssvm_internals(self):
"""Test SSVM Internals"""
# Validate the following
# 1. The SSVM check script should not return any WARN|ERROR|FAIL messages
# 2. If you are unable to login to the SSVM with the signed key then test is deemed a failure
# 3. There should be only one ""cloud"" process running within the SSVM
# 4. If no process is running/multiple process are running then the test is a failure
cmd = listHosts.listHostsCmd()
cmd.zoneid = services["ssvm"]["zoneid"]
cmd.type = 'Routing'
cmd.state = 'Up'
host = self.apiclient.listHosts(cmd)[0]
cmd = listSystemVms.listSystemVmsCmd()
cmd.systemvmtype = 'secondarystoragevm'
cmd.hostid = host.id
ssvm = self.apiclient.listSystemVms(cmd)[0]
self.debug("Cheking cloud process status")
#SSH to the machine
ssh = remoteSSHClient.remoteSSHClient(
host.ipaddress,
services['host']["publicport"],
services['host']["username"],
services['host']["password"]
)
timeout = 5
ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s " % ssvm.linklocalip
c = ssh_command + "/usr/local/cloud/systemvm/ssvm-check.sh |grep -e ERROR -e WARNING -e FAIL"
# Ensure the SSH login is successful
while True:
res = ssh.execute(c)[0]
#Output:Tests Complete. Look for ERROR or WARNING above.
if res != "Host key verification failed.":
break
elif timeout == 0:
break
time.sleep(5)
timeout = timeout - 1
self.assertEqual(
res.count("ERROR"),
1,
"Check for Errors in tests"
)
self.assertEqual(
res.count("WARNING"),
1,
"Check for warnings in tests"
)
self.assertEqual(
res.count("FAIL"),
1,
"Check for failed tests"
)
#Check status of cloud service
c = ssh_command + "service cloud status"
res = ssh.execute(c)[0]
# cloud.com service (type=secstorage) is running: process id: 2346
self.assertEqual(
res.count("is running"),
1,
"Check cloud service is running or not"
)
return
def test_04_cpvm_internals(self):
"""Test CPVM Internals"""
# Validate the following
# 1. test that telnet access on 8250 is available to the management server for the CPVM
# 2. No telnet access, test FAIL
# 3. Service cloud status should report cloud agent status to be running
cmd = listHosts.listHostsCmd()
cmd.zoneid = services["cpvm"]["zoneid"]
cmd.type = 'Routing'
cmd.state = 'Up'
host = self.apiclient.listHosts(cmd)[0]
cmd = listSystemVms.listSystemVmsCmd()
cmd.systemvmtype = 'consoleproxy'
cmd.hostid = host.id
cpvm = self.apiclient.listSystemVms(cmd)[0]
with assertNotRaises(Exception):
telnet = telnetlib.Telnet( services["cpvm"]["mgmtserverIP"] , '8250')
self.debug("Cheking cloud process status")
#SSH to the machine
ssh = remoteSSHClient.remoteSSHClient(
host.ipaddress,
services['host']["publicport"],
services['host']["username"],
services['host']["password"]
)
timeout = 5
# Check status of cloud service
ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s " % cpvm.linklocalip
c = ssh_command + "service cloud status"
while True:
res = ssh.execute(c)[0]
# Response: cloud.com service (type=secstorage) is running: process id: 2346
#Check if double hop is successful
if res != "Host key verification failed.":
break
elif timeout == 0:
break
time.sleep(5)
timeout = timeout - 1
self.assertEqual(
res.count("is running"),
1,
"Check cloud service is running or not"
)
return
def test_05_stop_ssvm(self):
"""Test stop SSVM
"""
# Validate the following
# 1. The SSVM should go to stop state
# 2. After a brief delay of say one minute, the SSVM should be restarted once again and return to Running state with previous two test cases still passing
# 3. If either of the two above steps fail the test is a failure
cmd = stopSystemVm.stopSystemVmCmd()
cmd.id = services["ssvm"]["id"]
cmd.systemvmtype = 'secondarystoragevm'
self.apiclient.stopSystemVm(cmd)
#Sleep to ensure that SSVM is properly restarted
time.sleep(90)
cmd = listSystemVms.listSystemVmsCmd()
cmd.id = services["ssvm"]["id"]
list_ssvm_response = self.apiclient.listSystemVms(cmd)
ssvm_response = list_ssvm_response[0]
self.assertEqual(
ssvm_response.state,
'Running',
"Check whether SSVM is running or not"
)
# Call above tests to ensure SSVM is properly running
self.test_01_list_sec_storage_vm()
self.test_03_ssvm_internals()
return
def test_06_stop_cpvm(self):
"""Test stop CPVM
"""
# Validate the following
# 1. The CPVM should go to stop state
# 2. After a brief delay of say one minute, the SSVM should be restarted once again and return to Running state with previous two test cases still passing
# 3. If either of the two above steps fail the test is a failure
cmd = stopSystemVm.stopSystemVmCmd()
cmd.id = services["cpvm"]["id"]
cmd.systemvmtype = 'consoleproxy'
self.apiclient.stopSystemVm(cmd)
cmd = listSystemVms.listSystemVmsCmd()
cmd.id = services["cpvm"]["id"]
timeout = 10
while True :
list_cpvm_response = self.apiclient.listSystemVms(cmd)
if not list_cpvm_response:
break;
else:
if timeout == 0:
break
#Sleep to ensure that SSVM is properly restarted
time.sleep(10)
timeout = timeout - 1
cpvm_response = list_cpvm_response[0]
self.assertEqual(
cpvm_response.state,
'Running',
"Check whether CPVM is running or not"
)
# Call above tests to ensure CPVM is properly running
self.test_02_list_cpvm_vm()
self.test_04_cpvm_internals()
return
def test_07_reboot_ssvm(self):
"""Test reboot SSVM
"""
# Validate the following
# 1. The SSVM should go to stop and return to Running state
# 2. SSVM's public-ip and private-ip must remain the same before and after reboot
# 3. The cloud process should still be running within the SSVM
cmd = listSystemVms.listSystemVmsCmd()
cmd.id = services["ssvm"]["id"]
cmd.systemvmtype = 'secondarystoragevm'
ssvm_response = self.apiclient.listSystemVms(cmd)[0]
#Store the public & private IP values before reboot
old_public_ip = ssvm_response.publicip
old_private_ip = ssvm_response.privateip
cmd = rebootSystemVm.rebootSystemVmCmd()
cmd.id = services["ssvm"]["id"]
self.apiclient.rebootSystemVm(cmd)
#Sleep to ensure that SSVM is properly stopped/started
time.sleep(60)
cmd = listSystemVms.listSystemVmsCmd()
cmd.id = services["ssvm"]["id"]
ssvm_response = self.apiclient.listSystemVms(cmd)[0]
self.assertEqual(
'Running',
str(ssvm_response.state),
"Check whether CPVM is running or not"
)
self.assertEqual(
ssvm_response.publicip,
old_public_ip,
"Check Public IP after reboot with that of before reboot"
)
self.assertEqual(
ssvm_response.privateip,
old_private_ip,
"Check Private IP after reboot with that of before reboot"
)
#Call to verify cloud process is running
self.test_03_ssvm_internals()
return
def test_08_reboot_cpvm(self):
"""Test reboot CPVM
"""
# Validate the following
# 1. The CPVM should go to stop and return to Running state
# 2. CPVM's public-ip and private-ip must remain the same before and after reboot
# 3. the cloud process should still be running within the CPVM
cmd = listSystemVms.listSystemVmsCmd()
cmd.id = services["cpvm"]["id"]
cmd.systemvmtype = 'consoleproxy'
cpvm_response = self.apiclient.listSystemVms(cmd)[0]
#Store the public & private IP values before reboot
old_public_ip = cpvm_response.publicip
old_private_ip = cpvm_response.privateip
cmd = rebootSystemVm.rebootSystemVmCmd()
cmd.id = services["cpvm"]["id"]
self.apiclient.rebootSystemVm(cmd)
#Sleep to ensure that SSVM is properly stopped/started
time.sleep(60)
cmd = listSystemVms.listSystemVmsCmd()
cmd.id = services["cpvm"]["id"]
cpvm_response = self.apiclient.listSystemVms(cmd)[0]
self.assertEqual(
'Running',
str(cpvm_response.state),
"Check whether CPVM is running or not"
)
self.assertEqual(
cpvm_response.publicip,
old_public_ip,
"Check Public IP after reboot with that of before reboot"
)
self.assertEqual(
cpvm_response.privateip,
old_private_ip,
"Check Private IP after reboot with that of before reboot"
)
#Call to verify cloud process is running
self.test_04_cpvm_internals()
return
def test_09_destroy_ssvm(self):
"""Test destroy SSVM
"""
# Validate the following
# 1. SSVM should be completely destroyed and a new one will spin up
# 2. listSystemVMs will show a different name for the systemVM from what it was before
# 3. new SSVM will have a public/private and link-local-ip
# 4. cloud process within SSVM must be up and running
cmd = listSystemVms.listSystemVmsCmd()
cmd.zoneid = services["ssvm"]["zoneid"]
cmd.systemvmtype = 'secondarystoragevm'
ssvm_response = self.apiclient.listSystemVms(cmd)[0]
old_name = ssvm_response.name
cmd = destroySystemVm.destroySystemVmCmd()
cmd.id = ssvm_response.id
self.apiclient.destroySystemVm(cmd)
#Sleep to ensure that new SSVM is created
time.sleep(60)
cmd = listSystemVms.listSystemVmsCmd()
cmd.zoneid = services["ssvm"]["zoneid"]
cmd.systemvmtype = 'secondarystoragevm'
ssvm_response = self.apiclient.listSystemVms(cmd)[0]
# Verify Name, Public IP, Private IP and Link local IP for newly created SSVM
self.assertNotEqual(
ssvm_response.name,
old_name,
"Check SSVM new name with name of destroyed SSVM"
)
self.assertEqual(
hasattr(ssvm_response,'privateip'),
True,
"Check whether SSVM has private IP field"
)
self.assertEqual(
hasattr(ssvm_response,'linklocalip'),
True,
"Check whether SSVM has link local IP field"
)
self.assertEqual(
hasattr(ssvm_response,'publicip'),
True,
"Check whether SSVM has public IP field"
)
#Call to verify cloud process is running
self.test_03_ssvm_internals()
return
def test_10_destroy_cpvm(self):
"""Test destroy CPVM
"""
# Validate the following
# 1. CPVM should be completely destroyed and a new one will spin up
# 2. listSystemVMs will show a different name for the systemVM from what it was before
# 3. new CPVM will have a public/private and link-local-ip
# 4. cloud process within CPVM must be up and running
cmd = listSystemVms.listSystemVmsCmd()
cmd.zoneid = services["cpvm"]["zoneid"]
cmd.systemvmtype = 'consoleproxy'
cpvm_response = self.apiclient.listSystemVms(cmd)[0]
old_name = cpvm_response.name
cmd = destroySystemVm.destroySystemVmCmd()
cmd.id = cpvm_response.id
self.apiclient.destroySystemVm(cmd)
#Sleep to ensure that new CPVM is created
time.sleep(60)
cmd = listSystemVms.listSystemVmsCmd()
cmd.zoneid = services["cpvm"]["zoneid"]
cmd.systemvmtype = 'consoleproxy'
cpvm_response = self.apiclient.listSystemVms(cmd)[0]
# Verify Name, Public IP, Private IP and Link local IP for newly created CPVM
self.assertNotEqual(
cpvm_response.name,
old_name,
"Check SSVM new name with name of destroyed CPVM"
)
self.assertEqual(
hasattr(cpvm_response,'privateip'),
True,
"Check whether CPVM has private IP field"
)
self.assertEqual(
hasattr(cpvm_response,'linklocalip'),
True,
"Check whether CPVM has link local IP field"
)
self.assertEqual(
hasattr(cpvm_response,'publicip'),
True,
"Check whether CPVM has public IP field"
)
#Call to verify cloud process is running
self.test_04_cpvm_internals()
return

View File

@ -0,0 +1,567 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2012 Citrix. All rights reserved.
#
""" BVT tests for Templates ISO
"""
#Import Local Modules
from cloudstackTestCase import *
from cloudstackAPI import *
from settings import *
import remoteSSHClient
from utils import *
from base import *
import urllib
from random import random
#Import System modules
import time
services = TEST_TEMPLATE_SERVICES
class TestCreateTemplate(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = []
return
def tearDown(self):
try:
self.dbclient.close()
#Clean up, terminate the created templates
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" %e)
return
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
#create virtual machine
cls.virtual_machine = VirtualMachine.create(
cls.api_client,
services["virtual_machine"]
)
#Stop virtual machine
cmd = stopVirtualMachine.stopVirtualMachineCmd()
cmd.id = cls.virtual_machine.id
cls.api_client.stopVirtualMachine(cmd)
#Wait before server has be successfully stopped
time.sleep(30)
cmd = listVolumes.listVolumesCmd()
cmd.virtualmachineid = cls.virtual_machine.id
cmd.type = 'ROOT'
list_volume = cls.api_client.listVolumes(cmd)
cls.volume = list_volume[0]
return
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
#Cleanup resources used
cls.virtual_machine.delete(cls.api_client)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" %e)
return
def test_01_create_template(self):
"""Test create public & private template
"""
# Validate the following:
# 1. database (vm_template table) should be updated with newly created template
# 2. UI should show the newly added template
# 3. ListTemplates API should show the newly added template
#Create template from Virtual machine and Volume ID
template = Template.create(self.apiclient, self.volume, services["template_1"])
self.cleanup.append(template)
cmd = listTemplates.listTemplatesCmd()
cmd.templatefilter = services["templatefilter"]
cmd.id = template.id
list_template_response = self.apiclient.listTemplates(cmd)
#Verify template response to check whether template added successfully or not
template_response = list_template_response[0]
self.assertNotEqual(
len(list_template_response),
0,
"Check template avaliable in List Templates"
)
self.assertEqual(
template_response.displaytext,
services["template_1"]["displaytext"],
"Check display text of newly created template"
)
self.assertEqual(
template_response.name,
services["template_1"]["name"],
"Check name of newly created template"
)
self.assertEqual(
template_response.ostypeid,
services["template_1"]["ostypeid"],
"Check osTypeID of newly created template"
)
#Verify the database entry for template
self.debug(
"select name, display_text, guest_os_id from vm_template where id = %s;"
%template.id
)
qresultset = self.dbclient.execute(
"select name, display_text, guest_os_id from vm_template where id = %s;"
%template.id
)
self.assertNotEqual(
len(qresultset),
0,
"Check DB Query result set"
)
qresult = qresultset[0]
self.assertEqual(
qresult[0],
services["template_1"]["name"],
"Compare template name with database record"
)
self.assertEqual(
qresult[1],
services["template_1"]["displaytext"],
"Compare template display text with database record"
)
self.assertEqual(
qresult[2],
services["template_1"]["ostypeid"],
"Compare template osTypeID with database record"
)
return
class TestTemplates(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
#create virtual machines
cls.virtual_machine = VirtualMachine.create(
cls.api_client,
services["virtual_machine"]
)
#Stop virtual machine
cmd = stopVirtualMachine.stopVirtualMachineCmd()
cmd.id = cls.virtual_machine.id
cls.api_client.stopVirtualMachine(cmd)
#Wait before server has be successfully stopped
time.sleep(30)
cmd = listVolumes.listVolumesCmd()
cmd.virtualmachineid = cls.virtual_machine.id
cmd.type = 'ROOT'
list_volume = cls.api_client.listVolumes(cmd)
cls.volume = list_volume[0]
#Create templates for Edit, Delete & update permissions testcases
cls.template_1 = Template.create(cls.api_client, cls.volume, services["template_1"])
cls.template_2 = Template.create(cls.api_client, cls.volume, services["template_2"])
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
#Cleanup created resources such as templates and VMs
cls.template_2.delete(cls.api_client)
cls.virtual_machine.delete(cls.api_client)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" %e)
return
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = []
return
def tearDown(self):
try:
self.dbclient.close()
#Clean up, terminate the created templates
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" %e)
return
def test_02_edit_template(self):
"""Test Edit template
"""
# Validate the following:
# 1. UI should show the edited values for template
# 2. database (vm_template table) should have updated values
new_displayText = random_gen()
new_name = random_gen()
cmd = updateTemplate.updateTemplateCmd()
# Update template attributes
cmd.id = self.template_1.id
cmd.displaytext = new_displayText
cmd.name = new_name
cmd.bootable = services["bootable"]
cmd.passwordenabled = services["passwordenabled"]
cmd.ostypeid = services["ostypeid"]
self.apiclient.updateTemplate(cmd)
# Verify template response for updated attributes
cmd = listTemplates.listTemplatesCmd()
cmd.templatefilter = services["templatefilter"]
cmd.id = self.template_1.id
list_template_response = self.apiclient.listTemplates(cmd)
template_response = list_template_response[0]
self.assertNotEqual(
len(list_template_response),
0,
"Check template available in List Templates"
)
self.assertEqual(
template_response.displaytext,
new_displayText,
"Check display text of updated template"
)
self.assertEqual(
template_response.name,
new_name,
"Check name of updated template"
)
self.assertEqual(
str(template_response.passwordenabled).lower(),
str(services["passwordenabled"]).lower(),
"Check passwordenabled field of updated template"
)
self.assertEqual(
template_response.ostypeid,
services["ostypeid"],
"Check OSTypeID of updated template"
)
# Verify database entry for updated template attributes
self.debug(
"select name, display_text, bootable, enable_password, guest_os_id from vm_template where id = %s;"
%self.template_1.id
)
qresultset = self.dbclient.execute(
"select name, display_text, bootable, enable_password, guest_os_id from vm_template where id = %s;"
%self.template_1.id
)
self.assertNotEqual(
len(qresultset),
0,
"Check DB Query result set"
)
qresult = qresultset[0]
self.assertEqual(
qresult[0],
new_name,
"Compare template name with database record"
)
self.assertEqual(
qresult[1],
new_displayText,
"Compare template bootable field with database record"
)
self.assertEqual(
qresult[2],
int(services["bootable"]),
"Compare template enable_password field with database record"
)
self.assertEqual(
qresult[3],
int(services["passwordenabled"]),
"Compare template display text with database record"
)
self.assertEqual(
qresult[4],
services["ostypeid"],
"Compare template guest OS ID with database record"
)
return
def test_03_delete_template(self):
"""Test delete template
"""
# Validate the following:
# 1. UI should not show the deleted template
# 2. database (vm_template table) should not contain deleted template
self.template_1.delete(self.apiclient)
cmd = listTemplates.listTemplatesCmd()
cmd.templatefilter = services["templatefilter"]
cmd.id = self.template_1.id
list_template_response = self.apiclient.listTemplates(cmd)
# Verify template is deleted properly using ListTemplates
self.assertEqual(list_template_response, None, "Check if template exists in List Templates")
# Verify database entry is removed for deleted template
self.debug(
"select name, display_text from vm_template where id = %s;"
%self.template_1.id
)
qresultset = self.dbclient.execute(
"select name, display_text from vm_template where id = %s;"
%self.template_1.id
)
self.assertEqual(
len(qresultset),
1,
"Check DB Query result set"
)
return
def test_04_extract_template(self):
"Test for extract template"
# Validate the following
# 1. Admin should able extract and download the templates
# 2. ListTemplates should display all the public templates for all kind of users
# 3 .ListTemplates should not display the system templates
cmd = extractTemplate.extractTemplateCmd()
cmd.id = self.template_2.id
cmd.mode = services["template_2"]["mode"]
cmd.zoneid = services["template_2"]["zoneid"]
list_extract_response = self.apiclient.extractTemplate(cmd)
# Format URL to ASCII to retrieve response code
formatted_url = urllib.unquote_plus(list_extract_response.url)
url_response = urllib.urlopen(formatted_url)
response_code = url_response.getcode()
self.assertEqual(
list_extract_response.id,
self.template_2.id,
"Check ID of the extracted template"
)
self.assertEqual(
list_extract_response.extractMode,
services["template_2"]["mode"],
"Check mode of extraction"
)
self.assertEqual(
list_extract_response.zoneid,
services["template_2"]["zoneid"],
"Check zone ID of extraction"
)
self.assertEqual(
response_code,
200,
"Check for a valid response download URL"
)
return
def test_05_template_permissions(self):
"""Update & Test for template permissions"""
# Validate the following
# 1. listTemplatePermissions returns valid permissions set for template
# 2. permission changes should be reflected in vm_template table in database
cmd = updateTemplatePermissions.updateTemplatePermissionsCmd()
# Update template permissions
cmd.id = self.template_2.id
cmd.isfeatured = services["isfeatured"]
cmd.ispublic = services["ispublic"]
cmd.isextractable =services["isextractable"]
self.apiclient.updateTemplatePermissions(cmd)
cmd = listTemplates.listTemplatesCmd()
cmd.id = self.template_2.id
cmd.account = services["account"]
cmd.domainid = services["domainid"]
cmd.templatefilter = 'featured'
list_template_response = self.apiclient.listTemplates(cmd)
# Verify template response for updated permissions for normal user
template_response = list_template_response[0]
self.assertEqual(
template_response.id,
self.template_2.id,
"Check template ID"
)
self.assertEqual(
template_response.ispublic,
int(True),
"Check ispublic permission of template"
)
self.assertNotEqual(
template_response.templatetype,
'SYSTEM',
"ListTemplates should not list any system templates"
)
# Verify database entries for updated permissions
self.debug(
"select public, featured, extractable from vm_template where id = %s;"
%self.template_2.id
)
qresultset = self.dbclient.execute(
"select public, featured, extractable from vm_template where id = %s;"
%self.template_2.id
)
self.assertNotEqual(
len(qresultset),
0,
"Check DB Query result set"
)
qresult = qresultset[0]
self.assertEqual(
qresult[0],
int(services["ispublic"]),
"Compare public permission with database record"
)
self.assertEqual(
qresult[1],
int(services["isfeatured"]),
"Compare featured permission with database record"
)
self.assertEqual(
qresult[2],
int(services["isextractable"]),
"Compare extractable permission with database record"
)
return
def test_06_copy_template(self):
"""Test for copy template from one zone to another"""
# Validate the following
# 1. copy template should be successful and secondary storage should contain new copied template.
cmd = copyTemplate.copyTemplateCmd()
cmd.id = self.template_2.id
cmd.destzoneid = services["destzoneid"]
cmd.sourcezoneid = services["sourcezoneid"]
self.apiclient.copyTemplate(cmd)
# Verify template is copied to another zone using ListTemplates
cmd = listTemplates.listTemplatesCmd()
cmd.id = self.template_2.id
cmd.templatefilter = services["templatefilter"]
list_template_response = self.apiclient.listTemplates(cmd)
template_response = list_template_response[0]
self.assertNotEqual(
len(list_template_response),
0,
"Check template extracted in List Templates"
)
self.assertEqual(
template_response.id,
self.template_2.id,
"Check ID of the downloaded template"
)
self.assertEqual(
template_response.zoneid,
services["destzoneid"],
"Check zone ID of the copied template"
)
return
def test_07_list_public_templates(self):
"""Test only public templates are visible to normal user"""
# Validate the following
# 1. ListTemplates should show only 'public' templates for normal user
cmd = listTemplates.listTemplatesCmd()
cmd.templatefilter = 'featured'
cmd.account = services["account"]
cmd.domainid = services["domainid"]
list_template_response = self.apiclient.listTemplates(cmd)
self.assertNotEqual(
len(list_template_response),
0,
"Check template available in List Templates"
)
#Template response should list all 'public' templates
for k in range(len(list_template_response)):
self.assertEqual(
list_template_response[k].ispublic,
True,
"ListTemplates should list only public templates"
)
return
def test_08_list_system_templates(self):
"""Test System templates are not visible to normal user"""
# Validate the following
# 1. ListTemplates should not show 'SYSTEM' templates for normal user
cmd = listTemplates.listTemplatesCmd()
cmd.templatefilter = 'featured'
cmd.account = services["account"]
cmd.domainid = services["domainid"]
list_template_response = self.apiclient.listTemplates(cmd)
self.assertNotEqual(
len(list_template_response),
0,
"Check template available in List Templates"
)
for k in range(len(list_template_response)):
self.assertNotEqual(
list_template_response[k].templatetype,
'SYSTEM',
"ListTemplates should not list any system templates"
)
return

View File

@ -1,6 +1,6 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2011 Citrix. All rights reserved.
# Copyright (c) 2012 Citrix. All rights reserved.
#
""" BVT tests for Virtual Machine Life Cycle
@ -524,19 +524,16 @@ class TestVMLifeCycle(cloudstackTestCase):
cmd.id = self.small_virtual_machine.id
self.apiclient.destroyVirtualMachine(cmd)
# Wait for expunge.delay
cmd = listConfigurations.listConfigurationsCmd()
cmd.name = 'expunge.delay'
response = self.apiclient.listConfigurations(cmd)[0]
time.sleep(int(response.value))
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.id = self.small_virtual_machine.id
timeout = 50
while True :
list_vm_response = self.apiclient.listVirtualMachines(cmd)
if not list_vm_response:
break;
else:
if timeout == 0:
break
time.sleep(100)
timeout = timeout - 1
list_vm_response = self.apiclient.listVirtualMachines(cmd)
self.assertEqual(
list_vm_response,
@ -633,4 +630,4 @@ class TestVMLifeCycle(cloudstackTestCase):
result = services["diskdevice"] in res[0].split()
self.assertEqual(result, False, "Check if ISO is detached from virtual machine")
return
return

View File

@ -1,6 +1,6 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2011 Citrix. All rights reserved.
# Copyright (c) 2012 Citrix. All rights reserved.
#
""" BVT tests for Volumes
"""

View File

@ -1,6 +1,6 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2011 Citrix. All rights reserved.
# Copyright (c) 2012 Citrix. All rights reserved.
#
"""Utilities functions
@ -22,9 +22,8 @@ def random_gen(size=6, chars=string.ascii_uppercase + string.digits):
def cleanup_resources(api_client, resources):
"""Delete resources"""
for k,v in resources.items():
for obj in v:
obj.delete(api_client)
for obj in resources:
obj.delete(api_client)
def is_server_ssh_ready(ipaddress, port, username, password, retries=50):
"""Return ssh handle else wait till sshd is running"""