This commit contains the following items:

1. Reduce dependency on settings. (It should be able to run in any environment).
2. Simplify clean up by creating accounts in setup and deleting the account in teardown.
3. Check the list_XXX calls instead of querying the database.

Currently the following tests work per the above logic:
1. test_vm_life_cycle.py
2. test_disk_offerings.py
3. test_service_offerings.py

Rest are work in progress.
This commit is contained in:
Chirag Jog 2012-01-24 07:49:37 -08:00
parent 86256c04a1
commit b91d919e55
9 changed files with 663 additions and 604 deletions

View File

@ -19,7 +19,7 @@ class Account:
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, services, admin = False):
def create(cls, apiclient, services, admin=False):
cmd = createAccount.createAccountCmd()
#0 - User, 1 - Root Admin
cmd.accounttype = int(admin)
@ -31,7 +31,7 @@ class Account:
mdf = hashlib.md5()
mdf.update(services["password"])
cmd.password = mdf.hexdigest()
cmd.username = services["username"]
cmd.username = "-".join([services["username"], random_gen()])
account = apiclient.createAccount(cmd)
return Account(account.__dict__)
@ -43,8 +43,8 @@ class Account:
class VirtualMachine:
"""Manage virtual machine lifecycle
"""
"""Manage virtual machine lifecycle"""
def __init__(self, items, services):
self.__dict__.update(items)
self.username = services["username"]
@ -55,12 +55,24 @@ class VirtualMachine:
self.ipaddress = self.nic[0].ipaddress
@classmethod
def create(cls, apiclient, services, templateid = None, accountid = None, networkids = None):
def create(cls, apiclient, services, templateid=None, accountid=None,
networkids=None, serviceofferingid=None):
cmd = deployVirtualMachine.deployVirtualMachineCmd()
cmd.serviceofferingid = services["serviceoffering"]
if serviceofferingid:
cmd.serviceofferingid = serviceofferingid
elif "serviceoffering" in services:
cmd.serviceofferingid = services["serviceoffering"]
cmd.zoneid = services["zoneid"]
cmd.hypervisor = services["hypervisor"]
cmd.account = accountid or services["account"]
if accountid:
cmd.account = accountid
elif "account" in services:
cmd.account = services["account"]
cmd.domainid = services["domainid"]
# List Networks for that user
@ -75,7 +87,8 @@ class VirtualMachine:
elif "networkids" in services:
cmd.networkids = services["networkids"]
elif network: #If user already has source NAT created, then use that
cmd.networkids = network[0].id
if hasattr(network[0], "account"):
cmd.networkids = network[0].id
if templateid:
cmd.templateid = templateid
@ -84,9 +97,12 @@ class VirtualMachine:
if "diskoffering" in services:
cmd.diskofferingid = services["diskoffering"]
return VirtualMachine(apiclient.deployVirtualMachine(cmd).__dict__, services)
return VirtualMachine(
apiclient.deployVirtualMachine(cmd).__dict__,
services
)
def get_ssh_client(self, ipaddress = None, reconnect = False):
def get_ssh_client(self, ipaddress=None, reconnect=False):
if ipaddress != None:
self.ipaddress = ipaddress
if reconnect:
@ -128,13 +144,28 @@ class Volume:
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, services):
def create(cls, apiclient, services, zoneid=None, account=None,
diskofferingid=None):
cmd = createVolume.createVolumeCmd()
cmd.name = services["diskname"]
cmd.diskofferingid = services["volumeoffering"]
cmd.zoneid = services["zoneid"]
cmd.account = services["account"]
cmd.domainid = services["domainid"]
if diskofferingid:
cmd.diskofferingid = diskofferingid
elif "diskofferingid" in services:
cmd.diskofferingid = services["diskofferingid"]
if zoneid:
cmd.zoneid = zoneid
elif "zoneid" in services:
cmd.zoneid = services["zoneid"]
if account:
cmd.account = account
elif "account" in services:
cmd.account = services["account"]
return Volume(apiclient.createVolume(cmd).__dict__)
@classmethod
@ -285,11 +316,16 @@ class PublicIPAddress:
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, accountid, zoneid = None, domainid = None, services = None):
def create(cls, apiclient, accountid, zoneid=None, domainid=None,
services=None, networkid=None):
cmd = associateIpAddress.associateIpAddressCmd()
cmd.account = accountid
cmd.zoneid = zoneid or services["zoneid"]
cmd.domainid = domainid or services["domainid"]
if networkid:
cmd.networkid = networkid
return PublicIPAddress(apiclient.associateIpAddress(cmd).__dict__)
def delete(self, apiclient):
@ -306,7 +342,7 @@ class NATRule:
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, virtual_machine, services, ipaddressid = None):
def create(cls, apiclient, virtual_machine, services, ipaddressid=None):
cmd = createPortForwardingRule.createPortForwardingRuleCmd()
@ -314,9 +350,6 @@ class NATRule:
cmd.ipaddressid = ipaddressid
elif "ipaddressid" in services:
cmd.ipaddressid = services["ipaddressid"]
else: # If IP Address ID is not specified then assign new IP
public_ip = PublicIPAddress.create(apiclient, virtual_machine.account, virtual_machine.zoneid, virtual_machine.domainid, services)
cmd.ipaddressid = public_ip.ipaddress.id
cmd.privateport = services["privateport"]
cmd.publicport = services["publicport"]
@ -363,12 +396,15 @@ class DiskOffering:
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, services):
def create(cls, apiclient, services, custom=False):
cmd = createDiskOffering.createDiskOfferingCmd()
cmd.displaytext = services["displaytext"]
cmd.name = services["name"]
cmd.disksize = services["disksize"]
if custom:
cmd.customized = True
else:
cmd.disksize = services["disksize"]
return DiskOffering(apiclient.createDiskOffering(cmd).__dict__)
def delete(self, apiclient):
@ -411,7 +447,7 @@ class LoadBalancerRule:
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, services, ipaddressid, accountid = None):
def create(cls, apiclient, services, ipaddressid, accountid=None):
cmd = createLoadBalancerRule.createLoadBalancerRuleCmd()
cmd.publicipid = ipaddressid or services["ipaddressid"]
cmd.account = accountid or services["account"]
@ -540,3 +576,50 @@ class StoragePool:
cmd.id = self.id
apiclient.deleteStoragePool(cmd)
return
class Network:
"""Manage Network pools"""
def __init__(self, items):
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, services, accountid=None, domainid=None):
cmd = createNetwork.createNetworkCmd()
cmd.name = services["name"]
cmd.displaytext = services["displaytext"]
cmd.networkofferingid = services["networkoffering"]
cmd.zoneid = services["zoneid"]
if accountid:
cmd.account = accountid
if domainid:
cmd.domainid = domainid
return Network(apiclient.createNetwork(cmd).__dict__)
def delete(self, apiclient):
cmd = deleteNetwork.deleteNetworkCmd()
cmd.id = self.id
apiclient.deleteNetwork(cmd)
def get_zone(apiclient):
"Returns a default zone"
cmd = listZones.listZonesCmd()
return apiclient.listZones(cmd)[1]
def get_template(apiclient, zoneid, ostypeid=12):
"Returns a template"
cmd = listTemplates.listTemplatesCmd()
cmd.templatefilter = 'featured'
cmd.zoneid = zoneid
list_templates = apiclient.listTemplates(cmd)
for template in list_templates:
if template.ostypeid == ostypeid:
return template

View File

@ -17,16 +17,11 @@ class Services:
def __init__(self):
self.services = {
"off_1": {
"name": "Disk offering 1",
"displaytext": "Disk offering 1",
"off": {
"name": "Disk offering",
"displaytext": "Disk offering",
"disksize": 1 # in GB
},
"off_2": {
"name": "Disk offering 2",
"displaytext": "Disk offering 2",
"disksize": 1 # in GB
}
}
class TestCreateDiskOffering(cloudstackTestCase):
@ -55,7 +50,10 @@ class TestCreateDiskOffering(cloudstackTestCase):
# 1. createDiskOfferings should return valid info for new offering
# 2. The Cloud Database contains the valid information
disk_offering = DiskOffering.create(self.apiclient, self.services["off_1"])
disk_offering = DiskOffering.create(
self.apiclient,
self.services["off"]
)
self.cleanup.append(disk_offering)
cmd = listDiskOfferings.listDiskOfferingsCmd()
@ -71,39 +69,14 @@ class TestCreateDiskOffering(cloudstackTestCase):
self.assertEqual(
disk_response.displaytext,
self.services["off_1"]["displaytext"],
self.services["off"]["displaytext"],
"Check server id in createServiceOffering"
)
self.assertEqual(
disk_response.name,
self.services["off_1"]["name"],
self.services["off"]["name"],
"Check name in createServiceOffering"
)
#Verify the database entries for new disk offering
qresultset = self.dbclient.execute(
"select display_text, id from disk_offering where id = %s;"
% disk_offering.id
)
self.assertNotEqual(
len(qresultset),
0,
"Check DB Query result set"
)
qresult = qresultset[0]
self.assertEqual(
qresult[0],
self.services["off_1"]["displaytext"],
"Compare display text with database record"
)
self.assertEqual(
qresult[1],
disk_offering.id,
"Check ID in the database"
)
return
@ -129,8 +102,14 @@ class TestDiskOfferings(cloudstackTestCase):
def setUpClass(cls):
cls.services = Services().services
cls.api_client = fetch_api_client()
cls.disk_offering_1 = DiskOffering.create(cls.api_client, cls.services["off_1"])
cls.disk_offering_2 = DiskOffering.create(cls.api_client, cls.services["off_2"])
cls.disk_offering_1 = DiskOffering.create(
cls.api_client,
cls.services["off"]
)
cls.disk_offering_2 = DiskOffering.create(
cls.api_client,
cls.services["off"]
)
cls._cleanup = [cls.disk_offering_1]
return
@ -147,7 +126,8 @@ class TestDiskOfferings(cloudstackTestCase):
"""Test to update existing disk offering"""
# Validate the following:
# 1. updateDiskOffering should return a valid information for newly created offering
# 1. updateDiskOffering should return
# a valid information for newly created offering
#Generate new name & displaytext from random data
random_displaytext = random_gen()
@ -173,48 +153,23 @@ class TestDiskOfferings(cloudstackTestCase):
disk_response = list_disk_response[0]
self.assertEqual(
disk_response.displaytext,
random_displaytext,
"Check service displaytext in updateServiceOffering"
disk_response.displaytext,
random_displaytext,
"Check service displaytext in updateServiceOffering"
)
self.assertEqual(
disk_response.name,
random_name,
"Check service name in updateServiceOffering"
disk_response.name,
random_name,
"Check service name in updateServiceOffering"
)
#Verify database entries for updated disk offerings
qresultset = self.dbclient.execute(
"select display_text, id from disk_offering where id = %s;"
% self.disk_offering_1.id
)
self.assertNotEqual(
len(qresultset),
0,
"Check DB Query result set"
)
qresult = qresultset[0]
self.assertEqual(
qresult[0],
random_displaytext,
"Compare displaytext with database record"
)
self.assertEqual(
qresult[1],
self.disk_offering_1.id,
"Check name in the database"
)
return
def test_03_delete_disk_offering(self):
"""Test to delete disk offering"""
# Validate the following:
# 1. deleteDiskOffering should return a valid information for newly created offering
# 1. deleteDiskOffering should return
# a valid information for newly created offering
cmd = deleteDiskOffering.deleteDiskOfferingCmd()
cmd.id = self.disk_offering_2.id
@ -225,21 +180,8 @@ class TestDiskOfferings(cloudstackTestCase):
list_disk_response = self.apiclient.listDiskOfferings(cmd)
self.assertEqual(
list_disk_response,
None,
"Check if disk offering exists in listDiskOfferings"
list_disk_response,
None,
"Check if disk offering exists in listDiskOfferings"
)
#Verify database entry for deleted disk offering
qresultset = self.dbclient.execute(
"select display_text, name from disk_offering where id = %s;"
% str(self.disk_offering_2.id)
)
self.assertEqual(
len(qresultset),
1,
"Check DB Query result set"
)
return

View File

@ -24,29 +24,34 @@ class Services:
"user_account": "testuser",
"zoneid": 1,
"domainid": 1,
"service_offering": {
"name": "Tiny Service Offering",
"displaytext": "Tiny service offering",
"cpunumber": 1,
"cpuspeed": 100, # in MHz
"memory": 64, # In MBs
},
"account": {
"email": "test@test.com",
"firstname": "Test",
"lastname": "User",
"username": "testuser",
"password": "fr3sca",
"username": "test",
"password": "password",
"zoneid": 1,
},
"server":
{
"template": 206, # Template used for VM creation
"zoneid": 1,
"serviceoffering": 1,
"diskoffering": 3,
"template": 7, # Template used for VM creation
"zoneid": 3,
"displayname": "testserver",
"username": "root",
"password": "fr3sca",
"hypervisor": 'XenServer',
"hypervisor": 'VMWare',
"account": 'testuser',
"domainid": 1,
"ipaddressid": 4, # IP Address ID of Public IP, If not specified new public IP
"privateport": 22,
"publicport": 22,
"ssh_port": 22,
"protocol": 'TCP',
},
"natrule":
@ -170,13 +175,15 @@ class TestPortForwarding(cloudstackTestCase):
cls.services = Services().services
#Create an account, network, VM and IP addresses
cls.account = Account.create(cls.api_client, cls.services["account"], admin = True)
cls.service_offering = ServiceOffering.create(cls.api_client, cls.services["service_offering"])
cls.virtual_machine = VirtualMachine.create(
cls.api_client,
cls.services["server"],
accountid = cls.account.account.name,
serviceofferingid = cls.service_offering.id
)
cls._cleanup = [cls.virtual_machine, cls.account]
cls._cleanup = [cls.virtual_machine, cls.account, cls.service_offering]
def setUp(self):
self.apiclient = self.testClient.getApiClient()
@ -185,11 +192,11 @@ class TestPortForwarding(cloudstackTestCase):
@classmethod
def tearDownClass(cls):
cleanup_resources(cls.api_client, cls._cleanup)
#cleanup_resources(cls.api_client, cls._cleanup)
return
def tearDown(self):
cleanup_resources(self.apiclient, self.cleanup)
#cleanup_resources(self.apiclient, self.cleanup)
return
def test_01_port_fwd_on_src_nat(self):
@ -243,7 +250,7 @@ class TestPortForwarding(cloudstackTestCase):
# Check if the Public SSH port is inaccessible
with self.assertRaises(Exception):
remoteSSHClient.remoteSSHClient(
ip.ipaddress,
src_nat_ip_addr.ipaddress,
self.virtual_machine.ssh_port,
self.virtual_machine.username,
self.virtual_machine.password
@ -261,7 +268,8 @@ class TestPortForwarding(cloudstackTestCase):
self.apiclient,
self.account.account.name,
self.services["zoneid"],
self.services["domainid"]
self.services["domainid"],
self.services["server"]
)
self.clean_up.append(ip_address)
#Create NAT rule
@ -293,7 +301,7 @@ class TestPortForwarding(cloudstackTestCase):
)
try:
self.virtual_machine.get_ssh_client(public_ip = ip_address.ipaddress.ipaddress)
self.virtual_machine.get_ssh_client(ip_address.ipaddress.ipaddress)
except Exception as e:
self.fail("SSH Access failed for %s: %s" % (self.virtual_machine.ipaddress.ipaddress, e))
@ -330,23 +338,27 @@ class TestLoadBalancingRule(cloudstackTestCase):
cls.services = Services().services
#Create an account, network, VM and IP addresses
cls.account = Account.create(cls.api_client, cls.services["account"], admin = True)
cls.service_offering = ServiceOffering.create(cls.api_client, cls.services["service_offering"])
cls.vm_1 = VirtualMachine.create(
cls.api_client,
cls.services["server"],
accountid = cls.account.account.name,
serviceofferingid = cls.service_offering.id
)
cls.vm_2 = VirtualMachine.create(
cls.api_client,
cls.services["server"],
accountid = cls.account.account.name,
serviceofferingid = cls.service_offering.id
)
cls.non_src_nat_ip = PublicIPAddress.create(
cls.api_client,
cls.account.account.name,
cls.services["zoneid"],
cls.services["domainid"]
cls.services["domainid"],
cls.services["server"]
)
cls._cleanup = [cls.vm_1, cls.vm_2, cls.non_src_nat_ip, cls.account]
cls._cleanup = [cls.vm_1, cls.vm_2, cls.non_src_nat_ip, cls.account, cls.service_offering]
def setUp(self):
self.apiclient = self.testClient.getApiClient()
@ -553,12 +565,19 @@ class TestRebootRouter(cloudstackTestCase):
self.services = Services().services
#Create an account, network, VM and IP addresses
self.account = Account.create(self.apiclient, self.services["account"], admin = True)
self.service_offering = ServiceOffering.create(self.apiclient, self.services["service_offering"])
self.vm_1 = VirtualMachine.create(
self.apiclient,
self.services["server"],
accountid = self.account.account.name,
serviceofferingid = self.service_offering.id
)
cmd = listPublicIpAddresses.listPublicIpAddressesCmd()
cmd.account = self.account.account.name
cmd.domainid = self.account.account.domainid
src_nat_ip_addr = self.apiclient.listPublicIpAddresses(cmd)[0]
lb_rule = LoadBalancerRule.create(
self.apiclient,
self.services["lbrule"],
@ -566,8 +585,8 @@ class TestRebootRouter(cloudstackTestCase):
self.account.account.name
)
lb_rule.assign(self.apiclient, [self.vm_1])
#nat_rule = NATRule.create(self.apiclient, self.vm_1, self.services["natrule"], src_nat_ip_addr.id)
self.cleanup = [self.vm_1, lb_rule, self.account]
self.nat_rule = NATRule.create(self.apiclient, self.vm_1, self.services["natrule"], ipaddressid = src_nat_ip_addr.id)
self.cleanup = [self.vm_1, lb_rule, self.service_offering, self.account, self.nat_rule]
return
def test_reboot_router(self):
@ -619,26 +638,30 @@ class TestAssignRemoveLB(cloudstackTestCase):
self.services = Services().services
#Create VMs, accounts
self.account = Account.create(self.apiclient, self.services["account"], admin = True)
self.service_offering = ServiceOffering.create(self.apiclient, self.services["service_offering"])
self.vm_1 = VirtualMachine.create(
self.apiclient,
self.services["server"],
accountid = self.account.account.name,
serviceofferingid = self.service_offering.id
)
self.vm_2 = VirtualMachine.create(
self.apiclient,
self.services["server"],
accountid = self.account.account.name,
serviceofferingid = self.service_offering.id
)
self.vm_3 = VirtualMachine.create(
self.apiclient,
self.services["server"],
accountid = self.account.account.name,
serviceofferingid = self.service_offering.id
)
self.cleanup = [self.vm_1, self.vm_2, self.vm_3, self.account]
self.cleanup = [self.vm_1, self.vm_2, self.vm_3, self.account, self.service_offering]
return
@ -720,10 +743,13 @@ class TestReleaseIP(cloudstackTestCase):
#Create an account, network, VM, Port forwarding rule, LB rules and IP addresses
self.account = Account.create(self.apiclient, self.services["account"], admin = True)
self.service_offering = ServiceOffering.create(self.apiclient, self.services["service_offering"])
self.virtual_machine = VirtualMachine.create(
self.apiclient,
self.services["server"],
accountid = self.account.account.name,
serviceofferingid = self.service_offering.id
)
self.ip_address = PublicIPAddress.create(
@ -803,10 +829,12 @@ class TestDeleteAccount(cloudstackTestCase):
self.services = Services().services
#Create an account, network, VM and IP addresses
self.account = Account.create(self.apiclient, self.services["account"], admin = True)
self.service_offering = ServiceOffering.create(self.apiclient, self.services["service_offering"])
self.vm_1 = VirtualMachine.create(
self.apiclient,
self.services["server"],
accountid = self.account.account.name,
serviceofferingid = self.service_offering.id
)
cmd = listPublicIpAddresses.listPublicIpAddressesCmd()
@ -865,4 +893,3 @@ class TestDeleteAccount(cloudstackTestCase):
def tearDown(self):
cleanup_resources(self.apiclient, self.cleanup)
return

View File

@ -20,18 +20,23 @@ class Services:
def __init__(self):
self.services = {
"service_offering": {
"name": "Tiny Service Offering",
"displaytext": "Tiny service offering",
"cpunumber": 1,
"cpuspeed": 100, # in MHz
"memory": 64, # In MBs
},
"virtual_machine":
{
"template": 206, # Template used for VM creation
"zoneid": 1,
"serviceoffering": 1,
"template": 7, # Template used for VM creation
"zoneid": 3,
"displayname": "testserver",
"username": "root",
"password": "fr3sca",
"ssh_port": 22,
"hypervisor": 'XenServer',
"hypervisor": 'VMWare',
"domainid": 1,
"ipaddressid": 4, # IP Address ID of Public IP, If not specified new public IP
"privateport": 22,
"publicport": 22,
"protocol": 'TCP',
@ -56,14 +61,16 @@ class TestRouterServices(cloudstackTestCase):
cls.services = Services().services
#Create an account, network, VM and IP addresses
cls.account = Account.create(cls.api_client, cls.services["account"])
cls.account = Account.create(cls.api_client, cls.services["account"], admin = True)
cls.service_offering = ServiceOffering.create(cls.api_client, cls.services["service_offering"])
cls.vm_1 = VirtualMachine.create(
cls.api_client,
cls.services["virtual_machine"],
accountid = cls.account.account.name,
serviceofferingid = cls.service_offering.id
)
cls.cleanup = [cls.vm_1, cls.account]
cls.cleanup = [cls.vm_1, cls.account, cls.service_offering]
return
@classmethod

View File

@ -18,23 +18,14 @@ class Services:
def __init__(self):
self.services = {
"off_1":
"off":
{
"name": "Service Offering 1",
"displaytext": "Service Offering 1",
"name": "Service Offering",
"displaytext": "Service Offering",
"cpunumber": 1,
"cpuspeed": 200, # MHz
"memory": 200, # in MBs
"cpuspeed": 100, # MHz
"memory": 64, # in MBs
},
"off_2":
{
"name": "Service Offering 2",
"displaytext": "Service Offering 2",
"cpunumber": 1,
"cpuspeed": 200, # MHz
"memory": 200, # in MBs
}
}
class TestCreateServiceOffering(cloudstackTestCase):
@ -63,7 +54,10 @@ class TestCreateServiceOffering(cloudstackTestCase):
# 1. createServiceOfferings should return a valid information for newly created offering
# 2. The Cloud Database contains the valid information
service_offering = ServiceOffering.create(self.apiclient, self.services["off_1"])
service_offering = ServiceOffering.create(
self.apiclient,
self.services["off"]
)
self.cleanup.append(service_offering)
cmd = listServiceOfferings.listServiceOfferingsCmd()
@ -71,66 +65,37 @@ class TestCreateServiceOffering(cloudstackTestCase):
list_service_response = self.apiclient.listServiceOfferings(cmd)
self.assertNotEqual(
len(list_service_response),
0,
"Check Service offering is created"
len(list_service_response),
0,
"Check Service offering is created"
)
service_response = list_service_response[0]
self.assertEqual(
list_service_response[0].cpunumber,
self.services["off_1"]["cpunumber"],
"Check server id in createServiceOffering"
list_service_response[0].cpunumber,
self.services["off"]["cpunumber"],
"Check server id in createServiceOffering"
)
self.assertEqual(
list_service_response[0].cpuspeed,
self.services["off_1"]["cpuspeed"],
"Check cpuspeed in createServiceOffering"
list_service_response[0].cpuspeed,
self.services["off"]["cpuspeed"],
"Check cpuspeed in createServiceOffering"
)
self.assertEqual(
list_service_response[0].displaytext,
self.services["off_1"]["displaytext"],
"Check server displaytext in createServiceOfferings"
list_service_response[0].displaytext,
self.services["off"]["displaytext"],
"Check server displaytext in createServiceOfferings"
)
self.assertEqual(
list_service_response[0].memory,
self.services["off_1"]["memory"],
"Check memory in createServiceOffering"
list_service_response[0].memory,
self.services["off"]["memory"],
"Check memory in createServiceOffering"
)
self.assertEqual(
list_service_response[0].name,
self.services["off_1"]["name"],
self.services["off"]["name"],
"Check name in createServiceOffering"
)
#Verify the service offerings with database records
qresultset = self.dbclient.execute(
"select cpu, speed, ram_size from service_offering where id = %s;"
% service_offering.id
)
self.assertNotEqual(
len(qresultset),
0,
"Check DB Query result set"
)
qresult = qresultset[0]
self.assertEqual(
qresult[0],
self.services["off_1"]["cpunumber"],
"Check number of CPUs allocated to service offering in the database"
)
self.assertEqual(
qresult[1],
self.services["off_1"]["cpuspeed"],
"Check number of CPUs allocated to service offering in the database"
)
self.assertEqual(
qresult[2],
self.services["off_1"]["memory"],
"Check number of CPUs allocated to service offering in the database"
)
return
@ -157,8 +122,14 @@ class TestServiceOfferings(cloudstackTestCase):
def setUpClass(cls):
cls.services = Services().services
cls.api_client = fetch_api_client()
cls.service_offering_1 = ServiceOffering.create(cls.api_client, cls.services["off_1"])
cls.service_offering_2 = ServiceOffering.create(cls.api_client, cls.services["off_2"])
cls.service_offering_1 = ServiceOffering.create(
cls.api_client,
cls.services["off"]
)
cls.service_offering_2 = ServiceOffering.create(
cls.api_client,
cls.services["off"]
)
cls._cleanup = [cls.service_offering_1]
return
@ -177,7 +148,8 @@ class TestServiceOfferings(cloudstackTestCase):
"""Test to update existing service offering"""
# Validate the following:
# 1. updateServiceOffering should return a valid information for newly created offering
# 1. updateServiceOffering should return
# a valid information for newly created offering
#Generate new name & displaytext from random data
random_displaytext = random_gen()
@ -212,43 +184,14 @@ class TestServiceOfferings(cloudstackTestCase):
"Check server name in updateServiceOffering"
)
#Verify updated values in database
qresultset = self.dbclient.execute(
"select id, display_text, name from disk_offering where type='Service' and id = %s;"
% self.service_offering_1.id
)
self.assertNotEqual(
len(qresultset),
0,
"Check DB Query result set"
)
qresult = qresultset[0]
self.assertEqual(
qresult[0],
self.service_offering_1.id,
"Check service offering ID in the database"
)
self.assertEqual(
qresult[1],
random_displaytext,
"Check service offering ID in the database"
)
self.assertEqual(
qresult[2],
random_name,
"Check service offering ID in the database"
)
return
def test_03_delete_service_offering(self):
"""Test to delete service offering"""
# Validate the following:
# 1. deleteServiceOffering should return a valid information for newly created offering
# 1. deleteServiceOffering should return
# a valid information for newly created offering
cmd = deleteServiceOffering.deleteServiceOfferingCmd()
#Add the required parameters required to call for API
@ -260,20 +203,10 @@ class TestServiceOfferings(cloudstackTestCase):
list_service_response = self.apiclient.listServiceOfferings(cmd)
self.assertEqual(
list_service_response,
None,
"Check if service offering exists in listDiskOfferings"
)
list_service_response,
None,
"Check if service offering exists in listDiskOfferings"
)
#Verify database records for deleted service offerings
qresultset = self.dbclient.execute(
"select id from service_offering where id = %s;"
% self.service_offering_2.id
)
self.assertEqual(
len(qresultset),
1,
"Check DB Query result set"
)
return

View File

@ -17,20 +17,25 @@ class Services:
def __init__(self):
self.services = {
"service_offering": {
"name": "Tiny Service Offering",
"displaytext": "Tiny service offering",
"cpunumber": 1,
"cpuspeed": 100, # in MHz
"memory": 64, # In MBs
},
"server_with_disk":
{
"template": 206, # Template used for VM creation
"template": 256, # Template used for VM creation
"zoneid": 1,
"serviceoffering": 1,
"diskoffering": 3, # Optional, if specified data disk will be allocated to VM
"displayname": "testserver",
"username": "root",
"password": "fr3sca",
"password": "password",
"ssh_port": 22,
"hypervisor": 'XenServer',
"account": 'testuser',
"domainid": 1,
"ipaddressid": 4, # IP Address ID of Public IP, If not specified new public IP
"privateport": 22,
"publicport": 22,
"protocol": 'TCP',
@ -38,17 +43,15 @@ class Services:
"server_without_disk":
{
"template": 206, # Template used for VM creation
"template": 256, # Template used for VM creation
"zoneid": 1,
"serviceoffering": 1,
"displayname": "testserver",
"username": "root",
"password": "fr3sca",
"password": "password",
"ssh_port": 22,
"hypervisor": 'XenServer',
"account": 'testuser',
"domainid": 1,
"ipaddressid": 4, # IP Address ID of Public IP, If not specified new public IP
"privateport": 22, # For NAT rule creation
"publicport": 22,
"protocol": 'TCP',
@ -66,7 +69,7 @@ class Services:
"templates":
{
"displaytext": 'Test template snapshot',
"name": 'template_from_snapshot_3',
"name": 'template_from_snapshot',
"ostypeid": 12,
"templatefilter": 'self',
},
@ -76,8 +79,7 @@ class Services:
"serviceofferingid": 1,
},
"diskdevice": "/dev/xvda",
"offerings": 1,
"template": 206,
"template": 256,
"zoneid": 1,
"diskoffering": 3,
"diskname": "TestDiskServ",
@ -91,9 +93,11 @@ class Services:
"random_data": "random.data",
"exportpath": 'SecondaryStorage',
"sec_storage": '192.168.100.131',
# IP address of Sec storage where snapshots are stored
"mgmt_server_ip": '192.168.100.154',
# Required for verifying snapshots on secondary storage
"username": "root",
"password": "fr3sca",
"password": "password",
"ssh_port": 22,
}
@ -103,12 +107,21 @@ class TestSnapshots(cloudstackTestCase):
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.services = Services().services
cls.service_offering = ServiceOffering.create(cls.api_client, cls.services["service_offering"])
cls.virtual_machine = cls.virtual_machine_with_disk = \
VirtualMachine.create(cls.api_client, cls.services["server_with_disk"])
VirtualMachine.create(cls.api_client, cls.services["server_with_disk"], serviceofferingid = cls.service_offering.id)
cls.virtual_machine_without_disk = \
VirtualMachine.create(cls.api_client, cls.services["server_without_disk"])
cls.nat_rule = NATRule.create(cls.api_client, cls.virtual_machine, cls.services["server_with_disk"])
cls._cleanup = [cls.virtual_machine, cls.virtual_machine_without_disk, cls.nat_rule]
VirtualMachine.create(cls.api_client, cls.services["server_without_disk"], serviceofferingid = cls.service_offering.id)
cls.public_ip = PublicIPAddress.create(
cls.api_client,
cls.virtual_machine.account,
cls.virtual_machine.zoneid,
cls.virtual_machine.domainid,
cls.services["server_with_disk"]
)
cls.nat_rule = NATRule.create(cls.api_client, cls.virtual_machine, cls.services["server_with_disk"], ipaddressid = cls.public_ip.ipaddress.id)
cls._cleanup = [cls.virtual_machine, cls.nat_rule, cls.virtual_machine_without_disk, cls.public_ip, cls.service_offering]
return
@classmethod
@ -268,10 +281,6 @@ class TestSnapshots(cloudstackTestCase):
random_data_0 = random_gen(100)
random_data_1 = random_gen(100)
cmd = listPublicIpAddresses.listPublicIpAddressesCmd()
cmd.id = self.services["server_with_disk"]["ipaddressid"]
public_ip = self.apiclient.listPublicIpAddresses(cmd)[0]
ssh_client = self.virtual_machine.get_ssh_client(self.nat_rule.ipaddress)
#Format partition using ext3
format_volume_to_ext3(ssh_client, self.services["diskdevice"])
@ -331,10 +340,6 @@ class TestSnapshots(cloudstackTestCase):
cmd.virtualmachineid = new_virtual_machine.id
volume = self.apiclient.attachVolume(cmd)
cmd = listPublicIpAddresses.listPublicIpAddressesCmd()
cmd.id = self.services["server_without_disk"]["ipaddressid"]
public_ip = self.apiclient.listPublicIpAddresses(cmd)[0]
#Login to VM to verify test directories and files
ssh = new_virtual_machine.get_ssh_client(self.nat_rule.ipaddress)
cmds = [
@ -504,10 +509,6 @@ class TestSnapshots(cloudstackTestCase):
random_data_0 = random_gen(100)
random_data_1 = random_gen(100)
cmd = listPublicIpAddresses.listPublicIpAddressesCmd()
cmd.id = self.services["server_with_disk"]["ipaddressid"]
public_ip = self.apiclient.listPublicIpAddresses(cmd)[0]
#Login to virtual machine
ssh_client = self.virtual_machine.get_ssh_client(self.nat_rule.ipaddress)
@ -564,14 +565,11 @@ class TestSnapshots(cloudstackTestCase):
new_virtual_machine = VirtualMachine.create(
self.apiclient,
self.services["server_without_disk"],
template.id
templateid = template.id,
serviceofferingid = self.service_offering.id
)
self.cleanup.append(new_virtual_machine)
cmd = listPublicIpAddresses.listPublicIpAddressesCmd()
cmd.id = self.services["server_without_disk"]["ipaddressid"]
public_ip = self.apiclient.listPublicIpAddresses(cmd)[0]
#Login to VM & mount directory
ssh = new_virtual_machine.get_ssh_client(self.nat_rule.ipaddress)
cmds = [

View File

@ -20,16 +20,27 @@ class Services:
def __init__(self):
self.services = {
"service_offering": {
"name": "Tiny Service Offering",
"displaytext": "Tiny service offering",
"cpunumber": 1,
"cpuspeed": 100, # in MHz
"memory": 64, # In MBs
},
"virtual_machine":
{
"template": 206, # Template used for VM creation
"template": 256, # Template used for VM creation
"zoneid": 1,
"serviceoffering": 1,
"displayname": "testVM",
"hypervisor": 'XenServer',
"account": 'admin', # Account for which VM should be created
"account": 'testuser', # Account for which VM should be created
"domainid": 1,
"protocol": 'TCP',
"ssh_port": 22,
"username" : "root",
"password": "password"
},
"volume":
{
@ -92,10 +103,12 @@ class TestCreateTemplate(cloudstackTestCase):
def setUpClass(cls):
cls.services = Services().services
cls.api_client = fetch_api_client()
cls.service_offering = ServiceOffering.create(cls.api_client, cls.services["service_offering"])
#create virtual machine
cls.virtual_machine = VirtualMachine.create(
cls.api_client,
cls.services["virtual_machine"]
cls.services["virtual_machine"],
serviceofferingid = cls.service_offering.id
)
#Stop virtual machine
@ -110,7 +123,7 @@ class TestCreateTemplate(cloudstackTestCase):
cmd.type = 'ROOT'
list_volume = cls.api_client.listVolumes(cmd)
cls.volume = list_volume[0]
cls._cleanup = [cls.virtual_machine]
cls._cleanup = [cls.virtual_machine, cls.service_offering]
return
@classmethod
@ -214,10 +227,12 @@ class TestTemplates(cloudstackTestCase):
cls.services = Services().services
cls.api_client = fetch_api_client()
cls.service_offering = ServiceOffering.create(cls.api_client, cls.services["service_offering"])
#create virtual machines
cls.virtual_machine = VirtualMachine.create(
cls.api_client,
cls.services["virtual_machine"]
cls.services["virtual_machine"],
serviceofferingid = cls.service_offering.id
)
#Stop virtual machine
@ -236,7 +251,7 @@ class TestTemplates(cloudstackTestCase):
#Create templates for Edit, Delete & update permissions testcases
cls.template_1 = Template.create(cls.api_client, cls.volume, cls.services["template_1"])
cls.template_2 = Template.create(cls.api_client, cls.volume, cls.services["template_2"])
cls._cleanup = [cls.template_2, cls.virtual_machine]
cls._cleanup = [cls.template_2, cls.virtual_machine, cls.service_offering]
@classmethod
def tearDownClass(cls):

View File

@ -20,73 +20,93 @@ class Services:
def __init__(self):
self.services = {
"small": # Create a small virtual machine instance with disk offering
{
"template": 256, # Template used for VM creation
"zoneid": 1,
"serviceoffering": 43,
"diskoffering": 3, # Optional, if specified data disk will be allocated to VM
"displayname": "testserver",
"username": "root", # VM creds for SSH
"password": "password",
"ssh_port": 22,
"hypervisor": 'XenServer',
"account": 'testuser', # Account for which VM should be created
"domainid": 1,
"privateport": 22,
"publicport": 22,
"protocol": 'TCP',
"disk_offering":{
"displaytext": "Small",
"name": "Small",
"disksize": 1
},
"account": {
"email": "test@test.com",
"firstname": "Test",
"lastname": "User",
"username": "test",
# Random characters are appended in create account to
# ensure unique username generated each time
"password": "fr3sca",
},
"small":
# Create a small virtual machine instance with disk offering
{
"displayname": "testserver",
"username": "root", # VM creds for SSH
"password": "password",
"ssh_port": 22,
"hypervisor": 'XenServer',
"domainid": 1,
"privateport": 22,
"publicport": 22,
"protocol": 'TCP',
},
"medium": # Create a medium virtual machine instance
{
"displayname": "testserver",
"account":"admin",
"username": "root",
"password": "password",
"ssh_port": 22,
"hypervisor": 'XenServer',
"domainid": 1,
"privateport": 22,
"publicport": 22,
"protocol": 'TCP',
},
"service_offerings":
{
"tiny":
{
"name": "Tiny Instance",
"displaytext": "Tiny Instance",
"cpunumber": 1,
"cpuspeed": 100, # in MHz
"memory": 64, # In MBs
},
"medium": # Create a medium virtual machine instance
{
"template": 206, # Template used for VM creation
"zoneid": 1,
"serviceoffering": 1,
"displayname": "testserver",
"username": "root",
"password": "password",
"ssh_port": 22,
"hypervisor": 'XenServer',
"account": 'testuser',
"domainid": 1,
"ipaddressid": 4,
"privateport": 22,
"publicport": 22,
"protocol": 'TCP',
},
"service_offerings":
{
"small":
{
"id": 40,
# Small service offering ID to for change VM service offering from medium to small
"cpunumber": 1,
"cpuspeed": 500,
"memory": 524288
},
"medium":
{
"id": 39,
# Medium service offering ID to for change VM service offering from small to medium
"cpunumber": 1,
"cpuspeed": 1000,
"memory": 1048576
}
},
"iso": # ISO settings for Attach/Detach ISO tests
{
"displaytext": "Test ISO type",
"name": "testISO",
"url": "http://iso.linuxquestions.org/download/504/1819/http/gd4.tuwien.ac.at/dsl-4.4.10.iso",
# Source URL where ISO is located
"zoneid": 1,
"ostypeid": 12,
"mode": 'HTTP_DOWNLOAD', # Downloading existing ISO
},
"diskdevice": '/dev/xvdd',
"mount_dir": "/mnt/tmp",
"hostid": 1,
}
"small":
{
# Small service offering ID to for change VM
# service offering from medium to small
"name": "Small Instance",
"displaytext": "Small Instance",
"cpunumber": 1,
"cpuspeed": 500,
"memory": 512
},
"medium":
{
# Medium service offering ID to for
# change VM service offering from small to medium
"name": "Medium Instance",
"displaytext": "Medium Instance",
"cpunumber": 1,
"cpuspeed": 1000,
"memory": 1024
}
},
"iso": # ISO settings for Attach/Detach ISO tests
{
"displaytext": "Test ISO",
"name": "testISO",
"url": "http://iso.linuxquestions.org/download/504/1819/http/gd4.tuwien.ac.at/dsl-4.4.10.iso",
# Source URL where ISO is located
"ostypeid": 12,
"mode": 'HTTP_DOWNLOAD', # Downloading existing ISO
},
"diskdevice": '/dev/xvdd',
"mount_dir": "/mnt/tmp",
"hostid": 1,
#Migrate VM to hostid
"ostypeid": 12
# CentOS 5.3 (64-bit)
}
class TestDeployVM(cloudstackTestCase):
@ -94,14 +114,45 @@ class TestDeployVM(cloudstackTestCase):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = []
# Create VMs, NAT Rules etc
self.services = Services().services
self.virtual_machine = VirtualMachine.create(self.apiclient, self.services["small"])
self.cleanup.append(self.virtual_machine)
self.nat_rule = NATRule.create(self.apiclient, self.virtual_machine, self.services["small"])
self.cleanup.append(self.nat_rule)
# Get Zone, Domain and templates
zone = get_zone(self.apiclient)
disk_offering = DiskOffering.create(
self.apiclient,
self.services["disk_offering"]
)
template = get_template(
self.apiclient,
zone.id,
self.services["ostypeid"]
)
# Set Zones and disk offerings
self.services["small"]["zoneid"] = zone.id
self.services["small"]["diskoffering"] = disk_offering.id
self.services["small"]["template"] = template.id
self.services["medium"]["zoneid"] = zone.id
self.services["medium"]["template"] = template.id
self.services["iso"]["zoneid"] = zone.id
# Create Account, VMs, NAT Rules etc
self.account = Account.create(
self.apiclient,
self.services["account"],
admin=True
)
self.service_offering = ServiceOffering.create(
self.apiclient,
self.services["service_offerings"]["tiny"]
)
# Cleanup
self.cleanup = [
self.service_offering,
disk_offering,
self.account
]
def test_deploy_vm(self):
"""Test Deploy Virtual Machine
@ -112,18 +163,21 @@ class TestDeployVM(cloudstackTestCase):
# 2. listVirtualMachines returns accurate information
# 3. The Cloud Database contains the valid information
ipaddress = self.virtual_machine.ipaddress
self.debug("Verify SSH Access for virtual machine: %s" % self.virtual_machine.id)
try:
self.virtual_machine.get_ssh_client(self.nat_rule.ipaddress)
except Exception as e:
self.fail("SSH Access failed for %s: %s" % (self.virtual_machine.ipaddress, e))
self.virtual_machine = VirtualMachine.create(
self.apiclient,
self.services["small"],
self.account.account.name,
serviceofferingid=self.service_offering.id
)
self.cleanup.append(self.virtual_machine)
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.id = self.virtual_machine.id
list_vm_response = self.apiclient.listVirtualMachines(cmd)
self.debug("Verify listVirtualMachines response for virtual machine: %s" % self.virtual_machine.id)
self.debug(
"Verify listVirtualMachines response for virtual machine: %s" \
% self.virtual_machine.id
)
vm_response = list_vm_response[0]
self.assertNotEqual(
@ -140,39 +194,10 @@ class TestDeployVM(cloudstackTestCase):
)
self.assertEqual(
vm_response.displayname,
self.virtual_machine.displayname,
"Check virtual machine displayname in listVirtualMachines"
)
self.debug("Verify the database entry for virtual machine: %s" % self.virtual_machine.id)
self.debug("select id, state, private_ip_address from vm_instance where id = %s;" % self.virtual_machine.id)
qresultset = self.dbclient.execute("select id, state, private_ip_address from vm_instance where id = %s;" % self.virtual_machine.id)
self.assertNotEqual(
len(qresultset),
0,
"Check DB Query result set"
)
qresult = qresultset[0]
self.assertEqual(
qresult[0],
self.virtual_machine.id,
"Compare virtual machine id with database record"
)
self.assertEqual(
qresult[1],
'Running',
"Check virtual machine state in the database"
)
self.assertEqual(
qresult[2],
ipaddress,
"Check IP Address in the database"
)
vm_response.displayname,
self.virtual_machine.displayname,
"Check virtual machine displayname in listVirtualMachines"
)
return
def tearDown(self):
@ -187,22 +212,85 @@ class TestVMLifeCycle(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.services = self.services().services
cls.services = Services().services
# Get Zone, Domain and templates
zone = get_zone(cls.api_client)
disk_offering = DiskOffering.create(
cls.api_client,
cls.services["disk_offering"]
)
template = get_template(
cls.api_client,
zone.id,
cls.services["ostypeid"]
)
# Set Zones and disk offerings
cls.services["small"]["zoneid"] = zone.id
cls.services["small"]["diskoffering"] = disk_offering.id
cls.services["small"]["template"] = template.id
cls.services["medium"]["zoneid"] = zone.id
cls.services["medium"]["template"] = template.id
cls.services["iso"]["zoneid"] = zone.id
# Create VMs, NAT Rules etc
cls.account = Account.create(
cls.api_client,
cls.services["account"],
admin=True
)
cls.small_offering = ServiceOffering.create(
cls.api_client,
cls.services["service_offerings"]["small"]
)
cls.medium_offering = ServiceOffering.create(
cls.api_client,
cls.services["service_offerings"]["medium"]
)
#create small and large virtual machines
cls.small_virtual_machine = VirtualMachine.create(
cls.api_client,
cls.services["small"]
cls.services["small"],
accountid=cls.account.account.name,
serviceofferingid=cls.small_offering.id
)
cls.medium_virtual_machine = VirtualMachine.create(
cls.api_client,
cls.services["medium"]
)
cls.api_client,
cls.services["medium"],
accountid=cls.account.account.name,
serviceofferingid=cls.medium_offering.id
)
cls.virtual_machine = VirtualMachine.create(
cls.api_client,
cls.services["small"]
cls.services["small"],
accountid=cls.account.account.name,
serviceofferingid=cls.small_offering.id
)
cls.nat_rule = NATRule.create(cls.api_client, cls.small_virtual_machine, cls.services["small"])
cls._cleanup = [cls.nat_rule, cls.medium_virtual_machine, cls.virtual_machine, ]
cls.public_ip = PublicIPAddress.create(
cls.api_client,
cls.small_virtual_machine.account,
cls.small_virtual_machine.zoneid,
cls.small_virtual_machine.domainid,
cls.services["small"]
)
cls.nat_rule = NATRule.create(
cls.api_client,
cls.small_virtual_machine,
cls.services["small"],
ipaddressid=cls.public_ip.ipaddress.id
)
cls._cleanup = [
cls.nat_rule,
cls.public_ip,
cls.medium_virtual_machine,
cls.virtual_machine,
cls.small_offering,
cls.medium_offering,
cls.account
]
@classmethod
def tearDownClass(cls):
@ -223,21 +311,16 @@ class TestVMLifeCycle(cloudstackTestCase):
def test_01_stop_vm(self):
"""Test Stop Virtual Machine
"""
# Validate the following
# 1. Should Not be able to login to the VM.
# 2. listVM command should return
# this VM.State of this VM should be ""Stopped"".
cmd = stopVirtualMachine.stopVirtualMachineCmd()
cmd.id = self.small_virtual_machine.id
self.api_client.stopVirtualMachine(cmd)
#Wait before server has be successfully stopped
self.debug("Verify SSH Access for virtual machine: %s,%s" % (self.small_virtual_machine.id, self.small_virtual_machine.ipaddress))
time.sleep(30)
with self.assertRaises(Exception):
remoteSSHClient.remoteSSHClient(
self.small_virtual_machine.ipaddress,
self.small_virtual_machine.ssh_port,
self.small_virtual_machine.username,
self.small_virtual_machine.password
)
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.id = self.small_virtual_machine.id
list_vm_response = self.api_client.listVirtualMachines(cmd)
@ -253,36 +336,19 @@ class TestVMLifeCycle(cloudstackTestCase):
"Check virtual machine is in stopped state"
)
self.debug("Verify the database entry for virtual machine: %s" % self.small_virtual_machine.id)
self.debug("select state from vm_instance where id = %s;" % self.small_virtual_machine.id)
qresultset = self.dbclient.execute("select state from vm_instance where id = %s;" % self.small_virtual_machine.id)
self.assertNotEqual(
len(qresultset),
0,
"Check DB Query result set"
)
qresult = qresultset[0]
self.assertEqual(
qresult[0],
"Stopped",
"Compare virtual machine state with database record"
)
return
def test_02_start_vm(self):
"""Test Start Virtual Machine
"""
# Validate the following
# 1. listVM command should return this VM.State
# of this VM should be Running".
cmd = startVirtualMachine.startVirtualMachineCmd()
cmd.id = self.small_virtual_machine.id
self.apiclient.startVirtualMachine(cmd)
time.sleep(30)
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.id = self.small_virtual_machine.id
list_vm_response = self.apiclient.listVirtualMachines(cmd)
@ -293,65 +359,53 @@ class TestVMLifeCycle(cloudstackTestCase):
"Check VM avaliable in List Virtual Machines"
)
self.debug("Verify listVirtualMachines response for virtual machine: %s" % self.small_virtual_machine.id)
self.debug(
"Verify listVirtualMachines response for virtual machine: %s" \
% self.small_virtual_machine.id
)
self.assertEqual(
list_vm_response[0].state,
"Running",
"Check virtual machine is in running state"
)
self.debug("Verify SSH Access for virtual machine: %s" % self.small_virtual_machine.id)
self.debug(
"Verify SSH Access for virtual machine: %s" \
% self.small_virtual_machine.id
)
# SSH to check whether VM is Up and Running
try:
self.small_virtual_machine.get_ssh_client(self.nat_rule.ipaddress)
except Exception as e:
self.fail("SSH Access failed for %s: %s" % (self.small_virtual_machine.ipaddress, e))
self.debug("Verify the database entry for virtual machine: %s" % self.small_virtual_machine.id)
self.debug("select state from vm_instance where id = %s;" % self.small_virtual_machine.id)
qresultset = self.dbclient.execute("select state from vm_instance where id = %s;" % self.small_virtual_machine.id)
self.assertNotEqual(
len(qresultset),
0,
"Check DB Query result set"
)
qresult = qresultset[0]
self.assertEqual(
qresult[0],
'Running',
"Compare virtual machine state with database record"
)
self.fail(
"SSH Access failed for %s: %s" \
% (self.small_virtual_machine.ipaddress, e)
)
return
def test_03_reboot_vm(self):
"""Test Reboot Virtual Machine
"""
# Validate the following
# 1. Should be able to login to the VM.
# 2. listVM command should return the deployed VM.
# State of this VM should be "Running"
cmd = rebootVirtualMachine.rebootVirtualMachineCmd()
cmd.id = self.small_virtual_machine.id
self.apiclient.rebootVirtualMachine(cmd)
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.id = self.small_virtual_machine.id
list_vm_response = self.apiclient.listVirtualMachines(cmd)
self.assertNotEqual(
len(list_vm_response),
0,
"Check VM avaliable in List Virtual Machines"
)
self.debug("Verify SSH Access for virtual machine: %s" % self.small_virtual_machine.id)
try:
self.small_virtual_machine.get_ssh_client(self.nat_rule.ipaddress)
except Exception as e:
self.fail("SSH Access failed for %s: %s" % (self.small_virtual_machine.ipaddress, e))
self.assertEqual(
list_vm_response[0].state,
"Running",
@ -362,16 +416,19 @@ class TestVMLifeCycle(cloudstackTestCase):
def test_04_change_offering_medium(self):
"""Change Offering to a medium capacity
"""
# Validate the following
# 1. Log in to the Vm .We should see that the CPU and memory Info of
# this Vm matches the one specified for "Medium" service offering.
# 2. Using listVM command verify that this Vm
# has Medium service offering Id.
cmd = stopVirtualMachine.stopVirtualMachineCmd()
cmd.id = self.small_virtual_machine.id
self.apiclient.stopVirtualMachine(cmd)
#Sleep for 60 seconds to ensure the machine has stopped
time.sleep(60)
cmd = changeServiceForVirtualMachine.changeServiceForVirtualMachineCmd()
cmd.id = self.small_virtual_machine.id
cmd.serviceofferingid = self.services["service_offerings"]["medium"]["id"]
cmd.serviceofferingid = self.medium_offering.id
self.apiclient.changeServiceForVirtualMachine(cmd)
@ -379,11 +436,19 @@ class TestVMLifeCycle(cloudstackTestCase):
cmd.id = self.small_virtual_machine.id
self.apiclient.startVirtualMachine(cmd)
try:
ssh = self.small_virtual_machine.get_ssh_client(self.nat_rule.ipaddress)
except Exception as e:
self.fail("SSH Access failed for %s: %s" % (self.small_virtual_machine.ipaddress, e))
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.id = self.small_virtual_machine.id
list_vm_response = self.apiclient.listVirtualMachines(cmd)
try:
ssh = self.small_virtual_machine.get_ssh_client(
self.nat_rule.ipaddress
)
except Exception as e:
self.fail(
"SSH Access failed for %s: %s" % \
(self.small_virtual_machine.ipaddress, e)
)
cpuinfo = ssh.execute("cat /proc/cpuinfo")
@ -397,33 +462,38 @@ class TestVMLifeCycle(cloudstackTestCase):
self.assertEqual(
cpu_cnt,
self.services["service_offerings"]["medium"]["cpunumber"],
self.medium_offering.cpunumber,
"Check CPU Count for medium offering"
)
self.assertEqual(
cpu_speed,
self.services["service_offerings"]["medium"]["cpuspeed"],
list_vm_response[0].cpuspeed,
self.medium_offering.cpuspeed,
"Check CPU Speed for medium offering"
)
self.assertEqual(
total_mem,
self.services["service_offerings"]["medium"]["memory"],
self.medium_offering.memory,
"Check Memory(kb) for medium offering"
)
def test_05_change_offering_small(self):
"""Change Offering to a small capacity
"""
# Validate the following
# 1. Log in to the Vm .We should see that the CPU and memory Info of
# this Vm matches the one specified for "Small" service offering.
# 2. Using listVM command verify that this Vm
# has Small service offering Id.
cmd = stopVirtualMachine.stopVirtualMachineCmd()
cmd.id = self.medium_virtual_machine.id
self.apiclient.stopVirtualMachine(cmd)
#Sleep before the changes are reflected
time.sleep(60)
cmd = changeServiceForVirtualMachine.changeServiceForVirtualMachineCmd()
cmd.id = self.medium_virtual_machine.id
cmd.serviceofferingid = self.services["service_offerings"]["small"]["id"]
cmd.serviceofferingid = self.small_offering.id
self.apiclient.changeServiceForVirtualMachine(cmd)
@ -431,11 +501,19 @@ class TestVMLifeCycle(cloudstackTestCase):
cmd.id = self.medium_virtual_machine.id
self.apiclient.startVirtualMachine(cmd)
try:
ssh = self.medium_virtual_machine.get_ssh_client(self.nat_rule.ipaddress)
except Exception as e:
self.fail("SSH Access failed for %s: %s" % (self.medium_virtual_machine.ipaddress, e))
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.id = self.small_virtual_machine.id
list_vm_response = self.apiclient.listVirtualMachines(cmd)
try:
ssh = self.medium_virtual_machine.get_ssh_client(
self.nat_rule.ipaddress
)
except Exception as e:
self.fail(
"SSH Access failed for %s: %s" % \
(self.medium_virtual_machine.ipaddress, e)
)
cpuinfo = ssh.execute("cat /proc/cpuinfo")
@ -449,18 +527,18 @@ class TestVMLifeCycle(cloudstackTestCase):
self.assertEqual(
cpu_cnt,
self.services["service_offerings"]["small"]["cpunumber"],
self.small_offering.cpunumber,
"Check CPU Count for small offering"
)
self.assertEqual(
cpu_speed,
self.services["service_offerings"]["small"]["cpuspeed"],
list_vm_response[0].cpuspeed,
self.small_offering.cpuspeed,
"Check CPU Speed for small offering"
)
self.assertEqual(
total_mem,
self.services["service_offerings"]["small"]["memory"],
self.small_offering.memory,
"Check Memory(kb) for small offering"
)
@ -468,19 +546,16 @@ class TestVMLifeCycle(cloudstackTestCase):
def test_06_destroy_vm(self):
"""Test destroy Virtual Machine
"""
# Validate the following
# 1. Should not be able to login to the VM.
# 2. listVM command should return this VM.State
# of this VM should be "Destroyed".
cmd = destroyVirtualMachine.destroyVirtualMachineCmd()
cmd.id = self.small_virtual_machine.id
self.apiclient.destroyVirtualMachine(cmd)
self.debug("Verify SSH Access for virtual machine: %s" % self.small_virtual_machine.id)
with self.assertRaises(Exception):
remoteSSHClient.remoteSSHClient(
self.small_virtual_machine.ipaddress,
self.small_virtual_machine.ssh_port,
self.small_virtual_machine.username,
self.small_virtual_machine.password
)
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.id = self.small_virtual_machine.id
list_vm_response = self.apiclient.listVirtualMachines(cmd)
@ -496,31 +571,18 @@ class TestVMLifeCycle(cloudstackTestCase):
"Destroyed",
"Check virtual machine is in destroyed state"
)
self.debug("Verify the database entry for virtual machine: %s" % self.small_virtual_machine.id)
self.debug("select state from vm_instance where id = %s;" % self.small_virtual_machine.id)
qresultset = self.dbclient.execute("select state from vm_instance where id = %s;" % self.small_virtual_machine.id)
self.assertNotEqual(
len(qresultset),
0,
"Check DB Query result set"
)
qresult = qresultset[0]
self.assertEqual(
qresult[0],
'Destroyed',
"Compare virtual machine state with database record"
)
return
def test_07_restore_vm(self):
"""Test recover Virtual Machine
"""
# Validate the following
# 1. listVM command should return this VM.
# State of this VM should be "Stopped".
# 2. We should be able to Start this VM successfully.
cmd = recoverVirtualMachine.recoverVirtualMachineCmd()
cmd.id = self.small_virtual_machine.id
self.apiclient.recoverVirtualMachine(cmd)
@ -545,27 +607,21 @@ class TestVMLifeCycle(cloudstackTestCase):
"Check virtual machine is in Stopped state"
)
self.debug("Verify the database entry for virtual machine: %s" % self.small_virtual_machine.id)
self.debug("select state from vm_instance where id = %s;" % self.small_virtual_machine.id)
qresultset = self.dbclient.execute("select state from vm_instance where id = %s;" % self.small_virtual_machine.id)
self.assertNotEqual(
len(qresultset),
0,
"Check DB Query result set"
)
qresult = qresultset[0]
self.assertEqual(
qresult[0],
'Stopped',
"Compare virtual_machine state with database record"
)
return
def test_08_migrate_vm(self):
"""Test migrate VM
"""
# Validate the following
# 1. Should be able to login to the VM.
# 2. listVM command should return this VM.State of this VM
# should be "Running" and the host should be the host
# to which the VM was migrated to
cmd = startVirtualMachine.startVirtualMachineCmd()
cmd.id = self.small_virtual_machine.id
self.apiclient.startVirtualMachine(cmd)
cmd = migrateVirtualMachine.migrateVirtualMachineCmd()
cmd.hostid = self.services["hostid"]
cmd.virtualmachineid = self.small_virtual_machine.id
@ -599,63 +655,28 @@ class TestVMLifeCycle(cloudstackTestCase):
def test_09_expunge_vm(self):
"""Test destroy(expunge) Virtual Machine
"""
# Validate the following
# 1. listVM command should NOT return this VM any more.
cmd = destroyVirtualMachine.destroyVirtualMachineCmd()
cmd.id = self.small_virtual_machine.id
self.apiclient.destroyVirtualMachine(cmd)
# Wait for expunge.delay
cmd = listConfigurations.listConfigurationsCmd()
cmd.name = 'expunge.delay'
response = self.apiclient.listConfigurations(cmd)[0]
time.sleep(int(response.value) + 10)
# Wait for some time more than expunge.delay
time.sleep(int(response.value) * 2)
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.id = self.small_virtual_machine.id
list_vm_response = self.apiclient.listVirtualMachines(cmd)
self.assertEqual(
list_vm_response,
None,
"Check Expunged virtual machine is listVirtualMachines"
)
self.debug("Verify the database entry for virtual machine: %s" % self.small_virtual_machine.id)
self.debug("select state from vm_instance where id = %s;" % self.small_virtual_machine.id)
qresultset = self.dbclient.execute("select state from vm_instance where id = %s;" % self.small_virtual_machine.id)
self.assertNotEqual(
len(qresultset),
0,
"Check DB Query result set"
)
qresult = qresultset[0]
self.assertEqual(
qresult[0],
"Expunging",
"Check virtual machine state in VM_INSTANCES table"
)
self.debug("select instance_id from nics where instance_id = %s;" % self.small_virtual_machine.id)
qresultset = self.dbclient.execute("select instance_id from nics where instance_id = %s;" % self.small_virtual_machine.id)
qresult = qresultset[0]
self.assertEqual(
len(qresult),
0,
"Check virtual_machine entry in NICS table"
)
self.debug("select instance_id from volumes where instance_id = %s;" % self.small_virtual_machine.id)
qresultset = self.dbclient.execute("select instance_id from volumes where instance_id = %s;" % self.small_virtual_machine.id)
qresult = qresultset[0]
self.assertEqual(
len(qresult),
0,
"Check virtual machine entry in VOLUMES table"
)
list_vm_response,
None,
"Check Expunged virtual machine is listVirtualMachines"
)
return
def test_10_attachAndDetach_iso(self):
@ -667,7 +688,7 @@ class TestVMLifeCycle(cloudstackTestCase):
# 3. Log in to the VM.
# 4. The device should be available for use
# 5. Detach ISO
#6. Check the device is properly detached by logging into VM
# 6. Check the device is properly detached by logging into VM
iso = Iso.create(self.apiclient, self.services["iso"])
self.cleanup.append(iso)
@ -679,10 +700,14 @@ class TestVMLifeCycle(cloudstackTestCase):
cmd.virtualmachineid = self.virtual_machine.id
self.apiclient.attachIso(cmd)
ssh_client = self.virtual_machine.get_ssh_client(self.nat_rule.ipaddress)
ssh_client = self.virtual_machine.get_ssh_client(
self.nat_rule.ipaddress
)
cmds = [ "mkdir -p %s" % self.services["mount_dir"],
"mount -rt iso9660 %s %s" % (self.services["diskdevice"], self.services["mount_dir"]),
cmds = [
"mkdir -p %s" % self.services["mount_dir"],
"mount -rt iso9660 %s %s" \
% (self.services["diskdevice"], self.services["mount_dir"]),
]
for c in cmds:
res = ssh_client.execute(c)
@ -694,7 +719,11 @@ class TestVMLifeCycle(cloudstackTestCase):
#Disk /dev/xvdd: 4393 MB, 4393723904 bytes
actual_disk_size = res[0].split()[4]
self.assertEqual(str(iso.size), actual_disk_size, "Check size of the attached ISO")
self.assertEqual(
str(iso.size),
actual_disk_size,
"Check size of the attached ISO"
)
#Unmount ISO
command = "umount %s" % self.services["diskdevice"]
@ -708,5 +737,9 @@ class TestVMLifeCycle(cloudstackTestCase):
res = ssh_client.execute(c)
result = self.services["diskdevice"] in res[0].split()
self.assertEqual(result, False, "Check if ISO is detached from virtual machine")
self.assertEqual(
result,
False,
"Check if ISO is detached from virtual machine"
)
return

View File

@ -22,6 +22,13 @@ class Services:
def __init__(self):
self.services = {
"service_offering": {
"name": "Tiny Service Offering",
"displaytext": "Tiny service offering",
"cpunumber": 1,
"cpuspeed": 100, # in MHz
"memory": 64, # In MBs
},
"volume_offerings": {
0: {
"offerings": 1,
@ -51,21 +58,19 @@ class Services:
"domainid": 1,
},
},
"customdiskofferingid": 16, #Custom disk offering should be availale
"customdiskofferingid": 52, #Custom disk offering should be available
"customdisksize": 2, # GBs
"volumeoffering": 3,
"serviceoffering": 1,
"template": 206,
"template": 256,
"zoneid": 1,
"username": "root", # Creds for SSH to VM
"password": "fr3sca",
"password": "password",
"ssh_port": 22,
"diskname": "TestDiskServ",
"hypervisor": 'XenServer',
"account": 'testuser', # Account for VM instance
"domainid": 1,
"ipaddressid": 4,
# Optional, If not given Public IP will be assigned for that account before NAT rule creation
"privateport": 22,
"publicport": 22,
"protocol": 'TCP',
@ -78,9 +83,18 @@ class TestCreateVolume(cloudstackTestCase):
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.services = Services().services
cls.virtual_machine = VirtualMachine.create(cls.api_client, cls.services)
cls.nat_rule = NATRule.create(cls.api_client, cls.virtual_machine, cls.services)
cls._cleanup = [cls.nat_rule, cls.virtual_machine]
cls.service_offering = ServiceOffering.create(cls.api_client, cls.services["service_offering"])
cls.virtual_machine = VirtualMachine.create(cls.api_client, cls.services, serviceofferingid = cls.service_offering.id)
cls.public_ip = PublicIPAddress.create(
cls.api_client,
cls.virtual_machine.account,
cls.virtual_machine.zoneid,
cls.virtual_machine.domainid,
cls.services
)
cls.nat_rule = NATRule.create(cls.api_client, cls.virtual_machine, cls.services, ipaddressid = cls.public_ip.ipaddress.id)
cls._cleanup = [cls.nat_rule, cls.virtual_machine, cls.service_offering, cls.public_ip]
def setUp(self):
@ -146,10 +160,20 @@ class TestVolumes(cloudstackTestCase):
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.services = Services().services
cls.virtual_machine = VirtualMachine.create(cls.api_client, cls.services)
cls.nat_rule = NATRule.create(cls.api_client, cls.virtual_machine, cls.services)
cls.volume = Volume.create(cls.api_client, self.services)
cls._cleanup = [cls.nat_rule, cls.virtual_machine, cls.volume]
cls.service_offering = ServiceOffering.create(cls.api_client, cls.services["service_offering"])
cls.virtual_machine = VirtualMachine.create(cls.api_client, cls.services, serviceofferingid = cls.service_offering.id)
cls.public_ip = PublicIPAddress.create(
cls.api_client,
cls.virtual_machine.account,
cls.virtual_machine.zoneid,
cls.virtual_machine.domainid,
cls.services
)
cls.nat_rule = NATRule.create(cls.api_client, cls.virtual_machine, cls.services, ipaddressid = cls.public_ip.ipaddress.id)
cls.volume = Volume.create(cls.api_client, cls.services)
cls._cleanup = [cls.nat_rule, cls.virtual_machine, cls.volume, cls.public_ip, cls.service_offering]
@classmethod
def tearDownClass(cls):
@ -184,10 +208,6 @@ class TestVolumes(cloudstackTestCase):
self.assertEqual(qresult[0], self.virtual_machine.id, "Check if volume is assc. with virtual machine in Database")
#self.assertEqual(qresult[1], 0, "Check if device is valid in the database")
cmd = listPublicIpAddresses.listPublicIpAddressesCmd()
cmd.id = self.services["ipaddressid"]
public_ip = self.apiclient.listPublicIpAddresses(cmd)[0]
#Format the attached volume to a known fs
format_volume_to_ext3(self.virtual_machine.get_ssh_client(self.nat_rule.ipaddress))
@ -217,7 +237,7 @@ class TestVolumes(cloudstackTestCase):
def test_05_detach_volume(self):
"""Detach a Volume attached to a VM
"""
self.virtual_machine.detach_volume(self.apiClient, self.virtual_machine)
self.virtual_machine.detach_volume(self.apiClient, self.volume)
#Sleep to ensure the current state will reflected in other calls
time.sleep(60)
cmd = listVolumes.listVolumesCmd()
@ -248,7 +268,8 @@ class TestVolumes(cloudstackTestCase):
#Attempt to download the volume and save contents locally
try:
response = urllib2.urlopen(urllib2.unquote(extract_vol.url))
formatted_url = urllib.unquote_plus(extract_vol.url)
response = urllib.urlopen(formatted_url)
fd, path = tempfile.mkstemp()
os.close(fd)
fd = open(path, 'wb')