Created BVT Tests for VM Life Cycle and Volumes

1. README - Overview of the BVT
2. base.py - Resource classes
3. settings.py - Test data
4. test_vm_life_cycle.py - VM Life Cycle Tests
5. test_volumes.py - Volumes Tests
6. utils.py - Utility functions
This commit is contained in:
Chirag Jog 2011-12-24 05:24:25 -08:00
parent 02241facd4
commit f1e9f7f1cf
7 changed files with 983 additions and 0 deletions

View File

@ -0,0 +1,29 @@
Build Verification Testing (BVT) Cases
--------------------------------------
These test cases are the core functionality tests that ensure the application is stable and can be tested thoroughly.
These BVT cases definitions are located at : https://docs.google.com/a/cloud.com/spreadsheet/ccc?key=0Ak8acbfxQG8ndEppOGZSLV9mUF9idjVkTkZkajhTZkE&invite=CPij0K0L
Guidelines
----------
BVT test cases are being developed using Python's unittests2. Following are certain guidelines being followed
1. Tests exercised for the same resource should ideally be present under a single suite or file.
2. Time-consuming operations that create new cloud resources like server creation, volume creation etc
should not necessarily be exercised per unit test. The resources can be shared by creating them at
the class-level using setUpClass and shared across all instances during a single run.
3. Certain tests pertaining to NAT, Firewall and Load Balancing warrant fresh resources per test. Hence a call should be
taken by the stakeholders regarding sharing resources.
4. Ensure that the tearDown/tearDownClass functions clean up all the resources created during the test run.
For more information about unittests: http://docs.python.org/library/unittest.html
BVT Tests
----------
The following files contain these BVT cases:
1. test_vm_life_cycle.py contains all the VM Life Cycle tests
2. test_volumes.py contains all the Volumes related tests

View File

@ -0,0 +1,93 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2011 Citrix. All rights reserved.
#
""" Base class for all Cloudstack resources - Server, Volume, Snapshot etc
"""
from utils import is_server_ssh_ready, random_gen
from cloudstackAPI import *
class Server:
"""Manage server lifecycle
"""
def __init__(self, items, services):
self.__dict__.update(items)
self.username = services["username"]
self.password = services["password"]
self.ssh_port = services["ssh_port"]
self.ssh_client = None
#extract out the ipaddress
self.ipaddress = self.nic[0].ipaddress
@classmethod
def create(cls, apiclient, services):
cmd = deployVirtualMachine.deployVirtualMachineCmd()
cmd.serviceofferingid = services["serviceoffering"]
cmd.templateid = services["template"]
cmd.zoneid = services["zoneid"]
if "diskoffering" in services:
cmd.diskofferingid = services["diskoffering"]
return Server(apiclient.deployVirtualMachine(cmd).__dict__, services)
def get_ssh_client(self):
self.ssh_client = self.ssh_client or is_server_ssh_ready(
self.ipaddress,
self.ssh_port,
self.username,
self.password
)
return self.ssh_client
def delete(self, apiclient):
cmd = destroyVirtualMachine.destroyVirtualMachineCmd()
cmd.id = self.id
apiclient.destroyVirtualMachine(cmd)
class Volume:
"""Manage Volume Lifecycle
"""
def __init__(self, items):
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, services):
cmd = createVolume.createVolumeCmd()
cmd.name = services["diskname"]
cmd.diskofferingid = services["volumeoffering"]
cmd.zoneid = services["zoneid"]
return Volume(apiclient.createVolume(cmd).__dict__)
@classmethod
def create_from_snapshot(cls, apiclient, snapshot_id, services):
cmd = createVolume.createVolumeCmd()
cmd.name = "-".join([services["diskname"], random_gen()])
cmd.snapshotid = snapshot_id
cmd.zoneid = services["zoneid"]
return Volume(apiclient.createVolume(cmd).__dict__)
def delete(self, apiclient):
cmd = deleteVolume.deleteVolumeCmd()
cmd.id = self.id
apiclient.deleteVolume(cmd)
class Snapshot:
"""Manage Snapshot Lifecycle
"""
def __init__(self, items):
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, volume_id):
cmd = createSnapshot.createSnapshotCmd()
cmd.volumeid = volume_id
return Snapshot(apiclient.createSnapshot(cmd).__dict__)
def delete(self, apiclient):
cmd = deleteSnapshot.deleteSnapshotCmd()
cmd.id = self.id
apiclient.deleteSnapshot(cmd)

View File

@ -0,0 +1,85 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2011 Citrix. All rights reserved.
#
"""Test Information Services
"""
TEST_VM_LIFE_CYCLE_SERVICES = {
"small" :
{
"template" : 7,
"zoneid" : 2,
"serviceoffering" : 1,
"diskoffering" : 3,
"displayname" : "testserver",
"username" : "root",
"password" : "password",
"ssh_port" : 22,
},
"medium" :
{
"template" : 7,
"zoneid" : 2,
"serviceoffering" : 2,
"displayname" : "testserver",
"username" : "root",
"password" : "password",
"ssh_port" : 22,
},
"service_offerings" :
{ "small" :
{
"id": 1,
"cpunumber" : 1,
"cpuspeed" : 500,
"memory" : 524288
},
"medium" :
{
"id" : 2,
"cpunumber" : 1,
"cpuspeed" : 1000,
"memory" : 1048576
}
}
}
TEST_SNAPSHOT_SERVICES = {
"diskdevice" : "/dev/sda",
"offerings" : 1,
"template" : 7,
"zoneid" : 2,
"diskoffering" : 3,
"username" : "root",
"password" : "password",
"ssh_port" : 22,
"diskname" : "TestDiskServ",
"server2": {
"offerings" : 1,
"template" : 7,
"zoneid" : 2,
"username" : "root",
"password" : "password",
"ssh_port" : 22,
}
}
TEST_VOLUME_SERVICES = {
"offerings" : 1,
"volumeoffering" : 3,
"diskname" : "TestDiskServ",
"template" : 7,
"zoneid" : 2,
"username" : "root",
"password" : "password",
"ssh_port" : 22,
}

View File

@ -0,0 +1,526 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2011 Citrix. All rights reserved.
#
""" BVT tests for Virtual Machine Life Cycle
"""
#Import Local Modules
from cloudstackTestCase import *
from cloudstackAPI import *
from settings import *
import remoteSSHClient
from utils import *
from base import *
#Import System modules
import time
services = TEST_VM_LIFE_CYCLE_SERVICES
class TestDeployVM(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
def test_deploy_vm(self):
"""Test Deploy Virtual Machine
"""
self.server = Server.create(self.apiclient, services["small"])
#Validate the following:
# 1. Server is accessible via SSH
# 2. listVirtualMachines returns accurate information
# 3. The Cloud Database contains the valid information
self.debug("Verify SSH Access for server: %s" %self.server.id)
try:
self.server.get_ssh_client()
except Exception as e:
self.fail("SSH Access failed for %s: %s" %(self.server.ipaddress, e))
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.id = self.server.id
list_vm_response = self.apiclient.listVirtualMachines(cmd)
self.debug("Verify listVirtualMachines response for server: %s" %self.server.id)
self.assertNotEqual(
len(list_vm_response),
0,
"Check VM avaliable in List Virtual Machines"
)
self.assertEqual(
list_vm_response[0].id,
self.server.id,
"Check server id in listVirtualMachines"
)
self.assertEqual(
list_vm_response[0].displayname,
self.server.displayname,
"Check server displayname in listVirtualMachines"
)
self.debug("Verify the database entry for server: %s" %self.server.id)
self.debug("select id, state, private_ip_address from vm_instance where id = %s;" %self.server.id)
qresultset = self.dbclient.execute("select id, state, private_ip_address from vm_instance where id = %s;" %self.server.id)
self.assertNotEqual(
len(qresultset),
0,
"Check DB Query result set"
)
qresult = qresultset[0]
self.assertEqual(
qresult[0],
self.server.id,
"Compare server id with database record"
)
self.assertEqual(
qresult[1],
'Running',
"Check server state in the database"
)
self.assertEqual(
qresult[2],
self.server.ipaddress,
"Check IP Address in the database"
)
return
def tearDown(self):
try:
self.server.delete(self.apiclient)
except Exception as e:
self.debug("Warning! Exception in tearDown: %s" %e)
class TestVMLifeCycle(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
#create small and large servers
cls.small_server = Server.create(
cls.api_client,
TEST_VM_LIFE_CYCLE_SERVICES["small"]
)
cls.medium_server = Server.create(
cls.api_client,
TEST_VM_LIFE_CYCLE_SERVICES["medium"]
)
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
def test_01_stop_vm(self):
"""Test Stop Virtual Machine
"""
cmd = stopVirtualMachine.stopVirtualMachineCmd()
cmd.id = self.small_server.id
self.api_client.stopVirtualMachine(cmd)
#Wait before server has be successfully stopped
self.debug("Verify SSH Access for server: %s,%s" %(self.small_server.id, self.small_server.ipaddress))
time.sleep(30)
with self.assertRaises(Exception):
remoteSSHClient.remoteSSHClient(
self.small_server.ipaddress,
self.small_server.ssh_port,
self.small_server.username,
self.small_server.password
)
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.id = self.small_server.id
list_vm_response = self.api_client.listVirtualMachines(cmd)
self.assertNotEqual(
len(list_vm_response),
0,
"Check VM avaliable in List Virtual Machines"
)
self.assertEqual(
list_vm_response[0].state,
"Stopped",
"Check server is in stopped state"
)
self.debug("Verify the database entry for server: %s" %self.small_server.id)
self.debug("select state from vm_instance where id = %s;" %self.small_server.id)
qresultset = self.dbclient.execute("select state from vm_instance where id = %s;" %self.small_server.id)
self.assertNotEqual(
len(qresultset),
0,
"Check DB Query result set"
)
qresult = qresultset[0]
self.assertEqual(
qresult[0],
"Stopped",
"Compare server state with database record"
)
return
def test_02_start_vm(self):
"""Test Start Virtual Machine
"""
cmd = startVirtualMachine.startVirtualMachineCmd()
cmd.id = self.small_server.id
self.apiclient.startVirtualMachine(cmd)
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.id = self.small_server.id
list_vm_response = self.apiclient.listVirtualMachines(cmd)
self.assertNotEqual(
len(list_vm_response),
0,
"Check VM avaliable in List Virtual Machines"
)
self.debug("Verify listVirtualMachines response for server: %s" %self.small_server.id)
self.assertEqual(
list_vm_response[0].state,
"Running",
"Check server is in running state"
)
self.debug("Verify SSH Access for server: %s" %self.small_server.id)
try:
self.small_server.get_ssh_client()
except Exception as e:
self.fail("SSH Access failed for %s: %s" %(self.small_server.ipaddress, e))
self.debug("Verify the database entry for server: %s" %self.small_server.id)
self.debug("select state from vm_instance where id = %s;" %self.small_server.id)
qresultset = self.dbclient.execute("select state from vm_instance where id = %s;" %self.small_server.id)
self.assertNotEqual(
len(qresultset),
0,
"Check DB Query result set"
)
qresult = qresultset[0]
self.assertEqual(
qresult[0],
'Running',
"Compare server state with database record"
)
return
def test_03_reboot_vm(self):
"""Test Reboot Virtual Machine
"""
cmd = rebootVirtualMachine.rebootVirtualMachineCmd()
cmd.id = self.small_server.id
self.apiclient.rebootVirtualMachine(cmd)
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.id = self.small_server.id
list_vm_response = self.apiclient.listVirtualMachines(cmd)
self.assertNotEqual(
len(list_vm_response),
0,
"Check VM avaliable in List Virtual Machines"
)
self.debug("Verify SSH Access for server: %s" %self.small_server.id)
try:
self.small_server.get_ssh_client()
except Exception as e:
self.fail("SSH Access failed for %s: %s" %(self.small_server.ipaddress, e))
self.assertEqual(
list_vm_response[0].state,
"Running",
"Check server is in running state"
)
return
def test_04_change_offering_medium(self):
"""Change Offering to a medium capacity
"""
cmd = stopVirtualMachine.stopVirtualMachineCmd()
cmd.id = self.small_server.id
self.apiclient.stopVirtualMachine(cmd)
#Sleep for 60 seconds to ensure the machine has stopped
time.sleep(60)
cmd = changeServiceForVirtualMachine.changeServiceForVirtualMachineCmd()
cmd.id = self.small_server.id
cmd.serviceofferingid = services["service_offerings"]["medium"]["id"]
self.apiclient.changeServiceForVirtualMachine(cmd)
cmd = startVirtualMachine.startVirtualMachineCmd()
cmd.id = self.small_server.id
self.apiclient.startVirtualMachine(cmd)
try:
ssh = self.small_server.get_ssh_client()
except Exception as e:
self.fail("SSH Access failed for %s: %s" %(self.small_server.ipaddress, e))
cpuinfo = ssh.execute("cat /proc/cpuinfo")
cpu_cnt = len([i for i in cpuinfo if "processor" in i])
#'cpu MHz\t\t: 2660.499'
cpu_speed = [i for i in cpuinfo if "cpu MHz" in i][0].split()[3]
meminfo = ssh.execute("cat /proc/meminfo")
#MemTotal: 1017464 kB
total_mem = [i for i in meminfo if "MemTotal" in i][0].split()[1]
self.assertEqual(
cpu_cnt,
services["service_offerings"]["medium"]["cpunumber"],
"Check CPU Count for medium offering"
)
self.assertEqual(
cpu_speed,
services["service_offerings"]["medium"]["cpuspeed"],
"Check CPU Speed for medium offering"
)
self.assertEqual(
total_mem,
services["service_offerings"]["medium"]["memory"],
"Check Memory(kb) for medium offering"
)
def test_05_change_offering_small(self):
"""Change Offering to a small capacity
"""
cmd = stopVirtualMachine.stopVirtualMachineCmd()
cmd.id = self.medium_server.id
self.apiclient.stopVirtualMachine(cmd)
#Sleep before the changes are reflected
time.sleep(60)
cmd = changeServiceForVirtualMachine.changeServiceForVirtualMachineCmd()
cmd.id = self.medium_server.id
cmd.serviceofferingid = services["service_offerings"]["small"]["id"]
self.apiclient.changeServiceForVirtualMachine(cmd)
cmd = startVirtualMachine.startVirtualMachineCmd()
cmd.id = self.medium_server.id
self.apiclient.startVirtualMachine(cmd)
try:
ssh = self.medium_server.get_ssh_client()
except Exception as e:
self.fail("SSH Access failed for %s: %s" %(self.medium_server.ipaddress, e))
cpuinfo = ssh.execute("cat /proc/cpuinfo")
cpu_cnt = len([i for i in cpuinfo if "processor" in i])
#'cpu MHz\t\t: 2660.499'
cpu_speed = [i for i in cpuinfo if "cpu MHz" in i ][0].split()[3]
meminfo = ssh.execute("cat /proc/meminfo")
#MemTotal: 1017464 kB
total_mem = [i for i in meminfo if "MemTotal" in i][0].split()[1]
self.assertEqual(
cpu_cnt,
services["service_offerings"]["small"]["cpunumber"],
"Check CPU Count for small offering"
)
self.assertEqual(
cpu_speed,
services["service_offerings"]["small"]["cpuspeed"],
"Check CPU Speed for small offering"
)
self.assertEqual(
total_mem,
services["service_offerings"]["small"]["memory"],
"Check Memory(kb) for small offering"
)
def test_06_destroy_vm(self):
"""Test destroy Virtual Machine
"""
cmd = destroyVirtualMachine.destroyVirtualMachineCmd()
cmd.id = self.small_server.id
self.apiclient.destroyVirtualMachine(cmd)
self.debug("Verify SSH Access for server: %s" %self.small_server.id)
with self.assertRaises(Exception):
remoteSSHClient.remoteSSHClient(
self.small_server.ipaddress,
self.small_server.ssh_port,
self.small_server.username,
self.small_server.password
)
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.id = self.small_server.id
list_vm_response = self.apiclient.listVirtualMachines(cmd)
self.assertNotEqual(
len(list_vm_response),
0,
"Check VM avaliable in List Virtual Machines"
)
self.assertEqual(
list_vm_response[0].state,
"Destroyed",
"Check server is in destroyed state"
)
self.debug("Verify the database entry for server: %s" %self.small_server.id)
self.debug("select state from vm_instance where id = %s;" %self.small_server.id)
qresultset = self.dbclient.execute("select state from vm_instance where id = %s;" %self.small_server.id)
self.assertNotEqual(
len(qresultset),
0,
"Check DB Query result set"
)
qresult = qresultset[0]
self.assertEqual(
qresult[0],
'Destroyed',
"Compare server state with database record"
)
return
def test_07_restore_vm(self):
"""Test recover Virtual Machine
"""
cmd = recoverVirtualMachine.recoverVirtualMachineCmd()
cmd.id = self.small_server.id
self.apiclient.recoverVirtualMachine(cmd)
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.id = self.small_server.id
list_vm_response = self.apiclient.listVirtualMachines(cmd)
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.id = self.small_server.id
list_vm_response = self.apiclient.listVirtualMachines(cmd)
self.assertNotEqual(
len(list_vm_response),
0,
"Check VM avaliable in List Virtual Machines"
)
self.assertEqual(
list_vm_response[0].state,
"Stopped",
"Check server is in Stopped state"
)
self.debug("Verify the database entry for server: %s" %self.small_server.id)
self.debug("select state from vm_instance where id = %s;" %self.small_server.id)
qresultset = self.dbclient.execute("select state from vm_instance where id = %s;" %self.small_server.id)
self.assertNotEqual(
len(qresultset),
0,
"Check DB Query result set"
)
qresult = qresultset[0]
self.assertEqual(
qresult[0],
'Stopped',
"Compare server state with database record"
)
return
def test_08_expunge_vm(self):
"""Test destroy(expunge) Virtual Machine
"""
cmd = destroyVirtualMachine.destroyVirtualMachineCmd()
cmd.id = self.small_server.id
self.apiclient.destroyVirtualMachine(cmd)
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.id = self.small_server.id
timeout = 50
while True :
list_vm_response = self.apiclient.listVirtualMachines(cmd)
if not list_vm_response:
break;
else:
if timeout == 0:
break
time.sleep(100)
timeout = timeout - 1
self.assertEqual(
list_vm_response,
None,
"Check Expunged server is listVirtualMachines"
)
self.debug("Verify the database entry for server: %s" %self.small_server.id)
self.debug("select state from vm_instance where id = %s;" %self.small_server.id)
qresultset = self.dbclient.execute("select state from vm_instance where id = %s;" %self.small_server.id)
self.assertNotEqual(
len(qresultset),
0,
"Check DB Query result set"
)
qresult = qresultset[0]
self.assertEqual(
qresult[0],
"Expunging",
"Check server state in VM_INSTANCES table"
)
self.debug("select instance_id from nics where instance_id = %s;" %self.small_server.id)
qresultset = self.dbclient.execute("select instance_id from nics where instance_id = %s;" %self.small_server.id)
qresult = qresultset[0]
self.assertEqual(
len(qresult),
0,
"Check server entry in NICS table"
)
self.debug("select instance_id from volumes where instance_id = %s;" %self.small_server.id)
qresultset = self.dbclient.execute("select instance_id from volumes where instance_id = %s;" %self.small_server.id)
qresult = qresultset[0]
self.assertEqual(
len(qresult),
0,
"Check server entry in VOLUMES table"
)
return

View File

@ -0,0 +1,176 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2011 Citrix. All rights reserved.
#
""" BVT tests for Volumes
"""
#Import Local Modules
from cloudstackTestCase import *
from cloudstackAPI import *
from settings import *
from utils import fetch_api_client, format_volume_to_ext3
from base import Server, Volume
#Import System modules
import os
import urllib2
import time
import tempfile
services = TEST_VOLUME_SERVICES
class TestCreateVolume(cloudstackTestCase):
def setUp(self):
self.apiClient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
def test_01_create_volume(self):
"""Test Volume creation
"""
self.volume = Volume.create(self.apiClient, services)
cmd = listVolumes.listVolumesCmd()
cmd.id = self.volume.id
list_volume_response = self.apiClient.listVolumes(cmd)
self.assertNotEqual(list_volume_response, None, "Check if volume exists in ListVolumes")
qresultset = self.dbclient.execute("select id from volumes where id = %s" %self.volume.id)
self.assertNotEqual(len(qresultset), 0, "Check if volume exists in Database")
def tearDown(self):
self.volume.delete(self.apiClient)
class TestVolumes(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.server = Server.create(cls.api_client, services)
cls.volume = Volume.create(cls.api_client, services)
@classmethod
def tearDownClass(cls):
try:
cls.server.delete(cls.api_client)
cls.volume.delete(cls.api_client)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" %e)
def setUp(self):
self.apiClient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
def test_02_attach_volume(self):
"""Attach a created Volume to a Running VM
"""
cmd = attachVolume.attachVolumeCmd()
cmd.id = self.volume.id
cmd.virtualmachineid = self.server.id
self.apiClient.attachVolume(cmd)
#Sleep to ensure the current state will reflected in other calls
time.sleep(60)
cmd = listVolumes.listVolumesCmd()
cmd.id = self.volume.id
list_volume_response = self.apiClient.listVolumes(cmd)
self.assertNotEqual(list_volume_response, None, "Check if volume exists in ListVolumes")
volume = list_volume_response[0]
self.assertNotEqual(volume.virtualmachineid, None, "Check if volume state (attached) is reflected")
qresultset = self.dbclient.execute("select instance_id, device_id from volumes where id = %s" %self.volume.id)
self.assertNotEqual(len(qresultset), 0, "Check if volume exists in Database")
qresult = qresultset[0]
self.assertEqual(qresult[0], self.server.id, "Check if volume is assc. with server in Database")
#self.assertEqual(qresult[1], 0, "Check if device is valid in the database")
#Format the attached volume to a known fs
format_volume_to_ext3(self.server.get_ssh_client())
def test_03_download_attached_volume(self):
"""Download a Volume attached to a VM
"""
cmd = extractVolume.extractVolumeCmd()
cmd.id = self.volume.id
cmd.mode = "HTTP_DOWNLOAD"
cmd.zoneid = services["zoneid"]
#A proper exception should be raised; downloading attach VM is not allowed
with self.assertRaises(Exception):
self.apiClient.deleteVolume(cmd)
def test_04_delete_attached_volume(self):
"""Delete a Volume attached to a VM
"""
cmd = deleteVolume.deleteVolumeCmd()
cmd.id = self.volume.id
#A proper exception should be raised; deleting attach VM is not allowed
with self.assertRaises(Exception):
self.apiClient.deleteVolume(cmd)
def test_05_detach_volume(self):
"""Detach a Volume attached to a VM
"""
cmd = detachVolume.detachVolumeCmd()
cmd.id = self.volume.id
self.apiClient.detachVolume(cmd)
#Sleep to ensure the current state will reflected in other calls
time.sleep(60)
cmd = listVolumes.listVolumesCmd()
cmd.id = self.volume.id
list_volume_response = self.apiClient.listVolumes(cmd)
self.assertNotEqual(list_volume_response, None, "Check if volume exists in ListVolumes")
volume = list_volume_response[0]
self.assertEqual(volume.virtualmachineid, None, "Check if volume state (detached) is reflected")
qresultset = self.dbclient.execute("select instance_id, device_id from volumes where id = %s" %self.volume.id)
self.assertNotEqual(len(qresultset), 0, "Check if volume exists in Database")
qresult = qresultset[0]
self.assertEqual(qresult[0], None, "Check if volume is unassc. with server in Database")
self.assertEqual(qresult[1], None, "Check if no device is valid in the database")
def test_06_download_detached_volume(self):
"""Download a Volume unattached to an VM
"""
cmd = extractVolume.extractVolumeCmd()
cmd.id = self.volume.id
cmd.mode = "HTTP_DOWNLOAD"
cmd.zoneid = services["zoneid"]
extract_vol = self.apiClient.extractVolume(cmd)
#Attempt to download the volume and save contents locally
try:
response = urllib2.urlopen(urllib2.unquote(extract_vol.url))
fd, path = tempfile.mkstemp()
os.close(fd)
fd = open(path, 'wb')
fd.write(response.read())
fd.close()
except Exception as e:
print e
self.fail("Extract Volume Failed with invalid URL %s (vol id: %s)" %(extract_vol.url, self.volume.id))
def test_07_delete_detached_volume(self):
"""Delete a Volume unattached to an VM
"""
cmd = deleteVolume.deleteVolumeCmd()
cmd.id = self.volume.id
self.apiClient.deleteVolume(cmd)
time.sleep(60)
cmd = listVolumes.listVolumesCmd()
cmd.id = self.volume.id
cmd.type = 'DATADISK'
list_volume_response = self.apiClient.listVolumes(cmd)
self.assertEqual(list_volume_response, None, "Check if volume exists in ListVolumes")

View File

@ -0,0 +1,74 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2011 Citrix. All rights reserved.
#
"""Utilities functions
"""
import time
import remoteSSHClient
from cloudstackAPI import *
import cloudstackConnection
#from cloudstackConnection import cloudConnection
import configGenerator
import logging
import string
import random
def random_gen(size=6, chars=string.ascii_uppercase + string.digits):
"""Generate Random Strings of variable length"""
return ''.join(random.choice(chars) for x in range(size))
def cleanup_resources(api_client, resources):
"""Delete resources"""
for k,v in resources.items():
for obj in v:
obj.delete(api_client)
def is_server_ssh_ready(ipaddress, port, username, password, retries=50):
"""Return ssh handle else wait till sshd is running"""
loop_cnt = retries
while True:
try:
ssh = remoteSSHClient.remoteSSHClient(
ipaddress,
port,
username,
password
)
except Exception as e:
if loop_cnt == 0:
raise e
loop_cnt = loop_cnt - 1
time.sleep(60)
else:
return ssh
def format_volume_to_ext3(ssh_client, device="/dev/sda" ):
"""Format attached storage to ext3 fs"""
cmds = [
"echo -e 'n\np\n1\n\n\nw' | fdisk %s" %device,
"mkfs.ext3 %s1" %device,
]
for c in cmds:
print ssh_client.execute(c)
def fetch_api_client(config_file='datacenterCfg'):
"""Fetch the Cloudstack API Client"""
config = configGenerator.get_setup_config(config_file)
mgt = config.mgtSvr[0]
testClientLogger = logging.getLogger("testClient")
asyncTimeout=3600
return cloudstackAPIClient.CloudStackAPIClient(
cloudstackConnection.cloudConnection(
mgt.mgtSvrIp,
mgt.port,
mgt.apiKey,
mgt.securityKey,
asyncTimeout,
testClientLogger
)
)