1. Remove complete dependency from settings/environment

2. Add account creation/deletion to all relevant tests.
This commit is contained in:
Chirag Jog 2012-01-25 07:57:05 -08:00
parent b91d919e55
commit 4c447d92bc
12 changed files with 1730 additions and 1164 deletions

View File

@ -284,7 +284,7 @@ class Iso:
if "ispublic" in services:
cmd.ispublic = services["ispublic"]
return Iso(apiclient.createTemplate(cmd)[0].__dict__)
return Iso(apiclient.registerIso(cmd)[0].__dict__)
def delete(self, apiclient):
cmd = deleteIso.deleteIsoCmd()
@ -608,7 +608,14 @@ def get_zone(apiclient):
"Returns a default zone"
cmd = listZones.listZonesCmd()
return apiclient.listZones(cmd)[1]
return apiclient.listZones(cmd)[0]
def get_pod(apiclient, zoneid):
"Returns a default pod for specified zone"
cmd = listPods.listPodsCmd()
cmd.zoneid = zoneid
return apiclient.listPods(cmd)[0]
def get_template(apiclient, zoneid, ostypeid=12):
"Returns a template"

View File

@ -22,58 +22,60 @@ class Services:
"clusters": {
0: {
"clustername": "Xen Cluster",
"clustertype": "ExternalManaged", # CloudManaged or ExternalManaged"
"hypervisor": "XenServer", # Hypervisor type
"zoneid": 3,
"podid": 3,
"clustertype": "ExternalManaged",
# CloudManaged or ExternalManaged"
"hypervisor": "XenServer",
# Hypervisor type
},
1: {
"clustername": "KVM Cluster",
"clustertype": "CloudManaged", # CloudManaged or ExternalManaged"
"hypervisor": "KVM", # Hypervisor type
"zoneid": 3,
"podid": 3,
"clustertype": "CloudManaged",
# CloudManaged or ExternalManaged"
"hypervisor": "KVM",
# Hypervisor type
},
2: {
"hypervisor": 'VMware', # Hypervisor type
"clustertype": 'ExternalManaged', # CloudManaged or ExternalManaged"
"zoneid": 3,
"podid": 3,
"hypervisor": 'VMware',
# Hypervisor type
"clustertype": 'ExternalManaged',
# CloudManaged or ExternalManaged"
"username": 'administrator',
"password": 'fr3sca',
"url": 'http://192.168.100.17/CloudStack-Clogeny-Pune/Pune-1',
# Format: http:// vCenter Host / Datacenter / Cluster
# Format:http://vCenter Host/Datacenter/Cluster
"clustername": '192.168.100.17/CloudStack-Clogeny-Pune/Pune-1',
# Format: http:// IP_Address / Datacenter / Cluster
# Format: http://IP_Address/Datacenter/Cluster
},
},
"hosts": {
"xenserver": { #Must be name of corresponding Hypervisor type in cluster in small letters
"zoneid": 3,
"podid": 3,
"xenserver": {
# Must be name of corresponding Hypervisor type
# in cluster in small letters
"clusterid": 16,
"hypervisor": 'XenServer', # Hypervisor type
"clustertype": 'ExternalManaged', # CloudManaged or ExternalManaged"
"hypervisor": 'XenServer',
# Hypervisor type
"clustertype": 'ExternalManaged',
# CloudManaged or ExternalManaged"
"url": 'http://192.168.100.210',
"username": "administrator",
"password": "fr3sca",
},
"kvm": {
"zoneid": 3,
"podid": 3,
"clusterid": 35,
"hypervisor": 'KVM', # Hypervisor type
"clustertype": 'CloudManaged', # CloudManaged or ExternalManaged"
"hypervisor": 'KVM',
# Hypervisor type
"clustertype": 'CloudManaged',
# CloudManaged or ExternalManaged"
"url": 'http://192.168.100.212',
"username": "root",
"password": "fr3sca",
},
"vmware": {
"zoneid": 3,
"podid": 3,
"clusterid": 16,
"hypervisor": 'VMware', # Hypervisor type
"clustertype": 'ExternalManaged', # CloudManaged or ExternalManaged"
"hypervisor": 'VMware',
# Hypervisor type
"clustertype": 'ExternalManaged',
# CloudManaged or ExternalManaged"
"url": 'http://192.168.100.203',
"username": "administrator",
"password": "fr3sca",
@ -89,6 +91,27 @@ class TestHosts(cloudstackTestCase):
self.dbclient = self.testClient.getDbConnection()
self.services = Services().services
self.cleanup = []
# Get Zone and pod
self.zone = get_zone(self.apiclient)
self.pod = get_pod(self.apiclient, self.zone.id)
self.services["clusters"][0]["zoneid"] = self.zone.id
self.services["clusters"][1]["zoneid"] = self.zone.id
self.services["clusters"][2]["zoneid"] = self.zone.id
self.services["clusters"][0]["podid"] = self.pod.id
self.services["clusters"][1]["podid"] = self.pod.id
self.services["clusters"][2]["podid"] = self.pod.id
self.services["hosts"]["xenserver"]["zoneid"] = self.zone.id
self.services["hosts"]["kvm"]["zoneid"] = self.zone.id
self.services["hosts"]["vmware"]["zoneid"] = self.zone.id
self.services["hosts"]["xenserver"]["podid"] = self.pod.id
self.services["hosts"]["kvm"]["podid"] = self.pod.id
self.services["hosts"]["vmware"]["podid"] = self.pod.id
return
def tearDown(self):
@ -108,22 +131,23 @@ class TestHosts(cloudstackTestCase):
# Validate the following:
# 1. Verify hypervisortype returned by API is Xen/KVM/VWare
# 2. Verify that the cluster is in 'Enabled' allocation state
# 3. Verify that the host is added successfully and in Up state with listHosts API response
# 3. Verify that the host is added successfully and in Up state
# with listHosts API response
#Create clusters with Hypervisor type XEN/KVM/VWare
for k, v in self.services["clusters"].items():
cluster = Cluster.create(self.apiclient, v)
self.assertEqual(
cluster.hypervisortype,
v["hypervisor"],
"Check hypervisor type of created cluster is " + v["hypervisor"] + " or not"
)
cluster.hypervisortype,
v["hypervisor"],
"Check hypervisor type is " + v["hypervisor"] + " or not"
)
self.assertEqual(
cluster.allocationstate,
'Enabled',
"Check whether allocation state of cluster is enabled"
)
cluster.allocationstate,
'Enabled',
"Check whether allocation state of cluster is enabled"
)
#If host is externally managed host is already added with cluster
cmd = listHosts.listHostsCmd()
@ -177,8 +201,8 @@ class TestHosts(cloudstackTestCase):
"Check cluster ID with list clusters response"
)
self.assertEqual(
cluster_response.hypervisortype,
cluster.hypervisortype,
"Check hypervisor type with list clusters response is " + v["hypervisor"] + " or not"
)
cluster_response.hypervisortype,
cluster.hypervisortype,
"Check hypervisor type with is " + v["hypervisor"] + " or not"
)
return

View File

@ -14,50 +14,55 @@ from random import random
#Import System modules
import time
class Services:
"""Test ISO Services
"""
def __init__(self):
self.services = {
"iso_1":
{
"displaytext": "Test ISO type 1",
"name": "testISOType_1",
"url": "http://iso.linuxquestions.org/download/504/1819/http/gd4.tuwien.ac.at/dsl-4.4.10.iso",
# Source URL where ISO is located
"zoneid": 1,
"isextractable": True,
"isfeatured": True,
"ispublic": True,
"ostypeid": 12,
},
"iso_2":
{
"displaytext": "Test ISO type 2",
"name": "testISOType_2",
"url": "http://iso.linuxquestions.org/download/504/1819/http/gd4.tuwien.ac.at/dsl-4.4.10.iso",
# Source URL where ISO is located
"zoneid": 1,
"isextractable": True,
"isfeatured": True,
"ispublic": True,
"ostypeid": 12,
"mode": 'HTTP_DOWNLOAD',
# Used in Extract template, value must be HTTP_DOWNLOAD
"ostypeid": 12,
},
"destzoneid": 2, # Copy ISO from one zone to another (Destination Zone)
"sourcezoneid": 1, # Copy ISO from one zone to another (Source Zone)
"isfeatured": True,
"ispublic": True,
"isextractable": True,
"bootable": True, # For edit template
"passwordenabled": True,
"ostypeid": 15,
"account": 'bhavin333', # Normal user, no admin rights
"domainid": 1,
}
"account": {
"email": "test@test.com",
"firstname": "Test",
"lastname": "User",
"username": "test",
"password": "password",
},
"iso_1":
{
"displaytext": "Test ISO type 1",
"name": "testISOType_1",
"url": "http://iso.linuxquestions.org/download/504/1819/http/gd4.tuwien.ac.at/dsl-4.4.10.iso",
# Source URL where ISO is located
"isextractable": True,
"isfeatured": True,
"ispublic": True,
"ostypeid": 12,
},
"iso_2":
{
"displaytext": "Test ISO type 2",
"name": "testISOType_2",
"url": "http://iso.linuxquestions.org/download/504/1819/http/gd4.tuwien.ac.at/dsl-4.4.10.iso",
# Source URL where ISO is located
"isextractable": True,
"isfeatured": True,
"ispublic": True,
"ostypeid": 12,
"mode": 'HTTP_DOWNLOAD',
# Used in Extract template, value must be HTTP_DOWNLOAD
},
"destzoneid": 2,
# Copy ISO from one zone to another (Destination Zone)
"isfeatured": True,
"ispublic": True,
"isextractable": True,
"bootable": True, # For edit template
"passwordenabled": True,
"ostypeid": 12,
"domainid": 1,
}
class TestCreateIso(cloudstackTestCase):
@ -65,6 +70,9 @@ class TestCreateIso(cloudstackTestCase):
self.services = Services().services
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
# Get Zone, Domain and templates
self.zone = get_zone(self.apiclient)
self.services["iso_2"]["zoneid"] = self.zone.id
self.cleanup = []
return
@ -85,7 +93,8 @@ class TestCreateIso(cloudstackTestCase):
"""
# Validate the following:
# 1. database (vm_template table) should be updated with newly created ISO
# 1. database (vm_template table) should be
# updated with newly created ISO
# 2. UI should show the newly added ISO
# 3. listIsos API should show the newly added ISO
@ -120,55 +129,40 @@ class TestCreateIso(cloudstackTestCase):
self.services["iso_2"]["zoneid"],
"Check zone ID of newly created ISO"
)
#Verify the database entry for ISO
self.debug(
"select name, display_text from vm_template where id = %s and format='ISO';"
% iso.id
)
qresultset = self.dbclient.execute(
"select name, display_text from vm_template where id = %s and format='ISO';"
% iso.id
)
self.assertNotEqual(
len(qresultset),
0,
"Check DB Query result set"
)
qresult = qresultset[0]
self.assertEqual(
qresult[0],
self.services["iso_2"]["name"],
"Compare ISO name with database record"
)
self.assertEqual(
qresult[1],
self.services["iso_2"]["displaytext"],
"Compare ISO display text with database record"
)
return
class TestISO(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.services = Services().services
cls.api_client = fetch_api_client()
# Get Zone, Domain and templates
cls.zone = get_zone(cls.api_client)
cls.services["iso_1"]["zoneid"] = cls.zone.id
cls.services["iso_2"]["zoneid"] = cls.zone.id
cls.services["sourcezoneid"] = cls.zone.id
#Create an account, network, VM and IP addresses
cls.account = Account.create(
cls.api_client,
cls.services["account"],
)
cls.services["account"] = cls.account.account.name
cls.iso_1 = Iso.create(cls.api_client, cls.services["iso_1"])
cls.iso_1.download(cls.api_client)
cls.iso_2 = Iso.create(cls.api_client, cls.services["iso_2"])
cls.iso_2.download(cls.api_client)
cls._cleanup = [cls.iso_2, cls.account]
return
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
cls.iso_2.delete(cls.api_client)
#Clean up, terminate the created templates
cleanup_resources(cls.api_client, cls.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
@ -247,47 +241,6 @@ class TestISO(cloudstackTestCase):
self.services["ostypeid"],
"Check OSTypeID of updated ISO"
)
#Verify database entry for updateIso
self.debug(
"select name, display_text, bootable, guest_os_id from vm_template where id = %s and format='ISO';"
% self.iso_1.id
)
qresultset = self.dbclient.execute(
"select name, display_text, bootable, guest_os_id from vm_template where id = %s and format='ISO';"
% self.iso_1.id
)
self.assertNotEqual(
len(qresultset),
0,
"Check DB Query result set"
)
qresult = qresultset[0]
self.assertEqual(
qresult[0],
new_name,
"Compare ISO name with database record"
)
self.assertEqual(
qresult[1],
new_displayText,
"Compare ISO display text with database record"
)
self.assertEqual(
qresult[2],
int(self.services["bootable"]),
"Compare template enable_password field with database record"
)
self.assertEqual(
qresult[3],
self.services["ostypeid"],
"Compare template guest OS ID with database record"
)
return
def test_03_delete_iso(self):
@ -305,23 +258,11 @@ class TestISO(cloudstackTestCase):
cmd.id = self.iso_1.id
list_iso_response = self.apiclient.listIsos(cmd)
self.assertEqual(list_iso_response, None, "Check if ISO exists in ListIsos")
#Verify whether database entry is deleted or not
self.debug(
"select name, display_text from vm_template where id = %s and format='ISO';"
% self.iso_1.id
)
qresultset = self.dbclient.execute(
"select name, display_text from vm_template where id = %s and format='ISO';"
% self.iso_1.id
)
self.assertEqual(
len(qresultset),
1,
"Check DB Query result set"
)
list_iso_response,
None,
"Check if ISO exists in ListIsos"
)
return
def test_04_extract_Iso(self):
@ -329,7 +270,8 @@ class TestISO(cloudstackTestCase):
# Validate the following
# 1. Admin should able extract and download the ISO
# 2. ListIsos should display all the public templates for all kind of users
# 2. ListIsos should display all the public templates
# for all kind of users
# 3 .ListIsos should not display the system templates
cmd = extractIso.extractIsoCmd()
@ -370,7 +312,8 @@ class TestISO(cloudstackTestCase):
# validate the following
# 1. listIsos returns valid permissions set for ISO
# 2. permission changes should be reflected in vm_template table in database
# 2. permission changes should be reflected in vm_template
# table in database
cmd = updateIsoPermissions.updateIsoPermissionsCmd()
cmd.id = self.iso_2.id
@ -383,8 +326,8 @@ class TestISO(cloudstackTestCase):
#Verify ListIsos have updated permissions for the ISO for normal user
cmd = listIsos.listIsosCmd()
cmd.id = self.iso_2.id
cmd.account = self.services["account"]
cmd.domainid = self.services["domainid"]
cmd.account = self.account.account.name
cmd.domainid = self.account.account.domainid
list_iso_response = self.apiclient.listIsos(cmd)
iso_response = list_iso_response[0]
@ -405,48 +348,14 @@ class TestISO(cloudstackTestCase):
self.services["isfeatured"],
"Check isfeatured permission of ISO"
)
#Verify database entry for updated ISO permissions
self.debug(
"select public, featured, extractable from vm_template where id = %s and format='ISO';"
% self.iso_2.id
)
qresultset = self.dbclient.execute(
"select public, featured, extractable from vm_template where id = %s and format='ISO';"
% self.iso_2.id
)
self.assertNotEqual(
len(qresultset),
0,
"Check DB Query result set"
)
qresult = qresultset[0]
self.assertEqual(
qresult[0],
int(self.services["ispublic"]),
"Compare ispublic permission with database record"
)
self.assertEqual(
qresult[1],
int(self.services["isfeatured"]),
"Compare isfeatured permission with database record"
)
self.assertEqual(
qresult[2],
int(self.services["isextractable"]),
"Compare extractable permission with database record"
)
return
def test_06_copy_iso(self):
"""Test for copy ISO from one zone to another"""
#Validate the following
#1. copy ISO should be successful and secondary storage should contain new copied ISO.
#1. copy ISO should be successful and secondary storage
# should contain new copied ISO.
cmd = copyIso.copyIsoCmd()
cmd.id = self.iso_2.id

File diff suppressed because it is too large Load Diff

View File

@ -19,53 +19,43 @@ class Services:
def __init__(self):
self.services = {
"nfs": {
0: {
"url": "nfs://192.168.100.131/Primary",
# Format: File_System_Type/Location/Path
"name": "Primary XEN",
"podid": 3,
"clusterid": 1, # XEN Cluster
"zoneid": 3,
"hypervisor": 'XEN',
},
1: {
"url": "nfs://192.168.100.131/Primary",
"name": "Primary KVM",
"podid": 3,
"clusterid": 40, # KVM Cluster
"zoneid": 3,
"hypervisor": 'KVM',
},
2: {
"url": "nfs://192.168.100.131/Primary",
"name": "Primary VMWare",
"podid": 3,
"clusterid": 33, # VMWare Cluster
"zoneid": 3,
"hypervisor": 'VMWare',
},
},
"iscsi": {
0: {
"url": "iscsi://192.168.100.21/iqn.2012-01.localdomain.clo-cstack-cos6:iser/1",
# Format : iscsi : // IP Address/ IQN number/ LUN #
"name": "Primary iSCSI",
"podid": 1,
"clusterid": 1, # XEN Cluster
"zoneid": 1,
"hypervisor": 'XEN',
},
1: {
"url": "iscsi://192.168.100.21/export",
"name": "Primary KVM",
"podid": 3,
"clusterid": 1, # KVM Cluster
"zoneid": 3,
"hypervisor": 'KVM',
},
},
}
"nfs": {
0: {
"url": "nfs://192.168.100.131/Primary",
# Format: File_System_Type/Location/Path
"name": "Primary XEN",
"clusterid": 1, # XEN Cluster
"hypervisor": 'XEN',
},
1: {
"url": "nfs://192.168.100.131/Primary",
"name": "Primary KVM",
"clusterid": 40, # KVM Cluster
"hypervisor": 'KVM',
},
2: {
"url": "nfs://192.168.100.131/Primary",
"name": "Primary VMWare",
"clusterid": 33, # VMWare Cluster
"hypervisor": 'VMWare',
},
},
"iscsi": {
0: {
"url": "iscsi://192.168.100.21/iqn.2012-01.localdomain.clo-cstack-cos6:iser/1",
# Format : iscsi://IP Address/IQN number/LUN#
"name": "Primary iSCSI",
"clusterid": 1, # XEN Cluster
"hypervisor": 'XEN',
},
1: {
"url": "iscsi://192.168.100.21/export",
"name": "Primary KVM",
"clusterid": 1, # KVM Cluster
"hypervisor": 'KVM',
},
},
}
class TestPrimaryStorageServices(cloudstackTestCase):
@ -74,6 +64,24 @@ class TestPrimaryStorageServices(cloudstackTestCase):
self.apiclient = self.testClient.getApiClient()
self.services = Services().services
self.cleanup = []
# Get Zone and pod
self.zone = get_zone(self.apiclient)
self.pod = get_pod(self.apiclient, self.zone.id)
self.services["nfs"][0]["zoneid"] = self.zone.id
self.services["nfs"][1]["zoneid"] = self.zone.id
self.services["nfs"][2]["zoneid"] = self.zone.id
self.services["nfs"][0]["podid"] = self.pod.id
self.services["nfs"][1]["podid"] = self.pod.id
self.services["nfs"][2]["podid"] = self.pod.id
self.services["iscsi"][0]["zoneid"] = self.zone.id
self.services["iscsi"][1]["zoneid"] = self.zone.id
self.services["iscsi"][0]["podid"] = self.pod.id
self.services["iscsi"][1]["podid"] = self.pod.id
return
def tearDown(self):
@ -92,7 +100,8 @@ class TestPrimaryStorageServices(cloudstackTestCase):
# Validate the following:
# 1. verify hypervisortype returned by api is Xen/KVM/VMWare
# 2. verify that the cluster is in 'Enabled' allocation state
# 3. verify that the host is added successfully and in Up state with listHosts api response
# 3. verify that the host is added successfully and
# in Up state with listHosts api response
#Create NFS storage pools with on XEN/KVM/VMWare clusters
for k, v in self.services["nfs"].items():
@ -103,25 +112,25 @@ class TestPrimaryStorageServices(cloudstackTestCase):
list_hosts_response = self.apiclient.listHosts(cmd)
self.assertNotEqual(
len(list_hosts_response),
0,
"Check list Hosts response for hypervisor type : " + v["hypervisor"]
)
len(list_hosts_response),
0,
"Check list Hosts for hypervisor: " + v["hypervisor"]
)
storage = StoragePool.create(self.apiclient, v)
self.cleanup.append(storage)
self.assertEqual(
storage.state,
'Up',
"Check state of primary storage is Up or not for hypervisor type : " + v["hypervisor"]
)
storage.state,
'Up',
"Check primary storage state for hypervisor: " + v["hypervisor"]
)
self.assertEqual(
storage.type,
'NetworkFilesystem',
"Check type of the storage pool created for hypervisor type : " + v["hypervisor"]
)
storage.type,
'NetworkFilesystem',
"Check storage pool type for hypervisor : " + v["hypervisor"]
)
#Verify List Storage pool Response has newly added storage pool
cmd = listStoragePools.listStoragePoolsCmd()
@ -136,15 +145,15 @@ class TestPrimaryStorageServices(cloudstackTestCase):
storage_response = storage_pools_response[0]
self.assertEqual(
storage_response.id,
storage.id,
"Check storage pool ID with list storage pools response for hypervisor type : " + v["hypervisor"]
)
storage_response.id,
storage.id,
"Check storage pool ID for hypervisor: " + v["hypervisor"]
)
self.assertEqual(
storage.type,
storage_response.type,
"Check type of the storage pool for hypervisor type : " + v["hypervisor"]
)
storage.type,
storage_response.type,
"Check storage pool type for hypervisor: " + v["hypervisor"]
)
# Call cleanup for reusing primary storage
cleanup_resources(self.apiclient, self.cleanup)
self.cleanup = []
@ -155,10 +164,10 @@ class TestPrimaryStorageServices(cloudstackTestCase):
self.cleanup.append(storage)
self.assertEqual(
storage.state,
'Up',
"Check state of primary storage is Up or not for hypervisor type : " + v["hypervisor"]
)
storage.state,
'Up',
"Check primary storage state for hypervisor: " + v["hypervisor"]
)
#Verify List Storage pool Response has newly added storage pool
cmd = listStoragePools.listStoragePoolsCmd()
@ -166,22 +175,22 @@ class TestPrimaryStorageServices(cloudstackTestCase):
storage_pools_response = self.apiclient.listStoragePools(cmd)
self.assertNotEqual(
len(storage_pools_response),
0,
"Check list Hosts response for hypervisor type : " + v["hypervisor"]
len(storage_pools_response),
0,
"Check Hosts response for hypervisor: " + v["hypervisor"]
)
storage_response = storage_pools_response[0]
self.assertEqual(
storage_response.id,
storage.id,
"Check storage pool ID with list storage pools response for hypervisor type : " + v["hypervisor"]
)
storage_response.id,
storage.id,
"Check storage pool ID for hypervisor: " + v["hypervisor"]
)
self.assertEqual(
storage.type,
storage_response.type,
"Check type of the storage pool for hypervisor type : " + v["hypervisor"]
)
storage.type,
storage_response.type,
"Check storage pool type hypervisor: " + v["hypervisor"]
)
# Call cleanup for reusing primary storage
cleanup_resources(self.apiclient, self.cleanup)

View File

@ -14,6 +14,7 @@ from base import *
#Import System modules
import time
class Services:
"""Test router Services
"""
@ -29,11 +30,9 @@ class Services:
},
"virtual_machine":
{
"template": 7, # Template used for VM creation
"zoneid": 3,
"displayname": "testserver",
"displayname": "Test VM",
"username": "root",
"password": "fr3sca",
"password": "password",
"ssh_port": 22,
"hypervisor": 'VMWare',
"domainid": 1,
@ -47,30 +46,49 @@ class Services:
"lastname": "User",
"username": "testuser",
"password": "fr3sca",
"zoneid": 1,
},
"sleep_time": 300,
"ostypeid":12,
}
class TestRouterServices(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.services = Services().services
#Create an account, network, VM and IP addresses
cls.account = Account.create(cls.api_client, cls.services["account"], admin = True)
cls.service_offering = ServiceOffering.create(cls.api_client, cls.services["service_offering"])
cls.vm_1 = VirtualMachine.create(
cls.api_client,
cls.services["virtual_machine"],
accountid = cls.account.account.name,
serviceofferingid = cls.service_offering.id
)
# Get Zone, Domain and templates
cls.zone = get_zone(cls.api_client)
template = get_template(
cls.api_client,
cls.zone.id,
cls.services["ostypeid"]
)
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
cls.cleanup = [cls.vm_1, cls.account, cls.service_offering]
#Create an account, network, VM and IP addresses
cls.account = Account.create(
cls.api_client,
cls.services["account"],
admin=True
)
cls.service_offering = ServiceOffering.create(
cls.api_client,
cls.services["service_offering"]
)
cls.vm_1 = VirtualMachine.create(
cls.api_client,
cls.services["virtual_machine"],
templateid=template.id,
accountid=cls.account.account.name,
serviceofferingid=cls.service_offering.id
)
cls.cleanup = [
cls.vm_1,
cls.account,
cls.service_offering
]
return
@classmethod
@ -150,7 +168,7 @@ class TestRouterServices(cloudstackTestCase):
cmd = listRouters.listRoutersCmd()
cmd.account = self.account.account.name
cmd.domainid = self.services["virtual_machine"]["domainid"]
cmd.domainid = self.account.account.domainid
list_router_response = self.apiclient.listRouters(cmd)
self.assertNotEqual(
@ -212,7 +230,7 @@ class TestRouterServices(cloudstackTestCase):
cmd = listRouters.listRoutersCmd()
cmd.account = self.account.account.name
cmd.domainid = self.services["virtual_machine"]["domainid"]
cmd.domainid = self.account.account.domainid
router = self.apiclient.listRouters(cmd)[0]
#Stop the router
@ -221,7 +239,6 @@ class TestRouterServices(cloudstackTestCase):
self.apiclient.stopRouter(cmd)
#List routers to check state of router
cmd = listRouters.listRoutersCmd()
cmd.id = router.id
router_response = self.apiclient.listRouters(cmd)[0]
@ -243,7 +260,7 @@ class TestRouterServices(cloudstackTestCase):
cmd = listRouters.listRoutersCmd()
cmd.account = self.account.account.name
cmd.domainid = self.services["virtual_machine"]["domainid"]
cmd.domainid = self.account.account.domainid
router = self.apiclient.listRouters(cmd)[0]
#Start the router
@ -252,7 +269,6 @@ class TestRouterServices(cloudstackTestCase):
self.apiclient.startRouter(cmd)
#List routers to check state of router
cmd = listRouters.listRoutersCmd()
cmd.id = router.id
router_response = self.apiclient.listRouters(cmd)[0]
@ -274,7 +290,7 @@ class TestRouterServices(cloudstackTestCase):
cmd = listRouters.listRoutersCmd()
cmd.account = self.account.account.name
cmd.domainid = self.services["virtual_machine"]["domainid"]
cmd.domainid = self.account.account.domainid
router = self.apiclient.listRouters(cmd)[0]
public_ip = router.publicip
@ -315,7 +331,7 @@ class TestRouterServices(cloudstackTestCase):
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.account = self.account.account.name
cmd.domainid = self.services["virtual_machine"]["domainid"]
cmd.domainid = self.account.account.domainid
list_vms = self.apiclient.listVirtualMachines(cmd)
self.assertNotEqual(
@ -340,13 +356,13 @@ class TestRouterServices(cloudstackTestCase):
#Check status of network router
cmd = listRouters.listRoutersCmd()
cmd.account = self.account.account.name
cmd.domainid = self.services["virtual_machine"]["domainid"]
cmd.domainid = self.account.account.domainid
router = self.apiclient.listRouters(cmd)[0]
self.assertEqual(
router.state,
'Stopped',
"Check state of the router after stopping all VMs associated with that account"
router.state,
'Stopped',
"Check state of the router after stopping all VMs associated"
)
return
@ -355,12 +371,13 @@ class TestRouterServices(cloudstackTestCase):
"""
# Validate the following
# 1. Router only does dhcp
# 2. Verify that ports 67 (DHCP) and 53 (DNS) are open on UDP by checking status of dnsmasq process
# 2. Verify that ports 67 (DHCP) and 53 (DNS) are open on UDP
# by checking status of dnsmasq process
# Find router associated with user account
cmd = listRouters.listRoutersCmd()
cmd.account = self.account.account.name
cmd.domainid = self.services["virtual_machine"]["domainid"]
cmd.domainid = self.account.account.domainid
router = self.apiclient.listRouters(cmd)[0]
cmd = listHosts.listHostsCmd()
@ -371,12 +388,13 @@ class TestRouterServices(cloudstackTestCase):
#SSH to the machine
ssh = remoteSSHClient.remoteSSHClient(
host.ipaddress,
self.services['virtual_machine']["publicport"],
self.services['virtual_machine']["username"],
self.services['virtual_machine']["password"]
)
ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s " % router.linklocalip
host.ipaddress,
self.services['virtual_machine']["publicport"],
self.vm_1.username,
self.vm_1.password
)
ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s "\
% router.linklocalip
# Double hop into router
timeout = 5
@ -419,12 +437,13 @@ class TestRouterServices(cloudstackTestCase):
#SSH to the machine
ssh = remoteSSHClient.remoteSSHClient(
host.ipaddress,
self.services['virtual_machine']["publicport"],
self.services['virtual_machine']["username"],
self.services['virtual_machine']["password"]
)
ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s " % router.linklocalip
host.ipaddress,
self.services['virtual_machine']["publicport"],
self.vm_1.username,
self.vm_1.password
)
ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s "\
% router.linklocalip
# Double hop into router
timeout = 5
@ -468,7 +487,8 @@ class TestRouterServices(cloudstackTestCase):
# Validate the following
# 1. When cleanup = true, router is destroyed and a new one created
# 2. New router will have new publicIp and linkLocalIp and all it's services should resume
# 2. New router will have new publicIp and linkLocalIp and
# all it's services should resume
# Find router associated with user account
cmd = listRouters.listRoutersCmd()
@ -507,7 +527,8 @@ class TestRouterServices(cloudstackTestCase):
"""
# Validate the following
# 1. When cleanup = false, router is restarted and all services inside the router are restarted
# 1. When cleanup = false, router is restarted and
# all services inside the router are restarted
# 2. check 'uptime' to see if the actual restart happened
cmd = listNetworks.listNetworksCmd()
@ -534,12 +555,13 @@ class TestRouterServices(cloudstackTestCase):
#SSH to the machine
ssh = remoteSSHClient.remoteSSHClient(
host.ipaddress,
self.services['virtual_machine']["publicport"],
self.services['virtual_machine']["username"],
self.services['virtual_machine']["password"]
)
ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s uptime" % router.linklocalip
host.ipaddress,
self.services['virtual_machine']["publicport"],
self.vm_1.username,
self.vm_1.password
)
ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s uptime"\
% router.linklocalip
# Double hop into router to check router uptime
timeout = 5

View File

@ -19,35 +19,27 @@ class Services:
def __init__(self):
self.services = {
"storage": {
"zoneid": 3,
"podid": 3,
"url": "nfs://192.168.100.131/SecStorage"
# Format: File_System_Type/Location/Path
},
"hypervisors": {
0: {
"hypervisor": "XenServer",
"zoneid": 3,
"podid": 3,
"templatefilter": "self",
},
1: {
"hypervisor": "KVM",
"zoneid": 3,
"podid": 3,
"templatefilter": "self",
},
2: {
"hypervisor": "VMWare",
"zoneid": 3,
"podid": 3,
"templatefilter": "self",
},
},
"sleep": 180,
"timeout": 5,
}
"storage": {
"url": "nfs://192.168.100.131/SecStorage"
# Format: File_System_Type/Location/Path
},
"hypervisors": {
0: {
"hypervisor": "XenServer",
"templatefilter": "self",
},
1: {
"hypervisor": "KVM",
"templatefilter": "self",
},
2: {
"hypervisor": "VMWare",
"templatefilter": "self",
},
},
"sleep": 180,
"timeout": 5,
}
class TestSecStorageServices(cloudstackTestCase):
@ -56,6 +48,20 @@ class TestSecStorageServices(cloudstackTestCase):
self.apiclient = self.testClient.getApiClient()
self.cleanup = []
self.services = Services().services
# Get Zone and pod
self.zone = get_zone(self.apiclient)
self.pod = get_pod(self.apiclient, self.zone.id)
self.services["storage"]["zoneid"] = self.zone.id
self.services["storage"]["podid"] = self.pod.id
self.services["hypervisors"][0]["zoneid"] = self.zone.id
self.services["hypervisors"][1]["zoneid"] = self.zone.id
self.services["hypervisors"][2]["zoneid"] = self.zone.id
self.services["hypervisors"][0]["podid"] = self.pod.id
self.services["hypervisors"][1]["podid"] = self.pod.id
self.services["hypervisors"][2]["podid"] = self.pod.id
return
def tearDown(self):
@ -75,13 +81,13 @@ class TestSecStorageServices(cloudstackTestCase):
# 2. Verify with listHosts and type secondarystorage
cmd = addSecondaryStorage.addSecondaryStorageCmd()
cmd.zoneid = self.services["storage"]["zoneid"]
cmd.zoneid = self.zone.id
cmd.url = self.services["storage"]["url"]
sec_storage = self.apiclient.addSecondaryStorage(cmd)
self.assertEqual(
sec_storage.zoneid,
self.services["storage"]["zoneid"],
self.zone.id,
"Check zoneid where sec storage is added"
)
@ -115,13 +121,14 @@ class TestSecStorageServices(cloudstackTestCase):
"""
# 1. verify listHosts has all 'routing' hosts in UP state
# 2. verify listStoragePools shows all primary storage pools in UP state
# 2. verify listStoragePools shows all primary storage pools
# in UP state
# 3. verify that secondary storage was added successfully
cmd = listHosts.listHostsCmd()
cmd.type = 'Routing'
cmd.zoneid = self.services["storage"]["zoneid"]
cmd.podid = self.services["storage"]["podid"]
cmd.zoneid = self.zone.id
cmd.podid = self.pod.id
list_hosts_response = self.apiclient.listHosts(cmd)
# ListHosts has all 'routing' hosts in UP state
self.assertNotEqual(
@ -139,8 +146,8 @@ class TestSecStorageServices(cloudstackTestCase):
# ListStoragePools shows all primary storage pools in UP state
cmd = listStoragePools.listStoragePoolsCmd()
cmd.name = 'Primary'
cmd.zoneid = self.services["storage"]["zoneid"]
cmd.podid = self.services["storage"]["podid"]
cmd.zoneid = self.zone.id
cmd.podid = self.pod.id
list_storage_response = self.apiclient.listStoragePools(cmd)
self.assertNotEqual(
@ -159,7 +166,7 @@ class TestSecStorageServices(cloudstackTestCase):
# Secondary storage is added successfully
cmd = listHosts.listHostsCmd()
cmd.type = 'SecondaryStorage'
cmd.zoneid = self.services["storage"]["zoneid"]
cmd.zoneid = self.zone.id
timeout = self.services["timeout"]
while True:
@ -187,8 +194,8 @@ class TestSecStorageServices(cloudstackTestCase):
cmd = listSystemVms.listSystemVmsCmd()
cmd.systemvmtype = 'secondarystoragevm'
cmd.zoneid = self.services["storage"]["zoneid"]
cmd.podid = self.services["storage"]["podid"]
cmd.zoneid = self.zone.id
cmd.podid = self.pod.id
timeout = self.services["timeout"]
@ -222,13 +229,14 @@ class TestSecStorageServices(cloudstackTestCase):
# Validate the following
# If SSVM is in UP state and running
# 1. wait for listTemplates to show all builtin templates downloaded for all added hypervisors and in “Ready” state"
# 1. wait for listTemplates to show all builtin templates
# downloaded for all added hypervisors and in “Ready” state"
for k, v in self.services["hypervisors"].items():
cmd = listTemplates.listTemplatesCmd()
cmd.hypervisor = v["hypervisor"]
cmd.zoneid = v["zoneid"]
cmd.zoneid = self.zone.id
cmd.templatefilter = v["templatefilter"]
list_templates = self.apiclient.listTemplates(cmd)
@ -241,7 +249,7 @@ class TestSecStorageServices(cloudstackTestCase):
cmd = listTemplates.listTemplatesCmd()
cmd.id = templateid
cmd.templatefilter = v["templatefilter"]
cmd.zoneid = v["zoneid"]
cmd.zoneid = self.zone.id
while True and (templateid != None):

View File

@ -17,81 +17,83 @@ class Services:
def __init__(self):
self.services = {
"service_offering": {
"name": "Tiny Service Offering",
"displaytext": "Tiny service offering",
"account": {
"email": "test@test.com",
"firstname": "Test",
"lastname": "User",
"username": "test",
# Random characters are appended for unique
# username
"password": "fr3sca",
},
"service_offering": {
"name": "Tiny Instance",
"displaytext": "Tiny Instance",
"cpunumber": 1,
"cpuspeed": 100, # in MHz
"memory": 64, # In MBs
},
"server_with_disk":
"cpuspeed": 200, # in MHz
"memory": 256, # In MBs
},
"disk_offering": {
"displaytext": "Small",
"name": "Small",
"disksize": 1
},
"server_with_disk":
{
"template": 256, # Template used for VM creation
"zoneid": 1,
"diskoffering": 3, # Optional, if specified data disk will be allocated to VM
"displayname": "testserver",
"username": "root",
"password": "password",
"ssh_port": 22,
"hypervisor": 'XenServer',
"account": 'testuser',
"domainid": 1,
"privateport": 22,
"publicport": 22,
"protocol": 'TCP',
},
"server_without_disk":
"server_without_disk":
{
"template": 256, # Template used for VM creation
"zoneid": 1,
"displayname": "testserver",
"username": "root",
"password": "password",
"ssh_port": 22,
"hypervisor": 'XenServer',
"account": 'testuser',
"domainid": 1,
"privateport": 22, # For NAT rule creation
"privateport": 22,
# For NAT rule creation
"publicport": 22,
"protocol": 'TCP',
},
"recurring_snapshot":
"recurring_snapshot":
{
"intervaltype": 'HOURLY', # Frequency of snapshots
"intervaltype": 'HOURLY',
# Frequency of snapshots
"maxsnaps": 2, # Should be min 2
"schedule": 1,
"timezone": 'US/Arizona',
# Timezone Formats - http://cloud.mindtouch.us/CloudStack_Documentation/Developer's_Guide%3A_CloudStack
},
"templates":
"templates":
{
"displaytext": 'Test template snapshot',
"name": 'template_from_snapshot',
"displaytext": 'Template from snapshot',
"name": 'Template from snapshot',
"ostypeid": 12,
"templatefilter": 'self',
},
"small_instance":
{
"zoneid": 1,
"serviceofferingid": 1,
},
"ostypeid": 12,
# Cent OS 5.3 (64 bit)
"diskdevice": "/dev/xvda",
"template": 256,
"zoneid": 1,
"diskoffering": 3,
"diskname": "TestDiskServ",
"size": 1, # GBs
"account": 'testuser',
"domainid": 1,
"mount_dir": "/mnt/tmp",
"sub_dir": "test",
"sub_lvl_dir1": "test1",
"sub_lvl_dir2": "test2",
"random_data": "random.data",
"exportpath": 'SecondaryStorage',
"exportpath": 'SecStorage',
"sec_storage": '192.168.100.131',
# IP address of Sec storage where snapshots are stored
"mgmt_server_ip": '192.168.100.154',
@ -107,11 +109,55 @@ class TestSnapshots(cloudstackTestCase):
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.services = Services().services
cls.service_offering = ServiceOffering.create(cls.api_client, cls.services["service_offering"])
# Get Zone, Domain and templates
cls.zone = get_zone(cls.api_client)
cls.disk_offering = DiskOffering.create(
cls.api_client,
cls.services["disk_offering"]
)
template = get_template(
cls.api_client,
cls.zone.id,
cls.services["ostypeid"]
)
cls.services["server_with_disk"]["zoneid"] = cls.zone.id
cls.services["server_with_disk"]["diskoffering"] = cls.disk_offering.id
cls.services["server_without_disk"]["zoneid"] = cls.zone.id
cls.services["template"] = template.id
cls.services["zoneid"] = cls.zone.id
cls.services["diskoffering"] = cls.disk_offering.id
# Create VMs, NAT Rules etc
cls.account = Account.create(
cls.api_client,
cls.services["account"],
admin=True
)
cls.services["account"] = cls.account.account.name
cls.service_offering = ServiceOffering.create(
cls.api_client,
cls.services["service_offering"]
)
cls.virtual_machine = cls.virtual_machine_with_disk = \
VirtualMachine.create(cls.api_client, cls.services["server_with_disk"], serviceofferingid = cls.service_offering.id)
VirtualMachine.create(
cls.api_client,
cls.services["server_with_disk"],
templateid=template.id,
accountid=cls.account.account.name,
serviceofferingid=cls.service_offering.id
)
cls.virtual_machine_without_disk = \
VirtualMachine.create(cls.api_client, cls.services["server_without_disk"], serviceofferingid = cls.service_offering.id)
VirtualMachine.create(
cls.api_client,
cls.services["server_without_disk"],
templateid=template.id,
accountid=cls.account.account.name,
serviceofferingid=cls.service_offering.id
)
cls.public_ip = PublicIPAddress.create(
cls.api_client,
@ -120,8 +166,20 @@ class TestSnapshots(cloudstackTestCase):
cls.virtual_machine.domainid,
cls.services["server_with_disk"]
)
cls.nat_rule = NATRule.create(cls.api_client, cls.virtual_machine, cls.services["server_with_disk"], ipaddressid = cls.public_ip.ipaddress.id)
cls._cleanup = [cls.virtual_machine, cls.nat_rule, cls.virtual_machine_without_disk, cls.public_ip, cls.service_offering]
cls.nat_rule = NATRule.create(
cls.api_client,
cls.virtual_machine,
cls.services["server_with_disk"],
ipaddressid=cls.public_ip.ipaddress.id
)
cls._cleanup = [
cls.virtual_machine,
cls.nat_rule,
cls.public_ip,
cls.service_offering,
cls.disk_offering,
cls.account,
]
return
@classmethod
@ -152,9 +210,11 @@ class TestSnapshots(cloudstackTestCase):
"""
# Validate the following
# 1.listSnapshots should list the snapshot that was created.
# 2.verify that secondary storage NFS share contains the reqd volume under /secondary/snapshots/$volumeid/$snapshot_uuid
# 3.verify backup_snap_id was non null in the `snapshots` table
# 1. listSnapshots should list the snapshot that was created.
# 2. verify that secondary storage NFS share contains
# the reqd volume under
# /secondary/snapshots/$volumeid/$snapshot_uuid
# 3. verify backup_snap_id was non null in the `snapshots` table
cmd = listVolumes.listVolumesCmd()
cmd.virtualmachineid = self.virtual_machine_with_disk.id
@ -167,21 +227,30 @@ class TestSnapshots(cloudstackTestCase):
cmd.id = snapshot.id
list_snapshots = self.apiclient.listSnapshots(cmd)
self.assertNotEqual(list_snapshots, None, "Check if result exists in list item call")
self.assertNotEqual(
list_snapshots,
None,
"Check if result exists in list item call"
)
self.assertEqual(
list_snapshots[0].id,
snapshot.id,
"Check resource id in list resources call"
)
self.debug("select backup_snap_id, account_id, volume_id, path from snapshots where id = %s;" % snapshot.id)
qresultset = self.dbclient.execute("select backup_snap_id, account_id, volume_id, path from snapshots where id = %s;" % snapshot.id)
self.debug(
"select backup_snap_id, account_id, volume_id, path from snapshots where id = %s;" \
% snapshot.id
)
qresultset = self.dbclient.execute(
"select backup_snap_id, account_id, volume_id, path from snapshots where id = %s;" \
% snapshot.id
)
self.assertNotEqual(
len(qresultset),
0,
"Check DB Query result set"
)
qresult = qresultset[0]
self.assertNotEqual(
str(qresult[0]),
@ -190,25 +259,29 @@ class TestSnapshots(cloudstackTestCase):
)
# Login to management server to check snapshot present on sec disk
ssh_client = remoteSSHClient.remoteSSHClient(
self.services["mgmt_server_ip"],
self.services["ssh_port"],
self.services["username"],
self.services["password"]
)
ssh_client = self.virtual_machine_without_disk.get_ssh_client(
self.nat_rule.ipaddress
)
cmds = [ "mkdir -p %s" % self.services["mount_dir"],
"mount %s:/%s %s" % (self.services["sec_storage"], self.services["exportpath"], self.services["mount_dir"]),
"ls %s/snapshots/%s/%s" % (self.services["mount_dir"], qresult[1], qresult[2]),
"mount %s:/%s %s" % (
self.services["sec_storage"],
self.services["exportpath"],
self.services["mount_dir"]
),
"ls %s/snapshots/%s/%s" % (
self.services["mount_dir"],
qresult[1],
qresult[2]
),
]
for c in cmds:
result = ssh_client.execute(c)
self.assertEqual(
result[0],
str(qresult[3]) + ".vhd",
"Check snapshot UUID in secondary storage and database"
result[0],
str(qresult[3]) + ".vhd",
"Check snapshot UUID in secondary storage and database"
)
return
@ -226,14 +299,24 @@ class TestSnapshots(cloudstackTestCase):
cmd.id = snapshot.id
list_snapshots = self.apiclient.listSnapshots(cmd)
self.assertNotEqual(list_snapshots, None, "Check if result exists in list item call")
self.assertNotEqual(
list_snapshots,
None,
"Check if result exists in list item call"
)
self.assertEqual(
list_snapshots[0].id,
snapshot.id,
"Check resource id in list resources call"
)
self.debug("select backup_snap_id, account_id, volume_id, path from snapshots where id = %s;" % 6)#snapshot.id
qresultset = self.dbclient.execute("select backup_snap_id, account_id, volume_id, backup_snap_id from snapshots where id = %s;" % 6) # snapshot.id
self.debug(
"select backup_snap_id, account_id, volume_id, path from snapshots where id = %s;" \
% snapshot.id
)
qresultset = self.dbclient.execute(
"select backup_snap_id, account_id, volume_id, backup_snap_id from snapshots where id = %s;" \
% snapshot.id
)
self.assertNotEqual(
len(qresultset),
0,
@ -249,16 +332,20 @@ class TestSnapshots(cloudstackTestCase):
)
# Login to management server to check snapshot present on sec disk
ssh_client = remoteSSHClient.remoteSSHClient(
self.services["mgmt_server_ip"],
self.services["ssh_port"],
self.services["username"],
self.services["password"]
)
ssh_client = self.virtual_machine_with_disk.get_ssh_client(
self.nat_rule.ipaddress
)
cmds = [ "mkdir -p %s" % self.services["mount_dir"],
"mount %s:/%s %s" % (self.services["sec_storage"], self.services["exportpath"], self.services["mount_dir"]),
"ls %s/snapshots/%s/%s" % (self.services["mount_dir"], qresult[1], qresult[2]),
"mount %s:/%s %s" % (
self.services["sec_storage"],
self.services["exportpath"],
self.services["mount_dir"]
),
"ls %s/snapshots/%s/%s" % (
self.services["mount_dir"],
qresult[1],
qresult[2]
),
]
for c in cmds:
result = ssh_client.execute(c)
@ -281,11 +368,19 @@ class TestSnapshots(cloudstackTestCase):
random_data_0 = random_gen(100)
random_data_1 = random_gen(100)
ssh_client = self.virtual_machine.get_ssh_client(self.nat_rule.ipaddress)
ssh_client = self.virtual_machine.get_ssh_client(
self.nat_rule.ipaddress
)
#Format partition using ext3
format_volume_to_ext3(ssh_client, self.services["diskdevice"])
format_volume_to_ext3(
ssh_client,
self.services["diskdevice"]
)
cmds = [ "mkdir -p %s" % self.services["mount_dir"],
"mount %s1 %s" % (self.services["diskdevice"], self.services["mount_dir"]),
"mount %s1 %s" % (
self.services["diskdevice"],
self.services["mount_dir"]
),
"pushd %s" % self.services["mount_dir"],
"mkdir -p %s/{%s,%s} " % (
self.services["sub_dir"],
@ -317,7 +412,11 @@ class TestSnapshots(cloudstackTestCase):
snapshot = Snapshot.create(self.apiclient, volume.id)
self.cleanup.append(snapshot)
#Create volume from snapshot
volume = Volume.create_from_snapshot(self.apiclient, snapshot.id, self.services)
volume = Volume.create_from_snapshot(
self.apiclient,
snapshot.id,
self.services
)
self.cleanup.append(volume)
cmd = listVolumes.listVolumesCmd()
cmd.id = volume.id
@ -344,35 +443,37 @@ class TestSnapshots(cloudstackTestCase):
ssh = new_virtual_machine.get_ssh_client(self.nat_rule.ipaddress)
cmds = [
"mkdir %s" % self.services["mount_dir"],
"mount %s1 %s" % (self.services["diskdevice"], self.services["mount_dir"])
"mount %s1 %s" % (
self.services["diskdevice"],
self.services["mount_dir"]
)
]
for c in cmds:
self.debug(ssh.execute(c))
returned_data_0 = ssh.execute("cat %s/%s/%s" % (
self.services["sub_dir"],
self.services["sub_lvl_dir1"],
self.services["random_data"]
)
)
self.services["sub_dir"],
self.services["sub_lvl_dir1"],
self.services["random_data"]
))
returned_data_1 = ssh.execute("cat %s/%s/%s" % (
self.services["sub_dir"],
self.services["sub_lvl_dir2"],
self.services["random_data"]
self.services["sub_dir"],
self.services["sub_lvl_dir2"],
self.services["random_data"]
))
#Verify returned data
self.assertEqual(
random_data_0,
returned_data_0[0],
"Verify newly attached volume contents with existing one"
random_data_0,
returned_data_0[0],
"Verify newly attached volume contents with existing one"
)
self.assertEqual(
random_data_1,
returned_data_1[0],
"Verify newly attached volume contents with existing one"
)
random_data_1,
returned_data_1[0],
"Verify newly attached volume contents with existing one"
)
#detach volume for cleanup
cmd = detachVolume.detachVolumeCmd()
@ -398,7 +499,11 @@ class TestSnapshots(cloudstackTestCase):
cmd = listSnapshots.listSnapshotsCmd()
cmd.id = snapshot.id
list_snapshots = self.apiclient.listSnapshots(cmd)
self.assertEqual(list_snapshots, None, "Check if result exists in list item call")
self.assertEqual(
list_snapshots,
None,
"Check if result exists in list item call"
)
return
def test_05_recurring_snapshot_root_disk(self):
@ -412,7 +517,11 @@ class TestSnapshots(cloudstackTestCase):
cmd.virtualmachineid = self.virtual_machine_with_disk.id
cmd.type = 'ROOT'
volume = self.apiclient.listVolumes(cmd)
recurring_snapshot = SnapshotPolicy.create(self.apiclient, volume[0].id, self.services["recurring_snapshot"])
recurring_snapshot = SnapshotPolicy.create(
self.apiclient,
volume[0].id,
self.services["recurring_snapshot"]
)
self.cleanup.append(recurring_snapshot)
#ListSnapshotPolicy should return newly created policy
cmd = listSnapshotPolicies.listSnapshotPoliciesCmd()
@ -420,20 +529,27 @@ class TestSnapshots(cloudstackTestCase):
cmd.volumeid = volume[0].id
list_snapshots_policy = self.apiclient.listSnapshotPolicies(cmd)
self.assertNotEqual(list_snapshots_policy, None, "Check if result exists in list item call")
self.assertNotEqual(
list_snapshots_policy,
None,
"Check if result exists in list item call"
)
snapshots_policy = list_snapshots_policy[0]
self.assertEqual(
snapshots_policy.id,
recurring_snapshot.id,
"Check recurring snapshot id in list resources call"
snapshots_policy.id,
recurring_snapshot.id,
"Check recurring snapshot id in list resources call"
)
self.assertEqual(
snapshots_policy.maxsnaps,
self.services["recurring_snapshot"]["maxsnaps"],
"Check interval type in list resources call"
snapshots_policy.maxsnaps,
self.services["recurring_snapshot"]["maxsnaps"],
"Check interval type in list resources call"
)
#Sleep for (maxsnaps+1) hours to verify only maxsnaps snapshots are retained
time.sleep(((self.services["recurring_snapshot"]["maxsnaps"]) + 1) * 3600)
# Sleep for (maxsnaps+1) hours to verify
# only maxsnaps snapshots are retained
time.sleep(
((self.services["recurring_snapshot"]["maxsnaps"]) + 1) * 3600
)
cmd = listSnapshots.listSnapshotsCmd()
cmd.volumeid = volume.id
cmd.intervaltype = self.services["recurring_snapshot"]["intervaltype"]
@ -457,7 +573,11 @@ class TestSnapshots(cloudstackTestCase):
cmd.virtualmachineid = self.virtual_machine_with_disk.id
cmd.type = 'DATADISK'
volume = self.apiclient.listVolumes(cmd)
recurring_snapshot = SnapshotPolicy.create(self.apiclient, volume[0].id, self.services["recurring_snapshot"])
recurring_snapshot = SnapshotPolicy.create(
self.apiclient,
volume[0].id,
self.services["recurring_snapshot"]
)
self.cleanup.append(recurring_snapshot)
#ListSnapshotPolicy should return newly created policy
cmd = listSnapshotPolicies.listSnapshotPoliciesCmd()
@ -465,12 +585,16 @@ class TestSnapshots(cloudstackTestCase):
cmd.volumeid = volume[0].id
list_snapshots_policy = self.apiclient.listSnapshotPolicies(cmd)
self.assertNotEqual(list_snapshots_policy, None, "Check if result exists in list item call")
self.assertNotEqual(
list_snapshots_policy,
None,
"Check if result exists in list item call"
)
snapshots_policy = list_snapshots_policy[0]
self.assertEqual(
snapshots_policy.id,
recurring_snapshot.id,
"Check recurring snapshot id in list resources call"
snapshots_policy.id,
recurring_snapshot.id,
"Check recurring snapshot id in list resources call"
)
self.assertEqual(
snapshots_policy.maxsnaps,
@ -478,8 +602,11 @@ class TestSnapshots(cloudstackTestCase):
"Check interval type in list resources call"
)
#Sleep for (maxsnaps+1) hours to verify only maxsnaps snapshots are retained
time.sleep(((self.services["recurring_snapshot"]["maxsnaps"]) + 1) * 3600)
# Sleep for (maxsnaps+1) hours to
# verify only maxsnaps snapshots are retained
time.sleep(
((self.services["recurring_snapshot"]["maxsnaps"]) + 1) * 3600
)
cmd = listSnapshots.listSnapshotsCmd()
cmd.volumeid = volume.id
@ -510,10 +637,15 @@ class TestSnapshots(cloudstackTestCase):
random_data_1 = random_gen(100)
#Login to virtual machine
ssh_client = self.virtual_machine.get_ssh_client(self.nat_rule.ipaddress)
ssh_client = self.virtual_machine.get_ssh_client(
self.nat_rule.ipaddress
)
cmds = [ "mkdir -p %s" % self.services["mount_dir"],
"mount %s1 %s" % (self.services["diskdevice"], self.services["mount_dir"]),
"mount %s1 %s" % (
self.services["diskdevice"],
self.services["mount_dir"]
),
"pushd %s" % self.services["mount_dir"],
"mkdir -p %s/{%s,%s} " % (
self.services["sub_dir"],
@ -546,14 +678,22 @@ class TestSnapshots(cloudstackTestCase):
self.cleanup.append(snapshot)
# Generate template from the snapshot
template = Template.create_from_snapshot(self.apiclient, snapshot, self.services["templates"])
template = Template.create_from_snapshot(
self.apiclient,
snapshot,
self.services["templates"]
)
cmd = listTemplates.listTemplatesCmd()
cmd.templatefilter = self.services["templates"]["templatefilter"]
cmd.id = template.id
list_templates = self.apiclient.listTemplates(cmd)
self.assertNotEqual(list_templates, None, "Check if result exists in list item call")
self.assertNotEqual(
list_templates,
None,
"Check if result exists in list item call"
)
self.assertEqual(
list_templates[0].id,
@ -563,34 +703,46 @@ class TestSnapshots(cloudstackTestCase):
# Deploy new virtual machine using template
new_virtual_machine = VirtualMachine.create(
self.apiclient,
self.services["server_without_disk"],
templateid = template.id,
serviceofferingid = self.service_offering.id
)
self.apiclient,
self.services["server_without_disk"],
templateid=template.id,
accountid=self.account.account.name,
serviceofferingid=self.service_offering.id
)
self.cleanup.append(new_virtual_machine)
#Login to VM & mount directory
ssh = new_virtual_machine.get_ssh_client(self.nat_rule.ipaddress)
cmds = [
"mkdir %s" % self.services["mount_dir"],
"mount %s1 %s" % (self.services["diskdevice"], self.services["mount_dir"])
"mount %s1 %s" % (
self.services["diskdevice"],
self.services["mount_dir"]
)
]
for c in cmds:
ssh.execute(c)
returned_data_0 = ssh.execute("cat %s/%s/%s" % (
self.services["sub_dir"],
self.services["sub_lvl_dir1"],
self.services["random_data"]
self.services["sub_dir"],
self.services["sub_lvl_dir1"],
self.services["random_data"]
))
returned_data_1 = ssh.execute("cat %s/%s/%s" % (
self.services["sub_dir"],
self.services["sub_lvl_dir2"],
self.services["random_data"]
self.services["sub_dir"],
self.services["sub_lvl_dir2"],
self.services["random_data"]
))
#Verify returned data
self.assertEqual(random_data_0, returned_data_0[0], "Verify newly attached volume contents with existing one")
self.assertEqual(random_data_1, returned_data_1[0], "Verify newly attached volume contents with existing one")
self.assertEqual(
random_data_0,
returned_data_0[0],
"Verify newly attached volume contents with existing one"
)
self.assertEqual(
random_data_1,
returned_data_1[0],
"Verify newly attached volume contents with existing one"
)
return

View File

@ -23,12 +23,10 @@ class Services:
self.services = {
"ssvm": {
"id": 1, # SSVM ID in particular zone
"zoneid": 1,
},
"cpvm": {
"id": 2, # CPVM ID in particular zone
"zoneid": 1,
"mgmtserverIP": '192.168.100.154' # For Telnet
"mgmtserverIP": '192.168.100.154' # For Telnet
},
"host": {
"username": 'root', # Credentials for SSH
@ -44,6 +42,9 @@ class TestSSVMs(cloudstackTestCase):
self.apiclient = self.testClient.getApiClient()
self.cleanup = []
self.services = Services().services
self.zone = get_zone(self.apiclient)
self.services["ssvm"]["zoneid"] = self.zone.id
self.services["cpvm"]["zoneid"] = self.zone.id
return
def tearDown(self):
@ -60,10 +61,13 @@ class TestSSVMs(cloudstackTestCase):
"""
# Validate the following:
# 1. listSystemVM (systemvmtype=secondarystoragevm) should return only ONE SSVM per zone
# 1. listSystemVM (systemvmtype=secondarystoragevm)
# should return only ONE SSVM per zone
# 2. The returned SSVM should be in Running state
# 3. listSystemVM for secondarystoragevm should list publicip, privateip and link-localip
# 4. The gateway programmed on the ssvm by listSystemVm should be the same as the gateway returned by listVlanIpRanges
# 3. listSystemVM for secondarystoragevm should list publicip,
# privateip and link-localip
# 4. The gateway programmed on the ssvm by listSystemVm should be
# the same as the gateway returned by listVlanIpRanges
# 5. DNS entries must match those given for the zone
cmd = listSystemVms.listSystemVmsCmd()
@ -85,7 +89,8 @@ class TestSSVMs(cloudstackTestCase):
len(list_zones_response),
"Check number of SSVMs with number of zones"
)
#For each secondary storage VM check private IP, public IP, link local IP and DNS
#For each secondary storage VM check private IP,
#public IP, link local IP and DNS
for ssvm in list_ssvm_response:
self.assertEqual(
@ -146,10 +151,13 @@ class TestSSVMs(cloudstackTestCase):
"""
# Validate the following:
# 1. listSystemVM (systemvmtype=consoleproxy) should return at least ONE CPVM per zone
# 1. listSystemVM (systemvmtype=consoleproxy) should return
# at least ONE CPVM per zone
# 2. The returned ConsoleProxyVM should be in Running state
# 3. listSystemVM for console proxy should list publicip, privateip and link-localip
# 4. The gateway programmed on the console proxy should be the same as the gateway returned by listZones
# 3. listSystemVM for console proxy should list publicip, privateip
# and link-localip
# 4. The gateway programmed on the console proxy should be the same
# as the gateway returned by listZones
# 5. DNS entries must match those given for the zone
cmd = listSystemVms.listSystemVmsCmd()
@ -230,10 +238,13 @@ class TestSSVMs(cloudstackTestCase):
"""Test SSVM Internals"""
# Validate the following
# 1. The SSVM check script should not return any WARN|ERROR|FAIL messages
# 2. If you are unable to login to the SSVM with the signed key then test is deemed a failure
# 1. The SSVM check script should not return any
# WARN|ERROR|FAIL messages
# 2. If you are unable to login to the SSVM with the signed key
# then test is deemed a failure
# 3. There should be only one ""cloud"" process running within the SSVM
# 4. If no process is running/multiple process are running then the test is a failure
# 4. If no process is running/multiple process are running
# then the test is a failure
cmd = listHosts.listHostsCmd()
cmd.zoneid = self.services["ssvm"]["zoneid"]
@ -249,14 +260,15 @@ class TestSSVMs(cloudstackTestCase):
self.debug("Cheking cloud process status")
#SSH to the machine
ssh = remoteSSHClient.remoteSSHClient(
host.ipaddress,
self.services['host']["publicport"],
self.services['host']["username"],
self.services['host']["password"]
)
host.ipaddress,
self.services['host']["publicport"],
self.services['host']["username"],
self.services['host']["password"]
)
timeout = 5
ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s " % ssvm.linklocalip
ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s " \
% ssvm.linklocalip
c = ssh_command + "/usr/local/cloud/systemvm/ssvm-check.sh |grep -e ERROR -e WARNING -e FAIL"
# Ensure the SSH login is successful
@ -304,9 +316,11 @@ class TestSSVMs(cloudstackTestCase):
"""Test CPVM Internals"""
# Validate the following
# 1. test that telnet access on 8250 is available to the management server for the CPVM
# 1. test that telnet access on 8250 is available to
# the management server for the CPVM
# 2. No telnet access, test FAIL
# 3. Service cloud status should report cloud agent status to be running
# 3. Service cloud status should report cloud agent status to be
# running
cmd = listHosts.listHostsCmd()
cmd.zoneid = self.services["cpvm"]["zoneid"]
@ -320,24 +334,29 @@ class TestSSVMs(cloudstackTestCase):
cpvm = self.apiclient.listSystemVms(cmd)[0]
with assertNotRaises(Exception):
telnet = telnetlib.Telnet(self.services["cpvm"]["mgmtserverIP"] , '8250')
telnet = telnetlib.Telnet(
self.services["cpvm"]["mgmtserverIP"] ,
'8250'
)
self.debug("Cheking cloud process status")
#SSH to the machine
ssh = remoteSSHClient.remoteSSHClient(
host.ipaddress,
self.services['host']["publicport"],
self.services['host']["username"],
self.services['host']["password"]
)
host.ipaddress,
self.services['host']["publicport"],
self.services['host']["username"],
self.services['host']["password"]
)
timeout = 5
# Check status of cloud service
ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s " % cpvm.linklocalip
ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s " \
% cpvm.linklocalip
c = ssh_command + "service cloud status"
while True:
res = ssh.execute(c)[0]
# Response: cloud.com service (type=secstorage) is running: process id: 2346
# Response: cloud.com service (type=secstorage) is
# running: process id: 2346
#Check if double hop is successful
if res != "Host key verification failed.":
@ -360,7 +379,9 @@ class TestSSVMs(cloudstackTestCase):
# Validate the following
# 1. The SSVM should go to stop state
# 2. After a brief delay of say one minute, the SSVM should be restarted once again and return to Running state with previous two test cases still passing
# 2. After a brief delay of say one minute, the SSVM should be
# restarted once again and return to Running state with previous two
# test cases still passing
# 3. If either of the two above steps fail the test is a failure
cmd = stopSystemVm.stopSystemVmCmd()
@ -391,7 +412,9 @@ class TestSSVMs(cloudstackTestCase):
# Validate the following
# 1. The CPVM should go to stop state
# 2. After a brief delay of say one minute, the SSVM should be restarted once again and return to Running state with previous two test cases still passing
# 2. After a brief delay of say one minute, the SSVM should be
# restarted once again and return to Running state with previous
# two test cases still passing
# 3. If either of the two above steps fail the test is a failure
cmd = stopSystemVm.stopSystemVmCmd()
@ -430,7 +453,8 @@ class TestSSVMs(cloudstackTestCase):
"""
# Validate the following
# 1. The SSVM should go to stop and return to Running state
# 2. SSVM's public-ip and private-ip must remain the same before and after reboot
# 2. SSVM's public-ip and private-ip must remain the same
# before and after reboot
# 3. The cloud process should still be running within the SSVM
cmd = listSystemVms.listSystemVmsCmd()
@ -459,16 +483,16 @@ class TestSSVMs(cloudstackTestCase):
)
self.assertEqual(
ssvm_response.publicip,
old_public_ip,
"Check Public IP after reboot with that of before reboot"
)
ssvm_response.publicip,
old_public_ip,
"Check Public IP after reboot with that of before reboot"
)
self.assertEqual(
ssvm_response.privateip,
old_private_ip,
"Check Private IP after reboot with that of before reboot"
)
ssvm_response.privateip,
old_private_ip,
"Check Private IP after reboot with that of before reboot"
)
#Call to verify cloud process is running
self.test_03_ssvm_internals()
return
@ -478,7 +502,8 @@ class TestSSVMs(cloudstackTestCase):
"""
# Validate the following
# 1. The CPVM should go to stop and return to Running state
# 2. CPVM's public-ip and private-ip must remain the same before and after reboot
# 2. CPVM's public-ip and private-ip must remain
# the same before and after reboot
# 3. the cloud process should still be running within the CPVM
cmd = listSystemVms.listSystemVmsCmd()
@ -508,16 +533,16 @@ class TestSSVMs(cloudstackTestCase):
)
self.assertEqual(
cpvm_response.publicip,
old_public_ip,
"Check Public IP after reboot with that of before reboot"
)
cpvm_response.publicip,
old_public_ip,
"Check Public IP after reboot with that of before reboot"
)
self.assertEqual(
cpvm_response.privateip,
old_private_ip,
"Check Private IP after reboot with that of before reboot"
)
cpvm_response.privateip,
old_private_ip,
"Check Private IP after reboot with that of before reboot"
)
#Call to verify cloud process is running
self.test_04_cpvm_internals()
return
@ -528,7 +553,8 @@ class TestSSVMs(cloudstackTestCase):
# Validate the following
# 1. SSVM should be completely destroyed and a new one will spin up
# 2. listSystemVMs will show a different name for the systemVM from what it was before
# 2. listSystemVMs will show a different name for the
# systemVM from what it was before
# 3. new SSVM will have a public/private and link-local-ip
# 4. cloud process within SSVM must be up and running
@ -551,7 +577,8 @@ class TestSSVMs(cloudstackTestCase):
cmd.systemvmtype = 'secondarystoragevm'
ssvm_response = self.apiclient.listSystemVms(cmd)[0]
# Verify Name, Public IP, Private IP and Link local IP for newly created SSVM
# Verify Name, Public IP, Private IP and Link local IP
# for newly created SSVM
self.assertNotEqual(
ssvm_response.name,
old_name,
@ -584,7 +611,8 @@ class TestSSVMs(cloudstackTestCase):
# Validate the following
# 1. CPVM should be completely destroyed and a new one will spin up
# 2. listSystemVMs will show a different name for the systemVM from what it was before
# 2. listSystemVMs will show a different name for the systemVM from
# what it was before
# 3. new CPVM will have a public/private and link-local-ip
# 4. cloud process within CPVM must be up and running
@ -607,7 +635,8 @@ class TestSSVMs(cloudstackTestCase):
cmd.systemvmtype = 'consoleproxy'
cpvm_response = self.apiclient.listSystemVms(cmd)[0]
# Verify Name, Public IP, Private IP and Link local IP for newly created CPVM
# Verify Name, Public IP, Private IP and Link local IP
# for newly created CPVM
self.assertNotEqual(
cpvm_response.name,
old_name,

View File

@ -14,71 +14,70 @@ from random import random
#Import System modules
import time
class Services:
"""Test Templates Services
"""
def __init__(self):
self.services = {
"service_offering": {
"name": "Tiny Service Offering",
"displaytext": "Tiny service offering",
"account": {
"email": "test@test.com",
"firstname": "Test",
"lastname": "User",
"username": "test",
# Random characters are appended for unique
# username
"password": "fr3sca",
},
"service_offering": {
"name": "Tiny Instance",
"displaytext": "Tiny Instance",
"cpunumber": 1,
"cpuspeed": 100, # in MHz
"memory": 64, # In MBs
},
"cpuspeed": 200, # in MHz
"memory": 256, # In MBs
},
"disk_offering": {
"displaytext": "Small",
"name": "Small",
"disksize": 1
},
"virtual_machine": {
"displayname": "testVM",
"hypervisor": 'XenServer',
"domainid": 1,
"protocol": 'TCP',
"ssh_port": 22,
"username": "root",
"password": "password"
},
"volume": {
"diskname": "Test Volume",
},
"template_1": {
"displaytext": "Cent OS Template",
"name": "Cent OS Template",
"ostypeid": 12,
},
"template_2": {
"displaytext": "Public Template",
"name": "Public template",
"ostypeid": 12,
"isfeatured": True,
"ispublic": True,
"isextractable": True,
"mode": "HTTP_DOWNLOAD",
},
"templatefilter": 'self',
"destzoneid": 2, # For Copy template (Destination zone)
"isfeatured": True,
"ispublic": True,
"isextractable": False,
"bootable": True,
"passwordenabled": True,
"ostypeid": 12,
}
"virtual_machine":
{
"template": 256, # Template used for VM creation
"zoneid": 1,
"serviceoffering": 1,
"displayname": "testVM",
"hypervisor": 'XenServer',
"account": 'testuser', # Account for which VM should be created
"domainid": 1,
"protocol": 'TCP',
"ssh_port": 22,
"username" : "root",
"password": "password"
},
"volume":
{
"offerings": 1,
"volumeoffering": 3,
"diskname": "TestVolumeTemplate",
"zoneid": 1,
"diskofferingid": 3,
},
"template_1":
{
"displaytext": "Test Template Type 1",
"name": "testTemplate",
"ostypeid": 12,
},
"template_2":
{
"displaytext": "Test Template Type 2",
"name": "testTemplate",
"ostypeid": 12,
"isfeatured": True,
"ispublic": True,
"isextractable": True,
"mode": "HTTP_DOWNLOAD",
"zoneid": 1,
},
"templatefilter": 'self',
"destzoneid": 2, # For Copy template (Destination zone)
"sourcezoneid": 1, # For Copy template (Source zone)
"isfeatured": True,
"ispublic": True,
"isextractable": False,
"bootable": True,
"passwordenabled": True,
"ostypeid": 15,
"account": 'testuser', # Normal user
"domainid": 1,
}
class TestCreateTemplate(cloudstackTestCase):
@ -103,13 +102,42 @@ class TestCreateTemplate(cloudstackTestCase):
def setUpClass(cls):
cls.services = Services().services
cls.api_client = fetch_api_client()
cls.service_offering = ServiceOffering.create(cls.api_client, cls.services["service_offering"])
# Get Zone, Domain and templates
cls.zone = get_zone(cls.api_client)
cls.disk_offering = DiskOffering.create(
cls.api_client,
cls.services["disk_offering"]
)
template = get_template(
cls.api_client,
cls.zone.id,
cls.services["ostypeid"]
)
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
cls.services["volume"]["diskoffering"] = cls.disk_offering.id
cls.services["volume"]["zoneid"] = cls.zone.id
cls.services["sourcezoneid"] = cls.zone.id
cls.account = Account.create(
cls.api_client,
cls.services["account"],
admin=True
)
cls.services["account"] = cls.account.account.name
cls.service_offering = ServiceOffering.create(
cls.api_client,
cls.services["service_offering"]
)
#create virtual machine
cls.virtual_machine = VirtualMachine.create(
cls.api_client,
cls.services["virtual_machine"],
serviceofferingid = cls.service_offering.id
)
cls.api_client,
cls.services["virtual_machine"],
templateid=template.id,
accountid=cls.account.account.name,
serviceofferingid=cls.service_offering.id
)
#Stop virtual machine
cmd = stopVirtualMachine.stopVirtualMachineCmd()
@ -123,7 +151,12 @@ class TestCreateTemplate(cloudstackTestCase):
cmd.type = 'ROOT'
list_volume = cls.api_client.listVolumes(cmd)
cls.volume = list_volume[0]
cls._cleanup = [cls.virtual_machine, cls.service_offering]
cls._cleanup = [
cls.virtual_machine,
cls.service_offering,
cls.account,
cls.disk_offering,
]
return
@classmethod
@ -143,12 +176,17 @@ class TestCreateTemplate(cloudstackTestCase):
"""
# Validate the following:
# 1. database (vm_template table) should be updated with newly created template
# 1. database (vm_template table) should be updated
# with newly created template
# 2. UI should show the newly added template
# 3. ListTemplates API should show the newly added template
#Create template from Virtual machine and Volume ID
template = Template.create(self.apiclient, self.volume, self.services["template_1"])
template = Template.create(
self.apiclient,
self.volume,
self.services["template_1"]
)
self.cleanup.append(template)
cmd = listTemplates.listTemplatesCmd()
@ -156,7 +194,7 @@ class TestCreateTemplate(cloudstackTestCase):
cmd.id = template.id
list_template_response = self.apiclient.listTemplates(cmd)
#Verify template response to check whether template added successfully or not
#Verify template response to check whether template added successfully
template_response = list_template_response[0]
self.assertNotEqual(
len(list_template_response),
@ -180,46 +218,9 @@ class TestCreateTemplate(cloudstackTestCase):
self.services["template_1"]["ostypeid"],
"Check osTypeID of newly created template"
)
#Verify the database entry for template
self.debug(
"select name, display_text, guest_os_id from vm_template where id = %s;"
% template.id
)
qresultset = self.dbclient.execute(
"select name, display_text, guest_os_id from vm_template where id = %s;"
% template.id
)
self.assertNotEqual(
len(qresultset),
0,
"Check DB Query result set"
)
qresult = qresultset[0]
name = qresult[0]
self.assertEqual(
name.count(self.services["template_1"]["name"]),
1,
"Compare template name with database record"
)
self.assertEqual(
qresult[1],
self.services["template_1"]["displaytext"],
"Compare template display text with database record"
)
self.assertEqual(
qresult[2],
self.services["template_1"]["ostypeid"],
"Compare template osTypeID with database record"
)
return
class TestTemplates(cloudstackTestCase):
@classmethod
@ -227,14 +228,49 @@ class TestTemplates(cloudstackTestCase):
cls.services = Services().services
cls.api_client = fetch_api_client()
cls.service_offering = ServiceOffering.create(cls.api_client, cls.services["service_offering"])
#create virtual machines
cls.virtual_machine = VirtualMachine.create(
cls.api_client,
cls.services["virtual_machine"],
serviceofferingid = cls.service_offering.id
)
# Get Zone, Domain and templates
cls.zone = get_zone(cls.api_client)
cls.disk_offering = DiskOffering.create(
cls.api_client,
cls.services["disk_offering"]
)
template = get_template(
cls.api_client,
cls.zone.id,
cls.services["ostypeid"]
)
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
cls.services["volume"]["diskoffering"] = cls.disk_offering.id
cls.services["volume"]["zoneid"] = cls.zone.id
cls.services["template_2"]["zoneid"] = cls.zone.id
cls.services["sourcezoneid"] = cls.zone.id
cls.account = Account.create(
cls.api_client,
cls.services["account"],
admin=True
)
cls.user = Account.create(
cls.api_client,
cls.services["account"]
)
cls.services["account"] = cls.account.account.name
cls.service_offering = ServiceOffering.create(
cls.api_client,
cls.services["service_offering"]
)
#create virtual machine
cls.virtual_machine = VirtualMachine.create(
cls.api_client,
cls.services["virtual_machine"],
templateid=template.id,
accountid=cls.account.account.name,
serviceofferingid=cls.service_offering.id
)
#Stop virtual machine
cmd = stopVirtualMachine.stopVirtualMachineCmd()
cmd.id = cls.virtual_machine.id
@ -249,9 +285,24 @@ class TestTemplates(cloudstackTestCase):
cls.volume = list_volume[0]
#Create templates for Edit, Delete & update permissions testcases
cls.template_1 = Template.create(cls.api_client, cls.volume, cls.services["template_1"])
cls.template_2 = Template.create(cls.api_client, cls.volume, cls.services["template_2"])
cls._cleanup = [cls.template_2, cls.virtual_machine, cls.service_offering]
cls.template_1 = Template.create(
cls.api_client,
cls.volume,
cls.services["template_1"]
)
cls.template_2 = Template.create(
cls.api_client,
cls.volume,
cls.services["template_2"]
)
cls._cleanup = [
cls.template_2,
cls.virtual_machine,
cls.service_offering,
cls.disk_offering,
cls.account,
cls.user
]
@classmethod
def tearDownClass(cls):
@ -340,51 +391,6 @@ class TestTemplates(cloudstackTestCase):
self.services["ostypeid"],
"Check OSTypeID of updated template"
)
# Verify database entry for updated template attributes
self.debug(
"select name, display_text, bootable, enable_password, guest_os_id from vm_template where id = %s;"
% self.template_1.id
)
qresultset = self.dbclient.execute(
"select name, display_text, bootable, enable_password, guest_os_id from vm_template where id = %s;"
% self.template_1.id
)
self.assertNotEqual(
len(qresultset),
0,
"Check DB Query result set"
)
qresult = qresultset[0]
self.assertEqual(
qresult[0],
new_name,
"Compare template name with database record"
)
self.assertEqual(
qresult[1],
new_displayText,
"Compare template bootable field with database record"
)
self.assertEqual(
qresult[2],
int(self.services["bootable"]),
"Compare template enable_password field with database record"
)
self.assertEqual(
qresult[3],
int(self.services["passwordenabled"]),
"Compare template display text with database record"
)
self.assertEqual(
qresult[4],
self.services["ostypeid"],
"Compare template guest OS ID with database record"
)
return
def test_03_delete_template(self):
@ -403,25 +409,11 @@ class TestTemplates(cloudstackTestCase):
list_template_response = self.apiclient.listTemplates(cmd)
# Verify template is deleted properly using ListTemplates
self.assertEqual(list_template_response, None, "Check if template exists in List Templates")
# Verify database entry is removed for deleted template
self.debug(
"select name, display_text from vm_template where id = %s;"
% self.template_1.id
)
qresultset = self.dbclient.execute(
"select name, display_text from vm_template where id = %s;"
% self.template_1.id
)
self.assertEqual(
len(qresultset),
1,
"Check DB Query result set"
)
list_template_response,
None,
"Check if template exists in List Templates"
)
return
def test_04_extract_template(self):
@ -429,13 +421,14 @@ class TestTemplates(cloudstackTestCase):
# Validate the following
# 1. Admin should able extract and download the templates
# 2. ListTemplates should display all the public templates for all kind of users
# 2. ListTemplates should display all the public templates
# for all kind of users
# 3 .ListTemplates should not display the system templates
cmd = extractTemplate.extractTemplateCmd()
cmd.id = self.template_2.id
cmd.mode = self.services["template_2"]["mode"]
cmd.zoneid = self.services["template_2"]["zoneid"]
cmd.zoneid = self.zone.id
list_extract_response = self.apiclient.extractTemplate(cmd)
# Format URL to ASCII to retrieve response code
@ -469,8 +462,10 @@ class TestTemplates(cloudstackTestCase):
"""Update & Test for template permissions"""
# Validate the following
# 1. listTemplatePermissions returns valid permissions set for template
# 2. permission changes should be reflected in vm_template table in database
# 1. listTemplatePermissions returns valid
# permissions set for template
# 2. permission changes should be reflected in vm_template
# table in database
cmd = updateTemplatePermissions.updateTemplatePermissionsCmd()
# Update template permissions
@ -483,8 +478,8 @@ class TestTemplates(cloudstackTestCase):
cmd = listTemplates.listTemplatesCmd()
cmd.id = self.template_2.id
cmd.account = self.services["account"]
cmd.domainid = self.services["domainid"]
cmd.account = self.account.account.name
cmd.domainid = self.account.account.domainid
cmd.templatefilter = 'featured'
list_template_response = self.apiclient.listTemplates(cmd)
@ -503,44 +498,9 @@ class TestTemplates(cloudstackTestCase):
)
self.assertNotEqual(
template_response.templatetype,
'SYSTEM',
"ListTemplates should not list any system templates"
)
# Verify database entries for updated permissions
self.debug(
"select public, featured, extractable from vm_template where id = %s;"
% self.template_2.id
)
qresultset = self.dbclient.execute(
"select public, featured, extractable from vm_template where id = %s;"
% self.template_2.id
)
self.assertNotEqual(
len(qresultset),
0,
"Check DB Query result set"
)
qresult = qresultset[0]
self.assertEqual(
qresult[0],
int(self.services["ispublic"]),
"Compare public permission with database record"
)
self.assertEqual(
qresult[1],
int(self.services["isfeatured"]),
"Compare featured permission with database record"
)
self.assertEqual(
qresult[2],
int(self.services["isextractable"]),
"Compare extractable permission with database record"
template_response.templatetype,
'SYSTEM',
"ListTemplates should not list any system templates"
)
return
@ -548,7 +508,8 @@ class TestTemplates(cloudstackTestCase):
"""Test for copy template from one zone to another"""
# Validate the following
# 1. copy template should be successful and secondary storage should contain new copied template.
# 1. copy template should be successful and
# secondary storage should contain new copied template.
cmd = copyTemplate.copyTemplateCmd()
cmd.id = self.template_2.id
@ -588,8 +549,8 @@ class TestTemplates(cloudstackTestCase):
cmd = listTemplates.listTemplatesCmd()
cmd.templatefilter = 'featured'
cmd.account = self.services["account"]
cmd.domainid = self.services["domainid"]
cmd.account = self.user.account.name
cmd.domainid = self.user.account.domainid
list_template_response = self.apiclient.listTemplates(cmd)
@ -600,7 +561,7 @@ class TestTemplates(cloudstackTestCase):
)
#Template response should list all 'public' templates
for template in list_template_response:
self.assertEqual(
self.assertEqual(
template.ispublic,
True,
"ListTemplates should list only public templates"
@ -615,8 +576,8 @@ class TestTemplates(cloudstackTestCase):
cmd = listTemplates.listTemplatesCmd()
cmd.templatefilter = 'featured'
cmd.account = self.services["account"]
cmd.domainid = self.services["domainid"]
cmd.account = self.user.account.name
cmd.domainid = self.user.account.domainid
list_template_response = self.apiclient.listTemplates(cmd)
@ -627,9 +588,9 @@ class TestTemplates(cloudstackTestCase):
)
for template in list_template_response:
self.assertNotEqual(
template.templatetype,
'SYSTEM',
"ListTemplates should not list any system templates"
self.assertNotEqual(
template.templatetype,
'SYSTEM',
"ListTemplates should not list any system templates"
)
return

View File

@ -271,14 +271,14 @@ class TestVMLifeCycle(cloudstackTestCase):
)
cls.public_ip = PublicIPAddress.create(
cls.api_client,
cls.small_virtual_machine.account,
cls.small_virtual_machine.zoneid,
cls.small_virtual_machine.domainid,
cls.virtual_machine.account,
cls.virtual_machine.zoneid,
cls.virtual_machine.domainid,
cls.services["small"]
)
cls.nat_rule = NATRule.create(
cls.api_client,
cls.small_virtual_machine,
cls.virtual_machine,
cls.services["small"],
ipaddressid=cls.public_ip.ipaddress.id
)
@ -717,11 +717,17 @@ class TestVMLifeCycle(cloudstackTestCase):
c = "fdisk -l|grep %s|head -1" % self.services["diskdevice"]
res = ssh_client.execute(c)
#Disk /dev/xvdd: 4393 MB, 4393723904 bytes
actual_disk_size = res[0].split()[4]
# Res may contain more than one strings depending on environment
# Split res with space as delimiter to form new list (result)
result = []
for i in res:
for k in i.split():
result.append(k)
self.assertEqual(
str(iso.size),
actual_disk_size,
str(iso.size) in result,
True,
"Check size of the attached ISO"
)

View File

@ -12,79 +12,104 @@ from base import *
import remoteSSHClient
#Import System modules
import os
import urllib2
import urllib
import time
import tempfile
class Services:
"""Test Volume Services
"""
def __init__(self):
self.services = {
"account": {
"email": "test@test.com",
"firstname": "Test",
"lastname": "User",
"username": "test",
# Random characters are appended for unique
# username
"password": "fr3sca",
},
"service_offering": {
"name": "Tiny Service Offering",
"displaytext": "Tiny service offering",
"name": "Tiny Instance",
"displaytext": "Tiny Instance",
"cpunumber": 1,
"cpuspeed": 100, # in MHz
"memory": 64, # In MBs
},
"cpuspeed": 200, # in MHz
"memory": 256, # In MBs
},
"disk_offering": {
"displaytext": "Small",
"name": "Small",
"disksize": 1
},
"volume_offerings": {
0: {
"offerings": 1,
"volumeoffering": 3,
"diskname": "TestDiskServ",
"zoneid": 1,
"diskofferingid": 3,
"account": 'testuser', # Account for which volume offering is created
"domainid": 1,
},
1: {
"offerings": 1,
"volumeoffering": 4,
"diskname": "TestDiskServ",
"zoneid": 1,
"diskofferingid": 3,
"account": 'testuser',
"domainid": 1,
},
2: {
"offerings": 1,
"volumeoffering": 5,
"diskname": "TestDiskServ",
"zoneid": 1,
"diskofferingid": 3,
"account": 'testuser',
"domainid": 1,
},
},
"customdiskofferingid": 52, #Custom disk offering should be available
"customdisksize": 2, # GBs
"volumeoffering": 3,
"serviceoffering": 1,
"template": 256,
"zoneid": 1,
"customdisksize": 1, # GBs
"username": "root", # Creds for SSH to VM
"password": "password",
"ssh_port": 22,
"diskname": "TestDiskServ",
"hypervisor": 'XenServer',
"account": 'testuser', # Account for VM instance
"domainid": 1,
"privateport": 22,
"publicport": 22,
"protocol": 'TCP',
"diskdevice": "/dev/sda",
"ostypeid": 12,
}
class TestCreateVolume(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.services = Services().services
cls.service_offering = ServiceOffering.create(cls.api_client, cls.services["service_offering"])
cls.virtual_machine = VirtualMachine.create(cls.api_client, cls.services, serviceofferingid = cls.service_offering.id)
# Get Zone, Domain and templates
cls.zone = get_zone(cls.api_client)
cls.disk_offering = DiskOffering.create(
cls.api_client,
cls.services["disk_offering"]
)
cls.custom_disk_offering = DiskOffering.create(
cls.api_client,
cls.services["disk_offering"],
custom=True
)
template = get_template(
cls.api_client,
cls.zone.id,
cls.services["ostypeid"]
)
cls.services["zoneid"] = cls.zone.id
cls.services["template"] = template.id
cls.services["customdiskofferingid"] = cls.custom_disk_offering.id
# Create VMs, NAT Rules etc
cls.account = Account.create(
cls.api_client,
cls.services["account"],
admin=True
)
cls.services["account"] = cls.account.account.name
cls.service_offering = ServiceOffering.create(
cls.api_client,
cls.services["service_offering"]
)
cls.virtual_machine = VirtualMachine.create(
cls.api_client,
cls.services,
accountid=cls.account.account.name,
serviceofferingid=cls.service_offering.id
)
cls.public_ip = PublicIPAddress.create(
cls.api_client,
@ -93,8 +118,21 @@ class TestCreateVolume(cloudstackTestCase):
cls.virtual_machine.domainid,
cls.services
)
cls.nat_rule = NATRule.create(cls.api_client, cls.virtual_machine, cls.services, ipaddressid = cls.public_ip.ipaddress.id)
cls._cleanup = [cls.nat_rule, cls.virtual_machine, cls.service_offering, cls.public_ip]
cls.nat_rule = NATRule.create(
cls.api_client,
cls.virtual_machine,
cls.services,
ipaddressid=cls.public_ip.ipaddress.id
)
cls._cleanup = [
cls.nat_rule,
cls.virtual_machine,
cls.service_offering,
cls.public_ip,
cls.disk_offering,
cls.custom_disk_offering,
cls.account
]
def setUp(self):
@ -107,7 +145,13 @@ class TestCreateVolume(cloudstackTestCase):
"""
self.volumes = []
for k, v in self.services["volume_offerings"].items():
volume = Volume.create(self.apiClient, v)
volume = Volume.create(
self.apiClient,
v,
zoneid=self.zone.id,
account=self.account.account.name,
diskofferingid=self.disk_offering.id
)
self.volumes.append(volume)
self.cleanup.append(volume)
@ -115,30 +159,49 @@ class TestCreateVolume(cloudstackTestCase):
self.volumes.append(volume)
self.cleanup.append(volume)
#Attach a volume with different disk offerings and check the memory allocated to each of them
#Attach a volume with different disk offerings
#and check the memory allocated to each of them
for volume in self.volumes:
cmd = listVolumes.listVolumesCmd()
cmd.id = volume.id
list_volume_response = self.apiClient.listVolumes(cmd)
self.assertNotEqual(list_volume_response, None, "Check if volume exists in ListVolumes")
qresultset = self.dbclient.execute("select id from volumes where id = %s" % volume.id)
self.assertNotEqual(len(qresultset), 0, "Check if volume exists in Database")
attached_volume = self.virtual_machine.attach_volume(self.apiClient, volume)
self.assertNotEqual(
list_volume_response,
None,
"Check if volume exists in ListVolumes"
)
attached_volume = self.virtual_machine.attach_volume(
self.apiClient,
volume
)
ssh = self.virtual_machine.get_ssh_client(self.nat_rule.ipaddress)
ssh.execute("reboot")
#Sleep to ensure the machine is rebooted properly
time.sleep(120)
ssh = self.virtual_machine.get_ssh_client(self.nat_rule.ipaddress, reconnect = True)
ssh = self.virtual_machine.get_ssh_client(
self.nat_rule.ipaddress,
reconnect=True
)
c = "fdisk -l|grep %s|head -1" % self.services["diskdevice"]
res = ssh.execute(c)
#Disk /dev/sda: 21.5 GB, 21474836480 bytes
# Disk /dev/sda doesn't contain a valid partition table
# Disk /dev/sda: 21.5 GB, 21474836480 bytes
actual_disk_size = res[0].split()[4]
# Res may return more than one lines
# Split res with space as delimiter to form new list (result)
result = []
for i in res:
for k in i.split():
result.append(k)
self.assertEqual(str(list_volume_response[0].size), actual_disk_size, "Check if promised disk size actually available")
self.assertEqual(
str(list_volume_response[0].size) in result,
True,
"Check if promised disk size actually available"
)
self.virtual_machine.detach_volume(self.apiClient, volume)
def tearDown(self):
@ -160,9 +223,39 @@ class TestVolumes(cloudstackTestCase):
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.services = Services().services
# Get Zone, Domain and templates
cls.zone = get_zone(cls.api_client)
cls.disk_offering = DiskOffering.create(
cls.api_client,
cls.services["disk_offering"]
)
template = get_template(
cls.api_client,
cls.zone.id,
cls.services["ostypeid"]
)
cls.services["zoneid"] = cls.zone.id
cls.services["template"] = template.id
cls.services["diskofferingid"] = cls.disk_offering.id
cls.service_offering = ServiceOffering.create(cls.api_client, cls.services["service_offering"])
cls.virtual_machine = VirtualMachine.create(cls.api_client, cls.services, serviceofferingid = cls.service_offering.id)
# Create VMs, NAT Rules etc
cls.account = Account.create(
cls.api_client,
cls.services["account"],
admin=True
)
cls.services["account"] = cls.account.account.name
cls.service_offering = ServiceOffering.create(
cls.api_client,
cls.services["service_offering"]
)
cls.virtual_machine = VirtualMachine.create(
cls.api_client,
cls.services,
accountid=cls.account.account.name,
serviceofferingid=cls.service_offering.id
)
cls.public_ip = PublicIPAddress.create(
cls.api_client,
@ -171,9 +264,24 @@ class TestVolumes(cloudstackTestCase):
cls.virtual_machine.domainid,
cls.services
)
cls.nat_rule = NATRule.create(cls.api_client, cls.virtual_machine, cls.services, ipaddressid = cls.public_ip.ipaddress.id)
cls.volume = Volume.create(cls.api_client, cls.services)
cls._cleanup = [cls.nat_rule, cls.virtual_machine, cls.volume, cls.public_ip, cls.service_offering]
cls.nat_rule = NATRule.create(
cls.api_client,
cls.virtual_machine,
cls.services,
ipaddressid=cls.public_ip.ipaddress.id
)
cls.volume = Volume.create(
cls.api_client,
cls.services
)
cls._cleanup = [
cls.nat_rule,
cls.virtual_machine,
cls.public_ip,
cls.service_offering,
cls.disk_offering,
cls.account
]
@classmethod
def tearDownClass(cls):
@ -189,6 +297,11 @@ class TestVolumes(cloudstackTestCase):
def test_02_attach_volume(self):
"""Attach a created Volume to a Running VM
"""
# Validate the following
# 1. shows list of volumes
# 2. "Attach Disk" pop-up box will display with list of instances
# 3. disk should be attached to instance successfully
self.virtual_machine.attach_volume(self.apiClient, self.volume)
#Sleep to ensure the current state will reflected in other calls
@ -197,29 +310,38 @@ class TestVolumes(cloudstackTestCase):
cmd.id = self.volume.id
list_volume_response = self.apiClient.listVolumes(cmd)
self.assertNotEqual(list_volume_response, None, "Check if volume exists in ListVolumes")
self.assertNotEqual(
list_volume_response,
None,
"Check if volume exists in ListVolumes"
)
volume = list_volume_response[0]
self.assertNotEqual(volume.virtualmachineid, None, "Check if volume state (attached) is reflected")
qresultset = self.dbclient.execute("select instance_id, device_id from volumes where id = %s" % self.volume.id)
self.assertNotEqual(len(qresultset), 0, "Check if volume exists in Database")
qresult = qresultset[0]
self.assertEqual(qresult[0], self.virtual_machine.id, "Check if volume is assc. with virtual machine in Database")
#self.assertEqual(qresult[1], 0, "Check if device is valid in the database")
self.assertNotEqual(
volume.virtualmachineid,
None,
"Check if volume state (attached) is reflected"
)
#Format the attached volume to a known fs
format_volume_to_ext3(self.virtual_machine.get_ssh_client(self.nat_rule.ipaddress))
format_volume_to_ext3(
self.virtual_machine.get_ssh_client(
self.nat_rule.ipaddress
))
def test_03_download_attached_volume(self):
"""Download a Volume attached to a VM
"""
# Validate the following
# 1. download volume will fail with proper error message
# "Failed - Invalid state of the volume with ID:
# It should be either detached or the VM should be in stopped state
cmd = extractVolume.extractVolumeCmd()
cmd.id = self.volume.id
cmd.mode = "HTTP_DOWNLOAD"
cmd.zoneid = self.services["zoneid"]
#A proper exception should be raised; downloading attach VM is not allowed
# A proper exception should be raised;
# downloading attach VM is not allowed
with self.assertRaises(Exception):
self.apiClient.deleteVolume(cmd)
@ -227,9 +349,14 @@ class TestVolumes(cloudstackTestCase):
"""Delete a Volume attached to a VM
"""
# Validate the following
# 1. delete volume will fail with proper error message
# "Failed - Invalid state of the volume with ID:
# It should be either detached or the VM should be in stopped state
cmd = deleteVolume.deleteVolumeCmd()
cmd.id = self.volume.id
#A proper exception should be raised; deleting attach VM is not allowed
#Proper exception should be raised; deleting attach VM is not allowed
with self.assertRaises(Exception):
self.apiClient.deleteVolume(cmd)
@ -237,6 +364,11 @@ class TestVolumes(cloudstackTestCase):
def test_05_detach_volume(self):
"""Detach a Volume attached to a VM
"""
# Validate the following
# Data disk should be detached from instance and detached data disk
# details should be updated properly
self.virtual_machine.detach_volume(self.apiClient, self.volume)
#Sleep to ensure the current state will reflected in other calls
time.sleep(60)
@ -244,21 +376,24 @@ class TestVolumes(cloudstackTestCase):
cmd.id = self.volume.id
list_volume_response = self.apiClient.listVolumes(cmd)
self.assertNotEqual(list_volume_response, None, "Check if volume exists in ListVolumes")
self.assertNotEqual(
list_volume_response,
None,
"Check if volume exists in ListVolumes"
)
volume = list_volume_response[0]
self.assertEqual(volume.virtualmachineid, None, "Check if volume state (detached) is reflected")
qresultset = self.dbclient.execute("select instance_id, device_id from volumes where id = %s" % self.volume.id)
self.assertNotEqual(len(qresultset), 0, "Check if volume exists in Database")
qresult = qresultset[0]
self.assertEqual(qresult[0], None, "Check if volume is unassc. with virtual machine in Database")
self.assertEqual(qresult[1], None, "Check if no device is valid in the database")
self.assertEqual(
volume.virtualmachineid,
None,
"Check if volume state (detached) is reflected"
)
return
def test_06_download_detached_volume(self):
"""Download a Volume unattached to an VM
"""
# Validate the following
# 1. able to download the volume when its not attached to instance
cmd = extractVolume.extractVolumeCmd()
cmd.id = self.volume.id
@ -278,11 +413,20 @@ class TestVolumes(cloudstackTestCase):
except Exception as e:
print e
self.fail("Extract Volume Failed with invalid URL %s (vol id: %s)" % (extract_vol.url, self.volume.id))
self.fail(
"Extract Volume Failed with invalid URL %s (vol id: %s)" \
% (extract_vol.url, self.volume.id)
)
def test_07_delete_detached_volume(self):
"""Delete a Volume unattached to an VM
"""
# Validate the following
# 1. volume should be deleted successfully and listVolume should not
# contain the deleted volume details.
# 2. "Delete Volume" menu item not shown under "Actions" menu.
# (UI should not allow to delete the volume when it is attached
# to instance by hiding the menu Item)
cmd = deleteVolume.deleteVolumeCmd()
cmd.id = self.volume.id
@ -294,4 +438,8 @@ class TestVolumes(cloudstackTestCase):
cmd.type = 'DATADISK'
list_volume_response = self.apiClient.listVolumes(cmd)
self.assertEqual(list_volume_response, None, "Check if volume exists in ListVolumes")
self.assertEqual(
list_volume_response,
None,
"Check if volume exists in ListVolumes"
)