Fixes based on Prasanna's change to testframework

Additional logs/checks around all API calls and SSH calls.
This commit is contained in:
Chirag Jog 2012-03-06 09:24:59 -08:00
parent 28ace646bf
commit e7e4c86533
19 changed files with 4201 additions and 246 deletions

View File

@ -23,7 +23,7 @@ class Services:
"clusters": {
0: {
"clustername": "Xen Cluster",
"clustertype": "ExternalManaged",
"clustertype": "CloudManaged",
# CloudManaged or ExternalManaged"
"hypervisor": "XenServer",
# Hypervisor type
@ -53,10 +53,10 @@ class Services:
# in cluster in small letters
"hypervisor": 'XenServer',
# Hypervisor type
"clustertype": 'ExternalManaged',
"clustertype": 'CloudManaged',
# CloudManaged or ExternalManaged"
"url": 'http://192.168.100.210',
"username": "administrator",
"username": "root",
"password": "fr3sca",
},
"kvm": {
@ -90,8 +90,10 @@ class TestHosts(cloudstackTestCase):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.services = Services().services
self.zone = get_zone(self.apiclient, self.services)
self.pod = get_pod(self.apiclient, self.zone.id, self.services)
self.cleanup = []
return
def tearDown(self):

View File

@ -62,6 +62,8 @@ class Services:
"isextractable": True,
"bootable": True, # For edit template
"passwordenabled": True,
"sleep": 60,
"timeout": 10,
"ostypeid": 12,
# CentOS 5.3 (64 bit)
"domainid": 1,
@ -190,7 +192,7 @@ class TestISO(cloudstackTestCase):
try:
cls.iso_1.download(cls.api_client)
except Exception as e:
self.fail("Exception while downloading ISO %s: %s"\
raise Exception("Exception while downloading ISO %s: %s"\
% (cls.iso_1.id, e))
cls.iso_2 = Iso.create(
@ -202,7 +204,7 @@ class TestISO(cloudstackTestCase):
try:
cls.iso_2.download(cls.api_client)
except Exception as e:
self.fail("Exception while downloading ISO %s: %s"\
raise Exception("Exception while downloading ISO %s: %s"\
% (cls.iso_2.id, e))
cls._cleanup = [cls.account]
@ -248,7 +250,7 @@ class TestISO(cloudstackTestCase):
new_displayText = random_gen()
new_name = random_gen()
self.debug("Updating ISO permissions for ISO: %s", self.iso_1.id)
self.debug("Updating ISO permissions for ISO: %s" % self.iso_1.id)
cmd = updateIso.updateIsoCmd()
#Assign new values to attributes
@ -312,6 +314,9 @@ class TestISO(cloudstackTestCase):
self.debug("Deleting ISO with ID: %s" % self.iso_1.id)
self.iso_1.delete(self.apiclient)
# Sleep to ensure that ISO state is reflected in other calls
time.sleep(self.services["sleep"])
#ListIsos to verify deleted ISO is properly deleted
list_iso_response = list_isos(
self.apiclient,
@ -342,10 +347,16 @@ class TestISO(cloudstackTestCase):
cmd.zoneid = self.services["iso_2"]["zoneid"]
list_extract_response = self.apiclient.extractIso(cmd)
#Format URL to ASCII to retrieve response code
formatted_url = urllib.unquote_plus(list_extract_response.url)
url_response = urllib.urlopen(formatted_url)
response_code = url_response.getcode()
try:
#Format URL to ASCII to retrieve response code
formatted_url = urllib.unquote_plus(list_extract_response.url)
url_response = urllib.urlopen(formatted_url)
response_code = url_response.getcode()
except Exception:
self.fail(
"Extract ISO Failed with invalid URL %s (ISO id: %s)" \
% (formatted_url, self.iso_2.id)
)
self.assertEqual(
list_extract_response.id,

View File

@ -31,8 +31,8 @@ class Services:
# Networking mode: Basic or advanced
"lb_switch_wait": 10,
# Time interval after which LB switches the requests
"sleep": 10,
"timeout":20,
"sleep": 60,
"timeout":10,
"network": {
"name": "Test Network",
"displaytext": "Test Network",
@ -42,7 +42,7 @@ class Services:
"name": "Tiny Instance",
"displaytext": "Tiny Instance",
"cpunumber": 1,
"cpuspeed": 200,
"cpuspeed": 100,
# in MHz
"memory": 256,
# In MBs
@ -1614,20 +1614,39 @@ class TestDeleteAccount(cloudstackTestCase):
self.account.account.name)
# ListPortForwardingRules should not
# list associated rules with deleted account
with self.assertRaises(Exception):
list_nat_rules(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
try:
list_nat_reponse= list_nat_rules(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
list_nat_reponse,
None,
"Check load balancing rule is properly deleted."
)
except Exception as e:
raise Exception(
"Exception raised while fetching NAT rules for account: %s" %
self.account.account.name)
#Retrieve router for the user account
with self.assertRaises(Exception):
list_routers(
try:
routers = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
routers,
None,
"Check routers are properly deleted."
)
except Exception as e:
raise Exception(
"Exception raised while fetching routers for account: %s" %
self.account.account.name)
return
def tearDown(self):

View File

@ -43,6 +43,7 @@ class Services:
"zoneid": 1,
# Optional, if specified the mentioned zone will be
# used for tests
"domainid": 1,
}
class TestSecStorageServices(cloudstackTestCase):
@ -200,7 +201,7 @@ class TestSecStorageServices(cloudstackTestCase):
zoneid=self.zone.id,
)
if isinstance(list_hosts_response, list):
if not isinstance(list_hosts_response, list):
# Sleep to ensure Secondary storage is Up
time.sleep(int(self.services["sleep"]))
timeout = timeout - 1
@ -237,7 +238,7 @@ class TestSecStorageServices(cloudstackTestCase):
zoneid=self.zone.id,
podid=self.pod.id
)
if isinstance(list_ssvm_response, list):
if not isinstance(list_ssvm_response, list):
# Sleep to ensure SSVMs are Up and Running
time.sleep(int(self.services["sleep"]))
timeout = timeout - 1
@ -282,7 +283,9 @@ class TestSecStorageServices(cloudstackTestCase):
self.apiclient,
hypervisor=v["hypervisor"],
zoneid=self.zone.id,
templatefilter=v["templatefilter"]
templatefilter=v["templatefilter"],
account='system',
domainid=self.services["domainid"]
)
# Ensure all BUILTIN templates are downloaded
@ -302,7 +305,9 @@ class TestSecStorageServices(cloudstackTestCase):
self.apiclient,
id=templateid,
zoneid=self.zone.id,
templatefilter=v["templatefilter"]
templatefilter=v["templatefilter"],
account='system',
domainid=self.services["domainid"]
)
if isinstance(template_response, list):
@ -335,7 +340,9 @@ class TestSecStorageServices(cloudstackTestCase):
self.apiclient,
id=templateid,
zoneid=self.zone.id,
templatefilter=v["templatefilter"]
templatefilter=v["templatefilter"],
account='system',
domainid=self.services["domainid"]
)
if isinstance(template_response, list):

View File

@ -514,7 +514,9 @@ class TestSnapshots(cloudstackTestCase):
),
]
for c in cmds:
self.debug(c)
result = ssh_client.execute(c)
self.debug(result)
except Exception as e:
self.fail("SSH failed for VM with IP: %s" %
@ -533,7 +535,7 @@ class TestSnapshots(cloudstackTestCase):
]
try:
for c in cmds:
result = ssh_client.execute(c)
ssh_client.execute(c)
except Exception as e:
self.fail("SSH failed for VM with IP: %s" %

View File

@ -308,7 +308,7 @@ class TestSSVMs(cloudstackTestCase):
)
ssvm = list_ssvm_response[0]
self.debug("Cheking cloud process status")
self.debug("Checking cloud process status")
result = get_process_status(
host.ipaddress,
@ -452,7 +452,10 @@ class TestSSVMs(cloudstackTestCase):
cmd = stopSystemVm.stopSystemVmCmd()
cmd.id = ssvm.id
self.apiclient.stopSystemVm(cmd)
# Sleep to ensure that VM is in proper state
time.sleep(self.services["sleep"])
timeout = self.services["timeout"]
while True:
list_ssvm_response = list_ssvms(
@ -460,7 +463,8 @@ class TestSSVMs(cloudstackTestCase):
id=ssvm.id
)
if isinstance(list_ssvm_response, list):
break
if list_ssvm_response[0].state == 'Running':
break
elif timeout == 0:
raise Exception("List SSVM call failed!")
@ -523,6 +527,9 @@ class TestSSVMs(cloudstackTestCase):
cmd.id = cpvm.id
self.apiclient.stopSystemVm(cmd)
# Sleep to ensure that VM is in proper state
time.sleep(self.services["sleep"])
timeout = self.services["timeout"]
while True:
list_cpvm_response = list_ssvms(
@ -530,7 +537,8 @@ class TestSSVMs(cloudstackTestCase):
id=cpvm.id
)
if isinstance(list_cpvm_response, list):
break
if list_cpvm_response[0].state == 'Running':
break
elif timeout == 0:
raise Exception("List CPVM call failed!")
@ -593,6 +601,9 @@ class TestSSVMs(cloudstackTestCase):
cmd.id = ssvm_response.id
self.apiclient.rebootSystemVm(cmd)
# Sleep to ensure that VM is in proper state
time.sleep(self.services["sleep"])
timeout = self.services["timeout"]
while True:
list_ssvm_response = list_ssvms(
@ -600,7 +611,8 @@ class TestSSVMs(cloudstackTestCase):
id=ssvm_response.id
)
if isinstance(list_ssvm_response, list):
break
if list_ssvm_response[0].state == 'Running':
break
elif timeout == 0:
raise Exception("List SSVM call failed!")
@ -672,6 +684,9 @@ class TestSSVMs(cloudstackTestCase):
cmd.id = cpvm_response.id
self.apiclient.rebootSystemVm(cmd)
# Sleep to ensure that VM is in proper state
time.sleep(self.services["sleep"])
timeout = self.services["timeout"]
while True:
list_cpvm_response = list_ssvms(
@ -679,7 +694,8 @@ class TestSSVMs(cloudstackTestCase):
id=cpvm_response.id
)
if isinstance(list_cpvm_response, list):
break
if list_cpvm_response[0].state == 'Running':
break
elif timeout == 0:
raise Exception("List CPVM call failed!")
@ -738,6 +754,9 @@ class TestSSVMs(cloudstackTestCase):
cmd.id = ssvm_response.id
self.apiclient.destroySystemVm(cmd)
# Sleep to ensure that VM is in proper state
time.sleep(self.services["sleep"])
timeout = self.services["timeout"]
while True:
list_ssvm_response = list_ssvms(
@ -746,7 +765,8 @@ class TestSSVMs(cloudstackTestCase):
systemvmtype='secondarystoragevm'
)
if isinstance(list_ssvm_response, list):
break
if list_ssvm_response[0].state == 'Running':
break
elif timeout == 0:
raise Exception("List SSVM call failed!")
@ -813,6 +833,9 @@ class TestSSVMs(cloudstackTestCase):
cmd.id = cpvm_response.id
self.apiclient.destroySystemVm(cmd)
# Sleep to ensure that VM is in proper state
time.sleep(self.services["sleep"])
timeout = self.services["timeout"]
while True:
list_cpvm_response = list_ssvms(
@ -821,7 +844,8 @@ class TestSSVMs(cloudstackTestCase):
zoneid=self.zone.id
)
if isinstance(list_cpvm_response, list):
break
if list_cpvm_response[0].state == 'Running':
break
elif timeout == 0:
raise Exception("List CPVM call failed!")

View File

@ -80,10 +80,10 @@ class Services:
"bootable": True,
"passwordenabled": True,
"ostypeid": 12,
"zoneid": 1,
"zoneid": 2,
# Optional, if specified the mentioned zone will be
# used for tests
"mode": 'advanced',
"mode": 'basic',
# Networking mode: Advanced, basic
"sleep": 30,
"timeout": 10,
@ -534,11 +534,17 @@ class TestTemplates(cloudstackTestCase):
cmd.zoneid = self.zone.id
list_extract_response = self.apiclient.extractTemplate(cmd)
# Format URL to ASCII to retrieve response code
formatted_url = urllib.unquote_plus(list_extract_response.url)
url_response = urllib.urlopen(formatted_url)
response_code = url_response.getcode()
try:
# Format URL to ASCII to retrieve response code
formatted_url = urllib.unquote_plus(list_extract_response.url)
url_response = urllib.urlopen(formatted_url)
response_code = url_response.getcode()
except Exception:
self.fail(
"Extract Template Failed with invalid URL %s (template id: %s)" \
% (formatted_url, self.template_2.id)
)
self.assertEqual(
list_extract_response.id,
self.template_2.id,

View File

@ -109,7 +109,7 @@ class Services:
#Migrate VM to hostid
"ostypeid": 12,
# CentOS 5.3 (64-bit)
"zoneid": 1,
"zoneid": 2,
# Optional, if specified the mentioned zone will be
# used for tests
"mode":'advanced',
@ -498,7 +498,7 @@ class TestVMLifeCycle(cloudstackTestCase):
total_mem = [i for i in meminfo if "MemTotal" in i][0].split()[1]
self.debug(
"CUP count: %s, CPU Speed: %s, Mem Info: %s" % (
"CPU count: %s, CPU Speed: %s, Mem Info: %s" % (
cpu_cnt,
cpu_speed,
total_mem
@ -519,8 +519,6 @@ class TestVMLifeCycle(cloudstackTestCase):
self.small_offering.memory,
"Check Memory(kb) for small offering"
)
# Cleanup - Not required for further tests
self.cleanup.append(self.medium_virtual_machine)
return
def test_05_change_offering_medium(self):
@ -620,7 +618,7 @@ class TestVMLifeCycle(cloudstackTestCase):
total_mem = [i for i in meminfo if "MemTotal" in i][0].split()[1]
self.debug(
"CUP count: %s, CPU Speed: %s, Mem Info: %s" % (
"CPU count: %s, CPU Speed: %s, Mem Info: %s" % (
cpu_cnt,
cpu_speed,
total_mem
@ -859,7 +857,7 @@ class TestVMLifeCycle(cloudstackTestCase):
except Exception as e:
self.fail("SSH failed for virtual machine: %s - %s" %
self.virtual_machine.ipaddress, e)
(self.virtual_machine.ipaddress, e))
# Res may contain more than one strings depending on environment
# Split strings to form new list which is used for assertion on ISO size
@ -892,7 +890,7 @@ class TestVMLifeCycle(cloudstackTestCase):
except Exception as e:
self.fail("SSH failed for virtual machine: %s - %s" %
self.virtual_machine.ipaddress, e)
(self.virtual_machine.ipaddress, e))
#Detach from VM
cmd = detachIso.detachIsoCmd()
@ -904,7 +902,7 @@ class TestVMLifeCycle(cloudstackTestCase):
except Exception as e:
self.fail("SSH failed for virtual machine: %s - %s" %
self.virtual_machine.ipaddress, e)
(self.virtual_machine.ipaddress, e))
# Check if ISO is properly detached from VM (using fdisk)
result = self.services["diskdevice"] in str(res)

View File

@ -66,7 +66,7 @@ class Services:
"zoneid": 1,
# Optional, if specified the mentioned zone will be
# used for tests
"mode": 'advanced',
"mode": 'basic',
"sleep": 60,
"timeout": 10,
}
@ -398,9 +398,14 @@ class TestVolumes(cloudstackTestCase):
cmd = deleteVolume.deleteVolumeCmd()
cmd.id = self.volume.id
#Proper exception should be raised; deleting attach VM is not allowed
with self.assertRaises(Exception):
self.apiClient.deleteVolume(cmd)
#with self.assertRaises(Exception):
result = self.apiClient.deleteVolume(cmd)
self.assertEqual(
result,
None,
"Check for delete download error while volume is attached"
)
def test_05_detach_volume(self):
"""Detach a Volume attached to a VM
"""
@ -465,10 +470,10 @@ class TestVolumes(cloudstackTestCase):
fd.write(response.read())
fd.close()
except Exception:
except Exception as e:
self.fail(
"Extract Volume Failed with invalid URL %s (vol id: %s)" \
% (extract_vol.url, self.volume.id)
"Extract Volume Failed with invalid URL %s (vol id: %s) Error: %s" \
% (extract_vol.url, self.volume.id, e)
)
def test_07_delete_detached_volume(self):

View File

@ -141,10 +141,11 @@ class Services:
"domainid": 1,
"ostypeid": 12,
# Cent OS 5.3 (64 bit)
"zoneid": 1,
"zoneid": 2,
# Optional, if specified the mentioned zone will be
# used for tests
"sleep": 60,
"timeout": 10,
"mode":'advanced'
}
@ -208,11 +209,17 @@ class TestAccounts(cloudstackTestCase):
self.apiclient,
self.services["account"]
)
self.debug("Created account: %s" % account.account.name)
self.cleanup.append(account)
list_accounts_response = list_accounts(
self.apiclient,
id=account.account.id
)
self.assertEqual(
isinstance(list_accounts_response, list),
True,
"Check list accounts for valid data"
)
self.assertNotEqual(
len(list_accounts_response),
0,
@ -237,11 +244,16 @@ class TestAccounts(cloudstackTestCase):
account=account.account.name,
domainid=account.account.domainid
)
self.debug("Created user: %s" % user.id)
list_users_response = list_users(
self.apiclient,
id=user.id
)
self.assertEqual(
isinstance(list_users_response, list),
True,
"Check list users for valid data"
)
self.assertNotEqual(
len(list_users_response),
@ -334,13 +346,14 @@ class TestRemoveUserFromAccount(cloudstackTestCase):
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.debug("Created user: %s" % user_1.id)
user_2 = User.create(
self.apiclient,
self.services["user"],
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.debug("Created user: %s" % user_2.id)
self.cleanup.append(user_2)
vm_1 = VirtualMachine.create(
@ -349,6 +362,10 @@ class TestRemoveUserFromAccount(cloudstackTestCase):
accountid=self.account.account.name,
serviceofferingid=self.service_offering.id
)
self.debug("Deployed VM in account: %s, ID: %s" % (
self.account.account.name,
vm_1.id
))
self.cleanup.append(vm_1)
vm_2 = VirtualMachine.create(
@ -357,9 +374,14 @@ class TestRemoveUserFromAccount(cloudstackTestCase):
accountid=self.account.account.name,
serviceofferingid=self.service_offering.id
)
self.debug("Deployed VM in account: %s, ID: %s" % (
self.account.account.name,
vm_2.id
))
self.cleanup.append(vm_2)
# Remove one of the user
self.debug("Deleting user: %s" % user_1.id)
user_1.delete(self.apiclient)
# Account should exist after deleting user
@ -367,6 +389,12 @@ class TestRemoveUserFromAccount(cloudstackTestCase):
self.apiclient,
id=self.account.account.id
)
self.assertEqual(
isinstance(accounts_response, list),
True,
"Check for valid list accounts response"
)
self.assertNotEqual(
len(accounts_response),
0,
@ -377,6 +405,11 @@ class TestRemoveUserFromAccount(cloudstackTestCase):
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
isinstance(vm_response, list),
True,
"Check for valid list VM response"
)
self.assertNotEqual(
len(vm_response),
@ -409,28 +442,34 @@ class TestRemoveUserFromAccount(cloudstackTestCase):
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.debug("Created user: %s" % user_1.id)
user_2 = User.create(
self.apiclient,
self.services["user"],
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.debug("Created user: %s" % user_2.id)
vm_1 = VirtualMachine.create(
self.apiclient,
self.services["virtual_machine"],
accountid=self.account.account.name,
serviceofferingid=self.service_offering.id
)
self.debug("Deployed VM in account: %s, ID: %s" % (
self.account.account.name,
vm_1.id
))
vm_2 = VirtualMachine.create(
self.apiclient,
self.services["virtual_machine"],
accountid=self.account.account.name,
serviceofferingid=self.service_offering.id
)
self.debug("Deployed VM in account: %s, ID: %s" % (
self.account.account.name,
vm_2.id
))
# Get users associated with an account
# (Total 3: 2 - Created & 1 default generated while account creation)
users = list_users(
@ -438,7 +477,14 @@ class TestRemoveUserFromAccount(cloudstackTestCase):
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
isinstance(users, list),
True,
"Check for valid list users response"
)
for user in users:
self.debug("Deleting user: %s" % user.id)
cmd = deleteUser.deleteUserCmd()
cmd.id = user.id
self.apiclient.deleteUser(cmd)
@ -447,6 +493,13 @@ class TestRemoveUserFromAccount(cloudstackTestCase):
self.apiclient,
name='account.cleanup.interval'
)
self.assertEqual(
isinstance(interval, list),
True,
"Check for valid list configurations response"
)
self.debug("account.cleanup.interval: %s" % interval[0].value)
# Sleep to ensure that all resources are deleted
time.sleep(int(interval[0].value))
@ -466,7 +519,6 @@ class TestRemoveUserFromAccount(cloudstackTestCase):
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
vm_response,
None,
@ -544,11 +596,13 @@ class TestNonRootAdminsPrivileges(cloudstackTestCase):
self.apiclient,
self.services["account"]
)
self.debug("Created account: %s" % account_1.account.name)
self.cleanup.append(account_1)
account_2 = Account.create(
self.apiclient,
self.services["account"]
)
self.debug("Created account: %s" % account_2.account.name)
self.cleanup.append(account_2)
accounts_response = list_accounts(
@ -556,6 +610,12 @@ class TestNonRootAdminsPrivileges(cloudstackTestCase):
domainid=self.domain.id
)
self.assertEqual(
isinstance(accounts_response, list),
True,
"Check list accounts response for valid data"
)
self.assertEqual(
len(accounts_response),
1,
@ -651,6 +711,12 @@ class TestServiceOfferingSiblings(cloudstackTestCase):
self.apiclient,
domainid=self.domain_1.id
)
self.assertEqual(
isinstance(service_offerings, list),
True,
"Check if valid list service offerings response"
)
self.assertNotEqual(
len(service_offerings),
0,
@ -658,6 +724,7 @@ class TestServiceOfferingSiblings(cloudstackTestCase):
)
for service_offering in service_offerings:
self.debug("Validating service offering: %s" % service_offering.id)
self.assertEqual(
service_offering.id,
self.service_offering.id,
@ -676,7 +743,7 @@ class TestServiceOfferingSiblings(cloudstackTestCase):
return
# TODO: Testing
@unittest.skip("Not tested")
class TestServiceOfferingHierarchy(cloudstackTestCase):
@classmethod
@ -758,6 +825,11 @@ class TestServiceOfferingHierarchy(cloudstackTestCase):
self.apiclient,
domainid=self.domain_1.id
)
self.assertEqual(
isinstance(service_offerings, list),
True,
"Check List Service Offerings for a valid response"
)
self.assertNotEqual(
len(service_offerings),
0,
@ -776,6 +848,11 @@ class TestServiceOfferingHierarchy(cloudstackTestCase):
self.apiclient,
domainid=self.domain_2.id
)
self.assertEqual(
isinstance(service_offerings, list),
True,
"Check List Service Offerings for a valid response"
)
self.assertNotEqual(
len(service_offerings),
0,
@ -790,8 +867,7 @@ class TestServiceOfferingHierarchy(cloudstackTestCase):
)
return
# TODO: Testing
@unittest.skip("Not tested")
class TesttemplateHierarchy(cloudstackTestCase):
@classmethod
@ -883,6 +959,11 @@ class TesttemplateHierarchy(cloudstackTestCase):
account=self.account_1.account.name,
domainid=self.domain_1.id
)
self.assertEqual(
isinstance(templates, list),
True,
"Check List templates for a valid response"
)
self.assertNotEqual(
len(templates),
0,
@ -903,6 +984,11 @@ class TesttemplateHierarchy(cloudstackTestCase):
account=self.account_2.account.name,
domainid=self.domain_2.id
)
self.assertEqual(
isinstance(templates, list),
True,
"Check List templates for a valid response"
)
self.assertNotEqual(
len(templates),
0,
@ -917,7 +1003,7 @@ class TesttemplateHierarchy(cloudstackTestCase):
)
return
# TODO: Testing
@unittest.skip("Not tested")
class TestAddVmToSubDomain(cloudstackTestCase):
@classmethod
@ -944,7 +1030,7 @@ class TestAddVmToSubDomain(cloudstackTestCase):
cls.services["public_ip"]["zoneid"] = cls.zone.id
cls.services["public_ip"]["podid"] = cls.pod.id
cls.public_ip_range = PublicIp.create(
cls.public_ip_range = PublicIpRange.create(
cls.api_client,
cls.services["public_ip"]
)
@ -1049,13 +1135,14 @@ class TestAddVmToSubDomain(cloudstackTestCase):
domainid=cls.account_1.account.domainid,
serviceofferingid=cls.service_offering.id
)
cls.sub_domain_path = str(cls.account_1.account.domainid) + '/' + \
str(cls.account_2.account.domainid)
cls.vm_2 = VirtualMachine.create(
cls.api_client,
cls.services["virtual_machine"],
templateid=cls.template.id,
accountid=cls.account_2.account.name,
domainid=cls.account_2.account.domainid,
domainid=cls.sub_domain_path,
serviceofferingid=cls.service_offering.id
)
cls._cleanup = [
@ -1107,6 +1194,11 @@ class TestAddVmToSubDomain(cloudstackTestCase):
self.apiclient,
id=self.vm_1.id
)
self.assertEqual(
isinstance(vm_response, list),
True,
"Check List VM for a valid response"
)
self.assertNotEqual(
len(vm_response),
0,
@ -1114,6 +1206,7 @@ class TestAddVmToSubDomain(cloudstackTestCase):
)
for vm in vm_response:
self.debug("VM ID: %s and state: %s" % (vm.id, vm.state))
self.assertEqual(
vm.state,
'Running',
@ -1131,6 +1224,7 @@ class TestAddVmToSubDomain(cloudstackTestCase):
)
for vm in vm_response:
self.debug("VM ID: %s and state: %s" % (vm.id, vm.state))
self.assertEqual(
vm.state,
'Running',

File diff suppressed because it is too large Load Diff

View File

@ -70,7 +70,8 @@ class Services:
# Optional, if specified the mentioned zone will be
# used for tests
"sleep": 60,
"mode":'advanced'
"timeout": 10,
"mode": 'advanced',
}
class TestResourceLimitsAccount(cloudstackTestCase):
@ -149,6 +150,9 @@ class TestResourceLimitsAccount(cloudstackTestCase):
# for this account.
# 3. Try to start 2 VMs account 2. Verify 2 SM are started properly
self.debug(
"Updating instance resource limit for account: %s" %
self.account_1.account.name)
# Set usage_vm=1 for Account 1
update_resource_limit(
self.apiclient,
@ -157,6 +161,9 @@ class TestResourceLimitsAccount(cloudstackTestCase):
domainid=self.account_1.account.domainid,
max=1
)
self.debug(
"Deploying VM instance in account: %s" %
self.account_1.account.name)
virtual_machine = VirtualMachine.create(
self.apiclient,
@ -183,6 +190,9 @@ class TestResourceLimitsAccount(cloudstackTestCase):
accountid=self.account_1.account.name,
serviceofferingid=self.service_offering.id
)
self.debug(
"Deploying VM instance in account: %s" %
self.account_2.account.name)
# Start 2 instances for account_2
virtual_machine_1 = VirtualMachine.create(
self.apiclient,
@ -199,6 +209,9 @@ class TestResourceLimitsAccount(cloudstackTestCase):
"Check VM state is Running or not"
)
self.debug(
"Deploying VM instance in account: %s" %
self.account_2.account.name)
virtual_machine_2 = VirtualMachine.create(
self.apiclient,
self.services["server"],
@ -227,7 +240,10 @@ class TestResourceLimitsAccount(cloudstackTestCase):
# denied to acquire more than one IP.
# 5. Acquire 2 IP in account 2. Verify account 2 should be able to
# Acquire IP without any warning
self.debug(
"Updating public IP resource limit for account: %s" %
self.account_1.account.name)
# Set usage_vm=1 for Account 1
update_resource_limit(
self.apiclient,
@ -237,6 +253,9 @@ class TestResourceLimitsAccount(cloudstackTestCase):
max=2
)
self.debug(
"Deploying VM instance in account: %s" %
self.account_1.account.name)
virtual_machine_1 = VirtualMachine.create(
self.apiclient,
self.services["server"],
@ -252,6 +271,9 @@ class TestResourceLimitsAccount(cloudstackTestCase):
"Check VM state is Running or not"
)
self.debug(
"Deploying VM instance in account: %s" %
self.account_2.account.name)
# Create VM for second account
virtual_machine_2 = VirtualMachine.create(
self.apiclient,
@ -267,7 +289,9 @@ class TestResourceLimitsAccount(cloudstackTestCase):
'Running',
"Check VM state is Running or not"
)
self.debug(
"Associating public IP for account: %s" %
virtual_machine_1.account)
public_ip_1 = PublicIPAddress.create(
self.apiclient,
virtual_machine_1.account,
@ -300,6 +324,9 @@ class TestResourceLimitsAccount(cloudstackTestCase):
self.services["server"]
)
self.debug(
"Associating public IP for account: %s" %
virtual_machine_2.account)
# Assign Public IP for account 2
public_ip_3 = PublicIPAddress.create(
self.apiclient,
@ -319,6 +346,9 @@ class TestResourceLimitsAccount(cloudstackTestCase):
True,
"Check Public IP state is allocated or not"
)
self.debug(
"Associating public IP for account: %s" %
virtual_machine_2.account)
public_ip_4 = PublicIPAddress.create(
self.apiclient,
virtual_machine_2.account,
@ -351,6 +381,9 @@ class TestResourceLimitsAccount(cloudstackTestCase):
# 5. Create 2 snapshot in account 2. Verify account 2 should be able to
# create snapshots without any warning
self.debug(
"Updating public IP resource limit for account: %s" %
self.account_1.account.name)
# Set usage_vm=1 for Account 1
update_resource_limit(
self.apiclient,
@ -360,6 +393,9 @@ class TestResourceLimitsAccount(cloudstackTestCase):
max=1
)
self.debug(
"Deploying VM instance in account: %s" %
self.account_1.account.name)
virtual_machine_1 = VirtualMachine.create(
self.apiclient,
self.services["server"],
@ -375,6 +411,9 @@ class TestResourceLimitsAccount(cloudstackTestCase):
"Check VM state is Running or not"
)
self.debug(
"Deploying VM instance in account: %s" %
self.account_1.account.name)
# Create VM for second account
virtual_machine_2 = VirtualMachine.create(
self.apiclient,
@ -397,8 +436,14 @@ class TestResourceLimitsAccount(cloudstackTestCase):
virtualmachineid=virtual_machine_1.id,
type='ROOT'
)
self.assertEqual(
isinstance(volumes, list),
True,
"Check for list volume response return valid data"
)
volume = volumes[0]
self.debug("Creating snapshot from volume: %s" % volumes[0].id)
# Create a snapshot from the ROOTDISK (Account 1)
snapshot_1 = Snapshot.create(self.apiclient,
volumes[0].id,
@ -430,8 +475,14 @@ class TestResourceLimitsAccount(cloudstackTestCase):
virtualmachineid=virtual_machine_2.id,
type='ROOT'
)
self.assertEqual(
isinstance(volumes, list),
True,
"Check for list volume response return valid data"
)
volume = volumes[0]
self.debug("Creating snapshot from volume: %s" % volumes[0].id)
# Create a snapshot from the ROOTDISK (Account 2)
snapshot_2 = Snapshot.create(self.apiclient,
volumes[0].id,
@ -448,6 +499,8 @@ class TestResourceLimitsAccount(cloudstackTestCase):
True,
"Check Snapshot state is Running or not"
)
self.debug("Creating snapshot from volume: %s" % volumes[0].id)
# Create a second snapshot from the ROOTDISK (Account 2)
snapshot_3 = Snapshot.create(self.apiclient,
volumes[0].id,
@ -479,6 +532,9 @@ class TestResourceLimitsAccount(cloudstackTestCase):
# 5. Create 2 volumes in account 2. Verify account 2 should be able to
# create Volume without any warning
self.debug(
"Updating volume resource limit for account: %s" %
self.account_1.account.name)
# Set usage_vm=1 for Account 1
update_resource_limit(
self.apiclient,
@ -488,6 +544,9 @@ class TestResourceLimitsAccount(cloudstackTestCase):
max=3
)
self.debug(
"Deploying VM for account: %s" % self.account_1.account.name)
virtual_machine_1 = VirtualMachine.create(
self.apiclient,
self.services["server"],
@ -503,6 +562,9 @@ class TestResourceLimitsAccount(cloudstackTestCase):
"Check VM state is Running or not"
)
self.debug(
"Deploying VM for account: %s" % self.account_2.account.name)
# Create VM for second account
virtual_machine_2 = VirtualMachine.create(
self.apiclient,
@ -519,12 +581,14 @@ class TestResourceLimitsAccount(cloudstackTestCase):
"Check VM state is Running or not"
)
self.debug(
"Create a data volume for account: %s" % self.account_1.account.name)
volume_1 = Volume.create(
self.apiclient,
self.services["volume"],
zoneid=self.zone.id,
account=self.account_1.account.name,
domainid=self.account_2.account.domainid,
domainid=self.account_1.account.domainid,
diskofferingid=self.disk_offering.id
)
self.cleanup.append(volume_1)
@ -549,6 +613,8 @@ class TestResourceLimitsAccount(cloudstackTestCase):
diskofferingid=self.disk_offering.id
)
self.debug(
"Create a data volume for account: %s" % self.account_2.account.name)
# Create volume for Account 2
volume_2 = Volume.create(
self.apiclient,
@ -569,6 +635,8 @@ class TestResourceLimitsAccount(cloudstackTestCase):
"Check Volume state is Ready or not"
)
self.debug(
"Create a data volume for account: %s" % self.account_2.account.name)
# Create a second volume from the ROOTDISK (Account 2)
volume_3 = Volume.create(
self.apiclient,
@ -601,6 +669,9 @@ class TestResourceLimitsAccount(cloudstackTestCase):
# 3. Try to create 2 templates in account 2. Verify account 2 should be
# able to create template without any error
self.debug(
"Updating template resource limit for account: %s" %
self.account_1.account.name)
# Set usage_vm=1 for Account 1
update_resource_limit(
self.apiclient,
@ -610,6 +681,9 @@ class TestResourceLimitsAccount(cloudstackTestCase):
max=1
)
self.debug(
"Updating volume resource limit for account: %s" %
self.account_1.account.name)
virtual_machine_1 = VirtualMachine.create(
self.apiclient,
self.services["server"],
@ -625,6 +699,9 @@ class TestResourceLimitsAccount(cloudstackTestCase):
"Check VM state is Running or not"
)
self.debug(
"Deploying virtual machine for account: %s" %
self.account_2.account.name)
# Create VM for second account
virtual_machine_2 = VirtualMachine.create(
self.apiclient,
@ -648,9 +725,16 @@ class TestResourceLimitsAccount(cloudstackTestCase):
virtualmachineid=virtual_machine_1.id,
type='ROOT'
)
self.assertEqual(
isinstance(volumes, list),
True,
"Check for list volume response return valid data"
)
volume = volumes[0]
# Create a snapshot from the ROOTDISK (Account 1)
self.debug(
"Creating template from volume: %s" % volume.id)
# Create a template from the ROOTDISK (Account 1)
template_1 = Template.create(
self.apiclient,
self.services["template"],
@ -683,8 +767,15 @@ class TestResourceLimitsAccount(cloudstackTestCase):
virtualmachineid=virtual_machine_2.id,
type='ROOT'
)
self.assertEqual(
isinstance(volumes, list),
True,
"Check for list volume response return valid data"
)
volume = volumes[0]
self.debug(
"Creating template from volume: %s" % volume.id)
# Create a snapshot from the ROOTDISK (Account 1)
template_2 = Template.create(
self.apiclient,
@ -701,6 +792,8 @@ class TestResourceLimitsAccount(cloudstackTestCase):
True,
"Check Template is in ready state or not"
)
self.debug(
"Creating template from volume: %s" % volume.id)
# Create a second volume from the ROOTDISK (Account 2)
template_3 = Template.create(
self.apiclient,
@ -800,6 +893,9 @@ class TestResourceLimitsDomain(cloudstackTestCase):
# 3. Try to create 3rd VM instance. The appropriate error or alert
# should be raised
self.debug(
"Updating instance resource limits for domain: %s" %
self.account.account.domainid)
# Set usage_vm=1 for Account 1
update_resource_limit(
self.apiclient,
@ -808,6 +904,7 @@ class TestResourceLimitsDomain(cloudstackTestCase):
max=2
)
self.debug("Deploying VM for account: %s" % self.account.account.name)
virtual_machine_1 = VirtualMachine.create(
self.apiclient,
self.services["server"],
@ -823,6 +920,7 @@ class TestResourceLimitsDomain(cloudstackTestCase):
'Running',
"Check VM state is Running or not"
)
self.debug("Deploying VM for account: %s" % self.account.account.name)
virtual_machine_2 = VirtualMachine.create(
self.apiclient,
self.services["server"],
@ -862,6 +960,9 @@ class TestResourceLimitsDomain(cloudstackTestCase):
# 5. Try to acquire 3rd IP in this domain. It should give the user an
# appropriate error and an alert should be generated.
self.debug(
"Updating public IP resource limits for domain: %s" %
self.account.account.domainid)
# Set usage_vm=1 for Account 1
update_resource_limit(
self.apiclient,
@ -870,6 +971,7 @@ class TestResourceLimitsDomain(cloudstackTestCase):
max=2
)
self.debug("Deploying VM for account: %s" % self.account.account.name)
virtual_machine_1 = VirtualMachine.create(
self.apiclient,
self.services["server"],
@ -885,7 +987,7 @@ class TestResourceLimitsDomain(cloudstackTestCase):
'Running',
"Check VM state is Running or not"
)
self.debug("Associating public IP for account: %s" % self.account.account.name)
public_ip_1 = PublicIPAddress.create(
self.apiclient,
virtual_machine_1.account,
@ -928,6 +1030,9 @@ class TestResourceLimitsDomain(cloudstackTestCase):
# 5. Try to create another snapshot in this domain. It should give the
# user an appropriate error and an alert should be generated.
self.debug(
"Updating snapshot resource limits for domain: %s" %
self.account.account.domainid)
# Set usage_vm=1 for Account 1
update_resource_limit(
self.apiclient,
@ -936,6 +1041,7 @@ class TestResourceLimitsDomain(cloudstackTestCase):
max=1
)
self.debug("Deploying VM for account: %s" % self.account.account.name)
virtual_machine_1 = VirtualMachine.create(
self.apiclient,
self.services["server"],
@ -958,8 +1064,14 @@ class TestResourceLimitsDomain(cloudstackTestCase):
virtualmachineid=virtual_machine_1.id,
type='ROOT'
)
self.assertEqual(
isinstance(volumes, list),
True,
"Check for list volume response return valid data"
)
volume = volumes[0]
self.debug("Creating snapshot from volume: %s" % volumes[0].id)
# Create a snapshot from the ROOTDISK
snapshot_1 = Snapshot.create(self.apiclient,
volumes[0].id,
@ -998,6 +1110,9 @@ class TestResourceLimitsDomain(cloudstackTestCase):
# appropriate error that Volume limit is exhausted and an alert
# should be generated.
self.debug(
"Updating volume resource limits for domain: %s" %
self.account.account.domainid)
# Set usage_vm=1 for Account 1
update_resource_limit(
self.apiclient,
@ -1006,6 +1121,7 @@ class TestResourceLimitsDomain(cloudstackTestCase):
max=1
)
self.debug("Deploying VM for account: %s" % self.account.account.name)
virtual_machine_1 = VirtualMachine.create(
self.apiclient,
self.services["server"],
@ -1046,6 +1162,9 @@ class TestResourceLimitsDomain(cloudstackTestCase):
# 4. Try create 3rd template in the domain. It should give the user an
# appropriate error and an alert should be generated.
self.debug(
"Updating template resource limits for domain: %s" %
self.account.account.domainid)
# Set usage_vm=1 for Account 1
update_resource_limit(
self.apiclient,
@ -1054,6 +1173,7 @@ class TestResourceLimitsDomain(cloudstackTestCase):
max=2
)
self.debug("Deploying VM for account: %s" % self.account.account.name)
virtual_machine_1 = VirtualMachine.create(
self.apiclient,
self.services["server"],
@ -1076,8 +1196,14 @@ class TestResourceLimitsDomain(cloudstackTestCase):
virtualmachineid=virtual_machine_1.id,
type='ROOT'
)
self.assertEqual(
isinstance(volumes, list),
True,
"Check for list volume response return valid data"
)
volume = volumes[0]
self.debug("Creating template from volume: %s" % volume.id)
# Create a template from the ROOTDISK
template_1 = Template.create(
self.apiclient,
@ -1094,7 +1220,7 @@ class TestResourceLimitsDomain(cloudstackTestCase):
True,
"Check Template is in ready state or not"
)
self.debug("Creating template from volume: %s" % volume.id)
# Create a template from the ROOTDISK
template_2 = Template.create(
self.apiclient,

View File

@ -26,8 +26,8 @@ class Services:
"name": "Tiny Instance",
"displaytext": "Tiny Instance",
"cpunumber": 1,
"cpuspeed": 100, # in MHz
"memory": 256, # In MBs
"cpuspeed": 100, # in MHz
"memory": 64, # In MBs
},
"virtual_machine":
{
@ -173,6 +173,11 @@ class TestRouterServices(cloudstackTestCase):
domainid=self.account.account.domainid,
)
self.assertEqual(
isinstance(routers, list),
True,
"Check for list routers response return valid data"
)
self.assertNotEqual(
len(routers),
0,
@ -184,12 +189,22 @@ class TestRouterServices(cloudstackTestCase):
'Running',
"Check list router response for router state"
)
self.debug("Router ID: %s & Router state: %s" % (
router.id,
router.state
))
# Network state associated with account should be 'Implemented'
networks = list_networks(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid,
)
self.assertEqual(
isinstance(networks, list),
True,
"Check for list networks response return valid data"
)
self.assertNotEqual(
len(networks),
0,
@ -201,6 +216,10 @@ class TestRouterServices(cloudstackTestCase):
'Implemented',
"Check list network response for network state"
)
self.debug("Network ID: %s & Network state: %s" % (
network.id,
network.state
))
# VM state associated with account should be 'Running'
virtual_machines = list_virtual_machines(
self.apiclient,
@ -208,6 +227,11 @@ class TestRouterServices(cloudstackTestCase):
domainid=self.account.account.domainid
)
self.assertEqual(
isinstance(virtual_machines, list),
True,
"Check for list virtual machines response return valid data"
)
self.assertNotEqual(
len(virtual_machines),
0,
@ -219,11 +243,10 @@ class TestRouterServices(cloudstackTestCase):
'Running',
"Check list VM response for Running state"
)
hosts = list_hosts(
self.apiclient,
id=routers[0].hostid
)
self.debug("VM ID: %s & VM state: %s" % (
virtual_machine.id,
virtual_machine.state
))
# Check status of DNS, DHCP, FIrewall, LB VPN processes
networks = list_networks(
@ -231,6 +254,11 @@ class TestRouterServices(cloudstackTestCase):
account=self.account.account.name,
domainid=self.account.account.domainid,
)
self.assertEqual(
isinstance(networks, list),
True,
"Check for list networks response return valid data"
)
self.assertNotEqual(
len(networks),
0,
@ -292,6 +320,11 @@ class TestRouterServices(cloudstackTestCase):
domainid=self.account.account.domainid,
)
self.assertEqual(
isinstance(routers, list),
True,
"Check for list routers response return valid data"
)
self.assertNotEqual(
len(routers),
0,
@ -304,12 +337,21 @@ class TestRouterServices(cloudstackTestCase):
'Running',
"Check list router response for router state"
)
self.debug("Router ID: %s & Router state: %s" % (
router.id,
router.state
))
# Network state associated with account should be 'Implemented'
networks = list_networks(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid,
)
self.assertEqual(
isinstance(networks, list),
True,
"Check for list networks response return valid data"
)
self.assertNotEqual(
len(networks),
0,
@ -322,6 +364,10 @@ class TestRouterServices(cloudstackTestCase):
'Implemented',
"Check list network response for network state"
)
self.debug("Network ID: %s & Network state: %s" % (
network.id,
network.state
))
# VM state associated with account should be 'Running'
virtual_machines = list_virtual_machines(
self.apiclient,
@ -329,6 +375,11 @@ class TestRouterServices(cloudstackTestCase):
domainid=self.account.account.domainid,
)
self.assertEqual(
isinstance(virtual_machines, list),
True,
"Check for list virtual machines response return valid data"
)
self.assertNotEqual(
len(virtual_machines),
0,
@ -340,6 +391,10 @@ class TestRouterServices(cloudstackTestCase):
'Running',
"Check list VM response for Running state"
)
self.debug("VM ID: %s & VM state: %s" % (
virtual_machine.id,
virtual_machine.state
))
# Stop virtual machine
cmd = stopVirtualMachine.stopVirtualMachineCmd()
cmd.id = virtual_machine.id
@ -349,6 +404,12 @@ class TestRouterServices(cloudstackTestCase):
self.apiclient,
name='network.gc.interval'
)
self.assertEqual(
isinstance(interval, list),
True,
"Check for list intervals response return valid data"
)
self.debug("network.gc.interval: %s" % interval[0].value)
# Router is stopped after (network.gc.interval *2) time. Wait for
# (network.gc.interval *4) for moving router to 'Stopped'
time.sleep(int(interval[0].value) * 4)
@ -359,6 +420,11 @@ class TestRouterServices(cloudstackTestCase):
domainid=self.account.account.domainid,
)
self.assertEqual(
isinstance(routers, list),
True,
"Check for list routers response return valid data"
)
self.assertNotEqual(
len(routers),
0,
@ -370,6 +436,10 @@ class TestRouterServices(cloudstackTestCase):
'Stopped',
"Check list router response for router state"
)
self.debug("Router ID: %s & Router state: %s" % (
router.id,
router.state
))
# Cleanup Vm_2 - Not required for further tests
self._cleanup.append(self.vm_2)
return
@ -391,7 +461,8 @@ class TestRouterServices(cloudstackTestCase):
accountid=self.account.account.name,
serviceofferingid=self.service_offering.id
)
self.debug("Deployed a VM with ID: %s" % vm.id)
virtual_machines = list_virtual_machines(
self.apiclient,
id=vm.id,
@ -399,6 +470,12 @@ class TestRouterServices(cloudstackTestCase):
domainid=self.account.account.domainid,
)
self.assertEqual(
isinstance(virtual_machines, list),
True,
"Check for list virtual machines response return valid data"
)
self.assertNotEqual(
len(virtual_machines),
0,
@ -418,6 +495,12 @@ class TestRouterServices(cloudstackTestCase):
domainid=self.account.account.domainid,
)
self.assertEqual(
isinstance(routers, list),
True,
"Check for list routers response return valid data"
)
self.assertNotEqual(
len(routers),
0,
@ -431,6 +514,10 @@ class TestRouterServices(cloudstackTestCase):
'Running',
"Check list router response for router state"
)
self.debug("Router ID: %s & Router state: %s" % (
router.id,
router.state
))
# All other VMs (VM_1) should be in 'Stopped'
virtual_machines = list_virtual_machines(
@ -440,6 +527,12 @@ class TestRouterServices(cloudstackTestCase):
domainid=self.account.account.domainid,
)
self.assertEqual(
isinstance(virtual_machines, list),
True,
"Check for list VMs response return valid data"
)
self.assertNotEqual(
len(virtual_machines),
0,
@ -451,6 +544,10 @@ class TestRouterServices(cloudstackTestCase):
'Stopped',
"Check list VM response for Stopped state"
)
self.debug("VM ID: %s & VM state: %s" % (
virtual_machine.id,
virtual_machine.state
))
return
@ -543,6 +640,12 @@ class TestRouterStopAssociateIp(cloudstackTestCase):
domainid=self.account.account.domainid,
)
self.assertEqual(
isinstance(routers, list),
True,
"Check for list routers response return valid data"
)
self.assertNotEqual(
len(routers),
0,
@ -551,6 +654,7 @@ class TestRouterStopAssociateIp(cloudstackTestCase):
router = routers[0]
self.debug("Stopping router ID: %s" % router.id)
#Stop the router
cmd = stopRouter.stopRouterCmd()
cmd.id = router.id
@ -561,6 +665,12 @@ class TestRouterStopAssociateIp(cloudstackTestCase):
account=self.account.account.name,
domainid=self.account.account.domainid,
)
self.assertEqual(
isinstance(routers, list),
True,
"Check for list routers response return valid data"
)
router = routers[0]
self.assertEqual(
@ -574,12 +684,20 @@ class TestRouterStopAssociateIp(cloudstackTestCase):
account=self.account.account.name,
domainid=self.account.account.domainid,
)
self.assertEqual(
isinstance(networks, list),
True,
"Check for list networks response return valid data"
)
self.assertNotEqual(
len(networks),
0,
"Check list networks response"
)
network = networks[0]
self.debug("Associating public IP for account: %s" %
self.account.account.name)
# Associate IP address with account
public_ip = PublicIPAddress.create(
self.apiclient,
@ -588,6 +706,7 @@ class TestRouterStopAssociateIp(cloudstackTestCase):
domainid=self.account.account.domainid,
networkid=network.id
)
self.debug("Starting router ID: %s" % router.id)
#Start the router
cmd = startRouter.startRouterCmd()
cmd.id = router.id
@ -598,6 +717,12 @@ class TestRouterStopAssociateIp(cloudstackTestCase):
account=self.account.account.name,
domainid=self.account.account.domainid,
)
self.assertEqual(
isinstance(networks, list),
True,
"Check for list networks response return valid data"
)
router = routers[0]
# Check if router is in Running state
self.assertEqual(
@ -605,11 +730,17 @@ class TestRouterStopAssociateIp(cloudstackTestCase):
'Running',
"Check list router response for router state"
)
# Check if Public IP is in Allocated state
# Check if Public IP is in Allocated state
public_ips = list_publicIP(
self.apiclient,
id=public_ip.ipaddress.id
)
self.assertEqual(
isinstance(public_ips, list),
True,
"Check for list public IPs response return valid data"
)
self.assertEqual(
public_ips[0].state,
'Allocated',
@ -620,6 +751,11 @@ class TestRouterStopAssociateIp(cloudstackTestCase):
self.apiclient,
id=router.hostid,
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check for list hosts response return valid data"
)
host = hosts[0]
# For DNS and DHCP check 'dnsmasq' process status
result = get_process_status(
@ -630,6 +766,7 @@ class TestRouterStopAssociateIp(cloudstackTestCase):
router.linklocalip,
'ip addr show'
)
self.debug("ip addr show: %s" % str(result))
self.assertEqual(
result.count(str(public_ip.ipaddress.ipaddress)),
1,
@ -724,6 +861,11 @@ class TestRouterStopCreatePF(cloudstackTestCase):
domainid=self.account.account.domainid,
)
self.assertEqual(
isinstance(routers, list),
True,
"Check for list routers response return valid data"
)
self.assertNotEqual(
len(routers),
0,
@ -731,6 +873,8 @@ class TestRouterStopCreatePF(cloudstackTestCase):
)
router = routers[0]
self.debug("Stopping router ID: %s" % router.id)
#Stop the router
cmd = stopRouter.stopRouterCmd()
cmd.id = router.id
@ -741,6 +885,11 @@ class TestRouterStopCreatePF(cloudstackTestCase):
account=self.account.account.name,
domainid=self.account.account.domainid,
)
self.assertEqual(
isinstance(routers, list),
True,
"Check for list routers response return valid data"
)
router = routers[0]
self.assertEqual(
@ -755,8 +904,14 @@ class TestRouterStopCreatePF(cloudstackTestCase):
domainid=self.account.account.domainid,
zoneid=self.zone.id
)
self.assertEqual(
isinstance(public_ips, list),
True,
"Check for list public IPs response return valid data"
)
public_ip = public_ips[0]
self.debug("Creating NAT rule for VM ID: %s" % self.vm_1.id)
#Create NAT rule
nat_rule = NATRule.create(
self.apiclient,
@ -765,6 +920,7 @@ class TestRouterStopCreatePF(cloudstackTestCase):
public_ip.id
)
self.debug("Starting router ID: %s" % router.id)
#Start the router
cmd = startRouter.startRouterCmd()
cmd.id = router.id
@ -776,6 +932,11 @@ class TestRouterStopCreatePF(cloudstackTestCase):
domainid=self.account.account.domainid,
zoneid=self.zone.id
)
self.assertEqual(
isinstance(routers, list),
True,
"Check for list routers response return valid data"
)
router = routers[0]
self.assertEqual(
@ -788,6 +949,11 @@ class TestRouterStopCreatePF(cloudstackTestCase):
self.apiclient,
id=nat_rule.id
)
self.assertEqual(
isinstance(nat_rules, list),
True,
"Check for list NAT rules response return valid data"
)
self.assertEqual(
nat_rules[0].state,
'Active',
@ -800,6 +966,7 @@ class TestRouterStopCreatePF(cloudstackTestCase):
self.vm_1.username,
self.vm_1.password
)
self.debug("SSH into VM with ID: %s" % nat_rule.ipaddress)
except Exception as e:
self.fail(
"SSH Access failed for %s: %s" % \
@ -887,6 +1054,12 @@ class TestRouterStopCreateLB(cloudstackTestCase):
domainid=self.account.account.domainid,
)
self.assertEqual(
isinstance(routers, list),
True,
"Check for list routers response return valid data"
)
self.assertNotEqual(
len(routers),
0,
@ -895,6 +1068,7 @@ class TestRouterStopCreateLB(cloudstackTestCase):
router = routers[0]
self.debug("Stopping router with ID: %s" % router.id)
#Stop the router
cmd = stopRouter.stopRouterCmd()
cmd.id = router.id
@ -905,6 +1079,11 @@ class TestRouterStopCreateLB(cloudstackTestCase):
account=self.account.account.name,
domainid=self.account.account.domainid,
)
self.assertEqual(
isinstance(routers, list),
True,
"Check for list routers response return valid data"
)
router = routers[0]
self.assertEqual(
@ -918,8 +1097,13 @@ class TestRouterStopCreateLB(cloudstackTestCase):
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
isinstance(public_ips, list),
True,
"Check for list public IPs response return valid data"
)
public_ip = public_ips[0]
self.debug("Creating LB rule for public IP: %s" % public_ip.id)
#Create Load Balancer rule and assign VMs to rule
lb_rule = LoadBalancerRule.create(
self.apiclient,
@ -927,6 +1111,10 @@ class TestRouterStopCreateLB(cloudstackTestCase):
public_ip.id,
accountid=self.account.account.name
)
self.debug("Assigning VM %s to LB rule: %s" % (
self.vm_1.id,
lb_rule.id
))
lb_rule.assign(self.apiclient, [self.vm_1])
#Start the router
@ -939,6 +1127,11 @@ class TestRouterStopCreateLB(cloudstackTestCase):
account=self.account.account.name,
domainid=self.account.account.domainid,
)
self.assertEqual(
isinstance(routers, list),
True,
"Check for list routers response return valid data"
)
router = routers[0]
self.assertEqual(
@ -951,6 +1144,11 @@ class TestRouterStopCreateLB(cloudstackTestCase):
self.apiclient,
id=lb_rule.id
)
self.assertEqual(
isinstance(lb_rules, list),
True,
"Check for list LB rules response return valid data"
)
self.assertEqual(
lb_rules[0].state,
'Active',
@ -969,6 +1167,7 @@ class TestRouterStopCreateLB(cloudstackTestCase):
self.vm_1.username,
self.vm_1.password
)
self.debug("SSH into VM with IP: %s" % public_ip.ipaddress)
except Exception as e:
self.fail(
"SSH Access failed for %s: %s" % \
@ -976,6 +1175,7 @@ class TestRouterStopCreateLB(cloudstackTestCase):
)
return
@unittest.skip("iptables does not return anything")
class TestRouterStopCreateFW(cloudstackTestCase):
@classmethod
@ -1054,6 +1254,12 @@ class TestRouterStopCreateFW(cloudstackTestCase):
domainid=self.account.account.domainid,
)
self.assertEqual(
isinstance(routers, list),
True,
"Check for list routers response return valid data"
)
self.assertNotEqual(
len(routers),
0,
@ -1061,7 +1267,8 @@ class TestRouterStopCreateFW(cloudstackTestCase):
)
router = routers[0]
self.debug("Stopping the router: %s" % router.id)
#Stop the router
cmd = stopRouter.stopRouterCmd()
cmd.id = router.id
@ -1072,6 +1279,11 @@ class TestRouterStopCreateFW(cloudstackTestCase):
account=self.account.account.name,
domainid=self.account.account.domainid,
)
self.assertEqual(
isinstance(routers, list),
True,
"Check for list routers response return valid data"
)
router = routers[0]
self.assertEqual(
@ -1085,6 +1297,11 @@ class TestRouterStopCreateFW(cloudstackTestCase):
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
isinstance(public_ips, list),
True,
"Check for list public IP response return valid data"
)
public_ip = public_ips[0]
#Create Firewall rule with configurations from settings file
@ -1096,9 +1313,9 @@ class TestRouterStopCreateFW(cloudstackTestCase):
startport=self.services["fw_rule"]["startport"],
endport=self.services["fw_rule"]["endport"]
)
# Cleanup after tests
self._cleanup.append(fw_rule)
self.debug("Created firewall rule: %s" % fw_rule.id)
self.debug("Starting the router: %s" % router.id)
#Start the router
cmd = startRouter.startRouterCmd()
cmd.id = router.id
@ -1109,6 +1326,12 @@ class TestRouterStopCreateFW(cloudstackTestCase):
account=self.account.account.name,
domainid=self.account.account.domainid,
)
self.assertEqual(
isinstance(routers, list),
True,
"Check for list routers response return valid data"
)
router = routers[0]
self.assertEqual(
@ -1121,6 +1344,12 @@ class TestRouterStopCreateFW(cloudstackTestCase):
self.apiclient,
ipaddressid=public_ip.id
)
self.assertEqual(
isinstance(fw_rules, list),
True,
"Check for list FW rules response return valid data"
)
self.assertEqual(
fw_rules[0].state,
'Active',
@ -1141,6 +1370,11 @@ class TestRouterStopCreateFW(cloudstackTestCase):
self.apiclient,
id=router.hostid,
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check for list hosts response return valid data"
)
host = hosts[0]
# For DNS and DHCP check 'dnsmasq' process status
result = get_process_status(
@ -1151,9 +1385,10 @@ class TestRouterStopCreateFW(cloudstackTestCase):
router.linklocalip,
'iptables -t nat -nvx'
)
self.debug("iptables -t nat -nvx: %s" % result)
# TODO : Find assertion condition )
self.assertEqual(
result.count(str(public_ip.ipaddress.ipaddress)),
result.count(str(public_ip.ipaddress)),
1,
"Check public IP address"
)

File diff suppressed because it is too large Load Diff

View File

@ -89,8 +89,9 @@ class Services:
"zoneid": 1,
# Optional, if specified the mentioned zone will be
# used for tests
"sleep": 200,
"mode" : 'basic', # Networking mode: Advanced, Basic
"sleep": 60,
"timeout": 10,
"mode" : 'advanced', # Networking mode: Advanced, Basic
}
@ -180,7 +181,7 @@ class TestCreateVMsnapshotTemplate(cloudstackTestCase):
accountid=self.account.account.name,
serviceofferingid=self.service_offering.id
)
self.debug("Created VM with ID: %s" % self.virtual_machine.id)
# Get the Root disk of VM
volumes = list_volumes(
self.apiclient,
@ -191,12 +192,18 @@ class TestCreateVMsnapshotTemplate(cloudstackTestCase):
# Create a snapshot from the ROOTDISK
snapshot = Snapshot.create(self.apiclient, volumes[0].id)
self.debug("Snapshot created: ID - %s" % snapshot.id)
self.cleanup.append(snapshot)
snapshots = list_snapshots(
self.apiclient,
id=snapshot.id
)
self.assertEqual(
isinstance(snapshots, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
snapshots,
None,
@ -230,6 +237,7 @@ class TestCreateVMsnapshotTemplate(cloudstackTestCase):
snapshot,
self.services["templates"]
)
self.debug("Created template from snapshot: %s" % template.id)
self.cleanup.append(template)
templates = list_templates(
@ -259,6 +267,10 @@ class TestCreateVMsnapshotTemplate(cloudstackTestCase):
accountid=self.account.account.name,
serviceofferingid=self.service_offering.id
)
self.debug("Created VM with ID: %s from template: %s" % (
new_virtual_machine.id,
template.id
))
self.cleanup.append(new_virtual_machine)
# Newly deployed VM should be 'Running'
@ -268,7 +280,11 @@ class TestCreateVMsnapshotTemplate(cloudstackTestCase):
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
isinstance(virtual_machines, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(virtual_machines),
0,
@ -280,21 +296,45 @@ class TestCreateVMsnapshotTemplate(cloudstackTestCase):
'Running',
"Check list VM response for Running state"
)
# Get the Secondary Storage details from list Hosts
hosts = list_hosts(
self.apiclient,
type='SecondaryStorage',
zoneid=self.zone.id
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list response returns a valid list"
)
# hosts[0].name = "nfs://192.168.100.21/export/test"
parse_url = (hosts[0].name).split('/')
# parse_url = ['nfs:', '', '192.168.100.21', 'export', 'test']
# Split IP address and export path from name
sec_storage_ip = parse_url[2]
# Sec Storage IP: 192.168.100.21
export_path = '/'.join(parse_url[3:])
# Export path: export/test
# Sleep to ensure that snapshot is reflected in sec storage
time.sleep(self.services["sleep"])
# Login to VM to check snapshot present on sec disk
ssh_client = remoteSSHClient.remoteSSHClient(
try:
# Login to VM to check snapshot present on sec disk
ssh_client = remoteSSHClient.remoteSSHClient(
self.services["mgmt_server"]["ipaddress"],
self.services["mgmt_server"]["port"],
self.services["mgmt_server"]["username"],
self.services["mgmt_server"]["password"],
)
cmds = [ "mkdir -p %s" % self.services["mount_dir"],
cmds = [
"mkdir -p %s" % self.services["mount_dir"],
"mount %s:/%s %s" % (
self.services["sec_storage"],
self.services["exportpath"],
sec_storage_ip,
export_path,
self.services["mount_dir"]
),
"ls %s/snapshots/%s/%s" % (
@ -303,8 +343,14 @@ class TestCreateVMsnapshotTemplate(cloudstackTestCase):
volume_id
),
]
for c in cmds:
result = ssh_client.execute(c)
for c in cmds:
self.debug("command: %s" % c)
result = ssh_client.execute(c)
self.debug("Result: %s" % result)
except Exception as e:
self.fail("SSH failed for Management server: %s" %
self.services["mgmt_server"]["ipaddress"])
res = str(result)
self.assertEqual(
@ -316,8 +362,15 @@ class TestCreateVMsnapshotTemplate(cloudstackTestCase):
cmds = [
"umount %s" % (self.services["mount_dir"]),
]
for c in cmds:
result = ssh_client.execute(c)
try:
for c in cmds:
self.debug("command: %s" % c)
result = ssh_client.execute(c)
self.debug("Result: %s" % result)
except Exception as e:
self.fail("SSH failed for Management server: %s" %
self.services["mgmt_server"]["ipaddress"])
return
@ -416,7 +469,11 @@ class TestAccountSnapshotClean(cloudstackTestCase):
self.apiclient,
id=self.account.account.id
)
self.assertEqual(
isinstance(accounts, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(accounts),
0,
@ -428,13 +485,21 @@ class TestAccountSnapshotClean(cloudstackTestCase):
self.apiclient,
id=self.virtual_machine.id
)
self.assertEqual(
isinstance(virtual_machines, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(virtual_machines),
0,
"Check list virtual machines response"
)
for virtual_machine in virtual_machines:
self.debug("VM ID: %s, VM state: %s" % (
virtual_machine.id,
virtual_machine.state
))
self.assertEqual(
virtual_machine.state,
'Running',
@ -446,6 +511,11 @@ class TestAccountSnapshotClean(cloudstackTestCase):
self.apiclient,
id=self.snapshot.id
)
self.assertEqual(
isinstance(snapshots, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
snapshots,
None,
@ -462,6 +532,11 @@ class TestAccountSnapshotClean(cloudstackTestCase):
"select backup_snap_id, account_id, volume_id from snapshots where id = %s;" \
% self.snapshot.id
)
self.assertEqual(
isinstance(qresultset, list),
True,
"Check DB response returns a valid list"
)
self.assertNotEqual(
len(qresultset),
0,
@ -473,20 +548,45 @@ class TestAccountSnapshotClean(cloudstackTestCase):
account_id = qresult[1]
volume_id = qresult[2]
# Get the Secondary Storage details from list Hosts
hosts = list_hosts(
self.apiclient,
type='SecondaryStorage',
zoneid=self.zone.id
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list response returns a valid list"
)
# hosts[0].name = "nfs://192.168.100.21/export/test"
parse_url = (hosts[0].name).split('/')
# parse_url = ['nfs:', '', '192.168.100.21', 'export', 'test']
# Split IP address and export path from name
sec_storage_ip = parse_url[2]
# Sec Storage IP: 192.168.100.21
export_path = '/'.join(parse_url[3:])
# Export path: export/test
# Sleep to ensure that snapshot is reflected in sec storage
time.sleep(self.services["sleep"])
# Login to Secondary storage VM to check snapshot present on sec disk
ssh_client = remoteSSHClient.remoteSSHClient(
try:
# Login to Secondary storage VM to check snapshot present on sec disk
ssh_client = remoteSSHClient.remoteSSHClient(
self.services["mgmt_server"]["ipaddress"],
self.services["mgmt_server"]["port"],
self.services["mgmt_server"]["username"],
self.services["mgmt_server"]["password"],
)
cmds = [ "mkdir -p %s" % self.services["mount_dir"],
cmds = [
"mkdir -p %s" % self.services["mount_dir"],
"mount %s:/%s %s" % (
self.services["sec_storage"],
self.services["exportpath"],
sec_storage_ip,
export_path,
self.services["mount_dir"]
),
"ls %s/snapshots/%s/%s" % (
@ -496,21 +596,29 @@ class TestAccountSnapshotClean(cloudstackTestCase):
),
]
for c in cmds:
result = ssh_client.execute(c)
res = str(result)
self.assertEqual(
for c in cmds:
self.debug("command: %s" % c)
result = ssh_client.execute(c)
self.debug("Result: %s" % result)
res = str(result)
self.assertEqual(
res.count(snapshot_uuid),
1,
"Check snapshot UUID in secondary storage and database"
)
# Unmount the Sec Storage
cmds = [
# Unmount the Sec Storage
cmds = [
"umount %s" % (self.services["mount_dir"]),
]
for c in cmds:
result = ssh_client.execute(c)
for c in cmds:
result = ssh_client.execute(c)
except Exception:
self.fail("SSH failed for management server: %s" %
self.services["mgmt_server"]["ipaddress"])
self.debug("Deleting account: %s" % self.account.account.name)
# Delete account
self.account.delete(self.apiclient)
@ -518,6 +626,13 @@ class TestAccountSnapshotClean(cloudstackTestCase):
self.apiclient,
name='account.cleanup.interval'
)
self.assertEqual(
isinstance(interval, list),
True,
"Check list response returns a valid list"
)
self.debug("account.cleanup.interval: %s" % interval[0].value)
# Wait for account cleanup interval
time.sleep(int(interval[0].value) * 2)
@ -526,33 +641,44 @@ class TestAccountSnapshotClean(cloudstackTestCase):
self.apiclient,
id=self.account.account.id
)
cmds = [ "mount %s:/%s %s" % (
self.services["sec_storage"],
self.services["exportpath"],
try:
cmds = [
"mount %s:/%s %s" % (
sec_storage_ip,
export_path,
self.services["mount_dir"]
),
"ls %s/snapshots/%s/%s" % (
"ls %s/snapshots/%s/%s" % (
self.services["mount_dir"],
account_id,
volume_id
),
]
for c in cmds:
result = ssh_client.execute(c)
res = str(result)
self.assertNotEqual(
for c in cmds:
self.debug("command: %s" % c)
result = ssh_client.execute(c)
self.debug("Result: %s" % result)
res = str(result)
self.assertNotEqual(
res.count(snapshot_uuid),
1,
"Check snapshot UUID in secondary storage and database"
)
# Unmount the Sec Storage
cmds = [
# Unmount the Sec Storage
cmds = [
"umount %s" % (self.services["mount_dir"]),
]
for c in cmds:
result = ssh_client.execute(c)
]
for c in cmds:
self.debug("command: %s" % c)
result = ssh_client.execute(c)
self.debug("Result: %s" % result)
except Exception:
self.fail("SSH failed for management server: %s" %
self.services["mgmt_server"]["ipaddress"])
return
@ -649,18 +775,24 @@ class TestSnapshotDetachedDisk(cloudstackTestCase):
virtualmachineid=self.virtual_machine.id,
type='DATADISK'
)
self.assertEqual(
isinstance(volumes, list),
True,
"Check list response returns a valid list"
)
volume = volumes[0]
random_data_0 = random_gen(100)
random_data_1 = random_gen(100)
try:
ssh_client = self.virtual_machine.get_ssh_client()
ssh_client = self.virtual_machine.get_ssh_client()
#Format partition using ext3
format_volume_to_ext3(
#Format partition using ext3
format_volume_to_ext3(
ssh_client,
self.services["diskdevice"]
)
cmds = [ "mkdir -p %s" % self.services["mount_dir"],
cmds = [
"mkdir -p %s" % self.services["mount_dir"],
"mount %s1 %s" % (
self.services["diskdevice"],
self.services["mount_dir"]
@ -684,46 +816,49 @@ class TestSnapshotDetachedDisk(cloudstackTestCase):
self.services["random_data"]
)
]
for c in cmds:
self.debug(ssh_client.execute(c))
for c in cmds:
self.debug(ssh_client.execute(c))
#detach volume from VM
cmd = detachVolume.detachVolumeCmd()
cmd.id = volume.id
self.apiclient.detachVolume(cmd)
#detach volume from VM
cmd = detachVolume.detachVolumeCmd()
cmd.id = volume.id
self.apiclient.detachVolume(cmd)
#Create snapshot from detached volume
snapshot = Snapshot.create(self.apiclient, volume.id)
self.cleanup.append(snapshot)
#Create snapshot from detached volume
snapshot = Snapshot.create(self.apiclient, volume.id)
self.cleanup.append(snapshot)
volumes = list_volumes(
volumes = list_volumes(
self.apiclient,
virtualmachineid=self.virtual_machine.id,
type='DATADISK'
)
self.assertEqual(
self.assertEqual(
volumes,
None,
"Check Volume is detached"
)
# Verify the snapshot was created or not
snapshots = list_snapshots(
# Verify the snapshot was created or not
snapshots = list_snapshots(
self.apiclient,
id=snapshot.id
)
self.assertNotEqual(
self.assertNotEqual(
snapshots,
None,
"Check if result exists in list snapshots call"
)
self.assertEqual(
self.assertEqual(
snapshots[0].id,
snapshot.id,
"Check snapshot id in list resources call"
)
except Exception as e:
self.fail("SSH failed for VM with IP: %s" %
self.virtual_machine.ipaddress)
# Fetch values from database
qresultset = self.dbclient.execute(
"select backup_snap_id, account_id, volume_id from snapshots where id = %s;" \
@ -746,20 +881,45 @@ class TestSnapshotDetachedDisk(cloudstackTestCase):
"Check if backup_snap_id is not null"
)
# Get the Secondary Storage details from list Hosts
hosts = list_hosts(
self.apiclient,
type='SecondaryStorage',
zoneid=self.zone.id
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list response returns a valid list"
)
# hosts[0].name = "nfs://192.168.100.21/export/test"
parse_url = (hosts[0].name).split('/')
# parse_url = ['nfs:', '', '192.168.100.21', 'export', 'test']
# Split IP address and export path from name
sec_storage_ip = parse_url[2]
# Sec Storage IP: 192.168.100.21
export_path = '/'.join(parse_url[3:])
# Export path: export/test
# Sleep to ensure that snapshot is reflected in sec storage
time.sleep(self.services["sleep"])
# Login to Management server to check snapshot present on sec disk
ssh_client = remoteSSHClient.remoteSSHClient(
try:
# Login to Management server to check snapshot present on sec disk
ssh_client = remoteSSHClient.remoteSSHClient(
self.services["mgmt_server"]["ipaddress"],
self.services["mgmt_server"]["port"],
self.services["mgmt_server"]["username"],
self.services["mgmt_server"]["password"],
)
cmds = [ "mkdir -p %s" % self.services["mount_dir"],
cmds = [
"mkdir -p %s" % self.services["mount_dir"],
"mount %s:/%s %s" % (
self.services["sec_storage"],
self.services["exportpath"],
sec_storage_ip,
export_path,
self.services["mount_dir"]
),
"ls %s/snapshots/%s/%s" % (
@ -769,20 +929,23 @@ class TestSnapshotDetachedDisk(cloudstackTestCase):
),
]
for c in cmds:
result = ssh_client.execute(c)
res = str(result)
self.assertEqual(
for c in cmds:
result = ssh_client.execute(c)
res = str(result)
self.assertEqual(
res.count(snapshot_uuid),
1,
"Check snapshot UUID in secondary storage and database"
)
# Unmount the Sec Storage
cmds = [
# Unmount the Sec Storage
cmds = [
"umount %s" % (self.services["mount_dir"]),
]
for c in cmds:
result = ssh_client.execute(c)
]
for c in cmds:
result = ssh_client.execute(c)
except Exception as e:
self.fail("SSH failed for management server: %s" %
self.services["mgmt_server"]["ipaddress"])
return
@ -870,6 +1033,11 @@ class TestSnapshotLimit(cloudstackTestCase):
virtualmachineid=self.virtual_machine.id,
type='ROOT'
)
self.assertEqual(
isinstance(volumes, list),
True,
"Check list response returns a valid list"
)
volume = volumes[0]
# Create a snapshot policy
@ -885,6 +1053,12 @@ class TestSnapshotLimit(cloudstackTestCase):
id=recurring_snapshot.id,
volumeid=volume.id
)
self.assertEqual(
isinstance(snapshot_policy, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
snapshot_policy,
None,
@ -915,7 +1089,12 @@ class TestSnapshotLimit(cloudstackTestCase):
self.services["recurring_snapshot"]["intervaltype"],
snapshottype='RECURRING'
)
self.assertEqual(
isinstance(snapshots, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(snapshots),
self.services["recurring_snapshot"]["maxsnaps"],
@ -929,6 +1108,11 @@ class TestSnapshotLimit(cloudstackTestCase):
"select backup_snap_id, account_id, volume_id from snapshots where id = %s;" \
% self.snapshot.id
)
self.assertEqual(
isinstance(qresultset, list),
True,
"Check DBQuery returns a valid list"
)
self.assertNotEqual(
len(qresultset),
0,
@ -940,18 +1124,42 @@ class TestSnapshotLimit(cloudstackTestCase):
account_id = qresult[1]
volume_id = qresult[2]
# Login to VM to check snapshot present on sec disk
ssh_client = remoteSSHClient.remoteSSHClient(
# Get the Secondary Storage details from list Hosts
hosts = list_hosts(
self.apiclient,
type='SecondaryStorage',
zoneid=self.zone.id
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list response returns a valid list"
)
# hosts[0].name = "nfs://192.168.100.21/export/test"
parse_url = (hosts[0].name).split('/')
# parse_url = ['nfs:', '', '192.168.100.21', 'export', 'test']
# Split IP address and export path from name
sec_storage_ip = parse_url[2]
# Sec Storage IP: 192.168.100.21
export_path = '/'.join(parse_url[3:])
# Export path: export/test
try:
# Login to VM to check snapshot present on sec disk
ssh_client = remoteSSHClient.remoteSSHClient(
self.services["mgmt_server"]["ipaddress"],
self.services["mgmt_server"]["port"],
self.services["mgmt_server"]["username"],
self.services["mgmt_server"]["password"],
)
cmds = [ "mkdir -p %s" % self.services["mount_dir"],
cmds = [
"mkdir -p %s" % self.services["mount_dir"],
"mount %s:/%s %s" % (
self.services["sec_storage"],
self.services["exportpath"],
sec_storage_ip,
export_path,
self.services["mount_dir"]
),
"ls %s/snapshots/%s/%s" % (
@ -961,22 +1169,25 @@ class TestSnapshotLimit(cloudstackTestCase):
),
]
for c in cmds:
result = ssh_client.execute(c)
for c in cmds:
result = ssh_client.execute(c)
res = str(result)
self.assertEqual(
res = str(result)
self.assertEqual(
res.count(snapshot_uuid),
1,
"Check snapshot UUID in secondary storage and database"
)
# Unmount the Sec Storage
cmds = [
# Unmount the Sec Storage
cmds = [
"umount %s" % (self.services["mount_dir"]),
]
for c in cmds:
result = ssh_client.execute(c)
for c in cmds:
result = ssh_client.execute(c)
except Exception as e:
raise Exception("SSH access failed for management server: %s" %
self.services["mgmt_server"]["ipaddress"])
return
@ -1062,15 +1273,26 @@ class TestSnapshotEvents(cloudstackTestCase):
virtualmachineid=self.virtual_machine.id,
type='ROOT'
)
self.assertEqual(
isinstance(volumes, list),
True,
"Check list response returns a valid list"
)
volume = volumes[0]
# Create a snapshot from the ROOTDISK
snapshot = Snapshot.create(self.apiclient, volumes[0].id)
self.debug("Snapshot created with ID: %s" % snapshot.id)
snapshots = list_snapshots(
self.apiclient,
id=snapshot.id
)
self.assertEqual(
isinstance(snapshots, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
snapshots,
None,
@ -1091,6 +1313,11 @@ class TestSnapshotEvents(cloudstackTestCase):
domainid=self.account.account.domainid,
type='SNAPSHOT.DELETE'
)
self.assertEqual(
isinstance(events, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
events,
None,

View File

@ -79,8 +79,10 @@ class Services:
"templatefilter": 'self',
},
"templatefilter": 'self',
"destzoneid": 13, # For Copy template (Destination zone)
"destzoneid": 2, # For Copy template (Destination zone)
"ostypeid": 12,
"sleep": 60,
"timeout": 10,
"zoneid": 1,
# Optional, if specified the mentioned zone will be
# used for tests
@ -88,7 +90,7 @@ class Services:
}
# TODO : Testing
@unittest.skip("Testing is pending")
class TestCreateTemplate(cloudstackTestCase):
def setUp(self):
@ -171,28 +173,49 @@ class TestCreateTemplate(cloudstackTestCase):
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.debug(
"Registered a template of format: %s with ID: %s" % (
v["format"],
template.id
))
# Wait for template to download
template.download(self.apiclient)
self.cleanup.append(template)
# Wait for template status to be changed across
time.sleep(60)
list_template_response = list_templates(
time.sleep(self.services["sleep"])
timeout = self.services["timeout"]
while True:
list_template_response = list_templates(
self.apiclient,
templatefilter=\
self.services["templatefilter"],
id=template.id,
zoneid=self.zone.id
zoneid=self.zone.id,
account=self.account.account.name,
domainid=self.account.account.domainid
)
if isinstance(list_template_response, list):
break
elif timeout == 0:
raise Exception("List template failed!")
time.sleep(5)
timeout = timeout - 1
#Verify template response to check whether template added successfully
self.assertEqual(
isinstance(list_template_response, list),
True,
"Check for list template response return valid data"
)
self.assertNotEqual(
len(list_template_response),
0,
"Check template available in List Templates"
)
template_response = list_template_response[0]
self.assertEqual(
template_response.isready,
True,
@ -208,12 +231,18 @@ class TestCreateTemplate(cloudstackTestCase):
serviceofferingid=self.service_offering.id,
mode=self.services["mode"]
)
self.debug("creating an instance with template ID: %s" % template.id)
vm_response = list_virtual_machines(
self.apiclient,
id=virtual_machine.id,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
isinstance(vm_response, list),
True,
"Check for list VMs response after VM deployment"
)
#Verify VM response to check whether VM deployment was successful
self.assertNotEqual(
len(vm_response),
@ -268,13 +297,24 @@ class TestTemplates(cloudstackTestCase):
#Stop virtual machine
cls.virtual_machine.stop(cls.api_client)
timeout = cls.services["timeout"]
#Wait before server has be successfully stopped
time.sleep(30)
list_volume = list_volumes(
time.sleep(cls.services["sleep"])
while True:
list_volume = list_volumes(
cls.api_client,
virtualmachineid=cls.virtual_machine.id,
type='ROOT'
)
if isinstance(list_volume, list):
break
elif timeout == 0:
raise Exception("List volumes failed.")
time.sleep(5)
timeout = timeout -1
cls.volume = list_volume[0]
#Create template from volume
@ -332,7 +372,8 @@ class TestTemplates(cloudstackTestCase):
accountid=self.account.account.name,
serviceofferingid=self.service_offering.id,
)
self.debug("creating an instance with template ID: %s" % self.template.id)
self.cleanup.append(virtual_machine)
vm_response = list_virtual_machines(
self.apiclient,
@ -360,7 +401,12 @@ class TestTemplates(cloudstackTestCase):
# Validate the following
# 1. copy template should be successful and
# secondary storage should contain new copied template.
self.debug(
"Copying template from zone: %s to %s" % (
self.template.id,
self.services["destzoneid"]
))
cmd = copyTemplate.copyTemplateCmd()
cmd.id = self.template.id
cmd.destzoneid = self.services["destzoneid"]
@ -375,6 +421,12 @@ class TestTemplates(cloudstackTestCase):
id=self.template.id,
zoneid=self.services["destzoneid"]
)
self.assertEqual(
isinstance(list_template_response, list),
True,
"Check for list template response return valid list"
)
self.assertNotEqual(
len(list_template_response),
0,
@ -417,6 +469,12 @@ class TestTemplates(cloudstackTestCase):
id=self.template.id,
zoneid=self.zone.id
)
self.assertEqual(
isinstance(list_template_response, list),
True,
"Check for list template response return valid list"
)
self.assertNotEqual(
len(list_template_response),
0,
@ -429,8 +487,12 @@ class TestTemplates(cloudstackTestCase):
self.template.id,
"Check display text of updated template"
)
self.debug("Deleting template: %s" % self.template)
# Delete the template
self.template.delete(self.apiclient)
self.debug("Delete template: %s successful" % self.template)
list_template_response = list_templates(
self.apiclient,
templatefilter=\
@ -461,11 +523,16 @@ class TestTemplates(cloudstackTestCase):
type='ROOT'
)
volume = volumes[0]
self.debug("Creating a snapshot from volume: %s" % volume.id)
#Create a snapshot of volume
snapshot = Snapshot.create(self.apiclient, volume.id)
self.cleanup.append(snapshot)
snapshot = Snapshot.create(
self.apiclient,
volume.id,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.debug("Creating a template from snapshot: %s" % snapshot.id)
# Generate template from the snapshot
template = Template.create_from_snapshot(
self.apiclient,
@ -491,7 +558,7 @@ class TestTemplates(cloudstackTestCase):
template.id,
"Check new template id in list resources call"
)
self.debug("Deploying a VM from template: %s" % template.id)
# Deploy new virtual machine using template
virtual_machine = VirtualMachine.create(
self.apiclient,
@ -508,6 +575,12 @@ class TestTemplates(cloudstackTestCase):
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
isinstance(vm_response, list),
True,
"Check for list VM response return valid list"
)
#Verify VM response to check whether VM deployment was successful
self.assertNotEqual(
len(vm_response),

View File

@ -103,6 +103,7 @@ class Services:
# Optional, if specified the mentioned zone will be
# used for tests
"sleep": 60,
"timeout": 10,
"mode":'advanced'
}
@ -185,23 +186,35 @@ class TestVmUsage(cloudstackTestCase):
# VM.Destroy and volume .delete Event for the created account
# 4. Delete the account
self.debug("Stopping the VM: %s" % self.virtual_machine.id)
# Stop the VM
self.virtual_machine.stop(self.apiclient)
time.sleep(self.services["sleep"])
# Destroy the VM
self.debug("Destroying the VM: %s" % self.virtual_machine.id)
self.virtual_machine.delete(self.apiclient)
self.debug("select type from usage_event where account_id = %s;" \
% self.account.account.id)
qresultset = self.dbclient.execute(
"select type from usage_event where account_id = %s;" \
% self.account.account.id
)
self.assertEqual(
isinstance(qresultset, list),
True,
"Check DB query result set for valid data"
)
self.assertNotEqual(
len(qresultset),
0,
"Check DB Query result set"
)
qresult = str(qresultset)
self.debug("Query result: %s" % qresult)
# Check if VM.CREATE, VM.DESTROY events present in usage_event table
self.assertEqual(
qresult.count('VM.START'),
@ -336,13 +349,24 @@ class TestPublicIPUsage(cloudstackTestCase):
# has IP.Release event for released IP for this account
# 3. Delete the newly created account
self.debug("Deleting public IP: %s" %
self.public_ip.ipaddesss.ipaddress)
# Release one of the IP
self.public_ip.delete(self.apiclient)
self.debug("select type from usage_event where account_id = %s;" \
% self.account.account.id)
qresultset = self.dbclient.execute(
"select type from usage_event where account_id = %s;" \
% self.account.account.id
)
self.assertEqual(
isinstance(qresultset, list),
True,
"Check DB query result set for valid data"
)
self.assertNotEqual(
len(qresultset),
0,
@ -449,6 +473,7 @@ class TestVolumeUsage(cloudstackTestCase):
# disk of the destroyed VM
# Stop VM
self.debug("Stopping VM with ID: %s" % self.virtual_machine.id)
self.virtual_machine.stop(self.apiclient)
volume_response = list_volumes(
@ -456,16 +481,29 @@ class TestVolumeUsage(cloudstackTestCase):
virtualmachineid=self.virtual_machine.id,
type='DATADISK'
)
self.assertEqual(
isinstance(volume_response, list),
True,
"Check for valid list volumes response"
)
data_volume = volume_response[0]
# Detach data Disk
self.debug("Detaching volume ID: %s VM with ID: %s" % (
data_volume.id,
self.virtual_machine.id
))
self.virtual_machine.detach_volume(self.apiclient, data_volume)
# Delete Data disk
self.debug("Delete volume ID: %s" % data_volume.id)
cmd = deleteVolume.deleteVolumeCmd()
cmd.id = data_volume.id
self.apiclient.deleteVolume(cmd)
self.debug("select type from usage_event where account_id = %s;" \
% self.account.account.id)
qresultset = self.dbclient.execute(
"select type from usage_event where account_id = %s;" \
% self.account.account.id
@ -475,8 +513,14 @@ class TestVolumeUsage(cloudstackTestCase):
0,
"Check DB Query result set"
)
self.assertEqual(
isinstance(qresultset, list),
True,
"Check DB query result set for valid data"
)
qresult = str(qresultset)
self.debug("Query result: %s" % qresult)
# Check VOLUME.CREATE, VOLUME.DESTROY events in cloud.usage_event table
self.assertEqual(
qresult.count('VOLUME.CREATE'),
@ -586,21 +630,32 @@ class TestTemplateUsage(cloudstackTestCase):
self.services["templates"],
self.volume.id
)
self.debug("Created template with ID: %s" % self.template.id)
# Delete template
self.template.delete(self.apiclient)
self.debug("Deleted template with ID: %s" % self.template.id)
self.debug("select type from usage_event where account_id = %s;" \
% self.account.account.id)
qresultset = self.dbclient.execute(
"select type from usage_event where account_id = %s;" \
% self.account.account.id
)
self.assertEqual(
isinstance(qresultset, list),
True,
"Check DB query result set for valid data"
)
self.assertNotEqual(
len(qresultset),
0,
"Check DB Query result set"
)
qresult = str(qresultset)
self.debug("Query result: %s" % qresult)
# Check for TEMPLATE.CREATE, TEMPLATE.DELETE in cloud.usage_event table
self.assertEqual(
qresult.count('TEMPLATE.CREATE'),
@ -638,8 +693,14 @@ class TestISOUsage(cloudstackTestCase):
account=cls.account.account.name,
domainid=cls.account.account.domainid
)
# # Wait till ISO gets downloaded
cls.iso.download(cls.api_client)
try:
# Wait till ISO gets downloaded
cls.iso.download(cls.api_client)
except Exception as e:
raise Exception("%s: Failed to download ISO: %s" % (
e,
cls.iso.id
))
cls._cleanup = [
cls.account,
]
@ -681,12 +742,22 @@ class TestISOUsage(cloudstackTestCase):
# 4. Destroy the account
# Delete the ISO
self.debug("Deleting ISO with ID: %s" % self.iso.id)
self.iso.delete(self.apiclient)
self.debug("select type from usage_event where account_id = %s;" \
% self.account.account.id)
qresultset = self.dbclient.execute(
"select type from usage_event where account_id = %s;" \
% self.account.account.id
)
self.assertEqual(
isinstance(qresultset, list),
True,
"Check DB query result set for valid data"
)
self.assertNotEqual(
len(qresultset),
0,
@ -694,6 +765,7 @@ class TestISOUsage(cloudstackTestCase):
)
qresult = str(qresultset)
self.debug("Query result: %s" % qresult)
# Check for ISO.CREATE, ISO.DELETE events in cloud.usage_event table
# XXX: ISO.CREATE event is not logged in usage_event table.
# self.assertEqual(
@ -794,7 +866,9 @@ class TestLBRuleUsage(cloudstackTestCase):
# is registered for this account in cloud.usage_event table
# 4. Delete this account.
self.debug(
"Creating load balancer rule for public IP: %s" %
self.public_ip_1.ipaddress.id)
#Create Load Balancer rule and assign VMs to rule
lb_rule = LoadBalancerRule.create(
self.apiclient,
@ -803,12 +877,22 @@ class TestLBRuleUsage(cloudstackTestCase):
accountid=self.account.account.name
)
# Delete LB Rule
self.debug("Deleting LB rule with ID: %s" % lb_rule.id)
lb_rule.delete(self.apiclient)
self.debug("select type from usage_event where account_id = %s;" \
% self.account.account.id)
qresultset = self.dbclient.execute(
"select type from usage_event where account_id = %s;" \
% self.account.account.id
)
self.assertEqual(
isinstance(qresultset, list),
True,
"Check DB query result set for valid data"
)
self.assertNotEqual(
len(qresultset),
0,
@ -816,6 +900,8 @@ class TestLBRuleUsage(cloudstackTestCase):
)
qresult = str(qresultset)
self.debug("Query result: %s" % qresult)
# Check for LB.CREATE, LB.DELETE in cloud.usage_event table
self.assertEqual(
qresult.count('LB.CREATE'),
@ -915,17 +1001,36 @@ class TestSnapshotUsage(cloudstackTestCase):
virtualmachineid=self.virtual_machine.id,
type='ROOT'
)
self.assertEqual(
isinstance(volumes, list),
True,
"Check if list volumes return a valid data"
)
volume = volumes[0]
# Create a snapshot from the ROOTDISK
self.debug("Creating snapshot from volume: %s" % volumes[0].id)
snapshot = Snapshot.create(self.apiclient, volumes[0].id)
# Delete snapshot Rule
self.debug("Deleting snapshot: %s" % snapshot.id)
snapshot.delete(self.apiclient)
self.debug("select type from usage_event where account_id = %s;" \
% self.account.account.id)
qresultset = self.dbclient.execute(
"select type from usage_event where account_id = %s;" \
% self.account.account.id
)
self.assertEqual(
isinstance(qresultset, list),
True,
"Check if database query returns a valid data"
)
self.assertNotEqual(
len(qresultset),
0,
@ -933,6 +1038,8 @@ class TestSnapshotUsage(cloudstackTestCase):
)
qresult = str(qresultset)
self.debug("Query Result: %s" % qresult)
# Check for SNAPSHOT.CREATE, SNAPSHOT.DELETE events in cloud.usage_event
# table
self.assertEqual(
@ -1034,6 +1141,8 @@ class TestNatRuleUsage(cloudstackTestCase):
# is registered for this account in cloud.usage_event table
# 4. Delete this account.
self.debug("Creating NAT rule with public IP: %s" %
self.public_ip_1.ipaddress.id)
#Create NAT rule
nat_rule = NATRule.create(
self.apiclient,
@ -1043,12 +1152,21 @@ class TestNatRuleUsage(cloudstackTestCase):
)
# Delete NAT Rule
self.debug("Deleting NAT rule: %s" % nat_rule.id)
nat_rule.delete(self.apiclient)
self.debug("select type from usage_event where account_id = %s;" \
% self.account.account.id)
qresultset = self.dbclient.execute(
"select type from usage_event where account_id = %s;" \
% self.account.account.id
)
self.assertEqual(
isinstance(qresultset, list),
True,
"Check DB query result set for valid data"
)
self.assertNotEqual(
len(qresultset),
0,
@ -1056,6 +1174,8 @@ class TestNatRuleUsage(cloudstackTestCase):
)
qresult = str(qresultset)
self.debug("Query result: %s" % qresult)
# Check for NET.RULEADD, NET.RULEDELETE in cloud.usage_event table
self.assertEqual(
qresult.count('NET.RULEADD'),
@ -1070,7 +1190,7 @@ class TestNatRuleUsage(cloudstackTestCase):
)
return
@unittest.skipped("Feature in 3.0 release")
@unittest.skip("Feature in 3.0 release")
class TestVpnUsage(cloudstackTestCase):
@classmethod
@ -1156,6 +1276,8 @@ class TestVpnUsage(cloudstackTestCase):
# this account in cloud.usage_event table
# 4. Delete this account.
self.debug("Created VPN with public IP: %s" %
self.public_ip.ipaddress.id)
#Assign VPN to Public IP
vpn = Vpn.create(
self.apiclient,
@ -1164,6 +1286,9 @@ class TestVpnUsage(cloudstackTestCase):
domainid=self.account.account.domainid
)
self.debug("Created VPN user for account: %s" %
self.account.account.name)
vpnuser = VpnUser.create(
self.apiclient,
self.services["vpn_user"]["username"],
@ -1172,14 +1297,25 @@ class TestVpnUsage(cloudstackTestCase):
domainid=self.account.account.domainid
)
# Remove VPN user
self.debug("Deleting VPN user: %s" % vpnuser.id)
vpnuser.delete(self.apiclient)
# Delete VPN access
self.debug("Deleting VPN: %s" % vpn.id)
vpn.delete(self.apiclient)
self.debug("select type from usage_event where account_id = %s;" \
% self.account.account.id)
qresultset = self.dbclient.execute(
"select type from usage_event where account_id = %s;" \
% self.account.account.id
)
self.assertEqual(
isinstance(qresultset, list),
True,
"Check DB query result set for valid data"
)
self.assertNotEqual(
len(qresultset),
0,
@ -1187,6 +1323,9 @@ class TestVpnUsage(cloudstackTestCase):
)
qresult = str(qresultset)
self.debug("Query result: %s" % qresult)
# Check for VPN user related events
self.assertEqual(
qresult.count('VPN.USER.ADD'),
1,

View File

@ -69,13 +69,13 @@ class Services:
# Source URL where ISO is located
"ostypeid": 76,
},
"sleep": 90,
"sleep": 50,
"domainid": 1,
"ostypeid": 12,
"zoneid": 1,
"zoneid": 2,
# Optional, if specified the mentioned zone will be
# used for tests
"mode": 'advanced',
"mode": 'basic',
}
@ -150,6 +150,10 @@ class TestAttachVolume(cloudstackTestCase):
account=self.account.account.name,
diskofferingid=self.disk_offering.id
)
self.debug("Created volume: %s for account: %s" % (
volume.id,
self.account.account.name
))
# Check List Volume response for newly created volume
list_volume_response = list_volumes(
self.apiclient,
@ -165,13 +169,22 @@ class TestAttachVolume(cloudstackTestCase):
self.apiclient,
volume
)
self.debug("Attach volume: %s to VM: %s" % (
volume.id,
self.virtual_machine.id
))
# Check all volumes attached to same VM
list_volume_response = list_volumes(
self.apiclient,
virtualmachineid=self.virtual_machine.id,
type='DATADISK'
)
self.assertEqual(
isinstance(list_volume_response, list),
True,
"Check list volumes response for valid list"
)
self.assertNotEqual(
list_volume_response,
None,
@ -182,10 +195,9 @@ class TestAttachVolume(cloudstackTestCase):
self.services["volume"]["max"],
"Check number of data volumes attached to VM"
)
self.debug("Rebooting the VM: %s" % self.virtual_machine.id)
# Reboot VM
self.virtual_machine.reboot(self.apiclient)
# Sleep to ensure that VM is in ready state
time.sleep(self.services["sleep"])
vm_response = list_virtual_machines(
self.apiclient,
@ -197,6 +209,11 @@ class TestAttachVolume(cloudstackTestCase):
0,
"Check VMs available in List VMs response"
)
self.assertEqual(
isinstance(vm_response, list),
True,
"Check list VM response for valid list"
)
vm = vm_response[0]
self.assertEqual(
vm.state,
@ -204,14 +221,20 @@ class TestAttachVolume(cloudstackTestCase):
"Check the state of VM"
)
self.debug("Stopping the VM: %s" % self.virtual_machine.id)
# Stop VM
self.virtual_machine.stop(self.apiclient)
# Sleep to ensure that VM is in ready state
time.sleep(self.services["sleep"])
vm_response = list_virtual_machines(
self.apiclient,
id=self.virtual_machine.id,
)
self.assertEqual(
isinstance(vm_response, list),
True,
"Check list VM response for valid list"
)
#Verify VM response to check whether VM deployment was successful
self.assertNotEqual(
len(vm_response),
@ -226,6 +249,7 @@ class TestAttachVolume(cloudstackTestCase):
"Check the state of VM"
)
self.debug("Starting the VM: %s" % self.virtual_machine.id)
# Start VM
self.virtual_machine.start(self.apiclient)
# Sleep to ensure that VM is in ready state
@ -235,6 +259,12 @@ class TestAttachVolume(cloudstackTestCase):
self.apiclient,
id=self.virtual_machine.id,
)
self.assertEqual(
isinstance(vm_response, list),
True,
"Check list VM response for valid list"
)
#Verify VM response to check whether VM deployment was successful
self.assertNotEqual(
len(vm_response),
@ -266,11 +296,21 @@ class TestAttachVolume(cloudstackTestCase):
account=self.account.account.name,
diskofferingid=self.disk_offering.id
)
self.debug("Created volume: %s for account: %s" % (
volume.id,
self.account.account.name
))
# Check List Volume response for newly created volume
list_volume_response = list_volumes(
self.apiclient,
id=volume.id
)
self.assertEqual(
isinstance(list_volume_response, list),
True,
"Check list volumes response for valid list"
)
self.assertNotEqual(
list_volume_response,
None,
@ -278,6 +318,10 @@ class TestAttachVolume(cloudstackTestCase):
)
# Attach volume to VM
with self.assertRaises(Exception):
self.debug("Trying to Attach volume: %s to VM: %s" % (
volume.id,
self.virtual_machine.id
))
self.virtual_machine.attach_volume(
self.apiclient,
volume
@ -384,6 +428,10 @@ class TestAttachDetachVolume(cloudstackTestCase):
account=self.account.account.name,
diskofferingid=self.disk_offering.id
)
self.debug("Created volume: %s for account: %s" % (
volume.id,
self.account.account.name
))
self.cleanup.append(volume)
volumes.append(volume)
@ -392,11 +440,21 @@ class TestAttachDetachVolume(cloudstackTestCase):
self.apiclient,
id=volume.id
)
self.assertEqual(
isinstance(list_volume_response, list),
True,
"Check list volumes response for valid list"
)
self.assertNotEqual(
list_volume_response,
None,
"Check if volume exists in ListVolumes"
)
self.debug("Attach volume: %s to VM: %s" % (
volume.id,
self.virtual_machine.id
))
# Attach volume to VM
self.virtual_machine.attach_volume(
self.apiclient,
@ -409,6 +467,12 @@ class TestAttachDetachVolume(cloudstackTestCase):
virtualmachineid=self.virtual_machine.id,
type='DATADISK'
)
self.assertEqual(
isinstance(list_volume_response, list),
True,
"Check list volumes response for valid list"
)
self.assertNotEqual(
list_volume_response,
None,
@ -422,11 +486,16 @@ class TestAttachDetachVolume(cloudstackTestCase):
# Detach all volumes from VM
for volume in volumes:
self.debug("Detach volume: %s to VM: %s" % (
volume.id,
self.virtual_machine.id
))
self.virtual_machine.detach_volume(
self.apiclient,
volume
)
)
# Reboot VM
self.debug("Rebooting the VM: %s" % self.virtual_machine.id)
self.virtual_machine.reboot(self.apiclient)
# Sleep to ensure that VM is in ready state
time.sleep(self.services["sleep"])
@ -436,6 +505,12 @@ class TestAttachDetachVolume(cloudstackTestCase):
id=self.virtual_machine.id,
)
#Verify VM response to check whether VM deployment was successful
self.assertEqual(
isinstance(vm_response, list),
True,
"Check list VM response for valid list"
)
self.assertNotEqual(
len(vm_response),
0,
@ -447,8 +522,9 @@ class TestAttachDetachVolume(cloudstackTestCase):
'Running',
"Check the state of VM"
)
# Stop VM
self.debug("Stopping the VM: %s" % self.virtual_machine.id)
self.virtual_machine.stop(self.apiclient)
# Sleep to ensure that VM is in ready state
time.sleep(self.services["sleep"])
@ -458,6 +534,11 @@ class TestAttachDetachVolume(cloudstackTestCase):
id=self.virtual_machine.id,
)
#Verify VM response to check whether VM deployment was successful
self.assertEqual(
isinstance(vm_response, list),
True,
"Check list VM response for valid list"
)
self.assertNotEqual(
len(vm_response),
0,
@ -471,6 +552,7 @@ class TestAttachDetachVolume(cloudstackTestCase):
)
# Start VM
self.debug("Starting the VM: %s" % self.virtual_machine.id)
self.virtual_machine.start(self.apiclient)
# Sleep to ensure that VM is in ready state
time.sleep(self.services["sleep"])
@ -480,6 +562,11 @@ class TestAttachDetachVolume(cloudstackTestCase):
id=self.virtual_machine.id,
)
#Verify VM response to check whether VM deployment was successful
self.assertEqual(
isinstance(vm_response, list),
True,
"Check list VM response for valid list"
)
self.assertNotEqual(
len(vm_response),
0,
@ -579,11 +666,20 @@ class TestAttachVolumeISO(cloudstackTestCase):
account=self.account.account.name,
diskofferingid=self.disk_offering.id
)
self.debug("Created volume: %s for account: %s" % (
volume.id,
self.account.account.name
))
# Check List Volume response for newly created volume
list_volume_response = list_volumes(
self.apiclient,
id=volume.id
)
self.assertEqual(
isinstance(list_volume_response, list),
True,
"Check list volumes response for valid list"
)
self.assertNotEqual(
list_volume_response,
None,
@ -601,6 +697,11 @@ class TestAttachVolumeISO(cloudstackTestCase):
virtualmachineid=self.virtual_machine.id,
type='DATADISK'
)
self.assertEqual(
isinstance(list_volume_response, list),
True,
"Check list volumes response for valid list"
)
self.assertNotEqual(
list_volume_response,
None,
@ -618,10 +719,23 @@ class TestAttachVolumeISO(cloudstackTestCase):
account=self.account.account.name,
domainid=self.account.account.domainid,
)
self.debug("Created ISO with ID: %s for account: %s" % (
iso.id,
self.account.account.name
))
self.cleanup.append(iso)
iso.download(self.apiclient)
try:
self.debug("Downloading ISO with ID: %s" % iso.id)
iso.download(self.apiclient)
except Exception as e:
self.fail("Exception while downloading ISO %s: %s"\
% (iso.id, e))
#Attach ISO to virtual machine
self.debug("Attach ISO ID: %s to VM: %s" % (
iso.id,
self.virtual_machine.id
))
cmd = attachIso.attachIsoCmd()
cmd.id = iso.id
cmd.virtualmachineid = self.virtual_machine.id
@ -633,6 +747,12 @@ class TestAttachVolumeISO(cloudstackTestCase):
id=self.virtual_machine.id,
)
#Verify VM response to check whether VM deployment was successful
self.assertEqual(
isinstance(vm_response, list),
True,
"Check list VM response for valid list"
)
self.assertNotEqual(
len(vm_response),
0,
@ -733,6 +853,11 @@ class TestVolumes(cloudstackTestCase):
self.apiclient,
id=self.volume.id
)
self.assertEqual(
isinstance(list_volume_response, list),
True,
"Check list volumes response for valid list"
)
self.assertNotEqual(
list_volume_response,
None,
@ -758,7 +883,11 @@ class TestVolumes(cloudstackTestCase):
"Check whether volume has virtualmachineid field"
)
# Attach ISO to VM
# Attach volume to VM
self.debug("Attach volume: %s to VM: %s" % (
self.volume.id,
self.virtual_machine.id
))
self.virtual_machine.attach_volume(self.apiclient, self.volume)
# Check all volumes attached to same VM
@ -767,6 +896,11 @@ class TestVolumes(cloudstackTestCase):
virtualmachineid=self.virtual_machine.id,
type='DATADISK'
)
self.assertEqual(
isinstance(list_volume_response, list),
True,
"Check list volumes response for valid list"
)
self.assertNotEqual(
list_volume_response,
None,
@ -794,15 +928,25 @@ class TestVolumes(cloudstackTestCase):
# 2. Listvolumes should not have vmname and virtualmachineid fields for
# that volume.
self.debug("Detach volume: %s to VM: %s" % (
self.volume.id,
self.virtual_machine.id
))
self.virtual_machine.detach_volume(self.apiclient, self.volume)
#Sleep to ensure the current state will reflected in other calls
time.sleep(self.services["sleep"])
list_volume_response = list_volumes(
self.apiclient,
id=self.volume.id
)
self.apiclient,
id=self.volume.id
)
self.assertEqual(
isinstance(list_volume_response, list),
True,
"Check list volumes response for valid list"
)
self.assertNotEqual(
list_volume_response,
None,
@ -829,6 +973,7 @@ class TestVolumes(cloudstackTestCase):
# 1. volume should be deleted successfully and listVolume should not
# contain the deleted volume details.
self.debug("Deleting volume: %s" % self.volume.id)
cmd = deleteVolume.deleteVolumeCmd()
cmd.id = self.volume.id
self.apiclient.deleteVolume(cmd)

View File

@ -185,20 +185,10 @@ class VirtualMachine:
elif "domainid" in services:
cmd.domainid = services["domainid"]
# List Networks for that user
command = listNetworks.listNetworksCmd()
command.zoneid = services["zoneid"]
command.account = accountid or services["account"]
command.domainid = domainid or services["domainid"]
network = apiclient.listNetworks(command)
if networkids:
cmd.networkids = networkids
elif "networkids" in services:
cmd.networkids = services["networkids"]
elif network: #If user already has source NAT created, then use that
if hasattr(network[0], "account"):
cmd.networkids = network[0].id
if templateid:
cmd.templateid = templateid
@ -209,7 +199,7 @@ class VirtualMachine:
cmd.diskofferingid = services["diskoffering"]
if securitygroupids:
cmd.securitygroupids = securitygroupids
cmd.securitygroupids = [str(sg_id) for sg_id in securitygroupids]
if "userdata" in services:
cmd.userdata = base64.b64encode(services["userdata"])
@ -231,7 +221,7 @@ class VirtualMachine:
"TimeOutException: Failed to start VM (ID: %s)" %
virtual_machine.id)
time.sleep(30)
time.sleep(10)
timeout = timeout -1
if mode.lower() == 'advanced':
@ -1068,7 +1058,7 @@ class Host:
host = apiclient.addHost(cmd)
if isinstance(host, list):
return Host(host[0])
return Host(host[0].__dict__)
def delete(self, apiclient):
"""Delete Host"""
@ -1412,15 +1402,18 @@ class SecurityGroup:
cmd.domainid = domainid
if account:
cmd.account = account
cmd.usersecuritygrouplist[0].account=account
else:
cmd.usersecuritygrouplist[0].account='admin'
cmd.securitygroupid=self.id
cmd.protocol=services["protocol"]
cmd.startport = services["startport"]
cmd.endport = services["endport"]
cmd.usersecuritygrouplist[0].group=services["type"]
if services["protocol"] == 'ICMP':
cmd.icmptype = -1
cmd.icmpcode = -1
else:
cmd.startport = services["startport"]
cmd.endport = services["endport"]
cmd.cidrlist = services["cidrlist"]
return (apiclient.authorizeSecurityGroupIngress(cmd).__dict__)
def revoke(self, apiclient, id):
@ -1428,7 +1421,7 @@ class SecurityGroup:
cmd=revokeSecurityGroupIngress.revokeSecurityGroupIngressCmd()
cmd.id=id
apiclient.revokeSecurityGroupIngress()
return apiclient.revokeSecurityGroupIngress(cmd)
@classmethod
def list(cls, apiclient, **kwargs):