Port the BVT tests from 2.2.y to Acton 3.0.x

This commit is contained in:
Chirag Jog 2012-03-06 09:18:13 -08:00
parent 7a73fcd83c
commit 5f2e35fa70
21 changed files with 10350 additions and 0 deletions

View File

View File

@ -0,0 +1,41 @@
Build Verification Testing (BVT) Cases
--------------------------------------
These test cases are the core functionality tests that ensure the application is stable and can be tested thoroughly.
These BVT cases definitions are located at : https://docs.google.com/a/cloud.com/spreadsheet/ccc?key=0Ak8acbfxQG8ndEppOGZSLV9mUF9idjVkTkZkajhTZkE&invite=CPij0K0L
Guidelines
----------
BVT test cases are being developed using Python's unittests2. Following are certain guidelines being followed
1. Tests exercised for the same resource should ideally be present under a single suite or file.
2. Time-consuming operations that create new cloud resources like server creation, volume creation etc
should not necessarily be exercised per unit test. The resources can be shared by creating them at
the class-level using setUpClass and shared across all instances during a single run.
3. Certain tests pertaining to NAT, Firewall and Load Balancing warrant fresh resources per test. Hence a call should be
taken by the stakeholders regarding sharing resources.
4. Ensure that the tearDown/tearDownClass functions clean up all the resources created during the test run.
For more information about unittests: http://docs.python.org/library/unittest.html
BVT Tests
----------
The following files contain these BVT cases:
1. test_vm_life_cycle.py - VM Life Cycle tests
2. test_volumes.py - Volumes related tests
3. test_snapshots.py - Snapshots related tests
4. test_disk_offerings.py - Disk Offerings related tests
5. test_service_offerings.py - Service Offerings related tests
6. test_hosts.py - Hosts and Clusters related tests
7. test_iso.py - ISO related tests
8. test_network.py - Network related tests
9. test_primary_storage.py - Primary storage related tests
10. test_secondary_storage.py - Secondary storage related tests
11. test_ssvm.py - SSVM & CPVM related tests
12. test_templates.py - Templates related tests
13. test_routers.py - Router related tests

View File

@ -0,0 +1,204 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2012 Citrix. All rights reserved.
#
""" BVT tests for Disk offerings"""
#Import Local Modules
from cloudstackTestCase import *
from cloudstackAPI import *
from testcase.libs.utils import *
from testcase.libs.base import *
from testcase.libs.common import *
class Services:
"""Test Disk offerings Services
"""
def __init__(self):
self.services = {
"off": {
"name": "Disk offering",
"displaytext": "Disk offering",
"disksize": 1 # in GB
},
}
class TestCreateDiskOffering(cloudstackTestCase):
def setUp(self):
self.services = Services().services
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = []
return
def tearDown(self):
try:
self.dbclient.close()
#Clean up, terminate the created templates
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def test_01_create_disk_offering(self):
"""Test to create disk offering"""
# Validate the following:
# 1. createDiskOfferings should return valid info for new offering
# 2. The Cloud Database contains the valid information
disk_offering = DiskOffering.create(
self.apiclient,
self.services["off"]
)
self.cleanup.append(disk_offering)
self.debug("Created Disk offering with ID: %s" % disk_offering.id)
list_disk_response = list_disk_offering(
self.apiclient,
id=disk_offering.id
)
self.assertEqual(
isinstance(list_disk_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_disk_response),
0,
"Check Disk offering is created"
)
disk_response = list_disk_response[0]
self.assertEqual(
disk_response.displaytext,
self.services["off"]["displaytext"],
"Check server id in createServiceOffering"
)
self.assertEqual(
disk_response.name,
self.services["off"]["name"],
"Check name in createServiceOffering"
)
return
class TestDiskOfferings(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = []
def tearDown(self):
try:
self.dbclient.close()
#Clean up, terminate the created templates
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
@classmethod
def setUpClass(cls):
cls.services = Services().services
cls.api_client = fetch_api_client()
cls.disk_offering_1 = DiskOffering.create(
cls.api_client,
cls.services["off"]
)
cls.disk_offering_2 = DiskOffering.create(
cls.api_client,
cls.services["off"]
)
cls._cleanup = [cls.disk_offering_1]
return
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def test_02_edit_disk_offering(self):
"""Test to update existing disk offering"""
# Validate the following:
# 1. updateDiskOffering should return
# a valid information for newly created offering
#Generate new name & displaytext from random data
random_displaytext = random_gen()
random_name = random_gen()
self.debug("Updating Disk offering with ID: %s" %
self.disk_offering_1.id)
cmd = updateDiskOffering.updateDiskOfferingCmd()
cmd.id = self.disk_offering_1.id
cmd.displaytext = random_displaytext
cmd.name = random_name
self.apiclient.updateDiskOffering(cmd)
list_disk_response = list_disk_offering(
self.apiclient,
id=self.disk_offering_1.id
)
self.assertEqual(
isinstance(list_disk_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_disk_response),
0,
"Check disk offering is updated"
)
disk_response = list_disk_response[0]
self.assertEqual(
disk_response.displaytext,
random_displaytext,
"Check service displaytext in updateServiceOffering"
)
self.assertEqual(
disk_response.name,
random_name,
"Check service name in updateServiceOffering"
)
return
def test_03_delete_disk_offering(self):
"""Test to delete disk offering"""
# Validate the following:
# 1. deleteDiskOffering should return
# a valid information for newly created offering
self.disk_offering_2.delete(self.apiclient)
self.debug("Deleted Disk offering with ID: %s" %
self.disk_offering_2.id)
list_disk_response = list_disk_offering(
self.apiclient,
id=self.disk_offering_2.id
)
self.assertEqual(
list_disk_response,
None,
"Check if disk offering exists in listDiskOfferings"
)
return

View File

@ -0,0 +1,217 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2012 Citrix. All rights reserved.
#
""" BVT tests for Hosts and Clusters
"""
#Import Local Modules
from cloudstackTestCase import *
from cloudstackAPI import *
from testcase.libs.utils import *
from testcase.libs.base import *
from testcase.libs.common import *
#Import System modules
import time
class Services:
"""Test Hosts & Clusters Services
"""
def __init__(self):
self.services = {
"clusters": {
0: {
"clustername": "Xen Cluster",
"clustertype": "CloudManaged",
# CloudManaged or ExternalManaged"
"hypervisor": "XenServer",
# Hypervisor type
},
1: {
"clustername": "KVM Cluster",
"clustertype": "CloudManaged",
# CloudManaged or ExternalManaged"
"hypervisor": "KVM",
# Hypervisor type
},
2: {
"hypervisor": 'VMware',
# Hypervisor type
"clustertype": 'ExternalManaged',
# CloudManaged or ExternalManaged"
"username": 'administrator',
"password": 'fr3sca',
"url": 'http://192.168.100.17/CloudStack-Clogeny-Pune/Pune-1',
# Format:http://vCenter Host/Datacenter/Cluster
"clustername": 'VMWare Cluster',
},
},
"hosts": {
"xenserver": {
# Must be name of corresponding Hypervisor type
# in cluster in small letters
"hypervisor": 'XenServer',
# Hypervisor type
"clustertype": 'CloudManaged',
# CloudManaged or ExternalManaged"
"url": 'http://192.168.100.211',
"username": "root",
"password": "fr3sca",
},
"kvm": {
"hypervisor": 'KVM',
# Hypervisor type
"clustertype": 'CloudManaged',
# CloudManaged or ExternalManaged"
"url": 'http://192.168.100.212',
"username": "root",
"password": "fr3sca",
},
"vmware": {
"hypervisor": 'VMware',
# Hypervisor type
"clustertype": 'ExternalManaged',
# CloudManaged or ExternalManaged"
"url": 'http://192.168.100.203',
"username": "administrator",
"password": "fr3sca",
},
},
"zoneid": '5894a264-12b0-4257-a316-06c831bcf8e2',
# Optional, if specified the mentioned zone will be
# used for tests
}
class TestHosts(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.services = Services().services
self.zone = get_zone(self.apiclient, self.services)
self.pod = get_pod(self.apiclient, self.zone.id, self.services)
self.cleanup = []
return
def tearDown(self):
try:
self.dbclient.close()
#Clean up, terminate the created templates
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def test_01_clusters(self):
"""Test Add clusters & hosts - XEN, KVM, VWARE
"""
# Validate the following:
# 1. Verify hypervisortype returned by API is Xen/KVM/VWare
# 2. Verify that the cluster is in 'Enabled' allocation state
# 3. Verify that the host is added successfully and in Up state
# with listHosts API response
#Create clusters with Hypervisor type XEN/KVM/VWare
for k, v in self.services["clusters"].items():
cluster = Cluster.create(
self.apiclient,
v,
zoneid=self.zone.id,
podid=self.pod.id
)
self.debug(
"Created Cluster for hypervisor type %s & ID: %s" %(
v["hypervisor"],
cluster.id
))
self.assertEqual(
cluster.hypervisortype,
v["hypervisor"],
"Check hypervisor type is " + v["hypervisor"] + " or not"
)
self.assertEqual(
cluster.allocationstate,
'Enabled',
"Check whether allocation state of cluster is enabled"
)
#If host is externally managed host is already added with cluster
response = list_hosts(
self.apiclient,
clusterid=cluster.id
)
if not response:
hypervisor_type = str(cluster.hypervisortype.lower())
host = Host.create(
self.apiclient,
cluster,
self.services["hosts"][hypervisor_type],
zoneid=self.zone.id,
podid=self.pod.id
)
self.debug(
"Created host (ID: %s) in cluster ID %s" %(
host.id,
cluster.id
))
#Cleanup Host & Cluster
self.cleanup.append(host)
self.cleanup.append(cluster)
list_hosts_response = list_hosts(
self.apiclient,
clusterid=cluster.id
)
self.assertEqual(
isinstance(list_hosts_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_hosts_response),
0,
"Check list Hosts response"
)
host_response = list_hosts_response[0]
#Check if host is Up and running
self.assertEqual(
host_response.state,
'Up',
"Check if state of host is Up or not"
)
#Verify List Cluster Response has newly added cluster
list_cluster_response = list_clusters(
self.apiclient,
id=cluster.id
)
self.assertEqual(
isinstance(list_cluster_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_cluster_response),
0,
"Check list Hosts response"
)
cluster_response = list_cluster_response[0]
self.assertEqual(
cluster_response.id,
cluster.id,
"Check cluster ID with list clusters response"
)
self.assertEqual(
cluster_response.hypervisortype,
cluster.hypervisortype,
"Check hypervisor type with is " + v["hypervisor"] + " or not"
)
return

View File

@ -0,0 +1,488 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2012 Citrix. All rights reserved.
#
""" BVT tests for Templates ISO
"""
#Import Local Modules
from cloudstackTestCase import *
from cloudstackAPI import *
from testcase.libs.utils import *
from testcase.libs.base import *
from testcase.libs.common import *
import urllib
from random import random
#Import System modules
import time
class Services:
"""Test ISO Services
"""
def __init__(self):
self.services = {
"account": {
"email": "test@test.com",
"firstname": "Test",
"lastname": "User",
"username": "test",
# Random characters are appended in create account to
# ensure unique username generated each time
"password": "fr3sca",
},
"iso_1":
{
"displaytext": "Test ISO 1",
"name": "ISO 1",
"url": "http://iso.linuxquestions.org/download/504/1819/http/gd4.tuwien.ac.at/dsl-4.4.10.iso",
# Source URL where ISO is located
"isextractable": True,
"isfeatured": True,
"ispublic": True,
"ostypeid": '5a5b5f64-de08-452d-9672-608a4946f754',
},
"iso_2":
{
"displaytext": "Test ISO 2",
"name": "ISO 2",
"url": "http://iso.linuxquestions.org/download/504/1819/http/gd4.tuwien.ac.at/dsl-4.4.10.iso",
# Source URL where ISO is located
"isextractable": True,
"isfeatured": True,
"ispublic": True,
"ostypeid": '5a5b5f64-de08-452d-9672-608a4946f754',
"mode": 'HTTP_DOWNLOAD',
# Used in Extract template, value must be HTTP_DOWNLOAD
},
"destzoneid": 5,
# Copy ISO from one zone to another (Destination Zone)
"isfeatured": True,
"ispublic": True,
"isextractable": True,
"bootable": True, # For edit template
"passwordenabled": True,
"sleep": 60,
"timeout": 10,
"ostypeid": '5a5b5f64-de08-452d-9672-608a4946f754',
# CentOS 5.3 (64 bit)
"domainid": '9ee36d2e-8b8f-432e-a927-a678ebec1d6b',
"zoneid": '4a6c0290-e64d-40fc-afbb-4a05cab6fa4b',
# Optional, if specified the mentioned zone will be
# used for tests
"mode": 'advanced'
# Networking mode: Basic or Advanced
}
class TestCreateIso(cloudstackTestCase):
def setUp(self):
self.services = Services().services
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
# Get Zone, Domain and templates
self.zone = get_zone(self.apiclient, self.services)
self.services["iso_2"]["zoneid"] = self.zone.id
self.account = Account.create(
self.apiclient,
self.services["account"]
)
self.cleanup = [self.account]
return
def tearDown(self):
try:
self.dbclient.close()
#Clean up, terminate the created ISOs
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def test_01_create_iso(self):
"""Test create public & private ISO
"""
# Validate the following:
# 1. database (vm_template table) should be
# updated with newly created ISO
# 2. UI should show the newly added ISO
# 3. listIsos API should show the newly added ISO
iso = Iso.create(
self.apiclient,
self.services["iso_2"],
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.debug("ISO created with ID: %s" % iso.id)
try:
iso.download(self.apiclient)
except Exception as e:
self.fail("Exception while downloading ISO %s: %s"\
% (iso.id, e))
list_iso_response = list_isos(
self.apiclient,
id=iso.id
)
self.assertEqual(
isinstance(list_iso_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_iso_response),
0,
"Check template available in List ISOs"
)
iso_response = list_iso_response[0]
self.assertEqual(
iso_response.displaytext,
self.services["iso_2"]["displaytext"],
"Check display text of newly created ISO"
)
self.assertEqual(
iso_response.name,
self.services["iso_2"]["name"],
"Check name of newly created ISO"
)
self.assertEqual(
iso_response.zoneid,
self.services["iso_2"]["zoneid"],
"Check zone ID of newly created ISO"
)
return
class TestISO(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.services = Services().services
cls.api_client = fetch_api_client()
# Get Zone, Domain and templates
cls.zone = get_zone(cls.api_client, cls.services)
cls.services["iso_1"]["zoneid"] = cls.zone.id
cls.services["iso_2"]["zoneid"] = cls.zone.id
cls.services["sourcezoneid"] = cls.zone.id
#Create an account, ISOs etc.
cls.account = Account.create(
cls.api_client,
cls.services["account"],
)
cls.services["account"] = cls.account.account.name
cls.iso_1 = Iso.create(
cls.api_client,
cls.services["iso_1"],
account=cls.account.account.name,
domainid=cls.account.account.domainid
)
try:
cls.iso_1.download(cls.api_client)
except Exception as e:
raise Exception("Exception while downloading ISO %s: %s"\
% (cls.iso_1.id, e))
cls.iso_2 = Iso.create(
cls.api_client,
cls.services["iso_2"],
account=cls.account.account.name,
domainid=cls.account.account.domainid
)
try:
cls.iso_2.download(cls.api_client)
except Exception as e:
raise Exception("Exception while downloading ISO %s: %s"\
% (cls.iso_2.id, e))
cls._cleanup = [cls.account]
return
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
#Clean up, terminate the created templates
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = []
def tearDown(self):
try:
self.dbclient.close()
#Clean up, terminate the created ISOs, VMs
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def test_02_edit_iso(self):
"""Test Edit ISO
"""
# Validate the following:
# 1. UI should show the edited values for ISO
# 2. database (vm_template table) should have updated values
#Generate random values for updating ISO name and Display text
new_displayText = random_gen()
new_name = random_gen()
self.debug("Updating ISO permissions for ISO: %s" % self.iso_1.id)
cmd = updateIso.updateIsoCmd()
#Assign new values to attributes
cmd.id = self.iso_1.id
cmd.displaytext = new_displayText
cmd.name = new_name
cmd.bootable = self.services["bootable"]
cmd.passwordenabled = self.services["passwordenabled"]
cmd.ostypeid = self.services["ostypeid"]
self.apiclient.updateIso(cmd)
#Check whether attributes are updated in ISO using listIsos
list_iso_response = list_isos(
self.apiclient,
id=self.iso_1.id
)
self.assertEqual(
isinstance(list_iso_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_iso_response),
0,
"Check template available in List ISOs"
)
iso_response = list_iso_response[0]
self.assertEqual(
iso_response.displaytext,
new_displayText,
"Check display text of updated ISO"
)
self.assertEqual(
iso_response.name,
new_name,
"Check name of updated ISO"
)
self.assertEqual(
iso_response.bootable,
self.services["bootable"],
"Check if image is bootable of updated ISO"
)
self.assertEqual(
iso_response.ostypeid,
self.services["ostypeid"],
"Check OSTypeID of updated ISO"
)
return
def test_03_delete_iso(self):
"""Test delete ISO
"""
# Validate the following:
# 1. UI should not show the deleted ISP
# 2. database (vm_template table) should not contain deleted ISO
self.debug("Deleting ISO with ID: %s" % self.iso_1.id)
self.iso_1.delete(self.apiclient)
# Sleep to ensure that ISO state is reflected in other calls
time.sleep(self.services["sleep"])
#ListIsos to verify deleted ISO is properly deleted
list_iso_response = list_isos(
self.apiclient,
id=self.iso_1.id
)
self.assertEqual(
list_iso_response,
None,
"Check if ISO exists in ListIsos"
)
return
def test_04_extract_Iso(self):
"Test for extract ISO"
# Validate the following
# 1. Admin should able extract and download the ISO
# 2. ListIsos should display all the public templates
# for all kind of users
# 3 .ListIsos should not display the system templates
self.debug("Extracting ISO with ID: %s" % self.iso_2.id)
cmd = extractIso.extractIsoCmd()
cmd.id = self.iso_2.id
cmd.mode = self.services["iso_2"]["mode"]
cmd.zoneid = self.services["iso_2"]["zoneid"]
list_extract_response = self.apiclient.extractIso(cmd)
try:
#Format URL to ASCII to retrieve response code
formatted_url = urllib.unquote_plus(list_extract_response.url)
url_response = urllib.urlopen(formatted_url)
response_code = url_response.getcode()
except Exception:
self.fail(
"Extract ISO Failed with invalid URL %s (ISO id: %s)" \
% (formatted_url, self.iso_2.id)
)
self.assertEqual(
list_extract_response.id,
self.iso_2.id,
"Check ID of the downloaded ISO"
)
self.assertEqual(
list_extract_response.extractMode,
self.services["iso_2"]["mode"],
"Check mode of extraction"
)
self.assertEqual(
list_extract_response.zoneid,
self.services["iso_2"]["zoneid"],
"Check zone ID of extraction"
)
self.assertEqual(
response_code,
200,
"Check for a valid response of download URL"
)
return
def test_05_iso_permissions(self):
"""Update & Test for ISO permissions"""
# validate the following
# 1. listIsos returns valid permissions set for ISO
# 2. permission changes should be reflected in vm_template
# table in database
self.debug("Updating permissions for ISO: %s" % self.iso_2.id)
cmd = updateIsoPermissions.updateIsoPermissionsCmd()
cmd.id = self.iso_2.id
#Update ISO permissions
cmd.isfeatured = self.services["isfeatured"]
cmd.ispublic = self.services["ispublic"]
cmd.isextractable = self.services["isextractable"]
self.apiclient.updateIsoPermissions(cmd)
#Verify ListIsos have updated permissions for the ISO for normal user
list_iso_response = list_isos(
self.apiclient,
id=self.iso_2.id,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
isinstance(list_iso_response, list),
True,
"Check list response returns a valid list"
)
iso_response = list_iso_response[0]
self.assertEqual(
iso_response.id,
self.iso_2.id,
"Check ISO ID"
)
self.assertEqual(
iso_response.ispublic,
self.services["ispublic"],
"Check ispublic permission of ISO"
)
self.assertEqual(
iso_response.isfeatured,
self.services["isfeatured"],
"Check isfeatured permission of ISO"
)
return
def test_06_copy_iso(self):
"""Test for copy ISO from one zone to another"""
#Validate the following
#1. copy ISO should be successful and secondary storage
# should contain new copied ISO.
self.debug("Copy ISO from %s to %s" % (
self.zone.id,
self.services["destzoneid"]
))
cmd = copyIso.copyIsoCmd()
cmd.id = self.iso_2.id
cmd.destzoneid = self.services["destzoneid"]
cmd.sourcezoneid = self.zone.id
self.apiclient.copyIso(cmd)
#Verify ISO is copied to another zone using ListIsos
list_iso_response = list_isos(
self.apiclient,
id=self.iso_2.id,
zoneid=self.services["destzoneid"]
)
self.assertEqual(
isinstance(list_iso_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_iso_response),
0,
"Check template extracted in List ISO"
)
iso_response = list_iso_response[0]
self.assertEqual(
iso_response.id,
self.iso_2.id,
"Check ID of the downloaded ISO"
)
self.assertEqual(
iso_response.zoneid,
self.services["destzoneid"],
"Check zone ID of the copied ISO"
)
self.debug("Cleanup copied ISO: %s" % iso_response.id)
# Cleanup- Delete the copied ISO
cmd = deleteIso.deleteIsoCmd()
cmd.id = iso_response.id
cmd.zoneid = self.services["destzoneid"]
self.apiclient.deleteIso(cmd)
return

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,231 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2012 Citrix. All rights reserved.
#
""" BVT tests for Primary Storage
"""
#Import Local Modules
from cloudstackTestCase import *
from cloudstackAPI import *
from testcase.libs.utils import *
from testcase.libs.base import *
from testcase.libs.common import *
#Import System modules
import time
class Services:
"""Test Primary storage Services
"""
def __init__(self):
self.services = {
"nfs": {
0: {
"url": "nfs://192.168.100.131/testprimary",
# Format: File_System_Type/Location/Path
"name": "Primary XEN",
"hypervisor": 'XEN',
},
1: {
"url": "nfs://192.168.100.131/Primary",
"name": "Primary KVM",
"hypervisor": 'KVM',
},
2: {
"url": "nfs://192.168.100.131/Primary",
"name": "Primary VMWare",
"hypervisor": 'VMWare',
},
},
"iscsi": {
0: {
"url": "iscsi://192.168.100.21/iqn.2012-01.localdomain.clo-cstack-cos6:iser/1",
# Format : iscsi://IP Address/IQN number/LUN#
"name": "Primary iSCSI",
"hypervisor": 'XEN',
},
},
"zoneid": '5894a264-12b0-4257-a316-06c831bcf8e2',
# Optional, if specified the mentioned zone will be
# used for tests
}
class TestPrimaryStorageServices(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.services = Services().services
self.cleanup = []
# Get Zone and pod
self.zone = get_zone(self.apiclient, self.services)
self.pod = get_pod(self.apiclient, self.zone.id)
return
def tearDown(self):
try:
#Clean up, terminate the created templates
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def test_01_primary_storage(self):
"""Test primary storage pools - XEN, KVM, VMWare
"""
# Validate the following:
# 1. verify hypervisortype returned by api is Xen/KVM/VMWare
# 2. verify that the cluster is in 'Enabled' allocation state
# 3. verify that the host is added successfully and
# in Up state with listHosts api response
#Create NFS storage pools with on XEN/KVM/VMWare clusters
for k, v in self.services["nfs"].items():
clusters = list_clusters(
self.apiclient,
zoneid=self.zone.id,
hypervisortype=v["hypervisor"]
)
self.assertEqual(
isinstance(clusters, list),
True,
"Check list response returns a valid list"
)
cluster = clusters[0]
#Host should be present before adding primary storage
list_hosts_response = list_hosts(
self.apiclient,
clusterid=cluster.id
)
self.assertEqual(
isinstance(list_hosts_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_hosts_response),
0,
"Check list Hosts for hypervisor: " + v["hypervisor"]
)
storage = StoragePool.create(self.apiclient,
v,
clusterid=cluster.id,
zoneid=self.zone.id,
podid=self.pod.id
)
self.cleanup.append(storage)
self.debug("Created storage pool in cluster: %s" % cluster.id)
self.assertEqual(
storage.state,
'Up',
"Check primary storage state for hypervisor: " + v["hypervisor"]
)
self.assertEqual(
storage.type,
'NetworkFilesystem',
"Check storage pool type for hypervisor : " + v["hypervisor"]
)
#Verify List Storage pool Response has newly added storage pool
storage_pools_response = list_storage_pools(
self.apiclient,
id=storage.id,
)
self.assertEqual(
isinstance(storage_pools_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(storage_pools_response),
0,
"Check list Hosts response"
)
storage_response = storage_pools_response[0]
self.assertEqual(
storage_response.id,
storage.id,
"Check storage pool ID for hypervisor: " + v["hypervisor"]
)
self.assertEqual(
storage.type,
storage_response.type,
"Check storage pool type for hypervisor: " + v["hypervisor"]
)
# Call cleanup for reusing primary storage
cleanup_resources(self.apiclient, self.cleanup)
self.cleanup = []
# Create iSCSI storage pools with on XEN/KVM clusters
for k, v in self.services["iscsi"].items():
clusters = list_clusters(
self.apiclient,
zoneid=self.zone.id,
hypervisortype=v["hypervisor"]
)
self.assertEqual(
isinstance(clusters, list),
True,
"Check list response returns a valid list"
)
cluster = clusters[0]
storage = StoragePool.create(self.apiclient,
v,
clusterid=cluster.id,
zoneid=self.zone.id,
podid=self.pod.id
)
self.cleanup.append(storage)
self.debug("Created iSCSI storage pool in cluster: %s" % cluster.id)
self.assertEqual(
storage.state,
'Up',
"Check primary storage state for hypervisor: " + v["hypervisor"]
)
#Verify List Storage pool Response has newly added storage pool
storage_pools_response = list_storage_pools(
self.apiclient,
id=storage.id,
)
self.assertEqual(
isinstance(storage_pools_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(storage_pools_response),
0,
"Check Hosts response for hypervisor: " + v["hypervisor"]
)
storage_response = storage_pools_response[0]
self.assertEqual(
storage_response.id,
storage.id,
"Check storage pool ID for hypervisor: " + v["hypervisor"]
)
self.assertEqual(
storage.type,
storage_response.type,
"Check storage pool type hypervisor: " + v["hypervisor"]
)
# Call cleanup for reusing primary storage
cleanup_resources(self.apiclient, self.cleanup)
self.cleanup = []
return

View File

@ -0,0 +1,788 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2012 Citrix. All rights reserved.
#
""" BVT tests for routers
"""
#Import Local Modules
from cloudstackTestCase import *
from cloudstackAPI import *
import remoteSSHClient
from testcase.libs.utils import *
from testcase.libs.base import *
from testcase.libs.common import *
#Import System modules
import time
class Services:
"""Test router Services
"""
def __init__(self):
self.services = {
"service_offering": {
"name": "Tiny Instance",
"displaytext": "Tiny Instance",
"cpunumber": 1,
"cpuspeed": 100, # in MHz
"memory": 64, # In MBs
},
"virtual_machine":
{
"displayname": "Test VM",
"username": "root",
"password": "fr3sca",
"ssh_port": 22,
"hypervisor": 'XenServer',
"domainid": '9ee36d2e-8b8f-432e-a927-a678ebec1d6b',
"privateport": 22,
"publicport": 22,
"protocol": 'TCP',
},
"account": {
"email": "test@test.com",
"firstname": "Test",
"lastname": "User",
"username": "testuser",
"password": "fr3sca",
},
"ostypeid":'0c2c5d19-525b-41be-a8c3-c6607412f82b',
"sleep": 60,
"timeout": 10,
"zoneid": '4a6c0290-e64d-40fc-afbb-4a05cab6fa4b',
# Optional, if specified the mentioned zone will be
# used for tests
"mode": 'advanced', #Networking mode: Basic, Advanced
}
class TestRouterServices(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.services = Services().services
# Get Zone, Domain and templates
cls.zone = get_zone(cls.api_client, cls.services)
template = get_template(
cls.api_client,
cls.zone.id,
cls.services["ostypeid"]
)
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
#Create an account, network, VM and IP addresses
cls.account = Account.create(
cls.api_client,
cls.services["account"]
)
cls.service_offering = ServiceOffering.create(
cls.api_client,
cls.services["service_offering"]
)
cls.vm_1 = VirtualMachine.create(
cls.api_client,
cls.services["virtual_machine"],
templateid=template.id,
accountid=cls.account.account.name,
serviceofferingid=cls.service_offering.id
)
cls.cleanup = [
cls.vm_1,
cls.account,
cls.service_offering
]
return
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
#Clean up, terminate the created templates
cleanup_resources(cls.api_client, cls.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def setUp(self):
self.apiclient = self.testClient.getApiClient()
return
def test_01_router_internal_basic(self):
"""Test router internal basic zone
"""
# Validate the following
# 1. Router only does dhcp
# 2. Verify that ports 67 (DHCP) and 53 (DNS) are open on UDP
# by checking status of dnsmasq process
# Find router associated with user account
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
isinstance(list_router_response, list),
True,
"Check list response returns a valid list"
)
router = list_router_response[0]
hosts = list_hosts(
self.apiclient,
zoneid=router.zoneid,
type='Routing',
state='Up'
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list host returns a valid list"
)
host = hosts[0]
self.debug("Router ID: %s, state: %s" % (router.id, router.state))
self.assertEqual(
router.state,
'Running',
"Check list router response for router state"
)
result = get_process_status(
host.ipaddress,
self.services['virtual_machine']["publicport"],
self.vm_1.username,
self.vm_1.password,
router.linklocalip,
"service dnsmasq status"
)
res = str(result)
self.debug("Dnsmasq process status: %s" % res)
self.assertEqual(
res.count("running"),
1,
"Check dnsmasq service is running or not"
)
return
def test_02_router_internal_adv(self):
"""Test router internal advanced zone
"""
# Validate the following
# 1. Router does dhcp, dns, gateway, LB, PF, FW
# 2. verify that dhcp, dns ports are open on UDP
# 3. dnsmasq, haproxy processes should be running
# Find router associated with user account
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
isinstance(list_router_response, list),
True,
"Check list response returns a valid list"
)
router = list_router_response[0]
hosts = list_hosts(
self.apiclient,
zoneid=router.zoneid,
type='Routing',
state='Up'
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list response returns a valid list"
)
host = hosts[0]
self.debug("Router ID: %s, state: %s" % (router.id, router.state))
self.assertEqual(
router.state,
'Running',
"Check list router response for router state"
)
result = get_process_status(
host.ipaddress,
self.services['virtual_machine']["publicport"],
self.vm_1.username,
self.vm_1.password,
router.linklocalip,
"service dnsmasq status"
)
res = str(result)
self.debug("Dnsmasq process status: %s" % res)
self.assertEqual(
res.count("running"),
1,
"Check dnsmasq service is running or not"
)
result = get_process_status(
host.ipaddress,
self.services['virtual_machine']["publicport"],
self.vm_1.username,
self.vm_1.password,
router.linklocalip,
"service haproxy status"
)
res = str(result)
self.assertEqual(
res.count("running"),
1,
"Check haproxy service is running or not"
)
self.debug("Haproxy process status: %s" % res)
return
def test_03_restart_network_cleanup(self):
"""Test restart network
"""
# Validate the following
# 1. When cleanup = true, router is destroyed and a new one created
# 2. New router will have new publicIp and linkLocalIp and
# all it's services should resume
# Find router associated with user account
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
isinstance(list_router_response, list),
True,
"Check list response returns a valid list"
)
router = list_router_response[0]
#Store old values before restart
old_linklocalip = router.linklocalip
timeout = 10
# Network should be in Implemented or Setup stage before restart
while True:
networks = list_networks(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
isinstance(networks, list),
True,
"Check list response returns a valid list"
)
network = networks[0]
if network.state in ["Implemented", "Setup"]:
break
elif timeout == 0:
break
else:
time.sleep(self.services["sleep"])
timeout = timeout - 1
self.debug(
"Restarting network with ID: %s, Network state: %s" % (
network.id,
network.state
))
cmd = restartNetwork.restartNetworkCmd()
cmd.id = network.id
cmd.cleanup = True
self.apiclient.restartNetwork(cmd)
# Get router details after restart
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
isinstance(list_router_response, list),
True,
"Check list response returns a valid list"
)
router = list_router_response[0]
self.assertNotEqual(
router.linklocalip,
old_linklocalip,
"Check link-local IP after restart"
)
return
def test_04_restart_network_wo_cleanup(self):
"""Test restart network without cleanup
"""
# Validate the following
# 1. When cleanup = false, router is restarted and
# all services inside the router are restarted
# 2. check 'uptime' to see if the actual restart happened
timeout = 10
# Network should be in Implemented or Setup stage before restart
while True:
networks = list_networks(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
isinstance(networks, list),
True,
"Check list response returns a valid list"
)
network = networks[0]
if network.state in ["Implemented", "Setup"]:
break
elif timeout == 0:
break
else:
time.sleep(self.services["sleep"])
timeout = timeout - 1
self.debug(
"Restarting network with ID: %s, Network state: %s" % (
network.id,
network.state
))
cmd = restartNetwork.restartNetworkCmd()
cmd.id = network.id
cmd.cleanup = False
self.apiclient.restartNetwork(cmd)
# Get router details after restart
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
isinstance(list_router_response, list),
True,
"Check list response returns a valid list"
)
router = list_router_response[0]
hosts = list_hosts(
self.apiclient,
zoneid=router.zoneid,
type='Routing',
state='Up'
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list response returns a valid list"
)
host = hosts[0]
res = get_process_status(
host.ipaddress,
self.services['virtual_machine']["publicport"],
self.vm_1.username,
self.vm_1.password,
router.linklocalip,
"uptime"
)
# res = 12:37:14 up 1 min, 0 users, load average: 0.61, 0.22, 0.08
# Split result to check the uptime
result = res[0].split()
self.debug("Router Uptime: %s" % result)
self.assertEqual(
str(result[1]),
'up',
"Check router is running or not"
)
if str(result[3]) == "min,":
self.assertEqual(
(int(result[2]) < 3),
True,
"Check uptime is less than 3 mins or not"
)
else:
self.assertEqual(
str(result[3]),
'sec,',
"Check uptime is in seconds"
)
return
def test_05_router_basic(self):
"""Test router basic setup
"""
# Validate the following:
# 1. verify that listRouters returned a 'Running' router
# 2. router will have dns same as that seen in listZones
# 3. router will have a guestIP and a linkLocalIp"
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
isinstance(list_router_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_router_response),
0,
"Check list router response"
)
for router in list_router_response:
self.assertEqual(
router.state,
'Running',
"Check list router response for router state"
)
zones = list_zones(
self.apiclient,
id=router.zoneid
)
self.assertEqual(
isinstance(zones, list),
True,
"Check list response returns a valid list"
)
zone = zones[0]
self.assertEqual(
router.dns1,
zone.dns1,
"Compare DNS1 of router and zone"
)
self.assertEqual(
router.dns2,
zone.dns2,
"Compare DNS2 of router and zone"
)
self.assertEqual(
hasattr(router, 'guestipaddress'),
True,
"Check whether router has guest IP field"
)
self.assertEqual(
hasattr(router, 'linklocalip'),
True,
"Check whether router has link local IP field"
)
return
def test_06_router_advanced(self):
"""Test router advanced setup
"""
# Validate the following
# 1. verify that listRouters returned a 'Running' router
# 2. router will have dns and gateway as in listZones, listVlanIpRanges
# 3. router will have guest,public and linklocal IPs
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
isinstance(list_router_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_router_response),
0,
"Check list router response"
)
for router in list_router_response:
self.assertEqual(
router.state,
'Running',
"Check list router response for router state"
)
zones = list_zones(
self.apiclient,
id=router.zoneid
)
self.assertEqual(
isinstance(zones, list),
True,
"Check list response returns a valid list"
)
zone = zones[0]
self.assertEqual(
router.dns1,
zone.dns1,
"Compare DNS1 of router and zone"
)
self.assertEqual(
router.dns2,
zone.dns2,
"Compare DNS2 of router and zone"
)
self.assertEqual(
hasattr(router, 'guestipaddress'),
True,
"Check whether router has guest IP field"
)
self.assertEqual(
hasattr(router, 'linklocalip'),
True,
"Check whether router has link local IP field"
)
#Fetch corresponding ip ranges information from listVlanIpRanges
ipranges_response = list_vlan_ipranges(
self.apiclient,
zoneid=router.zoneid
)
self.assertEqual(
isinstance(ipranges_response, list),
True,
"Check list response returns a valid list"
)
iprange = ipranges_response[0]
self.assertEqual(
router.gateway,
iprange.gateway,
"Check gateway with that of corresponding IP range"
)
return
def test_07_stop_router(self):
"""Test stop router
"""
# Validate the following
# 1. listRouter should report the router for the account as stopped
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
isinstance(list_router_response, list),
True,
"Check list response returns a valid list"
)
router = list_router_response[0]
self.debug("Stopping the router with ID: %s" % router.id)
#Stop the router
cmd = stopRouter.stopRouterCmd()
cmd.id = router.id
self.apiclient.stopRouter(cmd)
#List routers to check state of router
router_response = list_routers(
self.apiclient,
id=router.id
)
self.assertEqual(
isinstance(router_response, list),
True,
"Check list response returns a valid list"
)
#List router should have router in stopped state
self.assertEqual(
router_response[0].state,
'Stopped',
"Check list router response for router state"
)
return
def test_08_start_router(self):
"""Test start router
"""
# Validate the following
# 1. listRouter should report the router for the account as stopped
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
isinstance(list_router_response, list),
True,
"Check list response returns a valid list"
)
router = list_router_response[0]
self.debug("Starting the router with ID: %s" % router.id)
#Start the router
cmd = startRouter.startRouterCmd()
cmd.id = router.id
self.apiclient.startRouter(cmd)
#List routers to check state of router
router_response = list_routers(
self.apiclient,
id=router.id
)
self.assertEqual(
isinstance(router_response, list),
True,
"Check list response returns a valid list"
)
#List router should have router in running state
self.assertEqual(
router_response[0].state,
'Running',
"Check list router response for router state"
)
return
def test_09_reboot_router(self):
"""Test reboot router
"""
# Validate the following
# 1. listRouter should report the router for the account as stopped
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
isinstance(list_router_response, list),
True,
"Check list response returns a valid list"
)
router = list_router_response[0]
public_ip = router.publicip
self.debug("Rebooting the router with ID: %s" % router.id)
#Reboot the router
cmd = rebootRouter.rebootRouterCmd()
cmd.id = router.id
self.apiclient.rebootRouter(cmd)
#List routers to check state of router
router_response = list_routers(
self.apiclient,
id=router.id
)
self.assertEqual(
isinstance(router_response, list),
True,
"Check list response returns a valid list"
)
#List router should have router in running state and same public IP
self.assertEqual(
router_response[0].state,
'Running',
"Check list router response for router state"
)
self.assertEqual(
router_response[0].publicip,
public_ip,
"Check list router response for router public IP"
)
return
def test_10_network_gc(self):
"""Test network GC
"""
# Validate the following
# 1. stop All User VMs in the account
# 2. wait for network.gc.interval time"
# 3. After network.gc.interval, router should be stopped
# 4. ListRouters should return the router in Stopped state
list_vms = list_virtual_machines(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
isinstance(list_vms, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_vms),
0,
"Check length of list VM response"
)
for vm in list_vms:
self.debug("Stopping the VM with ID: %s" % vm.id)
# Stop all virtual machines associated with that account
cmd = stopVirtualMachine.stopVirtualMachineCmd()
cmd.id = vm.id
self.apiclient.stopVirtualMachine(cmd)
config = list_configurations(
self.apiclient,
name='network.gc.interval'
)
self.assertEqual(
isinstance(config, list),
True,
"Check list response returns a valid list"
)
response = config[0]
# Wait for network.gc.interval * 3 time
time.sleep(int(response.value) * 3)
timeout = self.services["timeout"]
while True:
#Check status of network router
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
if isinstance(list_router_response, list):
break
elif timeout == 0:
raise Exception("List router call failed!")
time.sleep(5)
timeout = timeout -1
self.assertEqual(
isinstance(list_router_response, list),
True,
"Check list response returns a valid list"
)
router = list_router_response[0]
self.debug("Router state after network.gc.interval: %s" % router.state)
self.assertEqual(
router.state,
'Stopped',
"Check state of the router after stopping all VMs associated"
)
return

View File

@ -0,0 +1,373 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2012 Citrix. All rights reserved.
#
""" BVT tests for Secondary Storage
"""
#Import Local Modules
from cloudstackTestCase import *
from cloudstackAPI import *
from testcase.libs.utils import *
from testcase.libs.base import *
from testcase.libs.common import *
#Import System modules
import time
class Services:
"""Test secondary storage Services
"""
def __init__(self):
self.services = {
"storage": {
"url": "nfs://192.168.100.131/SecStorage"
# Format: File_System_Type/Location/Path
},
"hypervisors": {
0: {
"hypervisor": "XenServer",
"templatefilter": "self",
},
1: {
"hypervisor": "KVM",
"templatefilter": "self",
},
2: {
"hypervisor": "VMWare",
"templatefilter": "self",
},
},
"sleep": 60,
"timeout": 5,
"zoneid": '4a6c0290-e64d-40fc-afbb-4a05cab6fa4b',
# Optional, if specified the mentioned zone will be
# used for tests
"domainid": '9ee36d2e-8b8f-432e-a927-a678ebec1d6b',
}
class TestSecStorageServices(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.services = Services().services
cls._cleanup = []
return
@classmethod
def tearDownClass(cls):
try:
#Cleanup resources used
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.cleanup = []
self.services = Services().services
# Get Zone and pod
self.zone = get_zone(self.apiclient, self.services)
self.pod = get_pod(self.apiclient, self.zone.id)
return
def tearDown(self):
try:
#Clean up, terminate the created templates
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def test_01_add_sec_storage(self):
"""Test secondary storage
"""
# Validate the following:
# 1. secondary storage should be added to the zone.
# 2. Verify with listHosts and type secondarystorage
cmd = addSecondaryStorage.addSecondaryStorageCmd()
cmd.zoneid = self.zone.id
cmd.url = self.services["storage"]["url"]
sec_storage = self.apiclient.addSecondaryStorage(cmd)
self.debug("Added secondary storage to zone: %s" % self.zone.id)
# Cleanup at the end
self._cleanup.append(sec_storage)
self.assertEqual(
sec_storage.zoneid,
self.zone.id,
"Check zoneid where sec storage is added"
)
list_hosts_response = list_hosts(
self.apiclient,
type='SecondaryStorage',
id=sec_storage.id
)
self.assertEqual(
isinstance(list_hosts_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_hosts_response),
0,
"Check list Hosts response"
)
host_response = list_hosts_response[0]
#Check if host is Up and running
self.assertEqual(
host_response.id,
sec_storage.id,
"Check ID of secondary storage"
)
self.assertEqual(
sec_storage.type,
host_response.type,
"Check type of host from list hosts response"
)
return
def test_02_sys_vm_start(self):
"""Test system VM start
"""
# 1. verify listHosts has all 'routing' hosts in UP state
# 2. verify listStoragePools shows all primary storage pools
# in UP state
# 3. verify that secondary storage was added successfully
list_hosts_response = list_hosts(
self.apiclient,
type='Routing',
zoneid=self.zone.id,
podid=self.pod.id
)
self.assertEqual(
isinstance(list_hosts_response, list),
True,
"Check list response returns a valid list"
)
# ListHosts has all 'routing' hosts in UP state
self.assertNotEqual(
len(list_hosts_response),
0,
"Check list host response"
)
for host in list_hosts_response:
self.assertEqual(
host.state,
'Up',
"Check state of routing hosts is Up or not"
)
# ListStoragePools shows all primary storage pools in UP state
list_storage_response = list_storage_pools(
self.apiclient,
zoneid=self.zone.id,
podid=self.pod.id
)
self.assertEqual(
isinstance(list_storage_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_storage_response),
0,
"Check list storage pools response"
)
for primary_storage in list_hosts_response:
self.assertEqual(
primary_storage.state,
'Up',
"Check state of primary storage pools is Up or not"
)
# Secondary storage is added successfully
timeout = self.services["timeout"]
while True:
list_hosts_response = list_hosts(
self.apiclient,
type='SecondaryStorage',
zoneid=self.zone.id,
)
if not isinstance(list_hosts_response, list):
# Sleep to ensure Secondary storage is Up
time.sleep(int(self.services["sleep"]))
timeout = timeout - 1
elif timeout == 0 or isinstance(list_hosts_response, list):
break
self.assertEqual(
isinstance(list_hosts_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_hosts_response),
0,
"Check list Hosts response"
)
host_response = list_hosts_response[0]
#Check if host is Up and running
self.assertEqual(
host_response.state,
'Up',
"Check state of secondary storage"
)
self.debug("Checking SSVM status in zone: %s" % self.zone.id)
timeout = self.services["timeout"]
while True:
list_ssvm_response = list_ssvms(
self.apiclient,
systemvmtype='secondarystoragevm',
zoneid=self.zone.id,
podid=self.pod.id
)
if not isinstance(list_ssvm_response, list):
# Sleep to ensure SSVMs are Up and Running
time.sleep(int(self.services["sleep"]))
timeout = timeout - 1
elif timeout == 0 or isinstance(list_ssvm_response, list):
break
self.assertEqual(
isinstance(list_ssvm_response, list),
True,
"Check list response returns a valid list"
)
#Verify SSVM response
self.assertNotEqual(
len(list_ssvm_response),
0,
"Check list System VMs response"
)
for ssvm in list_ssvm_response:
self.assertEqual(
ssvm.state,
'Running',
"Check whether state of SSVM is running"
)
return
def test_03_sys_template_ready(self):
"""Test system templates are ready
"""
# Validate the following
# If SSVM is in UP state and running
# 1. wait for listTemplates to show all builtin templates
# downloaded for all added hypervisors and in “Ready” state"
for k, v in self.services["hypervisors"].items():
self.debug("Downloading BUILTIN templates in zone: %s" %
self.zone.id)
list_template_response = list_templates(
self.apiclient,
hypervisor=v["hypervisor"],
zoneid=self.zone.id,
templatefilter=v["templatefilter"],
listall=True,
account='system',
domainid=self.services["domainid"]
)
# Ensure all BUILTIN templates are downloaded
templateid = None
for template in list_template_response:
if template.templatetype == "BUILTIN":
templateid = template.id
# Wait to start a downloading of template
time.sleep(self.services["sleep"])
while True and (templateid != None):
timeout = self.services["timeout"]
while True:
template_response = list_templates(
self.apiclient,
id=templateid,
zoneid=self.zone.id,
templatefilter=v["templatefilter"],
listall=True,
account='system',
domainid=self.services["domainid"]
)
if isinstance(template_response, list):
template = template_response[0]
break
elif timeout == 0:
raise Exception("List template API call failed.")
time.sleep(1)
timeout = timeout - 1
# If template is ready,
# template.status = Download Complete
# Downloading - x% Downloaded
# Error - Any other string
if template.status == 'Download Complete' :
break
elif 'Downloaded' not in template.status.split():
raise Exception
elif 'Downloaded' in template.status.split():
time.sleep(self.services["sleep"])
#Ensuring the template is in ready state
time.sleep(self.services["sleep"])
timeout = self.services["timeout"]
while True:
template_response = list_templates(
self.apiclient,
id=templateid,
zoneid=self.zone.id,
templatefilter=v["templatefilter"],
listall=True,
account='system',
domainid=self.services["domainid"]
)
if isinstance(template_response, list):
template = template_response[0]
break
elif timeout == 0:
raise Exception("List template API call failed.")
time.sleep(1)
timeout = timeout - 1
self.assertEqual(
isinstance(template_response, list),
True,
"Check list response returns a valid list"
)
template = template_response[0]
self.assertEqual(
template.isready,
True,
"Check whether state of template is ready or not"
)
return

View File

@ -0,0 +1,229 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2012 Citrix. All rights reserved.
#
""" BVT tests for Service offerings"""
#Import Local Modules
from cloudstackTestCase import *
from cloudstackAPI import *
from testcase.libs.utils import *
from testcase.libs.base import *
from testcase.libs.common import *
class Services:
"""Test Service offerings Services
"""
def __init__(self):
self.services = {
"off":
{
"name": "Service Offering",
"displaytext": "Service Offering",
"cpunumber": 1,
"cpuspeed": 100, # MHz
"memory": 64, # in MBs
},
}
class TestCreateServiceOffering(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = []
self.services = Services().services
def tearDown(self):
try:
self.dbclient.close()
#Clean up, terminate the created templates
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def test_01_create_service_offering(self):
"""Test to create service offering"""
# Validate the following:
# 1. createServiceOfferings should return a valid information for newly created offering
# 2. The Cloud Database contains the valid information
service_offering = ServiceOffering.create(
self.apiclient,
self.services["off"]
)
self.cleanup.append(service_offering)
self.debug("Created service offering with ID: %s" % service_offering.id)
list_service_response = list_service_offering(
self.apiclient,
id=service_offering.id
)
self.assertEqual(
isinstance(list_service_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_service_response),
0,
"Check Service offering is created"
)
service_response = list_service_response[0]
self.assertEqual(
list_service_response[0].cpunumber,
self.services["off"]["cpunumber"],
"Check server id in createServiceOffering"
)
self.assertEqual(
list_service_response[0].cpuspeed,
self.services["off"]["cpuspeed"],
"Check cpuspeed in createServiceOffering"
)
self.assertEqual(
list_service_response[0].displaytext,
self.services["off"]["displaytext"],
"Check server displaytext in createServiceOfferings"
)
self.assertEqual(
list_service_response[0].memory,
self.services["off"]["memory"],
"Check memory in createServiceOffering"
)
self.assertEqual(
list_service_response[0].name,
self.services["off"]["name"],
"Check name in createServiceOffering"
)
return
class TestServiceOfferings(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = []
def tearDown(self):
try:
self.dbclient.close()
#Clean up, terminate the created templates
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
@classmethod
def setUpClass(cls):
cls.services = Services().services
cls.api_client = fetch_api_client()
cls.service_offering_1 = ServiceOffering.create(
cls.api_client,
cls.services["off"]
)
cls.service_offering_2 = ServiceOffering.create(
cls.api_client,
cls.services["off"]
)
cls._cleanup = [cls.service_offering_1]
return
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
#Clean up, terminate the created templates
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def test_02_edit_service_offering(self):
"""Test to update existing service offering"""
# Validate the following:
# 1. updateServiceOffering should return
# a valid information for newly created offering
#Generate new name & displaytext from random data
random_displaytext = random_gen()
random_name = random_gen()
self.debug("Updating service offering with ID: %s" %
self.service_offering_1.id)
cmd = updateServiceOffering.updateServiceOfferingCmd()
#Add parameters for API call
cmd.id = self.service_offering_1.id
cmd.displaytext = random_displaytext
cmd.name = random_name
self.apiclient.updateServiceOffering(cmd)
list_service_response = list_service_offering(
self.apiclient,
id=self.service_offering_1.id
)
self.assertEqual(
isinstance(list_service_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_service_response),
0,
"Check Service offering is updated"
)
self.assertEqual(
list_service_response[0].displaytext,
random_displaytext,
"Check server displaytext in updateServiceOffering"
)
self.assertEqual(
list_service_response[0].name,
random_name,
"Check server name in updateServiceOffering"
)
return
def test_03_delete_service_offering(self):
"""Test to delete service offering"""
# Validate the following:
# 1. deleteServiceOffering should return
# a valid information for newly created offering
self.debug("Deleting service offering with ID: %s" %
self.service_offering_2.id)
self.service_offering_2.delete(self.apiclient)
list_service_response = list_service_offering(
self.apiclient,
id=self.service_offering_2.id
)
self.assertEqual(
list_service_response,
None,
"Check if service offering exists in listDiskOfferings"
)
return

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,884 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2012 Citrix. All rights reserved.
#
""" BVT tests for SSVM
"""
#Import Local Modules
from cloudstackTestCase import *
from cloudstackAPI import *
import remoteSSHClient
from testcase.libs.utils import *
from testcase.libs.base import *
from testcase.libs.common import *
import telnetlib
#Import System modules
import time
class Services:
"""Test SSVM Services
"""
def __init__(self):
self.services = {
"host": {
"username": 'root', # Credentials for SSH
"password": 'fr3sca',
"publicport": 22,
},
"zoneid": '4a6c0290-e64d-40fc-afbb-4a05cab6fa4b',
# Optional, if specified the mentioned zone will be
# used for tests
"sleep": 60,
"timeout": 10,
}
class TestSSVMs(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.cleanup = []
self.services = Services().services
self.zone = get_zone(self.apiclient, self.services)
return
def tearDown(self):
try:
#Clean up, terminate the created templates
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def test_01_list_sec_storage_vm(self):
"""Test List secondary storage VMs
"""
# Validate the following:
# 1. listSystemVM (systemvmtype=secondarystoragevm)
# should return only ONE SSVM per zone
# 2. The returned SSVM should be in Running state
# 3. listSystemVM for secondarystoragevm should list publicip,
# privateip and link-localip
# 4. The gateway programmed on the ssvm by listSystemVm should be
# the same as the gateway returned by listVlanIpRanges
# 5. DNS entries must match those given for the zone
list_ssvm_response = list_ssvms(
self.apiclient,
systemvmtype='secondarystoragevm'
)
self.assertEqual(
isinstance(list_ssvm_response, list),
True,
"Check list response returns a valid list"
)
#Verify SSVM response
self.assertNotEqual(
len(list_ssvm_response),
0,
"Check list System VMs response"
)
list_zones_response = list_zones(self.apiclient)
self.assertEqual(
isinstance(list_zones_response, list),
True,
"Check list response returns a valid list"
)
# Number of Sec storage VMs = No of Zones
self.assertEqual(
len(list_ssvm_response),
len(list_zones_response),
"Check number of SSVMs with number of zones"
)
#For each secondary storage VM check private IP,
#public IP, link local IP and DNS
for ssvm in list_ssvm_response:
self.assertEqual(
ssvm.state,
'Running',
"Check whether state of SSVM is running"
)
self.assertEqual(
hasattr(ssvm, 'privateip'),
True,
"Check whether SSVM has private IP field"
)
self.assertEqual(
hasattr(ssvm, 'linklocalip'),
True,
"Check whether SSVM has link local IP field"
)
self.assertEqual(
hasattr(ssvm, 'publicip'),
True,
"Check whether SSVM has public IP field"
)
#Fetch corresponding ip ranges information from listVlanIpRanges
ipranges_response = list_vlan_ipranges(
self.apiclient,
zoneid=ssvm.zoneid
)
self.assertEqual(
isinstance(ipranges_response, list),
True,
"Check list response returns a valid list"
)
iprange = ipranges_response[0]
self.assertEqual(
ssvm.gateway,
iprange.gateway,
"Check gateway with that of corresponding ip range"
)
#Fetch corresponding zone information from listZones
zone_response = list_zones(
self.apiclient,
id=ssvm.zoneid
)
self.assertEqual(
isinstance(zone_response, list),
True,
"Check list response returns a valid list"
)
self.assertEqual(
ssvm.dns1,
zone_response[0].dns1,
"Check DNS1 with that of corresponding zone"
)
self.assertEqual(
ssvm.dns2,
zone_response[0].dns2,
"Check DNS2 with that of corresponding zone"
)
return
def test_02_list_cpvm_vm(self):
"""Test List console proxy VMs
"""
# Validate the following:
# 1. listSystemVM (systemvmtype=consoleproxy) should return
# at least ONE CPVM per zone
# 2. The returned ConsoleProxyVM should be in Running state
# 3. listSystemVM for console proxy should list publicip, privateip
# and link-localip
# 4. The gateway programmed on the console proxy should be the same
# as the gateway returned by listZones
# 5. DNS entries must match those given for the zone
list_cpvm_response = list_ssvms(
self.apiclient,
systemvmtype='consoleproxy'
)
self.assertEqual(
isinstance(list_cpvm_response, list),
True,
"Check list response returns a valid list"
)
#Verify CPVM response
self.assertNotEqual(
len(list_cpvm_response),
0,
"Check list System VMs response"
)
list_zones_response = list_zones(self.apiclient)
# Number of Console Proxy VMs = No of Zones
self.assertEqual(
isinstance(list_zones_response, list),
True,
"Check list response returns a valid list"
)
self.assertEqual(
len(list_cpvm_response),
len(list_zones_response),
"Check number of CPVMs with number of zones"
)
#For each CPVM check private IP, public IP, link local IP and DNS
for cpvm in list_cpvm_response:
self.assertEqual(
cpvm.state,
'Running',
"Check whether state of CPVM is running"
)
self.assertEqual(
hasattr(cpvm, 'privateip'),
True,
"Check whether CPVM has private IP field"
)
self.assertEqual(
hasattr(cpvm, 'linklocalip'),
True,
"Check whether CPVM has link local IP field"
)
self.assertEqual(
hasattr(cpvm, 'publicip'),
True,
"Check whether CPVM has public IP field"
)
#Fetch corresponding ip ranges information from listVlanIpRanges
ipranges_response = list_vlan_ipranges(
self.apiclient,
zoneid=cpvm.zoneid
)
self.assertEqual(
isinstance(ipranges_response, list),
True,
"Check list response returns a valid list"
)
iprange = ipranges_response[0]
self.assertEqual(
cpvm.gateway,
iprange.gateway,
"Check gateway with that of corresponding ip range"
)
#Fetch corresponding zone information from listZones
zone_response = list_zones(
self.apiclient,
id=cpvm.zoneid
)
self.assertEqual(
cpvm.dns1,
zone_response[0].dns1,
"Check DNS1 with that of corresponding zone"
)
self.assertEqual(
cpvm.dns2,
zone_response[0].dns2,
"Check DNS2 with that of corresponding zone"
)
return
def test_03_ssvm_internals(self):
"""Test SSVM Internals"""
# Validate the following
# 1. The SSVM check script should not return any
# WARN|ERROR|FAIL messages
# 2. If you are unable to login to the SSVM with the signed key
# then test is deemed a failure
# 3. There should be only one ""cloud"" process running within the SSVM
# 4. If no process is running/multiple process are running
# then the test is a failure
hosts = list_hosts(
self.apiclient,
zoneid=self.zone.id,
type='Routing',
state='Up'
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list response returns a valid list"
)
host = hosts[0]
list_ssvm_response = list_ssvms(
self.apiclient,
systemvmtype='secondarystoragevm',
hostid=host.id
)
self.assertEqual(
isinstance(list_ssvm_response, list),
True,
"Check list response returns a valid list"
)
ssvm = list_ssvm_response[0]
self.debug("Checking cloud process status")
result = get_process_status(
host.ipaddress,
self.services['host']["publicport"],
self.services['host']["username"],
self.services['host']["password"],
ssvm.linklocalip,
"/usr/local/cloud/systemvm/ssvm-check.sh |grep -e ERROR -e WARNING -e FAIL"
)
res = str(result)
self.assertEqual(
res.count("ERROR"),
1,
"Check for Errors in tests"
)
self.assertEqual(
res.count("WARNING"),
1,
"Check for warnings in tests"
)
#Check status of cloud service
result = get_process_status(
host.ipaddress,
self.services['host']["publicport"],
self.services['host']["username"],
self.services['host']["password"],
ssvm.linklocalip,
"service cloud status"
)
res = str(result)
# cloud.com service (type=secstorage) is running: process id: 2346
self.assertEqual(
res.count("is running"),
1,
"Check cloud service is running or not"
)
return
def test_04_cpvm_internals(self):
"""Test CPVM Internals"""
# Validate the following
# 1. test that telnet access on 8250 is available to
# the management server for the CPVM
# 2. No telnet access, test FAIL
# 3. Service cloud status should report cloud agent status to be
# running
hosts = list_hosts(
self.apiclient,
zoneid=self.zone.id,
type='Routing',
state='Up'
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list response returns a valid list"
)
host = hosts[0]
list_cpvm_response = list_ssvms(
self.apiclient,
systemvmtype='consoleproxy',
hostid=host.id
)
self.assertEqual(
isinstance(list_cpvm_response, list),
True,
"Check list response returns a valid list"
)
cpvm = list_cpvm_response[0]
try:
telnet = telnetlib.Telnet(
str(self.apiclient.connection.mgtSvr),
'8250'
)
except Exception as e:
self.fail(
"Telnet Access failed for %s: %s" % \
(self.apiclient.connection.mgtSvr, e)
)
self.debug("Checking cloud process status")
result = get_process_status(
host.ipaddress,
self.services['host']["publicport"],
self.services['host']["username"],
self.services['host']["password"],
cpvm.linklocalip,
"service cloud status"
)
res = str(result)
self.assertEqual(
res.count("is running"),
1,
"Check cloud service is running or not"
)
return
def test_05_stop_ssvm(self):
"""Test stop SSVM
"""
# Validate the following
# 1. The SSVM should go to stop state
# 2. After a brief delay of say one minute, the SSVM should be
# restarted once again and return to Running state with previous two
# test cases still passing
# 3. If either of the two above steps fail the test is a failure
hosts = list_hosts(
self.apiclient,
zoneid=self.zone.id,
type='Routing',
state='Up'
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list response returns a valid list"
)
host = hosts[0]
list_ssvm_response = list_ssvms(
self.apiclient,
systemvmtype='secondarystoragevm',
hostid=host.id
)
self.assertEqual(
isinstance(list_ssvm_response, list),
True,
"Check list response returns a valid list"
)
ssvm = list_ssvm_response[0]
cmd = stopSystemVm.stopSystemVmCmd()
cmd.id = ssvm.id
self.apiclient.stopSystemVm(cmd)
# Sleep to ensure that VM is in proper state
time.sleep(self.services["sleep"])
timeout = self.services["timeout"]
while True:
list_ssvm_response = list_ssvms(
self.apiclient,
id=ssvm.id
)
if isinstance(list_ssvm_response, list):
if list_ssvm_response[0].state == 'Running':
break
elif timeout == 0:
raise Exception("List SSVM call failed!")
time.sleep(self.services["sleep"])
timeout = timeout - 1
self.assertEqual(
isinstance(list_ssvm_response, list),
True,
"Check list response returns a valid list"
)
ssvm_response = list_ssvm_response[0]
self.assertEqual(
ssvm_response.state,
'Running',
"Check whether SSVM is running or not"
)
# Call above tests to ensure SSVM is properly running
self.test_01_list_sec_storage_vm()
self.test_03_ssvm_internals()
return
def test_06_stop_cpvm(self):
"""Test stop CPVM
"""
# Validate the following
# 1. The CPVM should go to stop state
# 2. After a brief delay of say one minute, the SSVM should be
# restarted once again and return to Running state with previous
# two test cases still passing
# 3. If either of the two above steps fail the test is a failure
hosts = list_hosts(
self.apiclient,
zoneid=self.zone.id,
type='Routing',
state='Up'
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list response returns a valid list"
)
host = hosts[0]
list_cpvm_response = list_ssvms(
self.apiclient,
systemvmtype='consoleproxy',
hostid=host.id
)
self.assertEqual(
isinstance(list_cpvm_response, list),
True,
"Check list response returns a valid list"
)
cpvm = list_cpvm_response[0]
cmd = stopSystemVm.stopSystemVmCmd()
cmd.id = cpvm.id
self.apiclient.stopSystemVm(cmd)
# Sleep to ensure that VM is in proper state
time.sleep(self.services["sleep"])
timeout = self.services["timeout"]
while True:
list_cpvm_response = list_ssvms(
self.apiclient,
id=cpvm.id
)
if isinstance(list_cpvm_response, list):
if list_cpvm_response[0].state == 'Running':
break
elif timeout == 0:
raise Exception("List CPVM call failed!")
time.sleep(self.services["sleep"])
timeout = timeout - 1
cpvm_response = list_cpvm_response[0]
self.assertEqual(
cpvm_response.state,
'Running',
"Check whether CPVM is running or not"
)
# Call above tests to ensure CPVM is properly running
self.test_02_list_cpvm_vm()
self.test_04_cpvm_internals()
return
def test_07_reboot_ssvm(self):
"""Test reboot SSVM
"""
# Validate the following
# 1. The SSVM should go to stop and return to Running state
# 2. SSVM's public-ip and private-ip must remain the same
# before and after reboot
# 3. The cloud process should still be running within the SSVM
hosts = list_hosts(
self.apiclient,
zoneid=self.zone.id,
type='Routing',
state='Up'
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list response returns a valid list"
)
host = hosts[0]
list_ssvm_response = list_ssvms(
self.apiclient,
systemvmtype='secondarystoragevm',
hostid=host.id
)
self.assertEqual(
isinstance(list_ssvm_response, list),
True,
"Check list response returns a valid list"
)
ssvm_response = list_ssvm_response[0]
#Store the public & private IP values before reboot
old_public_ip = ssvm_response.publicip
old_private_ip = ssvm_response.privateip
cmd = rebootSystemVm.rebootSystemVmCmd()
cmd.id = ssvm_response.id
self.apiclient.rebootSystemVm(cmd)
# Sleep to ensure that VM is in proper state
time.sleep(self.services["sleep"])
timeout = self.services["timeout"]
while True:
list_ssvm_response = list_ssvms(
self.apiclient,
id=ssvm_response.id
)
if isinstance(list_ssvm_response, list):
if list_ssvm_response[0].state == 'Running':
break
elif timeout == 0:
raise Exception("List SSVM call failed!")
time.sleep(self.services["sleep"])
timeout = timeout - 1
ssvm_response = list_ssvm_response[0]
self.assertEqual(
'Running',
str(ssvm_response.state),
"Check whether CPVM is running or not"
)
self.assertEqual(
ssvm_response.publicip,
old_public_ip,
"Check Public IP after reboot with that of before reboot"
)
self.assertEqual(
ssvm_response.privateip,
old_private_ip,
"Check Private IP after reboot with that of before reboot"
)
#Call to verify cloud process is running
self.test_03_ssvm_internals()
return
def test_08_reboot_cpvm(self):
"""Test reboot CPVM
"""
# Validate the following
# 1. The CPVM should go to stop and return to Running state
# 2. CPVM's public-ip and private-ip must remain
# the same before and after reboot
# 3. the cloud process should still be running within the CPVM
hosts = list_hosts(
self.apiclient,
zoneid=self.zone.id,
type='Routing',
state='Up'
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list response returns a valid list"
)
host = hosts[0]
list_cpvm_response = list_ssvms(
self.apiclient,
systemvmtype='consoleproxy',
hostid=host.id
)
self.assertEqual(
isinstance(list_cpvm_response, list),
True,
"Check list response returns a valid list"
)
cpvm_response = list_cpvm_response[0]
#Store the public & private IP values before reboot
old_public_ip = cpvm_response.publicip
old_private_ip = cpvm_response.privateip
cmd = rebootSystemVm.rebootSystemVmCmd()
cmd.id = cpvm_response.id
self.apiclient.rebootSystemVm(cmd)
# Sleep to ensure that VM is in proper state
time.sleep(self.services["sleep"])
timeout = self.services["timeout"]
while True:
list_cpvm_response = list_ssvms(
self.apiclient,
id=cpvm_response.id
)
if isinstance(list_cpvm_response, list):
if list_cpvm_response[0].state == 'Running':
break
elif timeout == 0:
raise Exception("List CPVM call failed!")
time.sleep(self.services["sleep"])
timeout = timeout - 1
cpvm_response = list_cpvm_response[0]
self.assertEqual(
'Running',
str(cpvm_response.state),
"Check whether CPVM is running or not"
)
self.assertEqual(
cpvm_response.publicip,
old_public_ip,
"Check Public IP after reboot with that of before reboot"
)
self.assertEqual(
cpvm_response.privateip,
old_private_ip,
"Check Private IP after reboot with that of before reboot"
)
#Call to verify cloud process is running
self.test_04_cpvm_internals()
return
def test_09_destroy_ssvm(self):
"""Test destroy SSVM
"""
# Validate the following
# 1. SSVM should be completely destroyed and a new one will spin up
# 2. listSystemVMs will show a different name for the
# systemVM from what it was before
# 3. new SSVM will have a public/private and link-local-ip
# 4. cloud process within SSVM must be up and running
list_ssvm_response = list_ssvms(
self.apiclient,
zoneid=self.zone.id,
systemvmtype='secondarystoragevm'
)
self.assertEqual(
isinstance(list_ssvm_response, list),
True,
"Check list response returns a valid list"
)
ssvm_response = list_ssvm_response[0]
old_name = ssvm_response.name
cmd = destroySystemVm.destroySystemVmCmd()
cmd.id = ssvm_response.id
self.apiclient.destroySystemVm(cmd)
# Sleep to ensure that VM is in proper state
time.sleep(self.services["sleep"])
timeout = self.services["timeout"]
while True:
list_ssvm_response = list_ssvms(
self.apiclient,
zoneid=self.zone.id,
systemvmtype='secondarystoragevm'
)
if isinstance(list_ssvm_response, list):
if list_ssvm_response[0].state == 'Running':
break
elif timeout == 0:
raise Exception("List SSVM call failed!")
time.sleep(self.services["sleep"])
timeout = timeout - 1
ssvm_response = list_ssvm_response[0]
# Verify Name, Public IP, Private IP and Link local IP
# for newly created SSVM
self.assertNotEqual(
ssvm_response.name,
old_name,
"Check SSVM new name with name of destroyed SSVM"
)
self.assertEqual(
hasattr(ssvm_response, 'privateip'),
True,
"Check whether SSVM has private IP field"
)
self.assertEqual(
hasattr(ssvm_response, 'linklocalip'),
True,
"Check whether SSVM has link local IP field"
)
self.assertEqual(
hasattr(ssvm_response, 'publicip'),
True,
"Check whether SSVM has public IP field"
)
#Call to verify cloud process is running
self.test_03_ssvm_internals()
return
def test_10_destroy_cpvm(self):
"""Test destroy CPVM
"""
# Validate the following
# 1. CPVM should be completely destroyed and a new one will spin up
# 2. listSystemVMs will show a different name for the systemVM from
# what it was before
# 3. new CPVM will have a public/private and link-local-ip
# 4. cloud process within CPVM must be up and running
list_cpvm_response = list_ssvms(
self.apiclient,
systemvmtype='consoleproxy',
zoneid=self.zone.id
)
self.assertEqual(
isinstance(list_cpvm_response, list),
True,
"Check list response returns a valid list"
)
cpvm_response = list_cpvm_response[0]
old_name = cpvm_response.name
cmd = destroySystemVm.destroySystemVmCmd()
cmd.id = cpvm_response.id
self.apiclient.destroySystemVm(cmd)
# Sleep to ensure that VM is in proper state
time.sleep(self.services["sleep"])
timeout = self.services["timeout"]
while True:
list_cpvm_response = list_ssvms(
self.apiclient,
systemvmtype='consoleproxy',
zoneid=self.zone.id
)
if isinstance(list_cpvm_response, list):
if list_cpvm_response[0].state == 'Running':
break
elif timeout == 0:
raise Exception("List CPVM call failed!")
time.sleep(self.services["sleep"])
timeout = timeout - 1
cpvm_response = list_cpvm_response[0]
# Verify Name, Public IP, Private IP and Link local IP
# for newly created CPVM
self.assertNotEqual(
cpvm_response.name,
old_name,
"Check SSVM new name with name of destroyed CPVM"
)
self.assertEqual(
hasattr(cpvm_response, 'privateip'),
True,
"Check whether CPVM has private IP field"
)
self.assertEqual(
hasattr(cpvm_response, 'linklocalip'),
True,
"Check whether CPVM has link local IP field"
)
self.assertEqual(
hasattr(cpvm_response, 'publicip'),
True,
"Check whether CPVM has public IP field"
)
#Call to verify cloud process is running
self.test_04_cpvm_internals()
return

View File

@ -0,0 +1,740 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2012 Citrix. All rights reserved.
#
""" BVT tests for Templates ISO
"""
#Import Local Modules
from cloudstackTestCase import *
from cloudstackAPI import *
from testcase.libs.utils import *
from testcase.libs.base import *
from testcase.libs.common import *
import urllib
from random import random
#Import System modules
import datetime
class Services:
"""Test Templates Services
"""
def __init__(self):
self.services = {
"account": {
"email": "test@test.com",
"firstname": "Test",
"lastname": "User",
"username": "test",
# Random characters are appended for unique
# username
"password": "fr3sca",
},
"service_offering": {
"name": "Tiny Instance",
"displaytext": "Tiny Instance",
"cpunumber": 1,
"cpuspeed": 100, # in MHz
"memory": 64, # In MBs
},
"disk_offering": {
"displaytext": "Small",
"name": "Small",
"disksize": 1
},
"virtual_machine": {
"displayname": "testVM",
"hypervisor": 'XenServer',
"domainid": '9ee36d2e-8b8f-432e-a927-a678ebec1d6b',
"protocol": 'TCP',
"ssh_port": 22,
"username": "root",
"password": "password",
"privateport": 22,
"publicport": 22,
},
"volume": {
"diskname": "Test Volume",
},
"template_1": {
"displaytext": "Cent OS Template",
"name": "Cent OS Template",
"ostypeid": '0c2c5d19-525b-41be-a8c3-c6607412f82b',
},
"template_2": {
"displaytext": "Public Template",
"name": "Public template",
"ostypeid": '0c2c5d19-525b-41be-a8c3-c6607412f82b',
"isfeatured": True,
"ispublic": True,
"isextractable": True,
"mode": "HTTP_DOWNLOAD",
},
"templatefilter": 'self',
"destzoneid": 5,
# For Copy template (Destination zone)
"isfeatured": True,
"ispublic": True,
"isextractable": False,
"bootable": True,
"passwordenabled": True,
"ostypeid": '0c2c5d19-525b-41be-a8c3-c6607412f82b',
"zoneid": '4a6c0290-e64d-40fc-afbb-4a05cab6fa4b',
# Optional, if specified the mentioned zone will be
# used for tests
"mode": 'advanced',
# Networking mode: Advanced, basic
"sleep": 30,
"timeout": 10,
}
class TestCreateTemplate(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = []
return
def tearDown(self):
try:
self.dbclient.close()
#Clean up, terminate the created templates
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
@classmethod
def setUpClass(cls):
cls.services = Services().services
cls.api_client = fetch_api_client()
# Get Zone, Domain and templates
cls.zone = get_zone(cls.api_client, cls.services)
cls.disk_offering = DiskOffering.create(
cls.api_client,
cls.services["disk_offering"]
)
template = get_template(
cls.api_client,
cls.zone.id,
cls.services["ostypeid"]
)
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
cls.services["volume"]["diskoffering"] = cls.disk_offering.id
cls.services["volume"]["zoneid"] = cls.zone.id
cls.services["sourcezoneid"] = cls.zone.id
cls.account = Account.create(
cls.api_client,
cls.services["account"]
)
cls.services["account"] = cls.account.account.name
cls.service_offering = ServiceOffering.create(
cls.api_client,
cls.services["service_offering"]
)
#create virtual machine
cls.virtual_machine = VirtualMachine.create(
cls.api_client,
cls.services["virtual_machine"],
templateid=template.id,
accountid=cls.account.account.name,
serviceofferingid=cls.service_offering.id,
mode=cls.services["mode"]
)
#Stop virtual machine
cls.virtual_machine.stop(cls.api_client)
# Poll listVM to ensure VM is stopped properly
timeout = cls.services["timeout"]
while True:
time.sleep(cls.services["sleep"])
# Ensure that VM is in stopped state
list_vm_response = list_virtual_machines(
cls.api_client,
id=cls.virtual_machine.id
)
if isinstance(list_vm_response, list):
vm = list_vm_response[0]
if vm.state == 'Stopped':
break
if timeout == 0:
raise Exception(
"Failed to stop VM (ID: %s) in change service offering" %
vm.id)
timeout = timeout - 1
list_volume = list_volumes(
cls.api_client,
virtualmachineid=cls.virtual_machine.id,
type='ROOT',
listall=True
)
cls.volume = list_volume[0]
cls._cleanup = [
cls.account,
cls.service_offering,
cls.disk_offering,
]
return
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
#Cleanup resources used
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def test_01_create_template(self):
"""Test create public & private template
"""
# Validate the following:
# 1. database (vm_template table) should be updated
# with newly created template
# 2. UI should show the newly added template
# 3. ListTemplates API should show the newly added template
#Create template from Virtual machine and Volume ID
template = Template.create(
self.apiclient,
self.services["template_1"],
self.volume.id,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.cleanup.append(template)
self.debug("Created template with ID: %s" % template.id)
list_template_response = list_templates(
self.apiclient,
templatefilter=\
self.services["templatefilter"],
id=template.id
)
self.assertEqual(
isinstance(list_template_response, list),
True,
"Check list response returns a valid list"
)
#Verify template response to check whether template added successfully
self.assertNotEqual(
len(list_template_response),
0,
"Check template available in List Templates"
)
template_response = list_template_response[0]
self.assertEqual(
template_response.displaytext,
self.services["template_1"]["displaytext"],
"Check display text of newly created template"
)
name = template_response.name
self.assertEqual(
name.count(self.services["template_1"]["name"]),
1,
"Check name of newly created template"
)
self.assertEqual(
template_response.ostypeid,
self.services["template_1"]["ostypeid"],
"Check osTypeID of newly created template"
)
return
class TestTemplates(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.services = Services().services
cls.api_client = fetch_api_client()
# Get Zone, Domain and templates
cls.zone = get_zone(cls.api_client, cls.services)
cls.disk_offering = DiskOffering.create(
cls.api_client,
cls.services["disk_offering"]
)
template = get_template(
cls.api_client,
cls.zone.id,
cls.services["ostypeid"]
)
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
cls.services["volume"]["diskoffering"] = cls.disk_offering.id
cls.services["volume"]["zoneid"] = cls.zone.id
cls.services["template_2"]["zoneid"] = cls.zone.id
cls.services["sourcezoneid"] = cls.zone.id
cls.account = Account.create(
cls.api_client,
cls.services["account"]
)
cls.user = Account.create(
cls.api_client,
cls.services["account"]
)
cls.services["account"] = cls.account.account.name
cls.service_offering = ServiceOffering.create(
cls.api_client,
cls.services["service_offering"]
)
#create virtual machine
cls.virtual_machine = VirtualMachine.create(
cls.api_client,
cls.services["virtual_machine"],
templateid=template.id,
accountid=cls.account.account.name,
serviceofferingid=cls.service_offering.id,
mode=cls.services["mode"]
)
#Stop virtual machine
cls.virtual_machine.stop(cls.api_client)
# Poll listVM to ensure VM is stopped properly
timeout = cls.services["timeout"]
while True:
time.sleep(cls.services["sleep"])
# Ensure that VM is in stopped state
list_vm_response = list_virtual_machines(
cls.api_client,
id=cls.virtual_machine.id
)
if isinstance(list_vm_response, list):
vm = list_vm_response[0]
if vm.state == 'Stopped':
break
if timeout == 0:
raise Exception(
"Failed to stop VM (ID: %s) in change service offering" %
vm.id)
timeout = timeout - 1
list_volume = list_volumes(
cls.api_client,
virtualmachineid=cls.virtual_machine.id,
type='ROOT',
listall=True
)
try:
cls.volume = list_volume[0]
except Exception as e:
raise Exception(
"Exception: Unable to find root volume foe VM: %s" %
cls.virtual_machine.id)
#Create templates for Edit, Delete & update permissions testcases
cls.template_1 = Template.create(
cls.api_client,
cls.services["template_1"],
cls.volume.id,
account=cls.account.account.name,
domainid=cls.account.account.domainid
)
cls.template_2 = Template.create(
cls.api_client,
cls.services["template_2"],
cls.volume.id,
account=cls.account.account.name,
domainid=cls.account.account.domainid
)
cls._cleanup = [
cls.service_offering,
cls.disk_offering,
cls.account,
cls.user
]
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
#Cleanup created resources such as templates and VMs
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = []
return
def tearDown(self):
try:
self.dbclient.close()
#Clean up, terminate the created templates
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def test_02_edit_template(self):
"""Test Edit template
"""
# Validate the following:
# 1. UI should show the edited values for template
# 2. database (vm_template table) should have updated values
new_displayText = random_gen()
new_name = random_gen()
cmd = updateTemplate.updateTemplateCmd()
# Update template attributes
cmd.id = self.template_1.id
cmd.displaytext = new_displayText
cmd.name = new_name
cmd.bootable = self.services["bootable"]
cmd.passwordenabled = self.services["passwordenabled"]
cmd.ostypeid = self.services["ostypeid"]
self.apiclient.updateTemplate(cmd)
self.debug("Edited template with new name: %s" % new_name)
# Sleep to ensure update reflected across all the calls
time.sleep(self.services["sleep"])
timeout = self.services["timeout"]
while True:
# Verify template response for updated attributes
list_template_response = list_templates(
self.apiclient,
templatefilter=\
self.services["templatefilter"],
id=self.template_1.id,
account=self.account.account.name,
domainid=self.account.account.domainid
)
if isinstance(list_template_response, list):
break
elif timeout == 0:
raise Exception("List Template failed!")
time.sleep(10)
timeout = timeout -1
self.assertEqual(
isinstance(list_template_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_template_response),
0,
"Check template available in List Templates"
)
template_response = list_template_response[0]
self.debug("New Name: %s" % new_displayText)
self.debug("Name in Template response: %s"
% template_response.displaytext)
self.assertEqual(
template_response.displaytext,
new_displayText,
"Check display text of updated template"
)
self.assertEqual(
template_response.name,
new_name,
"Check name of updated template"
)
self.assertEqual(
str(template_response.passwordenabled).lower(),
str(self.services["passwordenabled"]).lower(),
"Check passwordenabled field of updated template"
)
self.assertEqual(
template_response.ostypeid,
self.services["ostypeid"],
"Check OSTypeID of updated template"
)
return
def test_03_delete_template(self):
"""Test delete template
"""
# Validate the following:
# 1. UI should not show the deleted template
# 2. database (vm_template table) should not contain deleted template
self.debug("Deleting Template ID: %s" % self.template_1.id)
self.template_1.delete(self.apiclient)
list_template_response = list_templates(
self.apiclient,
templatefilter=\
self.services["templatefilter"],
id=self.template_1.id,
account=self.account.account.name,
domainid=self.account.account.domainid
)
# Verify template is deleted properly using ListTemplates
self.assertEqual(
list_template_response,
None,
"Check if template exists in List Templates"
)
return
def test_04_extract_template(self):
"Test for extract template"
# Validate the following
# 1. Admin should able extract and download the templates
# 2. ListTemplates should display all the public templates
# for all kind of users
# 3 .ListTemplates should not display the system templates
self.debug("Extracting template with ID: %s" % self.template_2.id)
cmd = extractTemplate.extractTemplateCmd()
cmd.id = self.template_2.id
cmd.mode = self.services["template_2"]["mode"]
cmd.zoneid = self.zone.id
list_extract_response = self.apiclient.extractTemplate(cmd)
try:
# Format URL to ASCII to retrieve response code
formatted_url = urllib.unquote_plus(list_extract_response.url)
url_response = urllib.urlopen(formatted_url)
response_code = url_response.getcode()
except Exception:
self.fail(
"Extract Template Failed with invalid URL %s (template id: %s)" \
% (formatted_url, self.template_2.id)
)
self.assertEqual(
list_extract_response.id,
self.template_2.id,
"Check ID of the extracted template"
)
self.assertEqual(
list_extract_response.extractMode,
self.services["template_2"]["mode"],
"Check mode of extraction"
)
self.assertEqual(
list_extract_response.zoneid,
self.services["template_2"]["zoneid"],
"Check zone ID of extraction"
)
self.assertEqual(
response_code,
200,
"Check for a valid response download URL"
)
return
def test_05_template_permissions(self):
"""Update & Test for template permissions"""
# Validate the following
# 1. listTemplatePermissions returns valid
# permissions set for template
# 2. permission changes should be reflected in vm_template
# table in database
self.debug("Updating Template permissions ID:%s" % self.template_2.id)
cmd = updateTemplatePermissions.updateTemplatePermissionsCmd()
# Update template permissions
cmd.id = self.template_2.id
cmd.isfeatured = self.services["isfeatured"]
cmd.ispublic = self.services["ispublic"]
cmd.isextractable = self.services["isextractable"]
self.apiclient.updateTemplatePermissions(cmd)
list_template_response = list_templates(
self.apiclient,
templatefilter='featured',
id=self.template_2.id,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
isinstance(list_template_response, list),
True,
"Check list response returns a valid list"
)
# Verify template response for updated permissions for normal user
template_response = list_template_response[0]
self.assertEqual(
template_response.id,
self.template_2.id,
"Check template ID"
)
self.assertEqual(
template_response.ispublic,
int(True),
"Check ispublic permission of template"
)
self.assertNotEqual(
template_response.templatetype,
'SYSTEM',
"ListTemplates should not list any system templates"
)
return
def test_06_copy_template(self):
"""Test for copy template from one zone to another"""
# Validate the following
# 1. copy template should be successful and
# secondary storage should contain new copied template.
self.debug("Copy template from Zone: %s to %s" % (
self.services["sourcezoneid"],
self.services["destzoneid"]
))
cmd = copyTemplate.copyTemplateCmd()
cmd.id = self.template_2.id
cmd.destzoneid = self.services["destzoneid"]
cmd.sourcezoneid = self.services["sourcezoneid"]
self.apiclient.copyTemplate(cmd)
# Verify template is copied to another zone using ListTemplates
list_template_response = list_templates(
self.apiclient,
templatefilter=\
self.services["templatefilter"],
id=self.template_2.id,
zoneid=self.services["destzoneid"]
)
self.assertEqual(
isinstance(list_template_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_template_response),
0,
"Check template extracted in List Templates"
)
template_response = list_template_response[0]
self.assertEqual(
template_response.id,
self.template_2.id,
"Check ID of the downloaded template"
)
self.assertEqual(
template_response.zoneid,
self.services["destzoneid"],
"Check zone ID of the copied template"
)
# Cleanup- Delete the copied template
cmd = deleteTemplate.deleteTemplateCmd()
cmd.id = template_response.id
cmd.zoneid = self.services["destzoneid"]
self.apiclient.deleteTemplate(cmd)
return
def test_07_list_public_templates(self):
"""Test only public templates are visible to normal user"""
# Validate the following
# 1. ListTemplates should show only 'public' templates for normal user
list_template_response = list_templates(
self.apiclient,
templatefilter='featured',
account=self.user.account.name,
domainid=self.user.account.domainid
)
self.assertEqual(
isinstance(list_template_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_template_response),
0,
"Check template available in List Templates"
)
#Template response should list all 'public' templates
for template in list_template_response:
self.assertEqual(
template.ispublic,
True,
"ListTemplates should list only public templates"
)
return
def test_08_list_system_templates(self):
"""Test System templates are not visible to normal user"""
# Validate the following
# 1. ListTemplates should not show 'SYSTEM' templates for normal user
list_template_response = list_templates(
self.apiclient,
templatefilter='featured',
account=self.user.account.name,
domainid=self.user.account.domainid
)
self.assertEqual(
isinstance(list_template_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_template_response),
0,
"Check template available in List Templates"
)
for template in list_template_response:
self.assertNotEqual(
template.templatetype,
'SYSTEM',
"ListTemplates should not list any system templates"
)
return

View File

@ -0,0 +1,915 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2012 Citrix. All rights reserved.
#
""" BVT tests for Virtual Machine Life Cycle
"""
#Import Local Modules
from cloudstackTestCase import *
from cloudstackAPI import *
import remoteSSHClient
from testcase.libs.utils import *
from testcase.libs.base import *
from testcase.libs.common import *
#Import System modules
import time
class Services:
"""Test VM Life Cycle Services
"""
def __init__(self):
self.services = {
"disk_offering":{
"displaytext": "Small",
"name": "Small",
"disksize": 1
},
"account": {
"email": "test@test.com",
"firstname": "Test",
"lastname": "User",
"username": "test",
# Random characters are appended in create account to
# ensure unique username generated each time
"password": "fr3sca",
},
"small":
# Create a small virtual machine instance with disk offering
{
"displayname": "testserver",
"username": "root", # VM creds for SSH
"password": "password",
"ssh_port": 22,
"hypervisor": 'XenServer',
"domainid": '9ee36d2e-8b8f-432e-a927-a678ebec1d6b',
"privateport": 22,
"publicport": 22,
"protocol": 'TCP',
},
"medium": # Create a medium virtual machine instance
{
"displayname": "testserver",
"username": "root",
"password": "password",
"ssh_port": 22,
"hypervisor": 'XenServer',
"domainid": '9ee36d2e-8b8f-432e-a927-a678ebec1d6b',
"privateport": 22,
"publicport": 22,
"protocol": 'TCP',
},
"service_offerings":
{
"tiny":
{
"name": "Tiny Instance",
"displaytext": "Tiny Instance",
"cpunumber": 1,
"cpuspeed": 100, # in MHz
"memory": 64, # In MBs
},
"small":
{
# Small service offering ID to for change VM
# service offering from medium to small
"name": "Small Instance",
"displaytext": "Small Instance",
"cpunumber": 1,
"cpuspeed": 500,
"memory": 256
},
"medium":
{
# Medium service offering ID to for
# change VM service offering from small to medium
"name": "Medium Instance",
"displaytext": "Medium Instance",
"cpunumber": 1,
"cpuspeed": 1000,
"memory": 1024
}
},
"iso": # ISO settings for Attach/Detach ISO tests
{
"displaytext": "Test ISO",
"name": "testISO",
"url": "http://iso.linuxquestions.org/download/504/1819/http/gd4.tuwien.ac.at/dsl-4.4.10.iso",
# Source URL where ISO is located
"ostypeid": '0c2c5d19-525b-41be-a8c3-c6607412f82b',
"mode": 'HTTP_DOWNLOAD', # Downloading existing ISO
},
"diskdevice": '/dev/xvdd',
# Disk device where ISO is attached to instance
"mount_dir": "/mnt/tmp",
"sleep": 60,
"timeout": 10,
"hostid": 5,
#Migrate VM to hostid
"ostypeid": '0c2c5d19-525b-41be-a8c3-c6607412f82b',
# CentOS 5.3 (64-bit)
"zoneid": '4a6c0290-e64d-40fc-afbb-4a05cab6fa4b',
# Optional, if specified the mentioned zone will be
# used for tests
"mode":'advanced',
# Networking mode: Basic or Advanced
}
class TestDeployVM(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.services = Services().services
# Get Zone, Domain and templates
zone = get_zone(self.apiclient, self.services)
template = get_template(
self.apiclient,
zone.id,
self.services["ostypeid"]
)
# Set Zones and disk offerings
self.services["small"]["zoneid"] = zone.id
self.services["small"]["template"] = template.id
self.services["medium"]["zoneid"] = zone.id
self.services["medium"]["template"] = template.id
self.services["iso"]["zoneid"] = zone.id
# Create Account, VMs, NAT Rules etc
self.account = Account.create(
self.apiclient,
self.services["account"]
)
self.service_offering = ServiceOffering.create(
self.apiclient,
self.services["service_offerings"]["tiny"]
)
# Cleanup
self.cleanup = [
self.service_offering,
self.account
]
def test_deploy_vm(self):
"""Test Deploy Virtual Machine
"""
# Validate the following:
# 1. Virtual Machine is accessible via SSH
# 2. listVirtualMachines returns accurate information
# 3. The Cloud Database contains the valid information
self.virtual_machine = VirtualMachine.create(
self.apiclient,
self.services["small"],
accountid=self.account.account.name,
serviceofferingid=self.service_offering.id
)
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.virtual_machine.id
)
self.debug(
"Verify listVirtualMachines response for virtual machine: %s" \
% self.virtual_machine.id
)
self.assertEqual(
isinstance(list_vm_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_vm_response),
0,
"Check VM available in List Virtual Machines"
)
vm_response = list_vm_response[0]
self.assertEqual(
vm_response.id,
self.virtual_machine.id,
"Check virtual machine id in listVirtualMachines"
)
self.assertEqual(
vm_response.displayname,
self.virtual_machine.displayname,
"Check virtual machine displayname in listVirtualMachines"
)
return
def tearDown(self):
try:
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
self.debug("Warning! Exception in tearDown: %s" % e)
class TestVMLifeCycle(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.services = Services().services
# Get Zone, Domain and templates
zone = get_zone(cls.api_client, cls.services)
template = get_template(
cls.api_client,
zone.id,
cls.services["ostypeid"]
)
# Set Zones and disk offerings
cls.services["small"]["zoneid"] = zone.id
cls.services["small"]["template"] = template.id
cls.services["medium"]["zoneid"] = zone.id
cls.services["medium"]["template"] = template.id
cls.services["iso"]["zoneid"] = zone.id
# Create VMs, NAT Rules etc
cls.account = Account.create(
cls.api_client,
cls.services["account"]
)
cls.small_offering = ServiceOffering.create(
cls.api_client,
cls.services["service_offerings"]["small"]
)
cls.medium_offering = ServiceOffering.create(
cls.api_client,
cls.services["service_offerings"]["medium"]
)
#create small and large virtual machines
cls.small_virtual_machine = VirtualMachine.create(
cls.api_client,
cls.services["small"],
accountid=cls.account.account.name,
serviceofferingid=cls.small_offering.id,
mode=cls.services["mode"]
)
cls.medium_virtual_machine = VirtualMachine.create(
cls.api_client,
cls.services["medium"],
accountid=cls.account.account.name,
serviceofferingid=cls.medium_offering.id,
mode=cls.services["mode"]
)
cls.virtual_machine = VirtualMachine.create(
cls.api_client,
cls.services["small"],
accountid=cls.account.account.name,
serviceofferingid=cls.small_offering.id,
mode=cls.services["mode"]
)
cls._cleanup = [
cls.small_offering,
cls.medium_offering,
cls.account
]
@classmethod
def tearDownClass(cls):
cls.api_client = fetch_api_client()
cleanup_resources(cls.api_client, cls._cleanup)
return
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = []
def tearDown(self):
#Clean up, terminate the created ISOs
cleanup_resources(self.apiclient, self.cleanup)
return
def test_01_stop_vm(self):
"""Test Stop Virtual Machine
"""
# Validate the following
# 1. Should Not be able to login to the VM.
# 2. listVM command should return
# this VM.State of this VM should be ""Stopped"".
self.debug("Stopping VM - ID: %s" % self.virtual_machine.id)
self.small_virtual_machine.stop(self.apiclient)
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.small_virtual_machine.id
)
self.assertEqual(
isinstance(list_vm_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_vm_response),
0,
"Check VM avaliable in List Virtual Machines"
)
self.assertEqual(
list_vm_response[0].state,
"Stopped",
"Check virtual machine is in stopped state"
)
return
def test_02_start_vm(self):
"""Test Start Virtual Machine
"""
# Validate the following
# 1. listVM command should return this VM.State
# of this VM should be Running".
self.debug("Starting VM - ID: %s" % self.virtual_machine.id)
self.small_virtual_machine.start(self.apiclient)
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.small_virtual_machine.id
)
self.assertEqual(
isinstance(list_vm_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_vm_response),
0,
"Check VM avaliable in List Virtual Machines"
)
self.debug(
"Verify listVirtualMachines response for virtual machine: %s" \
% self.small_virtual_machine.id
)
self.assertEqual(
list_vm_response[0].state,
"Running",
"Check virtual machine is in running state"
)
return
def test_03_reboot_vm(self):
"""Test Reboot Virtual Machine
"""
# Validate the following
# 1. Should be able to login to the VM.
# 2. listVM command should return the deployed VM.
# State of this VM should be "Running"
self.debug("Rebooting VM - ID: %s" % self.virtual_machine.id)
self.small_virtual_machine.reboot(self.apiclient)
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.small_virtual_machine.id
)
self.assertEqual(
isinstance(list_vm_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_vm_response),
0,
"Check VM avaliable in List Virtual Machines"
)
self.assertEqual(
list_vm_response[0].state,
"Running",
"Check virtual machine is in running state"
)
return
def test_04_change_offering_small(self):
"""Change Offering to a small capacity
"""
# Validate the following
# 1. Log in to the Vm .We should see that the CPU and memory Info of
# this Vm matches the one specified for "Small" service offering.
# 2. Using listVM command verify that this Vm
# has Small service offering Id.
self.debug("Stopping VM - ID: %s" % self.medium_virtual_machine.id)
self.medium_virtual_machine.stop(self.apiclient)
# Poll listVM to ensure VM is stopped properly
timeout = self.services["timeout"]
while True:
time.sleep(self.services["sleep"])
# Ensure that VM is in stopped state
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.medium_virtual_machine.id
)
if isinstance(list_vm_response, list):
vm = list_vm_response[0]
if vm.state == 'Stopped':
self.debug("VM state: %s" % vm.state)
break
if timeout == 0:
raise Exception(
"Failed to stop VM (ID: %s) in change service offering" % vm.id)
timeout = timeout - 1
self.debug("Change Service offering VM - ID: %s" %
self.medium_virtual_machine.id)
cmd = changeServiceForVirtualMachine.changeServiceForVirtualMachineCmd()
cmd.id = self.medium_virtual_machine.id
cmd.serviceofferingid = self.small_offering.id
self.apiclient.changeServiceForVirtualMachine(cmd)
self.debug("Starting VM - ID: %s" % self.medium_virtual_machine.id)
self.medium_virtual_machine.start(self.apiclient)
# Poll listVM to ensure VM is started properly
timeout = self.services["timeout"]
while True:
time.sleep(self.services["sleep"])
# Ensure that VM is in running state
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.medium_virtual_machine.id
)
if isinstance(list_vm_response, list):
vm = list_vm_response[0]
if vm.state == 'Running':
self.debug("VM state: %s" % vm.state)
break
if timeout == 0:
raise Exception(
"Failed to start VM (ID: %s) after changing service offering" % vm.id)
timeout = timeout - 1
try:
ssh = self.medium_virtual_machine.get_ssh_client()
except Exception as e:
self.fail(
"SSH Access failed for %s: %s" % \
(self.medium_virtual_machine.ipaddress, e)
)
cpuinfo = ssh.execute("cat /proc/cpuinfo")
cpu_cnt = len([i for i in cpuinfo if "processor" in i])
#'cpu MHz\t\t: 2660.499'
cpu_speed = [i for i in cpuinfo if "cpu MHz" in i ][0].split()[3]
meminfo = ssh.execute("cat /proc/meminfo")
#MemTotal: 1017464 kB
total_mem = [i for i in meminfo if "MemTotal" in i][0].split()[1]
self.debug(
"CPU count: %s, CPU Speed: %s, Mem Info: %s" % (
cpu_cnt,
cpu_speed,
total_mem
))
self.assertAlmostEqual(
int(cpu_cnt),
self.small_offering.cpunumber,
"Check CPU Count for small offering"
)
self.assertAlmostEqual(
list_vm_response[0].cpuspeed,
self.small_offering.cpuspeed,
"Check CPU Speed for small offering"
)
self.assertAlmostEqual(
int(total_mem) / 1024, # In MBs
self.small_offering.memory,
"Check Memory(kb) for small offering"
)
return
def test_05_change_offering_medium(self):
"""Change Offering to a medium capacity
"""
# Validate the following
# 1. Log in to the Vm .We should see that the CPU and memory Info of
# this Vm matches the one specified for "Medium" service offering.
# 2. Using listVM command verify that this Vm
# has Medium service offering Id.
self.debug("Stopping VM - ID: %s" % self.small_virtual_machine.id)
self.small_virtual_machine.stop(self.apiclient)
# Poll listVM to ensure VM is stopped properly
timeout = self.services["timeout"]
while True:
time.sleep(self.services["sleep"])
# Ensure that VM is in stopped state
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.small_virtual_machine.id
)
if isinstance(list_vm_response, list):
vm = list_vm_response[0]
if vm.state == 'Stopped':
self.debug("VM state: %s" % vm.state)
break
if timeout == 0:
raise Exception(
"Failed to stop VM (ID: %s) in change service offering" % vm.id)
timeout = timeout - 1
self.debug("Change service offering VM - ID: %s" %
self.small_virtual_machine.id)
cmd = changeServiceForVirtualMachine.changeServiceForVirtualMachineCmd()
cmd.id = self.small_virtual_machine.id
cmd.serviceofferingid = self.medium_offering.id
self.apiclient.changeServiceForVirtualMachine(cmd)
self.debug("Starting VM - ID: %s" % self.small_virtual_machine.id)
self.small_virtual_machine.start(self.apiclient)
# Poll listVM to ensure VM is started properly
timeout = self.services["timeout"]
while True:
time.sleep(self.services["sleep"])
# Ensure that VM is in running state
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.small_virtual_machine.id
)
if isinstance(list_vm_response, list):
vm = list_vm_response[0]
if vm.state == 'Running':
self.debug("VM state: %s" % vm.state)
break
if timeout == 0:
raise Exception(
"Failed to start VM (ID: %s) after changing service offering" % vm.id)
timeout = timeout - 1
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.small_virtual_machine.id
)
try:
ssh_client = self.small_virtual_machine.get_ssh_client()
except Exception as e:
self.fail(
"SSH Access failed for %s: %s" % \
(self.small_virtual_machine.ipaddress, e)
)
cpuinfo = ssh_client.execute("cat /proc/cpuinfo")
cpu_cnt = len([i for i in cpuinfo if "processor" in i])
#'cpu MHz\t\t: 2660.499'
cpu_speed = [i for i in cpuinfo if "cpu MHz" in i][0].split()[3]
meminfo = ssh_client.execute("cat /proc/meminfo")
#MemTotal: 1017464 kB
total_mem = [i for i in meminfo if "MemTotal" in i][0].split()[1]
self.debug(
"CPU count: %s, CPU Speed: %s, Mem Info: %s" % (
cpu_cnt,
cpu_speed,
total_mem
))
self.assertAlmostEqual(
int(cpu_cnt),
self.medium_offering.cpunumber,
"Check CPU Count for medium offering"
)
self.assertAlmostEqual(
list_vm_response[0].cpuspeed,
self.medium_offering.cpuspeed,
"Check CPU Speed for medium offering"
)
self.assertAlmostEqual(
int(total_mem) / 1024, # In MBs
self.medium_offering.memory,
"Check Memory(kb) for medium offering"
)
return
def test_06_destroy_vm(self):
"""Test destroy Virtual Machine
"""
# Validate the following
# 1. Should not be able to login to the VM.
# 2. listVM command should return this VM.State
# of this VM should be "Destroyed".
self.debug("Destroy VM - ID: %s" % self.small_virtual_machine.id)
self.small_virtual_machine.delete(self.apiclient)
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.small_virtual_machine.id
)
self.assertEqual(
isinstance(list_vm_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_vm_response),
0,
"Check VM avaliable in List Virtual Machines"
)
self.assertEqual(
list_vm_response[0].state,
"Destroyed",
"Check virtual machine is in destroyed state"
)
return
def test_07_restore_vm(self):
"""Test recover Virtual Machine
"""
# Validate the following
# 1. listVM command should return this VM.
# State of this VM should be "Stopped".
# 2. We should be able to Start this VM successfully.
self.debug("Recovering VM - ID: %s" % self.small_virtual_machine.id)
cmd = recoverVirtualMachine.recoverVirtualMachineCmd()
cmd.id = self.small_virtual_machine.id
self.apiclient.recoverVirtualMachine(cmd)
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.small_virtual_machine.id
)
self.assertEqual(
isinstance(list_vm_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_vm_response),
0,
"Check VM avaliable in List Virtual Machines"
)
self.assertEqual(
list_vm_response[0].state,
"Stopped",
"Check virtual machine is in Stopped state"
)
return
def test_08_migrate_vm(self):
"""Test migrate VM
"""
# Validate the following
# 1. Should be able to login to the VM.
# 2. listVM command should return this VM.State of this VM
# should be "Running" and the host should be the host
# to which the VM was migrated to
self.debug("Migrating VM-ID: %s to Host: %s" % (
self.medium_virtual_machine.id,
self.services["hostid"]
))
cmd = migrateVirtualMachine.migrateVirtualMachineCmd()
cmd.hostid = self.services["hostid"]
cmd.virtualmachineid = self.medium_virtual_machine.id
self.apiclient.migrateVirtualMachine(cmd)
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.medium_virtual_machine.id
)
self.assertEqual(
isinstance(list_vm_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
list_vm_response,
None,
"Check virtual machine is listVirtualMachines"
)
vm_response = list_vm_response[0]
self.assertEqual(
vm_response.id,
self.medium_virtual_machine.id,
"Check virtual machine ID of migrated VM"
)
self.assertEqual(
vm_response.hostid,
self.services["hostid"],
"Check destination hostID of migrated VM"
)
return
def test_09_expunge_vm(self):
"""Test destroy(expunge) Virtual Machine
"""
# Validate the following
# 1. listVM command should NOT return this VM any more.
self.debug("Expunge VM-ID: %s" % self.small_virtual_machine.id)
cmd = destroyVirtualMachine.destroyVirtualMachineCmd()
cmd.id = self.small_virtual_machine.id
self.apiclient.destroyVirtualMachine(cmd)
config = list_configurations(
self.apiclient,
name='expunge.delay'
)
response = config[0]
# Wait for some time more than expunge.delay
time.sleep(int(response.value) * 2)
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.small_virtual_machine.id
)
self.assertEqual(
list_vm_response,
None,
"Check Expunged virtual machine is listVirtualMachines"
)
return
def test_10_attachAndDetach_iso(self):
"""Test for detach ISO to virtual machine"""
# Validate the following
# 1. Create ISO
# 2. Attach ISO to VM
# 3. Log in to the VM.
# 4. The device should be available for use
# 5. Detach ISO
# 6. Check the device is properly detached by logging into VM
iso = Iso.create(
self.apiclient,
self.services["iso"],
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.debug("Successfully created ISO with ID: %s" % iso.id)
try:
iso.download(self.apiclient)
except Exception as e:
self.fail("Exception while downloading ISO %s: %s"\
% (iso.id, e))
self.debug("Attach ISO with ID: %s to VM ID: %s" % (
iso.id,
self.virtual_machine.id
))
#Attach ISO to virtual machine
cmd = attachIso.attachIsoCmd()
cmd.id = iso.id
cmd.virtualmachineid = self.virtual_machine.id
self.apiclient.attachIso(cmd)
try:
ssh_client = self.virtual_machine.get_ssh_client()
cmds = [
"mkdir -p %s" % self.services["mount_dir"],
"mount -rt iso9660 %s %s" \
% (
self.services["diskdevice"],
self.services["mount_dir"]
),
]
for c in cmds:
res = ssh_client.execute(c)
self.assertEqual(res, [], "Check mount is successful or not")
c = "fdisk -l|grep %s|head -1" % self.services["diskdevice"]
res = ssh_client.execute(c)
#Disk /dev/xvdd: 4393 MB, 4393723904 bytes
except Exception as e:
self.fail("SSH failed for virtual machine: %s - %s" %
(self.virtual_machine.ipaddress, e))
# Res may contain more than one strings depending on environment
# Split strings to form new list which is used for assertion on ISO size
result = []
for i in res:
for k in i.split():
result.append(k)
# Get ISO size
iso_response = list_isos(
self.apiclient,
id=iso.id
)
self.assertEqual(
isinstance(iso_response, list),
True,
"Check list response returns a valid list"
)
iso_size = iso_response[0].size
self.assertEqual(
str(iso_size) in result,
True,
"Check size of the attached ISO"
)
try:
#Unmount ISO
command = "umount %s" % self.services["mount_dir"]
ssh_client.execute(command)
except Exception as e:
self.fail("SSH failed for virtual machine: %s - %s" %
(self.virtual_machine.ipaddress, e))
#Detach from VM
cmd = detachIso.detachIsoCmd()
cmd.virtualmachineid = self.virtual_machine.id
self.apiclient.detachIso(cmd)
try:
res = ssh_client.execute(c)
except Exception as e:
self.fail("SSH failed for virtual machine: %s - %s" %
(self.virtual_machine.ipaddress, e))
# Check if ISO is properly detached from VM (using fdisk)
result = self.services["diskdevice"] in str(res)
self.assertEqual(
result,
False,
"Check if ISO is detached from virtual machine"
)
return

View File

@ -0,0 +1,505 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2012 Citrix. All rights reserved.
#
""" BVT tests for Volumes
"""
#Import Local Modules
from cloudstackTestCase import *
from cloudstackAPI import *
from testcase.libs.utils import *
from testcase.libs.base import *
from testcase.libs.common import *
import remoteSSHClient
#Import System modules
import os
import urllib
import time
import tempfile
class Services:
"""Test Volume Services
"""
def __init__(self):
self.services = {
"account": {
"email": "test@test.com",
"firstname": "Test",
"lastname": "User",
"username": "test",
# Random characters are appended for unique
# username
"password": "fr3sca",
},
"service_offering": {
"name": "Tiny Instance",
"displaytext": "Tiny Instance",
"cpunumber": 1,
"cpuspeed": 100, # in MHz
"memory": 64, # In MBs
},
"disk_offering": {
"displaytext": "Small",
"name": "Small",
"disksize": 1
},
"volume_offerings": {
0: {
"diskname": "TestDiskServ",
"domainid": '9ee36d2e-8b8f-432e-a927-a678ebec1d6b',
},
},
"customdisksize": 1, # GBs
"username": "root", # Creds for SSH to VM
"password": "password",
"ssh_port": 22,
"diskname": "TestDiskServ",
"hypervisor": 'XenServer',
"domainid": '9ee36d2e-8b8f-432e-a927-a678ebec1d6b',
"privateport": 22,
"publicport": 22,
"protocol": 'TCP',
"diskdevice": "/dev/xvdb",
"ostypeid": '0c2c5d19-525b-41be-a8c3-c6607412f82b',
"zoneid": '4a6c0290-e64d-40fc-afbb-4a05cab6fa4b',
# Optional, if specified the mentioned zone will be
# used for tests
"mode": 'advanced',
"sleep": 60,
"timeout": 10,
}
class TestCreateVolume(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.services = Services().services
# Get Zone, Domain and templates
cls.zone = get_zone(cls.api_client, cls.services)
cls.disk_offering = DiskOffering.create(
cls.api_client,
cls.services["disk_offering"]
)
cls.custom_disk_offering = DiskOffering.create(
cls.api_client,
cls.services["disk_offering"],
custom=True
)
template = get_template(
cls.api_client,
cls.zone.id,
cls.services["ostypeid"]
)
cls.services["zoneid"] = cls.zone.id
cls.services["template"] = template.id
cls.services["customdiskofferingid"] = cls.custom_disk_offering.id
# Create VMs, NAT Rules etc
cls.account = Account.create(
cls.api_client,
cls.services["account"]
)
cls.services["account"] = cls.account.account.name
cls.service_offering = ServiceOffering.create(
cls.api_client,
cls.services["service_offering"]
)
cls.virtual_machine = VirtualMachine.create(
cls.api_client,
cls.services,
accountid=cls.account.account.name,
serviceofferingid=cls.service_offering.id,
mode=cls.services["mode"]
)
cls._cleanup = [
cls.service_offering,
cls.disk_offering,
cls.custom_disk_offering,
cls.account
]
def setUp(self):
self.apiClient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = []
def test_01_create_volume(self):
"""Test Volume creation for all Disk Offerings (incl. custom)
"""
# Validate the following
# 1. Create volumes from the different sizes
# 2. Verify the size of volume with actual size allocated
self.volumes = []
for k, v in self.services["volume_offerings"].items():
volume = Volume.create(
self.apiClient,
v,
zoneid=self.zone.id,
account=self.account.account.name,
domainid=self.account.account.domainid,
diskofferingid=self.disk_offering.id
)
self.debug("Created a volume with ID: %s" % volume.id)
self.volumes.append(volume)
volume = Volume.create_custom_disk(
self.apiClient,
self.services,
account=self.account.account.name,
domainid=self.account.account.domainid,
)
self.debug("Created a volume with custom offering: %s" % volume.id)
self.volumes.append(volume)
#Attach a volume with different disk offerings
#and check the memory allocated to each of them
for volume in self.volumes:
list_volume_response = list_volumes(
self.apiClient,
id=volume.id
)
self.assertEqual(
isinstance(list_volume_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
list_volume_response,
None,
"Check if volume exists in ListVolumes"
)
self.debug(
"Attaching volume (ID: %s) to VM (ID: %s)" % (
volume.id,
self.virtual_machine.id
))
self.virtual_machine.attach_volume(
self.apiClient,
volume
)
try:
ssh = self.virtual_machine.get_ssh_client()
ssh.execute("reboot")
except Exception as e:
self.fail("SSH access failed for VM %s - %s" %
(self.virtual_machine.ipaddress, e))
# Poll listVM to ensure VM is started properly
timeout = self.services["timeout"]
while True:
time.sleep(self.services["sleep"])
# Ensure that VM is in running state
list_vm_response = list_virtual_machines(
self.apiClient,
id=self.virtual_machine.id
)
if isinstance(list_vm_response, list):
vm = list_vm_response[0]
if vm.state == 'Running':
self.debug("VM state: %s" % vm.state)
break
if timeout == 0:
raise Exception(
"Failed to start VM (ID: %s) " % vm.id)
timeout = timeout - 1
try:
ssh = self.virtual_machine.get_ssh_client(
reconnect=True
)
c = "fdisk -l"
res = ssh.execute(c)
except Exception as e:
self.fail("SSH access failed for VM: %s - %s" %
(self.virtual_machine.ipaddress, e))
# Disk /dev/sda doesn't contain a valid partition table
# Disk /dev/sda: 21.5 GB, 21474836480 bytes
result = str(res)
self.debug("fdisk result: %s" % result)
self.assertEqual(
str(list_volume_response[0].size) in result,
True,
"Check if promised disk size actually available"
)
self.virtual_machine.detach_volume(self.apiClient, volume)
def tearDown(self):
#Clean up, terminate the created volumes
cleanup_resources(self.apiClient, self.cleanup)
return
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
class TestVolumes(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.services = Services().services
# Get Zone, Domain and templates
cls.zone = get_zone(cls.api_client, cls.services)
cls.disk_offering = DiskOffering.create(
cls.api_client,
cls.services["disk_offering"]
)
template = get_template(
cls.api_client,
cls.zone.id,
cls.services["ostypeid"]
)
cls.services["zoneid"] = cls.zone.id
cls.services["template"] = template.id
cls.services["diskofferingid"] = cls.disk_offering.id
# Create VMs, VMs etc
cls.account = Account.create(
cls.api_client,
cls.services["account"]
)
cls.services["account"] = cls.account.account.name
cls.service_offering = ServiceOffering.create(
cls.api_client,
cls.services["service_offering"]
)
cls.virtual_machine = VirtualMachine.create(
cls.api_client,
cls.services,
accountid=cls.account.account.name,
serviceofferingid=cls.service_offering.id,
mode=cls.services["mode"]
)
cls.volume = Volume.create(
cls.api_client,
cls.services,
account=cls.account.account.name,
domainid=cls.account.account.domainid
)
cls._cleanup = [
cls.service_offering,
cls.disk_offering,
cls.account
]
@classmethod
def tearDownClass(cls):
try:
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
def setUp(self):
self.apiClient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
def test_02_attach_volume(self):
"""Attach a created Volume to a Running VM
"""
# Validate the following
# 1. shows list of volumes
# 2. "Attach Disk" pop-up box will display with list of instances
# 3. disk should be attached to instance successfully
self.debug(
"Attaching volume (ID: %s) to VM (ID: %s)" % (
self.volume.id,
self.virtual_machine.id
))
self.virtual_machine.attach_volume(self.apiClient, self.volume)
list_volume_response = list_volumes(
self.apiClient,
id=self.volume.id
)
self.assertEqual(
isinstance(list_volume_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
list_volume_response,
None,
"Check if volume exists in ListVolumes"
)
volume = list_volume_response[0]
self.assertNotEqual(
volume.virtualmachineid,
None,
"Check if volume state (attached) is reflected"
)
try:
#Format the attached volume to a known fs
format_volume_to_ext3(self.virtual_machine.get_ssh_client())
except Exception as e:
self.fail("SSH failed for VM: %s - %s" %
(self.virtual_machine.ipaddress, e))
return
def test_03_download_attached_volume(self):
"""Download a Volume attached to a VM
"""
# Validate the following
# 1. download volume will fail with proper error message
# "Failed - Invalid state of the volume with ID:
# It should be either detached or the VM should be in stopped state
self.debug("Extract attached Volume ID: %s" % self.volume.id)
cmd = extractVolume.extractVolumeCmd()
cmd.id = self.volume.id
cmd.mode = "HTTP_DOWNLOAD"
cmd.zoneid = self.services["zoneid"]
# A proper exception should be raised;
# downloading attach VM is not allowed
with self.assertRaises(Exception):
self.apiClient.extractVolume(cmd)
def test_04_delete_attached_volume(self):
"""Delete a Volume attached to a VM
"""
# Validate the following
# 1. delete volume will fail with proper error message
# "Failed - Invalid state of the volume with ID:
# It should be either detached or the VM should be in stopped state
self.debug("Trying to delete attached Volume ID: %s" %
self.volume.id)
cmd = deleteVolume.deleteVolumeCmd()
cmd.id = self.volume.id
#Proper exception should be raised; deleting attach VM is not allowed
#with self.assertRaises(Exception):
result = self.apiClient.deleteVolume(cmd)
self.assertEqual(
result,
None,
"Check for delete download error while volume is attached"
)
def test_05_detach_volume(self):
"""Detach a Volume attached to a VM
"""
# Validate the following
# Data disk should be detached from instance and detached data disk
# details should be updated properly
self.debug(
"Detaching volume (ID: %s) from VM (ID: %s)" % (
self.volume.id,
self.virtual_machine.id
))
self.virtual_machine.detach_volume(self.apiClient, self.volume)
#Sleep to ensure the current state will reflected in other calls
time.sleep(self.services["sleep"])
list_volume_response = list_volumes(
self.apiClient,
id=self.volume.id
)
self.assertEqual(
isinstance(list_volume_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
list_volume_response,
None,
"Check if volume exists in ListVolumes"
)
volume = list_volume_response[0]
self.assertEqual(
volume.virtualmachineid,
None,
"Check if volume state (detached) is reflected"
)
return
def test_06_download_detached_volume(self):
"""Download a Volume unattached to an VM
"""
# Validate the following
# 1. able to download the volume when its not attached to instance
self.debug("Extract detached Volume ID: %s" % self.volume.id)
cmd = extractVolume.extractVolumeCmd()
cmd.id = self.volume.id
cmd.mode = "HTTP_DOWNLOAD"
cmd.zoneid = self.services["zoneid"]
extract_vol = self.apiClient.extractVolume(cmd)
#Attempt to download the volume and save contents locally
try:
formatted_url = urllib.unquote_plus(extract_vol.url)
response = urllib.urlopen(formatted_url)
fd, path = tempfile.mkstemp()
os.close(fd)
fd = open(path, 'wb')
fd.write(response.read())
fd.close()
except Exception:
self.fail(
"Extract Volume Failed with invalid URL %s (vol id: %s)" \
% (extract_vol.url, self.volume.id)
)
def test_07_delete_detached_volume(self):
"""Delete a Volume unattached to an VM
"""
# Validate the following
# 1. volume should be deleted successfully and listVolume should not
# contain the deleted volume details.
# 2. "Delete Volume" menu item not shown under "Actions" menu.
# (UI should not allow to delete the volume when it is attached
# to instance by hiding the menu Item)
self.debug("Delete Volume ID: %s" % self.volume.id)
cmd = deleteVolume.deleteVolumeCmd()
cmd.id = self.volume.id
self.apiClient.deleteVolume(cmd)
list_volume_response = list_volumes(
self.apiClient,
id=self.volume.id,
type='DATADISK'
)
self.assertEqual(
list_volume_response,
None,
"Check if volume exists in ListVolumes"
)
return

View File

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,415 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2012 Citrix. All rights reserved.
#
"""Common functions
"""
#Import Local Modules
from cloudstackTestCase import *
from cloudstackAPI import *
import remoteSSHClient
from utils import *
from base import *
#Import System modules
import time
def get_zone(apiclient, services=None):
"Returns a default zone"
cmd = listZones.listZonesCmd()
if services:
if "zoneid" in services:
cmd.id = services["zoneid"]
zones = apiclient.listZones(cmd)
if isinstance(zones, list):
return zones[0]
else:
raise Exception("Failed to find specified zone.")
def get_pod(apiclient, zoneid, services=None):
"Returns a default pod for specified zone"
cmd = listPods.listPodsCmd()
cmd.zoneid = zoneid
if services:
if "podid" in services:
cmd.id = services["podid"]
pods = apiclient.listPods(cmd)
if isinstance(pods, list):
return pods[0]
else:
raise Exception("Exception: Failed to find specified pod.")
def get_template(apiclient, zoneid, ostypeid=12, services=None):
"Returns a template"
cmd = listTemplates.listTemplatesCmd()
cmd.templatefilter = 'featured'
cmd.zoneid = zoneid
if services:
if "template" in services:
cmd.id = services["template"]
list_templates = apiclient.listTemplates(cmd)
for template in list_templates:
if template.ostypeid == ostypeid:
return template
raise Exception("Exception: Failed to find template with OSTypeID: %s" %
ostypeid)
return
def download_systemplates_sec_storage(server, services):
"""Download System templates on sec storage"""
try:
# Login to management server
ssh = remoteSSHClient.remoteSSHClient(
server["ipaddress"],
server["port"],
server["username"],
server["password"]
)
except Exception as e:
raise Exception("SSH access failted for server with IP address: %s" %
server["ipaddess"])
# Mount Secondary Storage on Management Server
cmds = [
"mkdir -p %s" % services["mnt_dir"],
"mount -t nfs %s:/%s %s" % (
services["sec_storage"],
services["path"],
services["mnt_dir"]
),
"%s -m %s -u %s -h %s -F" % (
services["command"],
services["mnt_dir"],
services["download_url"],
services["hypervisor"]
)
]
for c in cmds:
result = ssh.execute(c)
res = str(result)
# Unmount the Secondary storage
ssh.execute("umount %s" % (services["mnt_dir"]))
if res.count("Successfully installed system VM template") == 1:
return
else:
raise Exception("Failed to download System Templates on Sec Storage")
return
def wait_for_ssvms(apiclient, zoneid, podid):
"""After setup wait for SSVMs to come Up"""
time.sleep(30)
timeout = 40
while True:
list_ssvm_response = list_ssvms(
apiclient,
systemvmtype='secondarystoragevm',
zoneid=zoneid,
podid=podid
)
ssvm = list_ssvm_response[0]
if ssvm.state != 'Running':
# Sleep to ensure SSVMs are Up and Running
time.sleep(30)
timeout = timeout - 1
elif ssvm.state == 'Running':
break
elif timeout == 0:
raise Exception("SSVM failled to come up")
break
timeout = 20
while True:
list_ssvm_response = list_ssvms(
apiclient,
systemvmtype='consoleproxy',
zoneid=zoneid,
podid=podid
)
cpvm = list_ssvm_response[0]
if cpvm.state != 'Running':
# Sleep to ensure SSVMs are Up and Running
time.sleep(30)
timeout = timeout - 1
elif cpvm.state == 'Running':
break
elif timeout == 0:
raise Exception("SSVM failled to come up")
break
return
def download_builtin_templates(apiclient, zoneid, hypervisor, host, linklocalip):
"""After setup wait till builtin templates are downloaded"""
# Change IPTABLES Rules
result = get_process_status(
host["ipaddress"],
host["port"],
host["username"],
host["password"],
linklocalip,
"iptables -P INPUT ACCEPT"
)
# Find the BUILTIN Templates for given Zone, Hypervisor
list_template_response = list_templates(
apiclient,
hypervisor=hypervisor,
zoneid=zoneid,
templatefilter='self'
)
if not isinstance(list_template_response, list):
raise Exception("Failed to download BUILTIN templates")
# Ensure all BUILTIN templates are downloaded
templateid = None
for template in list_template_response:
if template.templatetype == "BUILTIN":
templateid = template.id
# Sleep to ensure that template is in downloading state after adding
# Sec storage
time.sleep(30)
while True:
template_response = list_templates(
apiclient,
id=templateid,
zoneid=zoneid,
templatefilter='self'
)
template = template_response[0]
# If template is ready,
# template.status = Download Complete
# Downloading - x% Downloaded
# Error - Any other string
if template.status == 'Download Complete' :
break
elif 'Downloaded' not in template.status.split():
raise Exception("ErrorInDownload")
elif 'Downloaded' in template.status.split():
time.sleep(30)
return
def update_resource_limit(apiclient, resourcetype, account=None, domainid=None,
max=None):
"""Updates the resource limit to 'max' for given account"""
cmd = updateResourceLimit.updateResourceLimitCmd()
cmd.resourcetype = resourcetype
if account:
cmd.account = account
if domainid:
cmd.domainid = domainid
if max:
cmd.max = max
apiclient.updateResourceLimit(cmd)
return
def list_routers(apiclient, **kwargs):
"""List all Routers matching criteria"""
cmd = listRouters.listRoutersCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listRouters(cmd))
def list_zones(apiclient, **kwargs):
"""List all Zones matching criteria"""
cmd = listZones.listZonesCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listZones(cmd))
def list_networks(apiclient, **kwargs):
"""List all Networks matching criteria"""
cmd = listNetworks.listNetworksCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listNetworks(cmd))
def list_clusters(apiclient, **kwargs):
"""List all Clusters matching criteria"""
cmd = listClusters.listClustersCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listClusters(cmd))
def list_ssvms(apiclient, **kwargs):
"""List all SSVMs matching criteria"""
cmd = listSystemVms.listSystemVmsCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listSystemVms(cmd))
def list_storage_pools(apiclient, **kwargs):
"""List all storage pools matching criteria"""
cmd = listStoragePools.listStoragePoolsCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listStoragePools(cmd))
def list_virtual_machines(apiclient, **kwargs):
"""List all VMs matching criteria"""
cmd = listVirtualMachines.listVirtualMachinesCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listVirtualMachines(cmd))
def list_hosts(apiclient, **kwargs):
"""List all Hosts matching criteria"""
cmd = listHosts.listHostsCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listHosts(cmd))
def list_configurations(apiclient, **kwargs):
"""List configuration with specified name"""
cmd = listConfigurations.listConfigurationsCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listConfigurations(cmd))
def list_publicIP(apiclient, **kwargs):
"""List all Public IPs matching criteria"""
cmd = listPublicIpAddresses.listPublicIpAddressesCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listPublicIpAddresses(cmd))
def list_nat_rules(apiclient, **kwargs):
"""List all NAT rules matching criteria"""
cmd = listPortForwardingRules.listPortForwardingRulesCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listPortForwardingRules(cmd))
def list_lb_rules(apiclient, **kwargs):
"""List all Load balancing rules matching criteria"""
cmd = listLoadBalancerRules.listLoadBalancerRulesCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listLoadBalancerRules(cmd))
def list_lb_instances(apiclient, **kwargs):
"""List all Load balancing instances matching criteria"""
cmd = listLoadBalancerRuleInstances.listLoadBalancerRuleInstancesCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listLoadBalancerRuleInstances(cmd))
def list_firewall_rules(apiclient, **kwargs):
"""List all Firewall Rules matching criteria"""
cmd = listFirewallRules.listFirewallRulesCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listFirewallRules(cmd))
def list_volumes(apiclient, **kwargs):
"""List all volumes matching criteria"""
cmd = listVolumes.listVolumesCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listVolumes(cmd))
def list_isos(apiclient, **kwargs):
"""Lists all available ISO files."""
cmd = listIsos.listIsosCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listIsos(cmd))
def list_snapshots(apiclient, **kwargs):
"""List all snapshots matching criteria"""
cmd = listSnapshots.listSnapshotsCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listSnapshots(cmd))
def list_templates(apiclient, **kwargs):
"""List all templates matching criteria"""
cmd = listTemplates.listTemplatesCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listTemplates(cmd))
def list_domains(apiclient, **kwargs):
"""Lists domains"""
cmd = listDomains.listDomainsCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listDomains(cmd))
def list_accounts(apiclient, **kwargs):
"""Lists accounts and provides detailed account information for
listed accounts"""
cmd = listAccounts.listAccountsCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listAccounts(cmd))
def list_users(apiclient, **kwargs):
"""Lists users and provides detailed account information for
listed users"""
cmd = listUsers.listUsersCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listUsers(cmd))
def list_snapshot_policy(apiclient, **kwargs):
"""Lists snapshot policies."""
cmd = listSnapshotPolicies.listSnapshotPoliciesCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listSnapshotPolicies(cmd))
def list_events(apiclient, **kwargs):
"""Lists events"""
cmd = listEvents.listEventsCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listEvents(cmd))
def list_disk_offering(apiclient, **kwargs):
"""Lists all available disk offerings."""
cmd = listDiskOfferings.listDiskOfferingsCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listDiskOfferings(cmd))
def list_service_offering(apiclient, **kwargs):
"""Lists all available service offerings."""
cmd = listServiceOfferings.listServiceOfferingsCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listServiceOfferings(cmd))
def list_vlan_ipranges(apiclient, **kwargs):
"""Lists all VLAN IP ranges."""
cmd = listVlanIpRanges.listVlanIpRangesCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listVlanIpRanges(cmd))
def list_usage_records(apiclient, **kwargs):
"""Lists usage records for accounts"""
cmd = listUsageRecords.listUsageRecordsCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listUsageRecords(cmd))

View File

@ -0,0 +1,101 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2012 Citrix. All rights reserved.
#
"""Utilities functions
"""
import time
import remoteSSHClient
from cloudstackAPI import *
import cloudstackConnection
#from cloudstackConnection import cloudConnection
import configGenerator
import logging
import string
import random
def random_gen(size=6, chars=string.ascii_uppercase + string.digits):
"""Generate Random Strings of variable length"""
return ''.join(random.choice(chars) for x in range(size))
def cleanup_resources(api_client, resources):
"""Delete resources"""
for obj in resources:
obj.delete(api_client)
def is_server_ssh_ready(ipaddress, port, username, password, retries=50):
"""Return ssh handle else wait till sshd is running"""
loop_cnt = retries
while True:
try:
ssh = remoteSSHClient.remoteSSHClient(
ipaddress,
port,
username,
password
)
except Exception as e:
if loop_cnt == 0:
raise e
loop_cnt = loop_cnt - 1
time.sleep(30)
else:
return ssh
def format_volume_to_ext3(ssh_client, device="/dev/sda"):
"""Format attached storage to ext3 fs"""
cmds = [
"echo -e 'n\np\n1\n\n\nw' | fdisk %s" % device,
"mkfs.ext3 %s1" % device,
]
for c in cmds:
ssh_client.execute(c)
def fetch_api_client(config_file='datacenterCfg'):
"""Fetch the Cloudstack API Client"""
config = configGenerator.get_setup_config(config_file)
mgt = config.mgtSvr[0]
testClientLogger = logging.getLogger("testClient")
asyncTimeout = 3600
return cloudstackAPIClient.CloudStackAPIClient(
cloudstackConnection.cloudConnection(
mgt.mgtSvrIp,
mgt.port,
mgt.apiKey,
mgt.securityKey,
asyncTimeout,
testClientLogger
)
)
def get_process_status(hostip, port, username, password, linklocalip, process):
"""Double hop and returns a process status"""
#SSH to the machine
ssh = remoteSSHClient.remoteSSHClient(
hostip,
port,
username,
password
)
ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -ostricthostkeychecking=no "
ssh_command = ssh_command + "-oUserKnownHostsFile=/dev/null -p 3922 %s %s" \
% (linklocalip, process)
# Double hop into router
timeout = 5
# Ensure the SSH login is successful
while True:
res = ssh.execute(ssh_command)
if res[0] != "Host key verification failed.":
break
elif timeout == 0:
break
time.sleep(5)
timeout = timeout - 1
return res