The main features being added are:

1. Add support for Snapshot tests
2. Add Support for Disk offerings tests
3. Add Support for Service offerings tests
4. Add Support to test Volume creation for all available disk offerings
5. Rename Server as VirtualMachine
This commit is contained in:
Chirag Jog 2011-12-29 06:01:22 -08:00
parent 537d56227b
commit ae0896ccf8
8 changed files with 1052 additions and 146 deletions

View File

@ -3,14 +3,14 @@
# Copyright (c) 2011 Citrix. All rights reserved.
#
""" Base class for all Cloudstack resources - Server, Volume, Snapshot etc
""" Base class for all Cloudstack resources - Virtual machine, Volume, Snapshot etc
"""
from utils import is_server_ssh_ready, random_gen
from cloudstackAPI import *
class Server:
"""Manage server lifecycle
class VirtualMachine:
"""Manage virtual machine lifecycle
"""
def __init__(self, items, services):
self.__dict__.update(items)
@ -29,9 +29,16 @@ class Server:
cmd.zoneid = services["zoneid"]
if "diskoffering" in services:
cmd.diskofferingid = services["diskoffering"]
return Server(apiclient.deployVirtualMachine(cmd).__dict__, services)
return VirtualMachine(apiclient.deployVirtualMachine(cmd).__dict__, services)
def get_ssh_client(self):
def get_ssh_client(self, reconnect=False):
if reconnect:
self.ssh_client = is_server_ssh_ready(
self.ipaddress,
self.ssh_port,
self.username,
self.password
)
self.ssh_client = self.ssh_client or is_server_ssh_ready(
self.ipaddress,
self.ssh_port,
@ -45,6 +52,17 @@ class Server:
cmd.id = self.id
apiclient.destroyVirtualMachine(cmd)
def attach_volume(self, apiclient, volume):
cmd = attachVolume.attachVolumeCmd()
cmd.id = volume.id
cmd.virtualmachineid = self.id
return apiclient.attachVolume(cmd)
def detach_volume(self, apiclient, volume):
cmd = detachVolume.detachVolumeCmd()
cmd.id = volume.id
return apiclient.detachVolume(cmd)
class Volume:
"""Manage Volume Lifecycle
@ -60,13 +78,23 @@ class Volume:
cmd.zoneid = services["zoneid"]
return Volume(apiclient.createVolume(cmd).__dict__)
@classmethod
def create_custom_disk(cls, apiclient, services):
cmd = createVolume.createVolumeCmd()
cmd.name = services["diskname"]
cmd.diskofferingid = services["customdiskofferingid"]
cmd.size = services["customdisksize"]
cmd.zoneid = services["zoneid"]
return Volume(apiclient.createVolume(cmd).__dict__)
@classmethod
def create_from_snapshot(cls, apiclient, snapshot_id, services):
cmd = createVolume.createVolumeCmd()
cmd.name = "-".join([services["diskname"], random_gen()])
cmd.snapshotid = snapshot_id
cmd.zoneid = services["zoneid"]
cmd.size = services["size"]
return Volume(apiclient.createVolume(cmd).__dict__)
def delete(self, apiclient):
@ -90,3 +118,4 @@ class Snapshot:
cmd = deleteSnapshot.deleteSnapshotCmd()
cmd.id = self.id
apiclient.deleteSnapshot(cmd)

View File

@ -2,14 +2,14 @@
#
# Copyright (c) 2011 Citrix. All rights reserved.
#
"""Data for the BVT tests
"""Test Information Services
"""
TEST_VM_LIFE_CYCLE_SERVICES = {
"small" :
{
"template" : 7,
"zoneid" : 2,
"zoneid" : 1,
"serviceoffering" : 1,
"diskoffering" : 3,
"displayname" : "testserver",
@ -22,7 +22,7 @@ TEST_VM_LIFE_CYCLE_SERVICES = {
"medium" :
{
"template" : 7,
"zoneid" : 2,
"zoneid" : 1,
"serviceoffering" : 2,
"displayname" : "testserver",
"username" : "root",
@ -46,40 +46,147 @@ TEST_VM_LIFE_CYCLE_SERVICES = {
"memory" : 1048576
}
}
}
TEST_SNAPSHOT_SERVICES = {
"server_with_disk" :
{
"template" : 7,
"zoneid" : 1,
"serviceoffering" : 1,
"diskoffering" : 3,
"displayname" : "testserver",
"username" : "root",
"password" : "password",
"ssh_port" : 22,
},
"server_without_disk" :
{
"template" : 7,
"zoneid" : 1,
"serviceoffering" : 2,
"displayname" : "testserver",
"username" : "root",
"password" : "password",
"ssh_port" : 22,
},
"recurring_snapshot" :
{
"intervaltype" : 'HOURLY',
"maxsnaps" : 8,
"schedule" : 1,
"timezone" : 'US/Arizona',
},
"template":
{
"displaytext": "Test template from snapshot",
"name" : "template_from_snapshot",
"ostypeid": 12,
"templatefilter": 'featured',
},
"small_instance":
{
"zoneid": 1,
"serviceofferingid": 2,
},
"diskdevice" : "/dev/sda",
"offerings" : 1,
"template" : 7,
"zoneid" : 2,
"zoneid" : 1,
"diskoffering" : 3,
"diskname" : "TestDiskServ",
"size" : 1, #GBs
}
TEST_VOLUME_SERVICES = {
"volume_offerings" : {
0: {
"offerings" : 1,
"volumeoffering" : 3,
"diskname" : "TestDiskServ",
"zoneid" : 1,
"diskofferingid": 3,
},
1: {
"offerings" : 1,
"volumeoffering" : 4,
"diskname" : "TestDiskServ",
"zoneid" : 1,
"diskofferingid": 3,
},
2: {
"offerings" : 1,
"volumeoffering" : 5,
"diskname" : "TestDiskServ",
"zoneid" : 1,
"diskofferingid": 3,
},
},
"customdiskofferingid":22,
"customdisksize" : 7, #GBs
"serviceoffering" : 1,
"template" : 7,
"zoneid" : 1,
"username" : "root",
"password" : "password",
"ssh_port" : 22,
"diskname" : "TestDiskServ",
"server2": {
"offerings" : 1,
"template" : 7,
"zoneid" : 2,
}
TEST_SERVICE_OFFERING = {
"off_1" :
{
"id": 32,
"name":"small_service_offering",
"displaytext" : "Small service offering",
"cpunumber":1,
"cpuspeed": 200,
"memory": 200,
"username" : "root",
"password" : "password",
"ssh_port" : 22,
},
"off_2" :
{
"id":33,
"name":"medium_service_offering",
"displaytext" : "Medium service offering",
"cpunumber":1,
"cpuspeed": 200,
"memory": 200,
"username" : "root",
"password" : "password",
"ssh_port" : 22,
}
}
TEST_VOLUME_SERVICES = {
"offerings" : 1,
"volumeoffering" : 3,
"diskname" : "TestDiskServ",
"template" : 7,
"zoneid" : 2,
"username" : "root",
"password" : "password",
"ssh_port" : 22,
}
TEST_DISK_OFFERING = {
"off_1" :
{
"id": 31,
"name":"small_disk_offering",
"displaytext" : "Small disk offering",
"username" : "root",
"password" : "password",
"ssh_port" : 22,
"disksize": 1
},
"off_2" :
{
"id":32,
"name":"medium_disk_offering",
"displaytext" : "Medium disk offering",
"username" : "root",
"password" : "password",
"ssh_port" : 22,
"disksize": 1
}
}

View File

@ -0,0 +1,213 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2011 Citrix. All rights reserved.
#
""" BVT tests for Disk offerings"""
#Import Local Modules
from cloudstackTestCase import *
from cloudstackAPI import *
from settings import *
from utils import *
from base import *
services = TEST_DISK_OFFERING
class TestDiskOfferings(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cmd =createDiskOffering.createDiskOfferingCmd()
cmd.displaytext = services["off_1"]["displaytext"]
cmd.name = services["off_1"]["name"]
cmd.disksize=services["off_1"]["disksize"]
cls.small_disk_offering = cls.api_client.createDiskOffering(cmd)
cmd = createDiskOffering.createDiskOfferingCmd()
cmd.displaytext = services["off_2"]["displaytext"]
cmd.name = services["off_2"]["name"]
cmd.disksize=services["off_2"]["disksize"]
cls.medium_disk_offering = cls.api_client.createDiskOffering(cmd)
return
@classmethod
def tearDownClass(cls):
cls.api_client = fetch_api_client()
cmd = deleteDiskOffering.deleteDiskOfferingCmd()
cmd.id = cls.small_disk_offering.id
cls.api_client.deleteDiskOffering(cmd)
return
def test_01_create_disk_offering(self):
"""Test to create disk offering"""
# Validate the following:
# 1. createDiskOfferings should return a valid information for newly created offering
# 2. The Cloud Database contains the valid information
cmd = createDiskOffering.createDiskOfferingCmd()
#Add the required parameters for creating new service offering
cmd.displaytext = services["off_1"]["displaytext"]
cmd.name = services["off_1"]["name"]
cmd.disksize=services["off_1"]["disksize"]
self.tmp_disk_offering = self.apiclient.createDiskOffering(cmd)
cmd = listDiskOfferings.listDiskOfferingsCmd()
cmd.name = services["off_1"]["name"]
list_disk_response = self.apiclient.listDiskOfferings(cmd)
self.assertNotEqual(
len(list_disk_response),
0,
"Check Disk offering is created"
)
self.assertEqual(
list_disk_response[0].displaytext,
services["off_1"]["displaytext"],
"Check server id in createServiceOffering"
)
self.assertEqual(
list_disk_response[0].name,
services["off_1"]["name"],
"Check name in createServiceOffering"
)
qresultset = self.dbclient.execute(
"select display_text, id from disk_offering where id = %s;"
%list_disk_response[0].id
)
self.assertNotEqual(
len(qresultset),
0,
"Check DB Query result set"
)
qresult = qresultset[0]
self.assertEqual(
qresult[0],
services["off_1"]["displaytext"],
"Compare display text with database record"
)
self.assertEqual(
qresult[1],
list_disk_response[0].id,
"Check ID in the database"
)
cmd = deleteDiskOffering.deleteDiskOfferingCmd()
cmd.id = self.tmp_disk_offering.id
self.apiclient.deleteDiskOffering(cmd)
return
def test_02_edit_disk_offering(self):
"""Test to update existing disk offering"""
# Validate the following:
# 1. updateDiskOffering should return a valid information for newly created offering
cmd = updateDiskOffering.updateDiskOfferingCmd()
cmd.id= self.small_disk_offering.id
cmd.displaytext = services["off_1"]["displaytext"]
cmd.name = services["off_1"]["name"]
self.apiclient.updateDiskOffering(cmd)
cmd = listDiskOfferings.listDiskOfferingsCmd()
cmd.id = self.small_disk_offering.id
list_disk_response = self.apiclient.listDiskOfferings(cmd)
self.assertNotEqual(
len(list_disk_response),
0,
"Check disk offering is updated"
)
self.assertEqual(
list_disk_response[0].displaytext,
services["off_1"]["displaytext"],
"Check service displaytext in updateServiceOffering"
)
self.assertEqual(
list_disk_response[0].name,
services["off_1"]["name"],
"Check service name in updateServiceOffering"
)
qresultset = self.dbclient.execute(
"select display_text, id from disk_offering where id = %s;"
%self.small_disk_offering.id
)
self.assertNotEqual(
len(qresultset),
0,
"Check DB Query result set"
)
qresult = qresultset[0]
self.assertEqual(
qresult[0],
services["off_1"]["displaytext"],
"Compare displaytext with database record"
)
self.assertEqual(
qresult[1],
self.small_disk_offering.id,
"Check name in the database"
)
return
def test_03_delete_disk_offering(self):
"""Test to delete disk offering"""
# Validate the following:
# 1. deleteDiskOffering should return a valid information for newly created offering
cmd = deleteDiskOffering.deleteDiskOfferingCmd()
cmd.id = self.medium_disk_offering.id
self.apiclient.deleteDiskOffering(cmd)
cmd = listDiskOfferings.listDiskOfferingsCmd()
cmd.id = self.medium_disk_offering.id
list_disk_response = self.apiclient.listDiskOfferings(cmd)
self.assertEqual(
list_disk_response,
None,
"Check if disk offering exists in listDiskOfferings"
)
qresultset = self.dbclient.execute(
"select display_text, name from disk_offering where id = %s;"
% str(self.medium_disk_offering.id)
)
self.assertEqual(
len(qresultset),
1,
"Check DB Query result set"
)
return
def tearDown(self):
self.dbclient.close()
return

View File

@ -0,0 +1,242 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2011 Citrix. All rights reserved.
#
""" BVT tests for Service offerings"""
#Import Local Modules
from cloudstackTestCase import *
from cloudstackAPI import *
from settings import *
import remoteSSHClient
from utils import *
from base import *
services = TEST_SERVICE_OFFERING
class TestServiceOfferings(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cmd = createServiceOffering.createServiceOfferingCmd()
cmd.cpunumber = services["off_1"]["cpunumber"]
cmd.cpuspeed = services["off_1"]["cpuspeed"]
cmd.displaytext = services["off_1"]["displaytext"]
cmd.memory = services["off_1"]["memory"]
cmd.name = services["off_1"]["name"]
cls.small_service_offering = cls.api_client.createServiceOffering(cmd)
cmd = createServiceOffering.createServiceOfferingCmd()
cmd.cpunumber = services["off_2"]["cpunumber"]
cmd.cpuspeed = services["off_2"]["cpuspeed"]
cmd.displaytext = services["off_2"]["displaytext"]
cmd.memory = services["off_2"]["memory"]
cmd.name = services["off_2"]["name"]
cls.medium_service_offering = cls.api_client.createServiceOffering(cmd)
return
@classmethod
def tearDownClass(cls):
cls.api_client = fetch_api_client()
cmd = deleteServiceOffering.deleteServiceOfferingCmd()
cmd.id = cls.small_service_offering.id
cls.api_client.deleteServiceOffering(cmd)
return
def test_01_create_service_offering(self):
"""Test to create service offering"""
# Validate the following:
# 1. createServiceOfferings should return a valid information for newly created offering
# 2. The Cloud Database contains the valid information
cmd = createServiceOffering.createServiceOfferingCmd()
#Add the required parameters for creating new service offering
cmd.cpunumber = services["off_1"]["cpunumber"]
cmd.cpuspeed = services["off_1"]["cpuspeed"]
cmd.displaytext = services["off_1"]["displaytext"]
cmd.memory = services["off_1"]["memory"]
cmd.name = services["off_1"]["name"]
self.tmp_service_offering = self.apiclient.createServiceOffering(cmd)
cmd = listServiceOfferings.listServiceOfferingsCmd()
cmd.name = services["off_1"]["name"]
list_service_response = self.apiclient.listServiceOfferings(cmd)
self.assertNotEqual(
len(list_service_response),
0,
"Check Service offering is created"
)
self.assertEqual(
list_service_response[0].cpunumber,
services["off_1"]["cpunumber"],
"Check server id in createServiceOffering"
)
self.assertEqual(
list_service_response[0].cpuspeed,
services["off_1"]["cpuspeed"],
"Check cpuspeed in createServiceOffering"
)
self.assertEqual(
list_service_response[0].displaytext,
services["off_1"]["displaytext"],
"Check server displaytext in createServiceOfferings"
)
self.assertEqual(
list_service_response[0].memory,
services["off_1"]["memory"],
"Check memory in createServiceOffering"
)
self.assertEqual(
list_service_response[0].name,
services["off_1"]["name"],
"Check name in createServiceOffering"
)
qresultset = self.dbclient.execute(
"select cpu, speed, ram_size from service_offering where id = %s;"
% self.tmp_service_offering.id
)
self.assertNotEqual(
len(qresultset),
0,
"Check DB Query result set"
)
qresult = qresultset[0]
self.assertEqual(
qresult[0],
services["off_1"]["cpunumber"],
"Check number of CPUs allocated to service offering in the database"
)
self.assertEqual(
qresult[1],
services["off_1"]["cpuspeed"],
"Check number of CPUs allocated to service offering in the database"
)
self.assertEqual(
qresult[2],
services["off_1"]["memory"],
"Check number of CPUs allocated to service offering in the database"
)
cmd = deleteServiceOffering.deleteServiceOfferingCmd()
cmd.id = self.tmp_service_offering.id
self.api_client.deleteServiceOffering(cmd)
return
def test_02_edit_service_offering(self):
"""Test to update existing service offering"""
# Validate the following:
# 1. updateServiceOffering should return a valid information for newly created offering
random_displaytext = random_gen()
random_name = random_gen()
cmd = updateServiceOffering.updateServiceOfferingCmd()
cmd.id= self.small_service_offering.id
cmd.displaytext = random_displaytext
cmd.name = random_name
self.apiclient.updateServiceOffering(cmd)
cmd = listServiceOfferings.listServiceOfferingsCmd()
cmd.id = self.small_service_offering.id
list_service_response = self.apiclient.listServiceOfferings(cmd)
self.assertNotEqual(
len(list_service_response),
0,
"Check Service offering is updated"
)
self.assertEqual(
list_service_response[0].displaytext,
random_displaytext,
"Check server displaytext in updateServiceOffering"
)
self.assertEqual(
list_service_response[0].name,
random_name,
"Check server name in updateServiceOffering"
)
qresultset = self.dbclient.execute(
"select id, display_text, name from disk_offering where type='Service' and id = %s;"
% self.small_service_offering.id
)
self.assertNotEqual(
len(qresultset),
0,
"Check DB Query result set"
)
qresult = qresultset[0]
self.assertEqual(
qresult[0],
self.small_service_offering.id,
"Check service offering ID in the database"
)
self.assertEqual(
qresult[1],
services["off_1"]["displaytext"],
"Check service offering ID in the database"
)
self.assertEqual(
qresult[2],
services["off_1"]["name"],
"Check service offering ID in the database"
)
return
def test_03_delete_service_offering(self):
"""Test to delete service offering"""
# Validate the following:
# 1. deleteServiceOffering should return a valid information for newly created offering
cmd = deleteServiceOffering.deleteServiceOfferingCmd()
#Add the required parameters required to call for API
cmd.id = self.medium_service_offering.id
self.apiclient.deleteServiceOffering(cmd)
cmd = listServiceOfferings.listServiceOfferingsCmd()
cmd.id = self.medium_service_offering.id
list_service_response = self.apiclient.listServiceOfferings(cmd)
self.assertEqual(
list_service_response,
None,
"Check if service offering exists in listDiskOfferings"
)
qresultset = self.dbclient.execute(
"select id from service_offering where id = %s;"
% self.medium_service_offering.id
)
self.assertEqual(
len(qresultset),
1,
"Check DB Query result set"
)
return
def tearDown(self):
self.dbclient.close()
return

View File

@ -0,0 +1,284 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2011 Citrix. All rights reserved.
#
""" BVT tests for Snapshots
"""
#Import Local Modules
from cloudstackTestCase import *
from cloudstackAPI import *
from settings import *
from utils import *
from base import *
#Import System modules
services = TEST_SNAPSHOT_SERVICES
MOUNT_DIR = "/mnt/tmp"
SUB_DIR = "test"
SUB_LVL_DIR1 = "test1"
SUB_LVL_DIR2 = "test2"
RANDOM_DATA = "random.data"
class TestSnapshots(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.virtual_machine = cls.virtual_machine_with_disk = VirtualMachine.create(cls.api_client, services["server_with_disk"])
cls.virtual_machine_without_disk = VirtualMachine.create(cls.api_client, services["server_without_disk"])
@classmethod
def tearDownClass(cls):
# cls.virtual_machine.delete(cls.api_client)
# cls.virtual_machine_without_disk.delete(cls.api_client)
return
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = { 'virtual_machine': [],
'snapshot' : [],
'volume' : []
}
def tearDown(self):
#Clean up, terminate the created instance, volumes and snapshots
# cleanup_resources(self.apiclient, self.cleanup)
return
# def test_01_snapshot_root_disk(self):
# """Test Snapshot Root Disk
# """
# cmd = listVolumes.listVolumesCmd()
# cmd.virtualmachineid = self.virtual_machine_with_disk.id
# cmd.type = 'ROOT'
#
# volumes = self.apiclient.listVolumes(cmd)
# snapshot = Snapshot.create(self.apiclient, volumes[0].id)
#
# cmd = listSnapshots.listSnapshotsCmd()
# cmd.id = snapshot.id
# list_snapshots = self.apiclient.listSnapshots(cmd)
#
# self.assertNotEqual(list_snapshots, None, "Check if result exists in list item call")
#
# self.assertEqual(
# list_snapshots[0].id,
# snapshot.id,
# "Check resource id in list resources call"
# )
#
# def test_02_snapshot_data_disk(self):
# """Test Snapshot Data Disk
# """
#
# cmd = listVolumes.listVolumesCmd()
# cmd.virtualmachineid = self.virtual_machine_with_disk.id
# cmd.type = 'DATADISK'
#
# volume = self.apiclient.listVolumes(cmd)
# snapshot = Snapshot.create(self.apiclient, volume[0].id)
#
# cmd = listSnapshots.listSnapshotsCmd()
# cmd.id = snapshot.id
# list_snapshots = self.apiclient.listSnapshots(cmd)
#
# self.assertNotEqual(list_snapshots, None, "Check if result exists in list item call")
#
# self.assertEqual(
# list_snapshots[0].id,
# snapshot.id,
# "Check resource id in list resources call"
# )
#
def test_03_volume_from_snapshot(self):
"""Create volumes from snapshots
"""
#1. Login to machine; create temp/test directories on data volume
#2. Snapshot the Volume
#3. Create another Volume from snapshot
#4. Mount/Attach volume to another server
#5. Compare data
random_data_0 = random_gen(100)
random_data_1 = random_gen(100)
ssh_client = self.virtual_machine.get_ssh_client()
format_volume_to_ext3(ssh_client, services["diskdevice"])
cmds = [ "mkdir -p %s" %MOUNT_DIR,
"mount %s1 %s" %(services["diskdevice"], MOUNT_DIR),
"pushd %s" %MOUNT_DIR,
"mkdir -p %s/{%s,%s} " %(SUB_DIR, SUB_LVL_DIR1, SUB_LVL_DIR2),
"echo %s > %s/%s/%s" %(random_data_0, SUB_DIR, SUB_LVL_DIR1, RANDOM_DATA),
"echo %s > %s/%s/%s" %(random_data_1, SUB_DIR, SUB_LVL_DIR1, RANDOM_DATA)
]
for c in cmds:
self.debug(ssh_client.execute(c))
cmd = listVolumes.listVolumesCmd()
cmd.hostid = self.virtual_machine.id
cmd.type = 'DATADISK'
list_volume_response = self.apiclient.listVolumes(cmd)
volume = list_volume_response[0]
snapshot = Snapshot.create(self.apiclient, volume.id)
self.cleanup['snapshot'].append(snapshot)
volume = Volume.create_from_snapshot(self.apiclient, snapshot.id, services)
self.cleanup['volume'].append(volume)
cmd = listVolumes.listVolumesCmd()
cmd.id = volume.id
list_volumes = self.apiclient.listVolumes(cmd)
assertNotEqual(
len(list_volumes),
None,
"Check Volume list Length"
)
assertEqual (
list_volumes[0].id,
volume.id,
"Check Volume in the List Volumes"
)
new_virtual_machine = self.virtual_machine_without_disk
self.cleanup['virtual_machine'].append(new_virtual_machine)
cmd = attachVolume.attachVolumeCmd()
cmd.id = self.volume.id
cmd.virtualmachineid = deploy_new_virtual_machine.id
volume = self.apiclient.attachVolume(cmd)
ssh = virtual_machine.get_ssh_client()
cmds = [
"mkdir %s" %MOUNT_DIR,
"mount %s1 %s" %(services["diskdevice"], MOUNT_DIR)
]
for c in cmds:
self.debug(ssh.execute(c))
returned_data_0 = ssh.execute("cat %s/%s/%s/%s" %(MOUNT_DIR, SUB_DIR, SUB_LVL_DIR1, RANDOM_DATA))
returned_data_1 = ssh.execute("cat %s/%s/%s/%s" %(MOUNT_DIR, SUB_DIR, SUB_LVL_DIR2, RANDOM_DATA))
self.assertEqual(random_data_0, returned_data_0, "Verify newly attached volume contents with existing one")
self.assertEqual(random_data_1, returned_data_1, "Verify newly attached volume contents with existing one")
return
# def test_04_delete_snapshot(self):
# """Test Delete Snapshot
# """
#
# cmd = listVolumes.listVolumesCmd()
# cmd.hostid = self.virtual_machine.id
# cmd.type = 'DATADISK'
# list_volumes = self.apiclient.listVolumes(cmd)
#
# cmd = listSnapshots.listSnapshotsCmd()
# cmd.id = list_volumes[0].id
# list_snapshots = self.apiclient.listSnapshots(cmd)
#
# snapshot = Snapshot.create(self.apiclient,list_volumes[0].id)
# snapshot.delete(self.apiclient)
# #Sleep to ensure all database records are updated
# time.sleep(60)
# cmd = listSnapshots.listSnapshotsCmd()
# cmd.id = snapshot.id
# list_snapshots = self.apiclient.listSnapshots(cmd)
#
# self.assertEqual(list_snapshots, None, "Check if result exists in list item call")
#
# def test_05_recurring_snapshot_root_disk(self):
# """Test Recurring Snapshot Root Disk
# """
#
# cmd = listVolumes.listVolumesCmd()
# cmd.virtualmachineid = self.virtual_machine_with_disk.id
# cmd.type = 'ROOT'
#
# volume = self.apiclient.listVolumes(cmd)
#
# cmd = createSnapshotPolicy.createSnapshotPolicyCmd()
# cmd.intervaltype=services["recurring_snapshot"]["intervaltype"]
# cmd.maxsnaps=services["recurring_snapshot"]["maxsnaps"]
# cmd.schedule=services["recurring_snapshot"]["schedule"]
# cmd.timezone=services["recurring_snapshot"]["timezone"]
# cmd.volumeid=volume[0].id
# recurring_snapshot = self.apiclient.createSnapshotPolicy(cmd)
# cmd = listSnapshotPolicies.listSnapshotPoliciesCmd()
# cmd.id = recurring_snapshot.id
# cmd.volumeid=volume[0].id
# list_snapshots = self.apiclient.listSnapshotPolicies(cmd)
#
# self.assertNotEqual(list_snapshots, None, "Check if result exists in list item call")
#
# self.assertEqual(
# list_snapshots[0].id,
# recurring_snapshot.id,
# "Check recurring snapshot id in list resources call"
# )
# self.assertEqual(
# list_snapshots[0].maxsnaps,
# services["recurring_snapshot"]["maxsnaps"],
# "Check interval type in list resources call"
# )
#
# #Sleep for 9 hours to check only 8 snapshots are retained
# time.sleep(32400)
# cmd = listSnapshots.listSnapshotsCmd()
# cmd.volumeid=volume.id
# cmd.intervaltype = services["recurring_snapshot"]["intervaltype"]
# cmd.snapshottype = 'RECURRING'
#
# list_snapshots = self.apiclient.listSnapshots(cmd)
#
# self.assertEqual(len(list_snapshots),8, "Check maximum number of recurring snapshots retained")
#
# def test_06_recurring_snapshot_data_disk(self):
# """Test Recurring Snapshot data Disk
# """
#
# cmd = listVolumes.listVolumesCmd()
# cmd.virtualmachineid = self.virtual_machine_with_disk.id
# cmd.type = 'DATADISK'
#
# volume = self.apiclient.listVolumes(cmd)
#
# cmd = createSnapshotPolicy.createSnapshotPolicyCmd()
# cmd.intervaltype=services["recurring_snapshot"]["intervaltype"]
# cmd.maxsnaps=services["recurring_snapshot"]["maxsnaps"]
# cmd.schedule=services["recurring_snapshot"]["schedule"]
# cmd.timezone=services["recurring_snapshot"]["timezone"]
# cmd.volumeid=volume[0].id
# recurring_snapshot = self.apiclient.createSnapshotPolicy(cmd)
#
# cmd = listSnapshotPolicies.listSnapshotPoliciesCmd()
# cmd.id = recurring_snapshot.id
# cmd.volumeid=volume[0].id
# list_snapshots = self.apiclient.listSnapshotPolicies(cmd)
#
# self.assertNotEqual(list_snapshots, None, "Check if result exists in list item call")
#
# self.assertEqual(
# list_snapshots[0].id,
# recurring_snapshot.id,
# "Check recurring snapshot id in list resources call"
# )
# self.assertEqual(
# list_snapshots[0].maxsnaps,
# services["recurring_snapshot"]["maxsnaps"],
# "Check interval type in list resources call"
# )
#
# #Sleep for 9 hours to check only 8 snapshots are retained
# time.sleep(32400)
# cmd = listSnapshots.listSnapshotsCmd()
# cmd.volumeid=volume.id
# cmd.intervaltype = services["recurring_snapshot"]["intervaltype"]
# cmd.snapshottype = 'RECURRING'
#
# list_snapshots = self.apiclient.listSnapshots(cmd)
#
# self.assertEqual(len(list_snapshots),8, "Check maximum number of recurring snapshots retained")

View File

@ -28,22 +28,22 @@ class TestDeployVM(cloudstackTestCase):
"""Test Deploy Virtual Machine
"""
self.server = Server.create(self.apiclient, services["small"])
self.virtual_machine = VirtualMachine.create(self.apiclient, services["small"])
#Validate the following:
# 1. Server is accessible via SSH
# 1. Virtual Machine is accessible via SSH
# 2. listVirtualMachines returns accurate information
# 3. The Cloud Database contains the valid information
self.debug("Verify SSH Access for server: %s" %self.server.id)
self.debug("Verify SSH Access for virtual machine: %s" %self.virtual_machine.id)
try:
self.server.get_ssh_client()
self.virtual_machine.get_ssh_client()
except Exception as e:
self.fail("SSH Access failed for %s: %s" %(self.server.ipaddress, e))
self.fail("SSH Access failed for %s: %s" %(self.virtual_machine.ipaddress, e))
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.id = self.server.id
cmd.id = self.virtual_machine.id
list_vm_response = self.apiclient.listVirtualMachines(cmd)
self.debug("Verify listVirtualMachines response for server: %s" %self.server.id)
self.debug("Verify listVirtualMachines response for virtual machine: %s" %self.virtual_machine.id)
self.assertNotEqual(
len(list_vm_response),
@ -53,20 +53,20 @@ class TestDeployVM(cloudstackTestCase):
self.assertEqual(
list_vm_response[0].id,
self.server.id,
"Check server id in listVirtualMachines"
self.virtual_machine.id,
"Check virtual machine id in listVirtualMachines"
)
self.assertEqual(
list_vm_response[0].displayname,
self.server.displayname,
"Check server displayname in listVirtualMachines"
self.virtual_machine.displayname,
"Check virtual machine displayname in listVirtualMachines"
)
self.debug("Verify the database entry for server: %s" %self.server.id)
self.debug("Verify the database entry for virtual machine: %s" %self.virtual_machine.id)
self.debug("select id, state, private_ip_address from vm_instance where id = %s;" %self.server.id)
qresultset = self.dbclient.execute("select id, state, private_ip_address from vm_instance where id = %s;" %self.server.id)
self.debug("select id, state, private_ip_address from vm_instance where id = %s;" %self.virtual_machine.id)
qresultset = self.dbclient.execute("select id, state, private_ip_address from vm_instance where id = %s;" %self.virtual_machine.id)
self.assertNotEqual(
len(qresultset),
@ -78,24 +78,24 @@ class TestDeployVM(cloudstackTestCase):
self.assertEqual(
qresult[0],
self.server.id,
"Compare server id with database record"
self.virtual_machine.id,
"Compare virtual machine id with database record"
)
self.assertEqual(
qresult[1],
'Running',
"Check server state in the database"
"Check virtual machine state in the database"
)
self.assertEqual(
qresult[2],
self.server.ipaddress,
self.virtual_machine.ipaddress,
"Check IP Address in the database"
)
return
def tearDown(self):
try:
self.server.delete(self.apiclient)
self.virtual_machine.delete(self.apiclient)
except Exception as e:
self.debug("Warning! Exception in tearDown: %s" %e)
@ -108,11 +108,11 @@ class TestVMLifeCycle(cloudstackTestCase):
cls.api_client = fetch_api_client()
#create small and large servers
cls.small_server = Server.create(
cls.small_virtual_machine = VirtualMachine.create(
cls.api_client,
TEST_VM_LIFE_CYCLE_SERVICES["small"]
)
cls.medium_server = Server.create(
cls.medium_virtual_machine = VirtualMachine.create(
cls.api_client,
TEST_VM_LIFE_CYCLE_SERVICES["medium"]
)
@ -124,22 +124,22 @@ class TestVMLifeCycle(cloudstackTestCase):
"""Test Stop Virtual Machine
"""
cmd = stopVirtualMachine.stopVirtualMachineCmd()
cmd.id = self.small_server.id
cmd.id = self.small_virtual_machine.id
self.api_client.stopVirtualMachine(cmd)
#Wait before server has be successfully stopped
self.debug("Verify SSH Access for server: %s,%s" %(self.small_server.id, self.small_server.ipaddress))
self.debug("Verify SSH Access for virtual machine: %s,%s" %(self.small_virtual_machine.id, self.small_virtual_machine.ipaddress))
time.sleep(30)
with self.assertRaises(Exception):
remoteSSHClient.remoteSSHClient(
self.small_server.ipaddress,
self.small_server.ssh_port,
self.small_server.username,
self.small_server.password
self.small_virtual_machine.ipaddress,
self.small_virtual_machine.ssh_port,
self.small_virtual_machine.username,
self.small_virtual_machine.password
)
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.id = self.small_server.id
cmd.id = self.small_virtual_machine.id
list_vm_response = self.api_client.listVirtualMachines(cmd)
self.assertNotEqual(
len(list_vm_response),
@ -150,13 +150,13 @@ class TestVMLifeCycle(cloudstackTestCase):
self.assertEqual(
list_vm_response[0].state,
"Stopped",
"Check server is in stopped state"
"Check virtual machine is in stopped state"
)
self.debug("Verify the database entry for server: %s" %self.small_server.id)
self.debug("Verify the database entry for virtual machine: %s" %self.small_virtual_machine.id)
self.debug("select state from vm_instance where id = %s;" %self.small_server.id)
qresultset = self.dbclient.execute("select state from vm_instance where id = %s;" %self.small_server.id)
self.debug("select state from vm_instance where id = %s;" %self.small_virtual_machine.id)
qresultset = self.dbclient.execute("select state from vm_instance where id = %s;" %self.small_virtual_machine.id)
self.assertNotEqual(
len(qresultset),
@ -170,7 +170,7 @@ class TestVMLifeCycle(cloudstackTestCase):
self.assertEqual(
qresult[0],
"Stopped",
"Compare server state with database record"
"Compare virtual machine state with database record"
)
return
@ -178,11 +178,11 @@ class TestVMLifeCycle(cloudstackTestCase):
"""Test Start Virtual Machine
"""
cmd = startVirtualMachine.startVirtualMachineCmd()
cmd.id = self.small_server.id
cmd.id = self.small_virtual_machine.id
self.apiclient.startVirtualMachine(cmd)
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.id = self.small_server.id
cmd.id = self.small_virtual_machine.id
list_vm_response = self.apiclient.listVirtualMachines(cmd)
self.assertNotEqual(
@ -191,22 +191,22 @@ class TestVMLifeCycle(cloudstackTestCase):
"Check VM avaliable in List Virtual Machines"
)
self.debug("Verify listVirtualMachines response for server: %s" %self.small_server.id)
self.debug("Verify listVirtualMachines response for virtual machine: %s" %self.small_virtual_machine.id)
self.assertEqual(
list_vm_response[0].state,
"Running",
"Check server is in running state"
"Check virtual machine is in running state"
)
self.debug("Verify SSH Access for server: %s" %self.small_server.id)
self.debug("Verify SSH Access for virtual machine: %s" %self.small_virtual_machine.id)
try:
self.small_server.get_ssh_client()
self.small_virtual_machine.get_ssh_client()
except Exception as e:
self.fail("SSH Access failed for %s: %s" %(self.small_server.ipaddress, e))
self.fail("SSH Access failed for %s: %s" %(self.small_virtual_machine.ipaddress, e))
self.debug("Verify the database entry for server: %s" %self.small_server.id)
self.debug("select state from vm_instance where id = %s;" %self.small_server.id)
qresultset = self.dbclient.execute("select state from vm_instance where id = %s;" %self.small_server.id)
self.debug("Verify the database entry for virtual machine: %s" %self.small_virtual_machine.id)
self.debug("select state from vm_instance where id = %s;" %self.small_virtual_machine.id)
qresultset = self.dbclient.execute("select state from vm_instance where id = %s;" %self.small_virtual_machine.id)
self.assertNotEqual(
len(qresultset),
0,
@ -219,7 +219,7 @@ class TestVMLifeCycle(cloudstackTestCase):
self.assertEqual(
qresult[0],
'Running',
"Compare server state with database record"
"Compare virtual machine state with database record"
)
return
@ -227,11 +227,11 @@ class TestVMLifeCycle(cloudstackTestCase):
"""Test Reboot Virtual Machine
"""
cmd = rebootVirtualMachine.rebootVirtualMachineCmd()
cmd.id = self.small_server.id
cmd.id = self.small_virtual_machine.id
self.apiclient.rebootVirtualMachine(cmd)
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.id = self.small_server.id
cmd.id = self.small_virtual_machine.id
list_vm_response = self.apiclient.listVirtualMachines(cmd)
self.assertNotEqual(
@ -241,17 +241,17 @@ class TestVMLifeCycle(cloudstackTestCase):
)
self.debug("Verify SSH Access for server: %s" %self.small_server.id)
self.debug("Verify SSH Access for virtual machine: %s" %self.small_virtual_machine.id)
try:
self.small_server.get_ssh_client()
self.small_virtual_machine.get_ssh_client()
except Exception as e:
self.fail("SSH Access failed for %s: %s" %(self.small_server.ipaddress, e))
self.fail("SSH Access failed for %s: %s" %(self.small_virtual_machine.ipaddress, e))
self.assertEqual(
list_vm_response[0].state,
"Running",
"Check server is in running state"
"Check virtual machine is in running state"
)
return
@ -260,25 +260,25 @@ class TestVMLifeCycle(cloudstackTestCase):
"""
cmd = stopVirtualMachine.stopVirtualMachineCmd()
cmd.id = self.small_server.id
cmd.id = self.small_virtual_machine.id
self.apiclient.stopVirtualMachine(cmd)
#Sleep for 60 seconds to ensure the machine has stopped
time.sleep(60)
cmd = changeServiceForVirtualMachine.changeServiceForVirtualMachineCmd()
cmd.id = self.small_server.id
cmd.id = self.small_virtual_machine.id
cmd.serviceofferingid = services["service_offerings"]["medium"]["id"]
self.apiclient.changeServiceForVirtualMachine(cmd)
cmd = startVirtualMachine.startVirtualMachineCmd()
cmd.id = self.small_server.id
cmd.id = self.small_virtual_machine.id
self.apiclient.startVirtualMachine(cmd)
try:
ssh = self.small_server.get_ssh_client()
ssh = self.small_virtual_machine.get_ssh_client()
except Exception as e:
self.fail("SSH Access failed for %s: %s" %(self.small_server.ipaddress, e))
self.fail("SSH Access failed for %s: %s" %(self.small_virtual_machine.ipaddress, e))
cpuinfo = ssh.execute("cat /proc/cpuinfo")
@ -312,25 +312,25 @@ class TestVMLifeCycle(cloudstackTestCase):
"""Change Offering to a small capacity
"""
cmd = stopVirtualMachine.stopVirtualMachineCmd()
cmd.id = self.medium_server.id
cmd.id = self.medium_virtual_machine.id
self.apiclient.stopVirtualMachine(cmd)
#Sleep before the changes are reflected
time.sleep(60)
cmd = changeServiceForVirtualMachine.changeServiceForVirtualMachineCmd()
cmd.id = self.medium_server.id
cmd.id = self.medium_virtual_machine.id
cmd.serviceofferingid = services["service_offerings"]["small"]["id"]
self.apiclient.changeServiceForVirtualMachine(cmd)
cmd = startVirtualMachine.startVirtualMachineCmd()
cmd.id = self.medium_server.id
cmd.id = self.medium_virtual_machine.id
self.apiclient.startVirtualMachine(cmd)
try:
ssh = self.medium_server.get_ssh_client()
ssh = self.medium_virtual_machine.get_ssh_client()
except Exception as e:
self.fail("SSH Access failed for %s: %s" %(self.medium_server.ipaddress, e))
self.fail("SSH Access failed for %s: %s" %(self.medium_virtual_machine.ipaddress, e))
cpuinfo = ssh.execute("cat /proc/cpuinfo")
@ -365,20 +365,20 @@ class TestVMLifeCycle(cloudstackTestCase):
"""Test destroy Virtual Machine
"""
cmd = destroyVirtualMachine.destroyVirtualMachineCmd()
cmd.id = self.small_server.id
cmd.id = self.small_virtual_machine.id
self.apiclient.destroyVirtualMachine(cmd)
self.debug("Verify SSH Access for server: %s" %self.small_server.id)
self.debug("Verify SSH Access for virtual machine: %s" %self.small_virtual_machine.id)
with self.assertRaises(Exception):
remoteSSHClient.remoteSSHClient(
self.small_server.ipaddress,
self.small_server.ssh_port,
self.small_server.username,
self.small_server.password
self.small_virtual_machine.ipaddress,
self.small_virtual_machine.ssh_port,
self.small_virtual_machine.username,
self.small_virtual_machine.password
)
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.id = self.small_server.id
cmd.id = self.small_virtual_machine.id
list_vm_response = self.apiclient.listVirtualMachines(cmd)
self.assertNotEqual(
@ -390,13 +390,13 @@ class TestVMLifeCycle(cloudstackTestCase):
self.assertEqual(
list_vm_response[0].state,
"Destroyed",
"Check server is in destroyed state"
"Check virtual machine is in destroyed state"
)
self.debug("Verify the database entry for server: %s" %self.small_server.id)
self.debug("Verify the database entry for virtual machine: %s" %self.small_virtual_machine.id)
self.debug("select state from vm_instance where id = %s;" %self.small_server.id)
qresultset = self.dbclient.execute("select state from vm_instance where id = %s;" %self.small_server.id)
self.debug("select state from vm_instance where id = %s;" %self.small_virtual_machine.id)
qresultset = self.dbclient.execute("select state from vm_instance where id = %s;" %self.small_virtual_machine.id)
self.assertNotEqual(
len(qresultset),
0,
@ -409,7 +409,7 @@ class TestVMLifeCycle(cloudstackTestCase):
self.assertEqual(
qresult[0],
'Destroyed',
"Compare server state with database record"
"Compare virtual machine state with database record"
)
return
@ -418,15 +418,15 @@ class TestVMLifeCycle(cloudstackTestCase):
"""Test recover Virtual Machine
"""
cmd = recoverVirtualMachine.recoverVirtualMachineCmd()
cmd.id = self.small_server.id
cmd.id = self.small_virtual_machine.id
self.apiclient.recoverVirtualMachine(cmd)
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.id = self.small_server.id
cmd.id = self.small_virtual_machine.id
list_vm_response = self.apiclient.listVirtualMachines(cmd)
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.id = self.small_server.id
cmd.id = self.small_virtual_machine.id
list_vm_response = self.apiclient.listVirtualMachines(cmd)
self.assertNotEqual(
@ -438,12 +438,12 @@ class TestVMLifeCycle(cloudstackTestCase):
self.assertEqual(
list_vm_response[0].state,
"Stopped",
"Check server is in Stopped state"
"Check virtual machine is in Stopped state"
)
self.debug("Verify the database entry for server: %s" %self.small_server.id)
self.debug("select state from vm_instance where id = %s;" %self.small_server.id)
qresultset = self.dbclient.execute("select state from vm_instance where id = %s;" %self.small_server.id)
self.debug("Verify the database entry for virtual machine: %s" %self.small_virtual_machine.id)
self.debug("select state from vm_instance where id = %s;" %self.small_virtual_machine.id)
qresultset = self.dbclient.execute("select state from vm_instance where id = %s;" %self.small_virtual_machine.id)
self.assertNotEqual(
len(qresultset),
0,
@ -455,7 +455,7 @@ class TestVMLifeCycle(cloudstackTestCase):
self.assertEqual(
qresult[0],
'Stopped',
"Compare server state with database record"
"Compare virtual_machine state with database record"
)
return
@ -463,11 +463,11 @@ class TestVMLifeCycle(cloudstackTestCase):
"""Test destroy(expunge) Virtual Machine
"""
cmd = destroyVirtualMachine.destroyVirtualMachineCmd()
cmd.id = self.small_server.id
cmd.id = self.small_virtual_machine.id
self.apiclient.destroyVirtualMachine(cmd)
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.id = self.small_server.id
cmd.id = self.small_virtual_machine.id
timeout = 50
while True :
@ -483,12 +483,12 @@ class TestVMLifeCycle(cloudstackTestCase):
self.assertEqual(
list_vm_response,
None,
"Check Expunged server is listVirtualMachines"
"Check Expunged virtual machine is listVirtualMachines"
)
self.debug("Verify the database entry for server: %s" %self.small_server.id)
self.debug("select state from vm_instance where id = %s;" %self.small_server.id)
qresultset = self.dbclient.execute("select state from vm_instance where id = %s;" %self.small_server.id)
self.debug("Verify the database entry for virtual machine: %s" %self.small_virtual_machine.id)
self.debug("select state from vm_instance where id = %s;" %self.small_virtual_machine.id)
qresultset = self.dbclient.execute("select state from vm_instance where id = %s;" %self.small_virtual_machine.id)
self.assertNotEqual(
len(qresultset),
0,
@ -499,28 +499,28 @@ class TestVMLifeCycle(cloudstackTestCase):
self.assertEqual(
qresult[0],
"Expunging",
"Check server state in VM_INSTANCES table"
"Check virtual machine state in VM_INSTANCES table"
)
self.debug("select instance_id from nics where instance_id = %s;" %self.small_server.id)
qresultset = self.dbclient.execute("select instance_id from nics where instance_id = %s;" %self.small_server.id)
self.debug("select instance_id from nics where instance_id = %s;" %self.small_virtual_machine.id)
qresultset = self.dbclient.execute("select instance_id from nics where instance_id = %s;" %self.small_virtual_machine.id)
qresult = qresultset[0]
self.assertEqual(
len(qresult),
0,
"Check server entry in NICS table"
"Check virtual_machine entry in NICS table"
)
self.debug("select instance_id from volumes where instance_id = %s;" %self.small_server.id)
qresultset = self.dbclient.execute("select instance_id from volumes where instance_id = %s;" %self.small_server.id)
self.debug("select instance_id from volumes where instance_id = %s;" %self.small_virtual_machine.id)
qresultset = self.dbclient.execute("select instance_id from volumes where instance_id = %s;" %self.small_virtual_machine.id)
qresult = qresultset[0]
self.assertEqual(
len(qresult),
0,
"Check server entry in VOLUMES table"
"Check virtual machine entry in VOLUMES table"
)
return

View File

@ -9,7 +9,7 @@ from cloudstackTestCase import *
from cloudstackAPI import *
from settings import *
from utils import fetch_api_client, format_volume_to_ext3
from base import Server, Volume
from base import VirtualMachine, Volume
#Import System modules
import os
import urllib2
@ -20,38 +20,74 @@ services = TEST_VOLUME_SERVICES
class TestCreateVolume(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.virtual_machine = VirtualMachine.create(cls.api_client, services)
def setUp(self):
self.apiClient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
def test_01_create_volume(self):
"""Test Volume creation
"""Test Volume creation for all Disk Offerings (incl. custom)
"""
self.volume = Volume.create(self.apiClient, services)
cmd = listVolumes.listVolumesCmd()
cmd.id = self.volume.id
list_volume_response = self.apiClient.listVolumes(cmd)
self.volumes = []
for k,v in services["volume_offerings"].items():
self.volumes.append(Volume.create(self.apiClient, v))
self.volumes.append(Volume.create_custom_disk(self.apiClient, services))
#Attach a volume with different disk offerings and check the memory allocated to each of them
for volume in self.volumes:
cmd = listVolumes.listVolumesCmd()
cmd.id = volume.id
list_volume_response = self.apiClient.listVolumes(cmd)
self.assertNotEqual(list_volume_response, None, "Check if volume exists in ListVolumes")
qresultset = self.dbclient.execute("select id from volumes where id = %s" %volume.id)
self.assertNotEqual(len(qresultset), 0, "Check if volume exists in Database")
attached_volume = self.virtual_machine.attach_volume(self.apiClient,volume)
ssh = self.virtual_machine.get_ssh_client()
#ssh.execute("shutdown -r now")
#Sleep to ensure the machine is rebooted properly
time.sleep(120)
ssh = self.virtual_machine.get_ssh_client(True)
res = ssh.execute("fdisk -l|grep /dev/sda|head -1")
#Disk /dev/sda: 21.5 GB, 21474836480 bytes
actual_disk_size = res[0].split()[4]
self.assertEqual(str(list_volume_response[0].size), actual_disk_size, "Check if promised disk size actually available")
self.virtual_machine.detach_volume(self.apiClient,volume)
self.assertNotEqual(list_volume_response, None, "Check if volume exists in ListVolumes")
qresultset = self.dbclient.execute("select id from volumes where id = %s" %self.volume.id)
self.assertNotEqual(len(qresultset), 0, "Check if volume exists in Database")
def tearDown(self):
self.volume.delete(self.apiClient)
for volume in self.volumes:
volume.delete(self.apiClient)
@classmethod
def tearDownClass(cls):
try:
cls.virtual_machine.delete(cls.api_client)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" %e)
class TestVolumes(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.server = Server.create(cls.api_client, services)
cls.virtual_machine = VirtualMachine.create(cls.api_client, services)
cls.volume = Volume.create(cls.api_client, services)
@classmethod
def tearDownClass(cls):
try:
cls.server.delete(cls.api_client)
cls.virtual_machine.delete(cls.api_client)
cls.volume.delete(cls.api_client)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" %e)
@ -63,10 +99,7 @@ class TestVolumes(cloudstackTestCase):
def test_02_attach_volume(self):
"""Attach a created Volume to a Running VM
"""
cmd = attachVolume.attachVolumeCmd()
cmd.id = self.volume.id
cmd.virtualmachineid = self.server.id
self.apiClient.attachVolume(cmd)
self.virtual_machine.attach_volume(self.volume)
#Sleep to ensure the current state will reflected in other calls
time.sleep(60)
@ -82,11 +115,11 @@ class TestVolumes(cloudstackTestCase):
self.assertNotEqual(len(qresultset), 0, "Check if volume exists in Database")
qresult = qresultset[0]
self.assertEqual(qresult[0], self.server.id, "Check if volume is assc. with server in Database")
self.assertEqual(qresult[0], self.virtual_machine.id, "Check if volume is assc. with virtual machine in Database")
#self.assertEqual(qresult[1], 0, "Check if device is valid in the database")
#Format the attached volume to a known fs
format_volume_to_ext3(self.server.get_ssh_client())
format_volume_to_ext3(self.virtual_machine.get_ssh_client())
def test_03_download_attached_volume(self):
"""Download a Volume attached to a VM
@ -114,10 +147,7 @@ class TestVolumes(cloudstackTestCase):
def test_05_detach_volume(self):
"""Detach a Volume attached to a VM
"""
cmd = detachVolume.detachVolumeCmd()
cmd.id = self.volume.id
self.apiClient.detachVolume(cmd)
self.virtual_machine.detach_volume(self.virtual_machine)
#Sleep to ensure the current state will reflected in other calls
time.sleep(60)
cmd = listVolumes.listVolumesCmd()
@ -132,7 +162,7 @@ class TestVolumes(cloudstackTestCase):
self.assertNotEqual(len(qresultset), 0, "Check if volume exists in Database")
qresult = qresultset[0]
self.assertEqual(qresult[0], None, "Check if volume is unassc. with server in Database")
self.assertEqual(qresult[0], None, "Check if volume is unassc. with virtual machine in Database")
self.assertEqual(qresult[1], None, "Check if no device is valid in the database")

View File

@ -10,6 +10,7 @@ import time
import remoteSSHClient
from cloudstackAPI import *
import cloudstackConnection
#from cloudstackConnection import cloudConnection
import configGenerator
import logging
import string