Running marvin tests from the simulator using ant target

ant run-simulator - will start the simulator enviornment
ant run-simulator-tests - will fire simulator tests on above dev setup
This commit is contained in:
Prasanna Santhanam 2012-06-22 20:25:59 +05:30
parent 5e16130d98
commit 52f4c73b62
15 changed files with 5569 additions and 0 deletions

View File

@ -42,4 +42,16 @@ file except in compliance with the License. Citrix Systems, Inc. -->
</exec>
<echo message="distributable tarball at: ${marvin.dist.dir}/Marvin-*.tar.gz"/>
</target>
<target name="install-marvin" depends="package-marvin" description="installs marvin on the local machine">
<echo message="Uninstalling Marvin" />
<exec dir="${marvin.dist.dir}" executable="pip">
<arg line="uninstall -y marvin"/>
</exec>
<echo message="Installing Marvin" />
<exec dir="${marvin.dist.dir}" executable="pip">
<arg value="install" />
<arg value="Marvin-0.1.0.tar.gz" />
</exec>
</target>
</project>

View File

@ -33,6 +33,9 @@
<property name="agent-simulator.jar" value="cloud-agent-simulator.jar" />
<property name="testclient.jar" value="cloud-test.jar" />
<property name="simulator.setup.dir" value="${base.dir}/setup/db/" />
<property name="simulator.tests.dir" value="${base.dir}/test/" />
<!-- =================== Agent Simulator ==================== -->
<path id="agent-simulator.classpath">
@ -88,6 +91,28 @@
</target>
<!-- =================== Agent Simulator ==================== -->
<!-- Run Simulator Tests -->
<target name="setup-simulator-tests" depends="clean-all, install-marvin">
<mkdir dir="${simulator.setup.dir}/override">
</mkdir>
<copy overwrite="true" file="${simulator.setup.dir}/templates.simulator.sql" tofile="${simulator.setup.dir}/override/templates.sql" />
<mkdir dir="${build.dir}/override">
</mkdir>
<copy overwrite="true" file="${build.dir}/replace.properties" tofile="${build.dir}/override/replace.properties" />
<!-- Replace the COMPONENT-SPEC for components-simulator.xml -->
<copy overwrite="true" file="${simulator.tests.dir}/integration/smoke-simulator/simulator.properties" tofile="${build.dir}/override/replace.properties" />
</target>
<target name="run-simulator" depends="setup-simulator-tests, build-all-with-simulator, deploy-server, deploydb-simulator, debug">
</target>
<target name="run-simulator-tests">
<exec dir="test/" executable="bash">
<arg line="setup.sh -t integration/smoke-simulator/ -d localhost -m localhost" />
</exec>
</target>
<!-- -->
<!-- =================== QA Testing Client ==================== -->
<target name="-init-test" depends="-init">

View File

@ -0,0 +1,41 @@
Build Verification Testing (BVT) Cases
--------------------------------------
These test cases are the core functionality tests that ensure the application is stable and can be tested thoroughly.
These BVT cases definitions are located at : https://docs.google.com/a/cloud.com/spreadsheet/ccc?key=0Ak8acbfxQG8ndEppOGZSLV9mUF9idjVkTkZkajhTZkE&invite=CPij0K0L
Guidelines
----------
BVT test cases are being developed using Python's unittests2. Following are certain guidelines being followed
1. Tests exercised for the same resource should ideally be present under a single suite or file.
2. Time-consuming operations that create new cloud resources like server creation, volume creation etc
should not necessarily be exercised per unit test. The resources can be shared by creating them at
the class-level using setUpClass and shared across all instances during a single run.
3. Certain tests pertaining to NAT, Firewall and Load Balancing warrant fresh resources per test. Hence a call should be
taken by the stakeholders regarding sharing resources.
4. Ensure that the tearDown/tearDownClass functions clean up all the resources created during the test run.
For more information about unittests: http://docs.python.org/library/unittest.html
BVT Tests
----------
The following files contain these BVT cases:
1. test_vm_life_cycle.py - VM Life Cycle tests
2. test_volumes.py - Volumes related tests
3. test_snapshots.py - Snapshots related tests
4. test_disk_offerings.py - Disk Offerings related tests
5. test_service_offerings.py - Service Offerings related tests
6. test_hosts.py - Hosts and Clusters related tests
7. test_iso.py - ISO related tests
8. test_network.py - Network related tests
9. test_primary_storage.py - Primary storage related tests
10. test_secondary_storage.py - Secondary storage related tests
11. test_ssvm.py - SSVM & CPVM related tests
12. test_templates.py - Templates related tests
13. test_routers.py - Router related tests

View File

@ -0,0 +1,13 @@
# -*- encoding: utf-8 -*-
# Copyright 2012 Citrix Systems, Inc. Licensed under the
# Apache License, Version 2.0 (the "License"); you may not use this
# file except in compliance with the License. Citrix Systems, Inc.
# reserves all rights not expressly granted by the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Automatically generated by addcopyright.py at 04/03/2012

View File

@ -0,0 +1,10 @@
DBUSER=cloud
DBPW=cloud
MSLOG=vmops.log
APISERVERLOG=api.log
DBHOST=localhost
AGENTLOGDIR=logs
AGENTLOG=logs/agent.log
MSMNTDIR=/mnt
COMPONENTS-SPEC=components-simulator.xml
AWSAPILOG=awsapi.log

View File

@ -0,0 +1,64 @@
import marvin
import unittest
from marvin.cloudstackTestCase import *
from marvin.cloudstackAPI import *
from time import sleep as delay
class TestSetupSuccess(cloudstackTestCase):
"""
Test to verify if the cloudstack is ready to launch tests upon
1. Verify that system VMs are up and running in all zones
2. Verify that built-in templates are Ready in all zones
"""
@classmethod
def setUpClass(cls):
cls.apiClient = super(TestSetupSuccess, cls).getClsTestClient().getApiClient()
zones = listZones.listZonesCmd()
cls.zones_list = cls.apiClient.listZones(zones)
cls.retry = 50
def test_systemVmReady(self):
"""
system VMs need to be ready and Running for each zone in cloudstack
"""
for z in self.zones_list:
retry = self.retry
while retry != 0:
self.debug("looking for system VMs in zone: %s, %s"%(z.id, z.name))
sysvms = listSystemVms.listSystemVmsCmd()
sysvms.zoneid = z.id
sysvms.state = 'Running'
sysvms_list = self.apiClient.listSystemVms(sysvms)
if sysvms_list is not None and len(sysvms_list) == 2:
assert len(sysvms_list) == 2
self.debug("found %d system VMs running {%s}"%(len(sysvms_list), sysvms_list))
break
retry = retry - 1
delay(60) #wait a minute for retry
self.assertNotEqual(retry, 0, "system VMs not Running in zone %s"%z.name)
def test_templateBuiltInReady(self):
"""
built-in templates CentOS to be ready
"""
for z in self.zones_list:
retry = self.retry
while retry != 0:
self.debug("Looking for at least one ready builtin template")
templates = listTemplates.listTemplatesCmd()
templates.templatefilter = 'featured'
templates.listall = 'true'
templates_list = self.apiClient.listTemplates(templates)
if templates_list is not None:
builtins = [tmpl for tmpl in templates_list if tmpl.templatetype == 'BUILTIN' and tmpl.isready == True]
if len(builtins) > 0:
self.debug("Found %d builtins ready for use %s"%(len(builtins), builtins))
break
retry = retry - 1
delay(60) #wait a minute for retry
self.assertNotEqual(retry, 0, "builtIn templates not ready in zone %s"%z.name)
@classmethod
def tearDownClass(cls):
pass

View File

@ -0,0 +1,213 @@
# -*- encoding: utf-8 -*-
# Copyright 2012 Citrix Systems, Inc. Licensed under the
# Apache License, Version 2.0 (the "License"); you may not use this
# file except in compliance with the License. Citrix Systems, Inc.
# reserves all rights not expressly granted by the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Automatically generated by addcopyright.py at 04/03/2012
""" BVT tests for Disk offerings"""
#Import Local Modules
import marvin
from marvin.cloudstackTestCase import *
from marvin.cloudstackAPI import *
from integration.lib.utils import *
from integration.lib.base import *
from integration.lib.common import *
class Services:
"""Test Disk offerings Services
"""
def __init__(self):
self.services = {
"off": {
"name": "Disk offering",
"displaytext": "Disk offering",
"disksize": 1 # in GB
},
}
class TestCreateDiskOffering(cloudstackTestCase):
def setUp(self):
self.services = Services().services
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = []
return
def tearDown(self):
try:
self.dbclient.close()
#Clean up, terminate the created templates
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def test_01_create_disk_offering(self):
"""Test to create disk offering"""
# Validate the following:
# 1. createDiskOfferings should return valid info for new offering
# 2. The Cloud Database contains the valid information
disk_offering = DiskOffering.create(
self.apiclient,
self.services["off"]
)
self.cleanup.append(disk_offering)
self.debug("Created Disk offering with ID: %s" % disk_offering.id)
list_disk_response = list_disk_offering(
self.apiclient,
id=disk_offering.id
)
self.assertEqual(
isinstance(list_disk_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_disk_response),
0,
"Check Disk offering is created"
)
disk_response = list_disk_response[0]
self.assertEqual(
disk_response.displaytext,
self.services["off"]["displaytext"],
"Check server id in createServiceOffering"
)
self.assertEqual(
disk_response.name,
self.services["off"]["name"],
"Check name in createServiceOffering"
)
return
class TestDiskOfferings(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = []
def tearDown(self):
try:
self.dbclient.close()
#Clean up, terminate the created templates
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
@classmethod
def setUpClass(cls):
cls.services = Services().services
cls.api_client = super(TestDiskOfferings, cls).getClsTestClient().getApiClient()
cls.disk_offering_1 = DiskOffering.create(
cls.api_client,
cls.services["off"]
)
cls.disk_offering_2 = DiskOffering.create(
cls.api_client,
cls.services["off"]
)
cls._cleanup = [cls.disk_offering_1]
return
@classmethod
def tearDownClass(cls):
try:
cls.api_client = super(TestDiskOfferings, cls).getClsTestClient().getApiClient()
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def test_02_edit_disk_offering(self):
"""Test to update existing disk offering"""
# Validate the following:
# 1. updateDiskOffering should return
# a valid information for newly created offering
#Generate new name & displaytext from random data
random_displaytext = random_gen()
random_name = random_gen()
self.debug("Updating Disk offering with ID: %s" %
self.disk_offering_1.id)
cmd = updateDiskOffering.updateDiskOfferingCmd()
cmd.id = self.disk_offering_1.id
cmd.displaytext = random_displaytext
cmd.name = random_name
self.apiclient.updateDiskOffering(cmd)
list_disk_response = list_disk_offering(
self.apiclient,
id=self.disk_offering_1.id
)
self.assertEqual(
isinstance(list_disk_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_disk_response),
0,
"Check disk offering is updated"
)
disk_response = list_disk_response[0]
self.assertEqual(
disk_response.displaytext,
random_displaytext,
"Check service displaytext in updateServiceOffering"
)
self.assertEqual(
disk_response.name,
random_name,
"Check service name in updateServiceOffering"
)
return
def test_03_delete_disk_offering(self):
"""Test to delete disk offering"""
# Validate the following:
# 1. deleteDiskOffering should return
# a valid information for newly created offering
self.disk_offering_2.delete(self.apiclient)
self.debug("Deleted Disk offering with ID: %s" %
self.disk_offering_2.id)
list_disk_response = list_disk_offering(
self.apiclient,
id=self.disk_offering_2.id
)
self.assertEqual(
list_disk_response,
None,
"Check if disk offering exists in listDiskOfferings"
)
return

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,733 @@
# -*- encoding: utf-8 -*-
# Copyright 2012 Citrix Systems, Inc. Licensed under the
# Apache License, Version 2.0 (the "License"); you may not use this
# file except in compliance with the License. Citrix Systems, Inc.
# reserves all rights not expressly granted by the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Automatically generated by addcopyright.py at 04/03/2012
""" BVT tests for routers
"""
#Import Local Modules
import marvin
from marvin.cloudstackTestCase import *
from marvin.cloudstackAPI import *
from integration.lib.utils import *
from integration.lib.base import *
from integration.lib.common import *
#Import System modules
import time
class Services:
"""Test router Services
"""
def __init__(self):
self.services = {
"service_offering": {
"name": "Tiny Instance",
"displaytext": "Tiny Instance",
"cpunumber": 1,
"cpuspeed": 100, # in MHz
"memory": 64, # In MBs
},
"virtual_machine":
{
"displayname": "Test VM",
"username": "root",
"password": "password",
"ssh_port": 22,
"hypervisor": 'XenServer',
"privateport": 22,
"publicport": 22,
"protocol": 'TCP',
},
"account": {
"email": "test@test.com",
"firstname": "Test",
"lastname": "User",
"username": "testuser",
"password": "password",
},
"ostypeid":'52e14b2f-dea6-46dc-94e1-fba3ee264fc8',
"sleep": 60,
"timeout": 10,
"mode": 'advanced', #Networking mode: Basic, Advanced
}
class TestRouterServices(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = super(
TestRouterServices,
cls
).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
cls.zone = get_zone(cls.api_client, cls.services)
template = get_template(
cls.api_client,
cls.zone.id,
cls.services["ostypeid"]
)
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
#Create an account, network, VM and IP addresses
cls.account = Account.create(
cls.api_client,
cls.services["account"],
domainid=cls.domain.id
)
cls.service_offering = ServiceOffering.create(
cls.api_client,
cls.services["service_offering"]
)
cls.vm_1 = VirtualMachine.create(
cls.api_client,
cls.services["virtual_machine"],
templateid=template.id,
accountid=cls.account.account.name,
domainid=cls.account.account.domainid,
serviceofferingid=cls.service_offering.id
)
cls.cleanup = [
cls.vm_1,
cls.account,
cls.service_offering
]
return
@classmethod
def tearDownClass(cls):
try:
cls.api_client = super(
TestRouterServices,
cls
).getClsTestClient().getApiClient()
#Clean up, terminate the created templates
cleanup_resources(cls.api_client, cls.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def setUp(self):
self.apiclient = self.testClient.getApiClient()
return
def test_01_router_internal_basic(self):
"""Test router internal basic zone
"""
# Validate the following
# 1. Router only does dhcp
# 2. Verify that ports 67 (DHCP) and 53 (DNS) are open on UDP
# by checking status of dnsmasq process
# Find router associated with user account
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
isinstance(list_router_response, list),
True,
"Check list response returns a valid list"
)
router = list_router_response[0]
hosts = list_hosts(
self.apiclient,
zoneid=router.zoneid,
type='Routing',
state='Up'
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list host returns a valid list"
)
host = hosts[0]
self.debug("Router ID: %s, state: %s" % (router.id, router.state))
self.assertEqual(
router.state,
'Running',
"Check list router response for router state"
)
return
def test_02_router_internal_adv(self):
"""Test router internal advanced zone
"""
# Validate the following
# 1. Router does dhcp, dns, gateway, LB, PF, FW
# 2. verify that dhcp, dns ports are open on UDP
# 3. dnsmasq, haproxy processes should be running
# Find router associated with user account
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
isinstance(list_router_response, list),
True,
"Check list response returns a valid list"
)
router = list_router_response[0]
hosts = list_hosts(
self.apiclient,
zoneid=router.zoneid,
type='Routing',
state='Up'
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list response returns a valid list"
)
host = hosts[0]
self.debug("Router ID: %s, state: %s" % (router.id, router.state))
self.assertEqual(
router.state,
'Running',
"Check list router response for router state"
)
return
def test_03_restart_network_cleanup(self):
"""Test restart network
"""
# Validate the following
# 1. When cleanup = true, router is destroyed and a new one created
# 2. New router will have new publicIp and linkLocalIp and
# all it's services should resume
# Find router associated with user account
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
isinstance(list_router_response, list),
True,
"Check list response returns a valid list"
)
router = list_router_response[0]
#Store old values before restart
old_linklocalip = router.linklocalip
timeout = 10
# Network should be in Implemented or Setup stage before restart
while True:
networks = list_networks(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
isinstance(networks, list),
True,
"Check list response returns a valid list"
)
network = networks[0]
if network.state in ["Implemented", "Setup"]:
break
elif timeout == 0:
break
else:
time.sleep(self.services["sleep"])
timeout = timeout - 1
self.debug(
"Restarting network with ID: %s, Network state: %s" % (
network.id,
network.state
))
cmd = restartNetwork.restartNetworkCmd()
cmd.id = network.id
cmd.cleanup = True
self.apiclient.restartNetwork(cmd)
# Get router details after restart
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
isinstance(list_router_response, list),
True,
"Check list response returns a valid list"
)
router = list_router_response[0]
self.assertNotEqual(
router.linklocalip,
old_linklocalip,
"Check link-local IP after restart"
)
return
def test_04_restart_network_wo_cleanup(self):
"""Test restart network without cleanup
"""
# Validate the following
# 1. When cleanup = false, router is restarted and
# all services inside the router are restarted
# 2. check 'uptime' to see if the actual restart happened
timeout = 10
# Network should be in Implemented or Setup stage before restart
while True:
networks = list_networks(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
isinstance(networks, list),
True,
"Check list response returns a valid list"
)
network = networks[0]
if network.state in ["Implemented", "Setup"]:
break
elif timeout == 0:
break
else:
time.sleep(self.services["sleep"])
timeout = timeout - 1
self.debug(
"Restarting network with ID: %s, Network state: %s" % (
network.id,
network.state
))
cmd = restartNetwork.restartNetworkCmd()
cmd.id = network.id
cmd.cleanup = False
self.apiclient.restartNetwork(cmd)
# Get router details after restart
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
isinstance(list_router_response, list),
True,
"Check list response returns a valid list"
)
router = list_router_response[0]
hosts = list_hosts(
self.apiclient,
zoneid=router.zoneid,
type='Routing',
state='Up'
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list response returns a valid list"
)
host = hosts[0]
def test_05_router_basic(self):
"""Test router basic setup
"""
# Validate the following:
# 1. verify that listRouters returned a 'Running' router
# 2. router will have dns same as that seen in listZones
# 3. router will have a guestIP and a linkLocalIp"
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
isinstance(list_router_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_router_response),
0,
"Check list router response"
)
for router in list_router_response:
self.assertEqual(
router.state,
'Running',
"Check list router response for router state"
)
zones = list_zones(
self.apiclient,
id=router.zoneid
)
self.assertEqual(
isinstance(zones, list),
True,
"Check list response returns a valid list"
)
zone = zones[0]
self.assertEqual(
router.dns1,
zone.dns1,
"Compare DNS1 of router and zone"
)
self.assertEqual(
router.dns2,
zone.dns2,
"Compare DNS2 of router and zone"
)
self.assertEqual(
hasattr(router, 'guestipaddress'),
True,
"Check whether router has guest IP field"
)
self.assertEqual(
hasattr(router, 'linklocalip'),
True,
"Check whether router has link local IP field"
)
return
def test_06_router_advanced(self):
"""Test router advanced setup
"""
# Validate the following
# 1. verify that listRouters returned a 'Running' router
# 2. router will have dns and gateway as in listZones, listVlanIpRanges
# 3. router will have guest,public and linklocal IPs
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
isinstance(list_router_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_router_response),
0,
"Check list router response"
)
for router in list_router_response:
self.assertEqual(
router.state,
'Running',
"Check list router response for router state"
)
zones = list_zones(
self.apiclient,
id=router.zoneid
)
self.assertEqual(
isinstance(zones, list),
True,
"Check list response returns a valid list"
)
zone = zones[0]
self.assertEqual(
router.dns1,
zone.dns1,
"Compare DNS1 of router and zone"
)
self.assertEqual(
router.dns2,
zone.dns2,
"Compare DNS2 of router and zone"
)
self.assertEqual(
hasattr(router, 'guestipaddress'),
True,
"Check whether router has guest IP field"
)
self.assertEqual(
hasattr(router, 'linklocalip'),
True,
"Check whether router has link local IP field"
)
#Fetch corresponding ip ranges information from listVlanIpRanges
ipranges_response = list_vlan_ipranges(
self.apiclient,
zoneid=router.zoneid
)
self.assertEqual(
isinstance(ipranges_response, list),
True,
"Check list response returns a valid list"
)
iprange = ipranges_response[0]
self.assertEqual(
router.gateway,
iprange.gateway,
"Check gateway with that of corresponding IP range"
)
return
def test_07_stop_router(self):
"""Test stop router
"""
# Validate the following
# 1. listRouter should report the router for the account as stopped
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
isinstance(list_router_response, list),
True,
"Check list response returns a valid list"
)
router = list_router_response[0]
self.debug("Stopping the router with ID: %s" % router.id)
#Stop the router
cmd = stopRouter.stopRouterCmd()
cmd.id = router.id
self.apiclient.stopRouter(cmd)
#List routers to check state of router
router_response = list_routers(
self.apiclient,
id=router.id
)
self.assertEqual(
isinstance(router_response, list),
True,
"Check list response returns a valid list"
)
#List router should have router in stopped state
self.assertEqual(
router_response[0].state,
'Stopped',
"Check list router response for router state"
)
return
def test_08_start_router(self):
"""Test start router
"""
# Validate the following
# 1. listRouter should report the router for the account as stopped
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
isinstance(list_router_response, list),
True,
"Check list response returns a valid list"
)
router = list_router_response[0]
self.debug("Starting the router with ID: %s" % router.id)
#Start the router
cmd = startRouter.startRouterCmd()
cmd.id = router.id
self.apiclient.startRouter(cmd)
#List routers to check state of router
router_response = list_routers(
self.apiclient,
id=router.id
)
self.assertEqual(
isinstance(router_response, list),
True,
"Check list response returns a valid list"
)
#List router should have router in running state
self.assertEqual(
router_response[0].state,
'Running',
"Check list router response for router state"
)
return
def test_09_reboot_router(self):
"""Test reboot router
"""
# Validate the following
# 1. listRouter should report the router for the account as stopped
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
isinstance(list_router_response, list),
True,
"Check list response returns a valid list"
)
router = list_router_response[0]
public_ip = router.publicip
self.debug("Rebooting the router with ID: %s" % router.id)
#Reboot the router
cmd = rebootRouter.rebootRouterCmd()
cmd.id = router.id
self.apiclient.rebootRouter(cmd)
#List routers to check state of router
router_response = list_routers(
self.apiclient,
id=router.id
)
self.assertEqual(
isinstance(router_response, list),
True,
"Check list response returns a valid list"
)
#List router should have router in running state and same public IP
self.assertEqual(
router_response[0].state,
'Running',
"Check list router response for router state"
)
self.assertEqual(
router_response[0].publicip,
public_ip,
"Check list router response for router public IP"
)
return
def test_10_network_gc(self):
"""Test network GC
"""
# Validate the following
# 1. stop All User VMs in the account
# 2. wait for network.gc.interval time"
# 3. After network.gc.interval, router should be stopped
# 4. ListRouters should return the router in Stopped state
list_vms = list_virtual_machines(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertEqual(
isinstance(list_vms, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_vms),
0,
"Check length of list VM response"
)
for vm in list_vms:
self.debug("Stopping the VM with ID: %s" % vm.id)
# Stop all virtual machines associated with that account
cmd = stopVirtualMachine.stopVirtualMachineCmd()
cmd.id = vm.id
self.apiclient.stopVirtualMachine(cmd)
# Get network.gc.interval config value
config = list_configurations(
self.apiclient,
name='network.gc.interval'
)
self.assertEqual(
isinstance(config, list),
True,
"Check list response returns a valid list"
)
gcinterval = config[0]
# Get network.gc.wait config value
config = list_configurations(
self.apiclient,
name='network.gc.wait'
)
self.assertEqual(
isinstance(config, list),
True,
"Check list response returns a valid list"
)
gcwait = config[0]
total_wait = int(gcinterval.value) + int(gcwait.value)
# Wait for wait_time * 2 time to cleanup all the resources
time.sleep(total_wait * 2)
timeout = self.services["timeout"]
while True:
#Check status of network router
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
if isinstance(list_router_response, list):
break
elif timeout == 0:
raise Exception("List router call failed!")
time.sleep(5)
timeout = timeout -1
self.assertEqual(
isinstance(list_router_response, list),
True,
"Check list response returns a valid list"
)
router = list_router_response[0]
self.debug("Router state after network.gc.interval: %s" % router.state)
self.assertEqual(
router.state,
'Stopped',
"Check state of the router after stopping all VMs associated"
)
return

View File

@ -0,0 +1,238 @@
# -*- encoding: utf-8 -*-
# Copyright 2012 Citrix Systems, Inc. Licensed under the
# Apache License, Version 2.0 (the "License"); you may not use this
# file except in compliance with the License. Citrix Systems, Inc.
# reserves all rights not expressly granted by the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Automatically generated by addcopyright.py at 04/03/2012
""" BVT tests for Service offerings"""
#Import Local Modules
import marvin
from marvin.cloudstackTestCase import *
from marvin.cloudstackAPI import *
from integration.lib.utils import *
from integration.lib.base import *
from integration.lib.common import *
class Services:
"""Test Service offerings Services
"""
def __init__(self):
self.services = {
"off":
{
"name": "Service Offering",
"displaytext": "Service Offering",
"cpunumber": 1,
"cpuspeed": 100, # MHz
"memory": 64, # in MBs
},
}
class TestCreateServiceOffering(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = []
self.services = Services().services
def tearDown(self):
try:
self.dbclient.close()
#Clean up, terminate the created templates
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def test_01_create_service_offering(self):
"""Test to create service offering"""
# Validate the following:
# 1. createServiceOfferings should return a valid information for newly created offering
# 2. The Cloud Database contains the valid information
service_offering = ServiceOffering.create(
self.apiclient,
self.services["off"]
)
self.cleanup.append(service_offering)
self.debug("Created service offering with ID: %s" % service_offering.id)
list_service_response = list_service_offering(
self.apiclient,
id=service_offering.id
)
self.assertEqual(
isinstance(list_service_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_service_response),
0,
"Check Service offering is created"
)
service_response = list_service_response[0]
self.assertEqual(
list_service_response[0].cpunumber,
self.services["off"]["cpunumber"],
"Check server id in createServiceOffering"
)
self.assertEqual(
list_service_response[0].cpuspeed,
self.services["off"]["cpuspeed"],
"Check cpuspeed in createServiceOffering"
)
self.assertEqual(
list_service_response[0].displaytext,
self.services["off"]["displaytext"],
"Check server displaytext in createServiceOfferings"
)
self.assertEqual(
list_service_response[0].memory,
self.services["off"]["memory"],
"Check memory in createServiceOffering"
)
self.assertEqual(
list_service_response[0].name,
self.services["off"]["name"],
"Check name in createServiceOffering"
)
return
class TestServiceOfferings(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = []
def tearDown(self):
try:
self.dbclient.close()
#Clean up, terminate the created templates
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
@classmethod
def setUpClass(cls):
cls.services = Services().services
cls.api_client = super(TestServiceOfferings, cls).getClsTestClient().getApiClient()
cls.service_offering_1 = ServiceOffering.create(
cls.api_client,
cls.services["off"]
)
cls.service_offering_2 = ServiceOffering.create(
cls.api_client,
cls.services["off"]
)
cls._cleanup = [cls.service_offering_1]
return
@classmethod
def tearDownClass(cls):
try:
cls.api_client = super(TestServiceOfferings, cls).getClsTestClient().getApiClient()
#Clean up, terminate the created templates
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def test_02_edit_service_offering(self):
"""Test to update existing service offering"""
# Validate the following:
# 1. updateServiceOffering should return
# a valid information for newly created offering
#Generate new name & displaytext from random data
random_displaytext = random_gen()
random_name = random_gen()
self.debug("Updating service offering with ID: %s" %
self.service_offering_1.id)
cmd = updateServiceOffering.updateServiceOfferingCmd()
#Add parameters for API call
cmd.id = self.service_offering_1.id
cmd.displaytext = random_displaytext
cmd.name = random_name
self.apiclient.updateServiceOffering(cmd)
list_service_response = list_service_offering(
self.apiclient,
id=self.service_offering_1.id
)
self.assertEqual(
isinstance(list_service_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_service_response),
0,
"Check Service offering is updated"
)
self.assertEqual(
list_service_response[0].displaytext,
random_displaytext,
"Check server displaytext in updateServiceOffering"
)
self.assertEqual(
list_service_response[0].name,
random_name,
"Check server name in updateServiceOffering"
)
return
def test_03_delete_service_offering(self):
"""Test to delete service offering"""
# Validate the following:
# 1. deleteServiceOffering should return
# a valid information for newly created offering
self.debug("Deleting service offering with ID: %s" %
self.service_offering_2.id)
self.service_offering_2.delete(self.apiclient)
list_service_response = list_service_offering(
self.apiclient,
id=self.service_offering_2.id
)
self.assertEqual(
list_service_response,
None,
"Check if service offering exists in listDiskOfferings"
)
return

View File

@ -0,0 +1,796 @@
# -*- encoding: utf-8 -*-
# Copyright 2012 Citrix Systems, Inc. Licensed under the
# Apache License, Version 2.0 (the "License"); you may not use this
# file except in compliance with the License. Citrix Systems, Inc.
# reserves all rights not expressly granted by the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Automatically generated by addcopyright.py at 04/03/2012
""" BVT tests for Snapshots
"""
#Import Local Modules
import marvin
from marvin.cloudstackTestCase import *
from marvin.cloudstackAPI import *
from integration.lib.utils import *
from integration.lib.base import *
from integration.lib.common import *
class Services:
"""Test Snapshots Services
"""
def __init__(self):
self.services = {
"account": {
"email": "test@test.com",
"firstname": "Test",
"lastname": "User",
"username": "test",
# Random characters are appended for unique
# username
"password": "password",
},
"service_offering": {
"name": "Tiny Instance",
"displaytext": "Tiny Instance",
"cpunumber": 1,
"cpuspeed": 100, # in MHz
"memory": 64, # In MBs
},
"disk_offering": {
"displaytext": "Small",
"name": "Small",
"disksize": 1
},
"server_with_disk":
{
"displayname": "Test VM -With Disk",
"username": "root",
"password": "password",
"ssh_port": 22,
"hypervisor": 'XenServer',
"privateport": 22,
"publicport": 22,
"protocol": 'TCP',
},
"server_without_disk":
{
"displayname": "Test VM-No Disk",
"username": "root",
"password": "password",
"ssh_port": 22,
"hypervisor": 'XenServer',
"privateport": 22,
# For NAT rule creation
"publicport": 22,
"protocol": 'TCP',
},
"recurring_snapshot":
{
"intervaltype": 'HOURLY',
# Frequency of snapshots
"maxsnaps": 1, # Should be min 2
"schedule": 1,
"timezone": 'US/Arizona',
# Timezone Formats - http://cloud.mindtouch.us/CloudStack_Documentation/Developer's_Guide%3A_CloudStack
},
"templates":
{
"displaytext": 'Template from snapshot',
"name": 'Template from snapshot',
"ostypeid": '52e14b2f-dea6-46dc-94e1-fba3ee264fc8',
"templatefilter": 'self',
},
"ostypeid": 'adsfasdf',
# Cent OS 5.3 (64 bit)
"diskdevice": "/dev/xvdb", # Data Disk
"rootdisk": "/dev/xvda", # Root Disk
"diskname": "Test Disk",
"size": 1, # GBs
"mount_dir": "/mnt/tmp",
"sub_dir": "test",
"sub_lvl_dir1": "test1",
"sub_lvl_dir2": "test2",
"random_data": "random.data",
"username": "root",
"password": "password",
"ssh_port": 22,
"sleep": 60,
"timeout": 10,
"mode": 'advanced',
# Networking mode, Advanced, Basic
}
@unittest.skip("skip snapshot tests")
class TestSnapshotRootDisk(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = super(TestSnapshotRootDisk, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
cls.zone = get_zone(cls.api_client, cls.services)
template = get_template(
cls.api_client,
cls.zone.id,
cls.services["ostypeid"]
)
cls.services["domainid"] = cls.domain.id
cls.services["server_without_disk"]["zoneid"] = cls.zone.id
cls.services["template"] = template.id
cls.services["zoneid"] = cls.zone.id
# Create VMs, NAT Rules etc
cls.account = Account.create(
cls.api_client,
cls.services["account"],
domainid=cls.domain.id
)
cls.services["account"] = cls.account.account.name
cls.service_offering = ServiceOffering.create(
cls.api_client,
cls.services["service_offering"]
)
cls.virtual_machine = cls.virtual_machine_with_disk = \
VirtualMachine.create(
cls.api_client,
cls.services["server_without_disk"],
templateid=template.id,
accountid=cls.account.account.name,
domainid=cls.account.account.domainid,
serviceofferingid=cls.service_offering.id,
mode=cls.services["mode"]
)
cls._cleanup = [
cls.service_offering,
cls.account,
]
return
@classmethod
def tearDownClass(cls):
try:
#Cleanup resources used
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = []
return
def tearDown(self):
try:
#Clean up, terminate the created instance, volumes and snapshots
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def test_01_snapshot_root_disk(self):
"""Test Snapshot Root Disk
"""
# Validate the following
# 1. listSnapshots should list the snapshot that was created.
volumes = list_volumes(
self.apiclient,
virtualmachineid=self.virtual_machine_with_disk.id,
type='ROOT',
listall=True
)
snapshot = Snapshot.create(
self.apiclient,
volumes[0].id,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.debug("Snapshot created: ID - %s" % snapshot.id)
snapshots = list_snapshots(
self.apiclient,
id=snapshot.id
)
self.assertEqual(
isinstance(snapshots, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
snapshots,
None,
"Check if result exists in list item call"
)
self.assertEqual(
snapshots[0].id,
snapshot.id,
"Check resource id in list resources call"
)
self.debug(
"select backup_snap_id, account_id, volume_id from snapshots where uuid = '%s';" \
% str(snapshot.id)
)
qresultset = self.dbclient.execute(
"select backup_snap_id, account_id, volume_id from snapshots where uuid = '%s';" \
% str(snapshot.id)
)
self.assertNotEqual(
len(qresultset),
0,
"Check DB Query result set"
)
qresult = qresultset[0]
snapshot_uuid = qresult[0] # backup_snap_id = snapshot UUID
account_id = qresult[1]
volume_id = qresult[2]
self.assertNotEqual(
str(snapshot_uuid),
'',
"Check if backup_snap_id is not null"
)
# Get the Secondary Storage details from list Hosts
hosts = list_hosts(
self.apiclient,
type='SecondaryStorage',
zoneid=self.zone.id
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list response returns a valid list"
)
return
@unittest.skip("skip snapshot tests")
class TestSnapshots(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = super(TestSnapshots, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
cls.zone = get_zone(cls.api_client, cls.services)
cls.disk_offering = DiskOffering.create(
cls.api_client,
cls.services["disk_offering"]
)
template = get_template(
cls.api_client,
cls.zone.id,
cls.services["ostypeid"]
)
cls.services["domainid"] = cls.domain.id
cls.services["server_with_disk"]["zoneid"] = cls.zone.id
cls.services["server_with_disk"]["diskoffering"] = cls.disk_offering.id
cls.services["server_without_disk"]["zoneid"] = cls.zone.id
cls.services["template"] = template.id
cls.services["zoneid"] = cls.zone.id
cls.services["diskoffering"] = cls.disk_offering.id
# Create VMs, NAT Rules etc
cls.account = Account.create(
cls.api_client,
cls.services["account"],
domainid=cls.domain.id
)
cls.services["account"] = cls.account.account.name
cls.service_offering = ServiceOffering.create(
cls.api_client,
cls.services["service_offering"]
)
cls.virtual_machine = cls.virtual_machine_with_disk = \
VirtualMachine.create(
cls.api_client,
cls.services["server_with_disk"],
templateid=template.id,
accountid=cls.account.account.name,
domainid=cls.account.account.domainid,
serviceofferingid=cls.service_offering.id,
mode=cls.services["mode"]
)
cls.virtual_machine_without_disk = \
VirtualMachine.create(
cls.api_client,
cls.services["server_without_disk"],
templateid=template.id,
accountid=cls.account.account.name,
domainid=cls.account.account.domainid,
serviceofferingid=cls.service_offering.id,
mode=cls.services["mode"]
)
cls._cleanup = [
cls.service_offering,
cls.disk_offering,
cls.account,
]
return
@classmethod
def tearDownClass(cls):
try:
#Cleanup resources used
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = []
return
def tearDown(self):
try:
#Clean up, terminate the created instance, volumes and snapshots
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def test_02_snapshot_data_disk(self):
"""Test Snapshot Data Disk
"""
volume = list_volumes(
self.apiclient,
virtualmachineid=self.virtual_machine_with_disk.id,
type='DATADISK',
listall=True
)
self.assertEqual(
isinstance(volume, list),
True,
"Check list response returns a valid list"
)
self.debug("Creating a Snapshot from data volume: %s" % volume[0].id)
snapshot = Snapshot.create(
self.apiclient,
volume[0].id,
account=self.account.account.name,
domainid=self.account.account.domainid
)
snapshots = list_snapshots(
self.apiclient,
id=snapshot.id
)
self.assertEqual(
isinstance(snapshots, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
snapshots,
None,
"Check if result exists in list item call"
)
self.assertEqual(
snapshots[0].id,
snapshot.id,
"Check resource id in list resources call"
)
self.debug(
"select backup_snap_id, account_id, volume_id from snapshots where uuid = '%s';" \
% str(snapshot.id)
)
qresultset = self.dbclient.execute(
"select backup_snap_id, account_id, volume_id from snapshots where uuid = '%s';" \
% str(snapshot.id)
)
self.assertNotEqual(
len(qresultset),
0,
"Check DB Query result set"
)
qresult = qresultset[0]
snapshot_uuid = qresult[0] # backup_snap_id = snapshot UUID
account_id = qresult[1]
volume_id = qresult[2]
self.assertNotEqual(
str(snapshot_uuid),
'',
"Check if backup_snap_id is not null"
)
return
def test_03_volume_from_snapshot(self):
"""Create volumes from snapshots
"""
#1. Login to machine; create temp/test directories on data volume
#2. Snapshot the Volume
#3. Create another Volume from snapshot
list_volume_response = list_volumes(
self.apiclient,
virtualmachineid=self.virtual_machine.id,
type='DATADISK',
listall=True
)
volume_response = list_volume_response[0]
#Create snapshot from attached volume
snapshot = Snapshot.create(
self.apiclient,
volume_response.id,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.debug("Created Snapshot from volume: %s" % volume_response.id)
#Create volume from snapshot
self.debug("Creating volume from snapshot: %s" % snapshot.id)
volume = Volume.create_from_snapshot(
self.apiclient,
snapshot.id,
self.services,
account=self.account.account.name,
domainid=self.account.account.domainid
)
volumes = list_volumes(
self.apiclient,
id=volume.id
)
self.assertEqual(
isinstance(volumes, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(volumes),
None,
"Check Volume list Length"
)
self.assertEqual(
volumes[0].id,
volume.id,
"Check Volume in the List Volumes"
)
#Attaching volume to new VM
new_virtual_machine = self.virtual_machine_without_disk
self.cleanup.append(new_virtual_machine)
cmd = attachVolume.attachVolumeCmd()
cmd.id = volume.id
cmd.virtualmachineid = new_virtual_machine.id
self.apiclient.attachVolume(cmd)
return
def test_04_delete_snapshot(self):
"""Test Delete Snapshot
"""
#1. Snapshot the Volume
#2. Delete the snapshot
#3. Verify snapshot is removed by calling List Snapshots API
volumes = list_volumes(
self.apiclient,
virtualmachineid=self.virtual_machine.id,
type='DATADISK',
listall=True
)
self.assertEqual(
isinstance(volumes, list),
True,
"Check list response returns a valid list"
)
snapshot = Snapshot.create(
self.apiclient,
volumes[0].id,
account=self.account.account.name,
domainid=self.account.account.domainid
)
snapshot.delete(self.apiclient)
snapshots = list_snapshots(
self.apiclient,
id=snapshot.id
)
self.assertEqual(
snapshots,
None,
"Check if result exists in list item call"
)
return
def test_05_recurring_snapshot_root_disk(self):
"""Test Recurring Snapshot Root Disk
"""
#1. Create snapshot policy for root disk
#2. ListSnapshot policy should return newly created policy
#3. Verify only most recent number (maxsnaps) snapshots retailed
volume = list_volumes(
self.apiclient,
virtualmachineid=self.virtual_machine_with_disk.id,
type='ROOT',
listall=True
)
self.assertEqual(
isinstance(volume, list),
True,
"Check list response returns a valid list"
)
recurring_snapshot = SnapshotPolicy.create(
self.apiclient,
volume[0].id,
self.services["recurring_snapshot"]
)
self.cleanup.append(recurring_snapshot)
#ListSnapshotPolicy should return newly created policy
list_snapshots_policy = list_snapshot_policy(
self.apiclient,
id=recurring_snapshot.id,
volumeid=volume[0].id
)
self.assertEqual(
isinstance(list_snapshots_policy, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
list_snapshots_policy,
None,
"Check if result exists in list item call"
)
snapshots_policy = list_snapshots_policy[0]
self.assertEqual(
snapshots_policy.id,
recurring_snapshot.id,
"Check recurring snapshot id in list resources call"
)
self.assertEqual(
snapshots_policy.maxsnaps,
self.services["recurring_snapshot"]["maxsnaps"],
"Check interval type in list resources call"
)
# Sleep for (maxsnaps+1) hours to verify
# only maxsnaps snapshots are retained
time.sleep(
(self.services["recurring_snapshot"]["maxsnaps"]) * 3600
)
timeout = self.services["timeout"]
while True:
snapshots = list_snapshots(
self.apiclient,
volumeid=volume[0].id,
intervaltype=\
self.services["recurring_snapshot"]["intervaltype"],
snapshottype='RECURRING',
listall=True
)
if isinstance(snapshots, list):
break
elif timeout == 0:
raise Exception("List snapshots API call failed.")
time.sleep(1)
timeout = timeout - 1
self.assertEqual(
isinstance(snapshots, list),
True,
"Check list response returns a valid list"
)
self.assertEqual(
len(snapshots),
self.services["recurring_snapshot"]["maxsnaps"],
"Check maximum number of recurring snapshots retained"
)
return
def test_06_recurring_snapshot_data_disk(self):
"""Test Recurring Snapshot data Disk
"""
#1. Create snapshot policy for data disk
#2. ListSnapshot policy should return newly created policy
#3. Verify only most recent number (maxsnaps) snapshots retailed
volume = list_volumes(
self.apiclient,
virtualmachineid=self.virtual_machine_with_disk.id,
type='DATADISK',
listall=True
)
self.assertEqual(
isinstance(volume, list),
True,
"Check list response returns a valid list"
)
recurring_snapshot = SnapshotPolicy.create(
self.apiclient,
volume[0].id,
self.services["recurring_snapshot"]
)
self.cleanup.append(recurring_snapshot)
#ListSnapshotPolicy should return newly created policy
list_snapshots_policy = list_snapshot_policy(
self.apiclient,
id=recurring_snapshot.id,
volumeid=volume[0].id
)
self.assertEqual(
isinstance(list_snapshots_policy, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
list_snapshots_policy,
None,
"Check if result exists in list item call"
)
snapshots_policy = list_snapshots_policy[0]
self.assertEqual(
snapshots_policy.id,
recurring_snapshot.id,
"Check recurring snapshot id in list resources call"
)
self.assertEqual(
snapshots_policy.maxsnaps,
self.services["recurring_snapshot"]["maxsnaps"],
"Check interval type in list resources call"
)
# Sleep for (maxsnaps) hours to verify only maxsnaps snapshots are
# retained
time.sleep(
(self.services["recurring_snapshot"]["maxsnaps"]) * 3600
)
timeout = self.services["timeout"]
while True:
snapshots = list_snapshots(
self.apiclient,
volumeid=volume[0].id,
intervaltype=\
self.services["recurring_snapshot"]["intervaltype"],
snapshottype='RECURRING',
listall=True
)
if isinstance(snapshots, list):
break
elif timeout == 0:
raise Exception("List snapshots API call failed.")
time.sleep(1)
timeout = timeout - 1
self.assertEqual(
isinstance(snapshots, list),
True,
"Check list response returns a valid list"
)
self.assertEqual(
len(snapshots),
self.services["recurring_snapshot"]["maxsnaps"],
"Check maximum number of recurring snapshots retained"
)
return
def test_07_template_from_snapshot(self):
"""Create Template from snapshot
"""
#1. Login to machine; create temp/test directories on data volume
#2. Snapshot the Volume
#3. Create Template from snapshot
#4. Deploy Virtual machine using this template
#5. Login to newly created virtual machine
#6. Compare data
volumes = list_volumes(
self.apiclient,
virtualmachineid=self.virtual_machine.id,
type='ROOT',
listall=True
)
self.assertEqual(
isinstance(volumes, list),
True,
"Check list response returns a valid list"
)
volume = volumes[0]
#Create a snapshot of volume
snapshot = Snapshot.create(
self.apiclient,
volume.id,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.debug("Snapshot created from volume ID: %s" % volume.id)
# Generate template from the snapshot
template = Template.create_from_snapshot(
self.apiclient,
snapshot,
self.services["templates"]
)
self.cleanup.append(template)
self.debug("Template created from snapshot ID: %s" % snapshot.id)
# Verify created template
templates = list_templates(
self.apiclient,
templatefilter=\
self.services["templates"]["templatefilter"],
id=template.id
)
self.assertNotEqual(
templates,
None,
"Check if result exists in list item call"
)
self.assertEqual(
templates[0].id,
template.id,
"Check new template id in list resources call"
)
self.debug("Deploying new VM from template: %s" % template.id)
# Deploy new virtual machine using template
new_virtual_machine = VirtualMachine.create(
self.apiclient,
self.services["server_without_disk"],
templateid=template.id,
accountid=self.account.account.name,
domainid=self.account.account.domainid,
serviceofferingid=self.service_offering.id,
mode=self.services["mode"]
)
self.cleanup.append(new_virtual_machine)
return

View File

@ -0,0 +1,758 @@
# -*- encoding: utf-8 -*-
# Copyright 2012 Citrix Systems, Inc. Licensed under the
# Apache License, Version 2.0 (the "License"); you may not use this
# file except in compliance with the License. Citrix Systems, Inc.
# reserves all rights not expressly granted by the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Automatically generated by addcopyright.py at 04/03/2012
""" BVT tests for SSVM
"""
#Import Local Modules
import marvin
from marvin.cloudstackTestCase import *
from marvin.cloudstackAPI import *
from integration.lib.utils import *
from integration.lib.base import *
from integration.lib.common import *
#Import System modules
import time
class Services:
"""Test SSVM Services
"""
def __init__(self):
self.services = {
"host": {
"username": 'root', # Credentials for SSH
"password": 'fr3sca',
"publicport": 22,
},
"sleep": 60,
"timeout": 10,
}
class TestSSVMs(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.cleanup = []
self.services = Services().services
self.zone = get_zone(self.apiclient, self.services)
return
def tearDown(self):
try:
#Clean up, terminate the created templates
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def test_01_list_sec_storage_vm(self):
"""Test List secondary storage VMs
"""
# Validate the following:
# 1. listSystemVM (systemvmtype=secondarystoragevm)
# should return only ONE SSVM per zone
# 2. The returned SSVM should be in Running state
# 3. listSystemVM for secondarystoragevm should list publicip,
# privateip and link-localip
# 4. The gateway programmed on the ssvm by listSystemVm should be
# the same as the gateway returned by listVlanIpRanges
# 5. DNS entries must match those given for the zone
list_ssvm_response = list_ssvms(
self.apiclient,
systemvmtype='secondarystoragevm',
state='Running',
zoneid=self.zone.id
)
self.assertEqual(
isinstance(list_ssvm_response, list),
True,
"Check list response returns a valid list"
)
#Verify SSVM response
self.assertNotEqual(
len(list_ssvm_response),
0,
"Check list System VMs response"
)
list_zones_response = list_zones(self.apiclient)
self.assertEqual(
isinstance(list_zones_response, list),
True,
"Check list response returns a valid list"
)
self.debug("Number of zones: %s" % len(list_zones_response))
self.debug("Number of SSVMs: %s" % len(list_ssvm_response))
# Number of Sec storage VMs = No of Zones
self.assertEqual(
len(list_ssvm_response),
len(list_zones_response),
"Check number of SSVMs with number of zones"
)
#For each secondary storage VM check private IP,
#public IP, link local IP and DNS
for ssvm in list_ssvm_response:
self.debug("SSVM state: %s" % ssvm.state)
self.assertEqual(
ssvm.state,
'Running',
"Check whether state of SSVM is running"
)
self.assertEqual(
hasattr(ssvm, 'privateip'),
True,
"Check whether SSVM has private IP field"
)
self.assertEqual(
hasattr(ssvm, 'linklocalip'),
True,
"Check whether SSVM has link local IP field"
)
self.assertEqual(
hasattr(ssvm, 'publicip'),
True,
"Check whether SSVM has public IP field"
)
#Fetch corresponding ip ranges information from listVlanIpRanges
ipranges_response = list_vlan_ipranges(
self.apiclient,
zoneid=ssvm.zoneid
)
self.assertEqual(
isinstance(ipranges_response, list),
True,
"Check list response returns a valid list"
)
iprange = ipranges_response[0]
self.assertEqual(
ssvm.gateway,
iprange.gateway,
"Check gateway with that of corresponding ip range"
)
#Fetch corresponding zone information from listZones
zone_response = list_zones(
self.apiclient,
id=ssvm.zoneid
)
self.assertEqual(
isinstance(zone_response, list),
True,
"Check list response returns a valid list"
)
self.assertEqual(
ssvm.dns1,
zone_response[0].dns1,
"Check DNS1 with that of corresponding zone"
)
self.assertEqual(
ssvm.dns2,
zone_response[0].dns2,
"Check DNS2 with that of corresponding zone"
)
return
def test_02_list_cpvm_vm(self):
"""Test List console proxy VMs
"""
# Validate the following:
# 1. listSystemVM (systemvmtype=consoleproxy) should return
# at least ONE CPVM per zone
# 2. The returned ConsoleProxyVM should be in Running state
# 3. listSystemVM for console proxy should list publicip, privateip
# and link-localip
# 4. The gateway programmed on the console proxy should be the same
# as the gateway returned by listZones
# 5. DNS entries must match those given for the zone
list_cpvm_response = list_ssvms(
self.apiclient,
systemvmtype='consoleproxy',
state='Running',
zoneid=self.zone.id
)
self.assertEqual(
isinstance(list_cpvm_response, list),
True,
"Check list response returns a valid list"
)
#Verify CPVM response
self.assertNotEqual(
len(list_cpvm_response),
0,
"Check list System VMs response"
)
list_zones_response = list_zones(self.apiclient)
# Number of Console Proxy VMs = No of Zones
self.assertEqual(
isinstance(list_zones_response, list),
True,
"Check list response returns a valid list"
)
self.debug("Number of zones: %s" % len(list_zones_response))
self.debug("Number of CPVMs: %s" % len(list_cpvm_response))
self.assertEqual(
len(list_cpvm_response),
len(list_zones_response),
"Check number of CPVMs with number of zones"
)
#For each CPVM check private IP, public IP, link local IP and DNS
for cpvm in list_cpvm_response:
self.debug("CPVM state: %s" % cpvm.state)
self.assertEqual(
cpvm.state,
'Running',
"Check whether state of CPVM is running"
)
self.assertEqual(
hasattr(cpvm, 'privateip'),
True,
"Check whether CPVM has private IP field"
)
self.assertEqual(
hasattr(cpvm, 'linklocalip'),
True,
"Check whether CPVM has link local IP field"
)
self.assertEqual(
hasattr(cpvm, 'publicip'),
True,
"Check whether CPVM has public IP field"
)
#Fetch corresponding ip ranges information from listVlanIpRanges
ipranges_response = list_vlan_ipranges(
self.apiclient,
zoneid=cpvm.zoneid
)
self.assertEqual(
isinstance(ipranges_response, list),
True,
"Check list response returns a valid list"
)
iprange = ipranges_response[0]
self.assertEqual(
cpvm.gateway,
iprange.gateway,
"Check gateway with that of corresponding ip range"
)
#Fetch corresponding zone information from listZones
zone_response = list_zones(
self.apiclient,
id=cpvm.zoneid
)
self.assertEqual(
cpvm.dns1,
zone_response[0].dns1,
"Check DNS1 with that of corresponding zone"
)
self.assertEqual(
cpvm.dns2,
zone_response[0].dns2,
"Check DNS2 with that of corresponding zone"
)
return
def test_05_stop_ssvm(self):
"""Test stop SSVM
"""
# Validate the following
# 1. The SSVM should go to stop state
# 2. After a brief delay of say one minute, the SSVM should be
# restarted once again and return to Running state with previous two
# test cases still passing
# 3. If either of the two above steps fail the test is a failure
list_ssvm_response = list_ssvms(
self.apiclient,
systemvmtype='secondarystoragevm',
state='Running',
zoneid=self.zone.id
)
self.assertEqual(
isinstance(list_ssvm_response, list),
True,
"Check list response returns a valid list"
)
ssvm = list_ssvm_response[0]
hosts = list_hosts(
self.apiclient,
id=ssvm.hostid
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list response returns a valid list"
)
host = hosts[0]
self.debug("Stopping SSVM: %s" % ssvm.id)
cmd = stopSystemVm.stopSystemVmCmd()
cmd.id = ssvm.id
self.apiclient.stopSystemVm(cmd)
# Sleep to ensure that VM is in proper state
time.sleep(self.services["sleep"])
timeout = self.services["timeout"]
while True:
list_ssvm_response = list_ssvms(
self.apiclient,
id=ssvm.id
)
if isinstance(list_ssvm_response, list):
if list_ssvm_response[0].state == 'Running':
break
elif timeout == 0:
raise Exception("List SSVM call failed!")
time.sleep(self.services["sleep"])
timeout = timeout - 1
self.assertEqual(
isinstance(list_ssvm_response, list),
True,
"Check list response returns a valid list"
)
ssvm_response = list_ssvm_response[0]
self.debug("SSVM state after debug: %s" % ssvm_response.state)
self.assertEqual(
ssvm_response.state,
'Running',
"Check whether SSVM is running or not"
)
# Call above tests to ensure SSVM is properly running
self.test_01_list_sec_storage_vm()
return
def test_06_stop_cpvm(self):
"""Test stop CPVM
"""
# Validate the following
# 1. The CPVM should go to stop state
# 2. After a brief delay of say one minute, the SSVM should be
# restarted once again and return to Running state with previous
# two test cases still passing
# 3. If either of the two above steps fail the test is a failure
list_cpvm_response = list_ssvms(
self.apiclient,
systemvmtype='consoleproxy',
state='Running',
zoneid=self.zone.id
)
self.assertEqual(
isinstance(list_cpvm_response, list),
True,
"Check list response returns a valid list"
)
cpvm = list_cpvm_response[0]
hosts = list_hosts(
self.apiclient,
id=cpvm.hostid
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list response returns a valid list"
)
host = hosts[0]
self.debug("Stopping CPVM: %s" % cpvm.id)
cmd = stopSystemVm.stopSystemVmCmd()
cmd.id = cpvm.id
self.apiclient.stopSystemVm(cmd)
# Sleep to ensure that VM is in proper state
time.sleep(self.services["sleep"])
timeout = self.services["timeout"]
while True:
list_cpvm_response = list_ssvms(
self.apiclient,
id=cpvm.id
)
if isinstance(list_cpvm_response, list):
if list_cpvm_response[0].state == 'Running':
break
elif timeout == 0:
raise Exception("List CPVM call failed!")
time.sleep(self.services["sleep"])
timeout = timeout - 1
cpvm_response = list_cpvm_response[0]
self.debug("CPVM state after debug: %s" % cpvm_response.state)
self.assertEqual(
cpvm_response.state,
'Running',
"Check whether CPVM is running or not"
)
# Call above tests to ensure CPVM is properly running
self.test_02_list_cpvm_vm()
return
def test_07_reboot_ssvm(self):
"""Test reboot SSVM
"""
# Validate the following
# 1. The SSVM should go to stop and return to Running state
# 2. SSVM's public-ip and private-ip must remain the same
# before and after reboot
# 3. The cloud process should still be running within the SSVM
list_ssvm_response = list_ssvms(
self.apiclient,
systemvmtype='secondarystoragevm',
state='Running',
zoneid=self.zone.id
)
self.assertEqual(
isinstance(list_ssvm_response, list),
True,
"Check list response returns a valid list"
)
ssvm_response = list_ssvm_response[0]
hosts = list_hosts(
self.apiclient,
id=ssvm_response.hostid
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list response returns a valid list"
)
host = hosts[0]
#Store the public & private IP values before reboot
old_public_ip = ssvm_response.publicip
old_private_ip = ssvm_response.privateip
self.debug("Rebooting SSVM: %s" % ssvm_response.id)
cmd = rebootSystemVm.rebootSystemVmCmd()
cmd.id = ssvm_response.id
self.apiclient.rebootSystemVm(cmd)
# Sleep to ensure that VM is in proper state
time.sleep(self.services["sleep"])
timeout = self.services["timeout"]
while True:
list_ssvm_response = list_ssvms(
self.apiclient,
id=ssvm_response.id
)
if isinstance(list_ssvm_response, list):
if list_ssvm_response[0].state == 'Running':
break
elif timeout == 0:
raise Exception("List SSVM call failed!")
time.sleep(self.services["sleep"])
timeout = timeout - 1
ssvm_response = list_ssvm_response[0]
self.debug("SSVM State: %s" % ssvm_response.state)
self.assertEqual(
'Running',
str(ssvm_response.state),
"Check whether CPVM is running or not"
)
self.assertEqual(
ssvm_response.publicip,
old_public_ip,
"Check Public IP after reboot with that of before reboot"
)
self.assertEqual(
ssvm_response.privateip,
old_private_ip,
"Check Private IP after reboot with that of before reboot"
)
return
def test_08_reboot_cpvm(self):
"""Test reboot CPVM
"""
# Validate the following
# 1. The CPVM should go to stop and return to Running state
# 2. CPVM's public-ip and private-ip must remain
# the same before and after reboot
# 3. the cloud process should still be running within the CPVM
list_cpvm_response = list_ssvms(
self.apiclient,
systemvmtype='consoleproxy',
state='Running',
zoneid=self.zone.id
)
self.assertEqual(
isinstance(list_cpvm_response, list),
True,
"Check list response returns a valid list"
)
cpvm_response = list_cpvm_response[0]
hosts = list_hosts(
self.apiclient,
id=cpvm_response.hostid
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list response returns a valid list"
)
host = hosts[0]
#Store the public & private IP values before reboot
old_public_ip = cpvm_response.publicip
old_private_ip = cpvm_response.privateip
self.debug("Rebooting CPVM: %s" % cpvm_response.id)
cmd = rebootSystemVm.rebootSystemVmCmd()
cmd.id = cpvm_response.id
self.apiclient.rebootSystemVm(cmd)
# Sleep to ensure that VM is in proper state
time.sleep(self.services["sleep"])
timeout = self.services["timeout"]
while True:
list_cpvm_response = list_ssvms(
self.apiclient,
id=cpvm_response.id
)
if isinstance(list_cpvm_response, list):
if list_cpvm_response[0].state == 'Running':
break
elif timeout == 0:
raise Exception("List CPVM call failed!")
time.sleep(self.services["sleep"])
timeout = timeout - 1
cpvm_response = list_cpvm_response[0]
self.debug("CPVM state: %s" % cpvm_response.state)
self.assertEqual(
'Running',
str(cpvm_response.state),
"Check whether CPVM is running or not"
)
self.assertEqual(
cpvm_response.publicip,
old_public_ip,
"Check Public IP after reboot with that of before reboot"
)
self.assertEqual(
cpvm_response.privateip,
old_private_ip,
"Check Private IP after reboot with that of before reboot"
)
return
def test_09_destroy_ssvm(self):
"""Test destroy SSVM
"""
# Validate the following
# 1. SSVM should be completely destroyed and a new one will spin up
# 2. listSystemVMs will show a different name for the
# systemVM from what it was before
# 3. new SSVM will have a public/private and link-local-ip
# 4. cloud process within SSVM must be up and running
list_ssvm_response = list_ssvms(
self.apiclient,
systemvmtype='secondarystoragevm',
state='Running',
zoneid=self.zone.id
)
self.assertEqual(
isinstance(list_ssvm_response, list),
True,
"Check list response returns a valid list"
)
ssvm_response = list_ssvm_response[0]
old_name = ssvm_response.name
self.debug("Destroying SSVM: %s" % ssvm_response.id)
cmd = destroySystemVm.destroySystemVmCmd()
cmd.id = ssvm_response.id
self.apiclient.destroySystemVm(cmd)
# Sleep to ensure that VM is in proper state
time.sleep(self.services["sleep"])
timeout = self.services["timeout"]
while True:
list_ssvm_response = list_ssvms(
self.apiclient,
zoneid=self.zone.id,
systemvmtype='secondarystoragevm'
)
if isinstance(list_ssvm_response, list):
if list_ssvm_response[0].state == 'Running':
break
elif timeout == 0:
raise Exception("List SSVM call failed!")
time.sleep(self.services["sleep"])
timeout = timeout - 1
ssvm_response = list_ssvm_response[0]
# Verify Name, Public IP, Private IP and Link local IP
# for newly created SSVM
self.assertNotEqual(
ssvm_response.name,
old_name,
"Check SSVM new name with name of destroyed SSVM"
)
self.assertEqual(
hasattr(ssvm_response, 'privateip'),
True,
"Check whether SSVM has private IP field"
)
self.assertEqual(
hasattr(ssvm_response, 'linklocalip'),
True,
"Check whether SSVM has link local IP field"
)
self.assertEqual(
hasattr(ssvm_response, 'publicip'),
True,
"Check whether SSVM has public IP field"
)
return
def test_10_destroy_cpvm(self):
"""Test destroy CPVM
"""
# Validate the following
# 1. CPVM should be completely destroyed and a new one will spin up
# 2. listSystemVMs will show a different name for the systemVM from
# what it was before
# 3. new CPVM will have a public/private and link-local-ip
# 4. cloud process within CPVM must be up and running
list_cpvm_response = list_ssvms(
self.apiclient,
systemvmtype='consoleproxy',
zoneid=self.zone.id
)
self.assertEqual(
isinstance(list_cpvm_response, list),
True,
"Check list response returns a valid list"
)
cpvm_response = list_cpvm_response[0]
old_name = cpvm_response.name
self.debug("Destroying CPVM: %s" % cpvm_response.id)
cmd = destroySystemVm.destroySystemVmCmd()
cmd.id = cpvm_response.id
self.apiclient.destroySystemVm(cmd)
# Sleep to ensure that VM is in proper state
time.sleep(self.services["sleep"])
timeout = self.services["timeout"]
while True:
list_cpvm_response = list_ssvms(
self.apiclient,
systemvmtype='consoleproxy',
zoneid=self.zone.id
)
if isinstance(list_cpvm_response, list):
if list_cpvm_response[0].state == 'Running':
break
elif timeout == 0:
raise Exception("List CPVM call failed!")
time.sleep(self.services["sleep"])
timeout = timeout - 1
cpvm_response = list_cpvm_response[0]
# Verify Name, Public IP, Private IP and Link local IP
# for newly created CPVM
self.assertNotEqual(
cpvm_response.name,
old_name,
"Check SSVM new name with name of destroyed CPVM"
)
self.assertEqual(
hasattr(cpvm_response, 'privateip'),
True,
"Check whether CPVM has private IP field"
)
self.assertEqual(
hasattr(cpvm_response, 'linklocalip'),
True,
"Check whether CPVM has link local IP field"
)
self.assertEqual(
hasattr(cpvm_response, 'publicip'),
True,
"Check whether CPVM has public IP field"
)
return

View File

@ -0,0 +1,755 @@
# -*- encoding: utf-8 -*-
# Copyright 2012 Citrix Systems, Inc. Licensed under the
# Apache License, Version 2.0 (the "License"); you may not use this
# file except in compliance with the License. Citrix Systems, Inc.
# reserves all rights not expressly granted by the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Automatically generated by addcopyright.py at 04/03/2012
""" BVT tests for Virtual Machine Life Cycle
"""
#Import Local Modules
import marvin
from marvin.cloudstackTestCase import *
from marvin.cloudstackAPI import *
from integration.lib.utils import *
from integration.lib.base import *
from integration.lib.common import *
#Import System modules
import time
class Services:
"""Test VM Life Cycle Services
"""
def __init__(self):
self.services = {
"disk_offering":{
"displaytext": "Small",
"name": "Small",
"disksize": 1
},
"account": {
"email": "test@test.com",
"firstname": "Test",
"lastname": "User",
"username": "test",
# Random characters are appended in create account to
# ensure unique username generated each time
"password": "password",
},
"small":
# Create a small virtual machine instance with disk offering
{
"displayname": "testserver",
"username": "root", # VM creds for SSH
"password": "password",
"ssh_port": 22,
"hypervisor": 'XenServer',
"privateport": 22,
"publicport": 22,
"protocol": 'TCP',
},
"medium": # Create a medium virtual machine instance
{
"displayname": "testserver",
"username": "root",
"password": "password",
"ssh_port": 22,
"hypervisor": 'XenServer',
"privateport": 22,
"publicport": 22,
"protocol": 'TCP',
},
"service_offerings":
{
"tiny":
{
"name": "Tiny Instance",
"displaytext": "Tiny Instance",
"cpunumber": 1,
"cpuspeed": 100, # in MHz
"memory": 64, # In MBs
},
"small":
{
# Small service offering ID to for change VM
# service offering from medium to small
"name": "Small Instance",
"displaytext": "Small Instance",
"cpunumber": 1,
"cpuspeed": 500,
"memory": 256
},
"medium":
{
# Medium service offering ID to for
# change VM service offering from small to medium
"name": "Medium Instance",
"displaytext": "Medium Instance",
"cpunumber": 1,
"cpuspeed": 1000,
"memory": 1024
}
},
"iso": # ISO settings for Attach/Detach ISO tests
{
"displaytext": "Test ISO",
"name": "testISO",
"url": "http://nfs1.lab.vmops.com/isos_32bit/dsl-4.4.10.iso",
# Source URL where ISO is located
"ostypeid": '52e14b2f-dea6-46dc-94e1-fba3ee264fc8',
"mode": 'HTTP_DOWNLOAD', # Downloading existing ISO
},
"template": {
"displaytext": "Cent OS Template",
"name": "Cent OS Template",
"passwordenabled": True,
},
"diskdevice": '/dev/xvdd',
# Disk device where ISO is attached to instance
"mount_dir": "/mnt/tmp",
"sleep": 60,
"timeout": 10,
#Migrate VM to hostid
"ostypeid": '52e14b2f-dea6-46dc-94e1-fba3ee264fc8',
# CentOS 5.3 (64-bit)
"mode":'advanced',
}
class TestDeployVM(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.services = Services().services
# Get Zone, Domain and templates
domain = get_domain(self.apiclient, self.services)
zone = get_zone(self.apiclient, self.services)
template = get_template(
self.apiclient,
zone.id,
self.services["ostypeid"]
)
# Set Zones and disk offerings
self.services["small"]["zoneid"] = zone.id
self.services["small"]["template"] = template.id
self.services["medium"]["zoneid"] = zone.id
self.services["medium"]["template"] = template.id
self.services["iso"]["zoneid"] = zone.id
# Create Account, VMs, NAT Rules etc
self.account = Account.create(
self.apiclient,
self.services["account"],
domainid=domain.id
)
self.service_offering = ServiceOffering.create(
self.apiclient,
self.services["service_offerings"]["tiny"]
)
# Cleanup
self.cleanup = [
self.service_offering,
self.account
]
def test_deploy_vm(self):
"""Test Deploy Virtual Machine
"""
# Validate the following:
# 2. listVirtualMachines returns accurate information
# 3. The Cloud Database contains the valid information
self.virtual_machine = VirtualMachine.create(
self.apiclient,
self.services["small"],
accountid=self.account.account.name,
domainid=self.account.account.domainid,
serviceofferingid=self.service_offering.id
)
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.virtual_machine.id
)
self.debug(
"Verify listVirtualMachines response for virtual machine: %s" \
% self.virtual_machine.id
)
self.assertEqual(
isinstance(list_vm_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_vm_response),
0,
"Check VM available in List Virtual Machines"
)
vm_response = list_vm_response[0]
self.assertEqual(
vm_response.id,
self.virtual_machine.id,
"Check virtual machine id in listVirtualMachines"
)
self.assertEqual(
vm_response.displayname,
self.virtual_machine.displayname,
"Check virtual machine displayname in listVirtualMachines"
)
return
def tearDown(self):
try:
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
self.debug("Warning! Exception in tearDown: %s" % e)
class TestVMLifeCycle(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = super(TestVMLifeCycle, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
domain = get_domain(cls.api_client, cls.services)
zone = get_zone(cls.api_client, cls.services)
template = get_template(
cls.api_client,
zone.id,
cls.services["ostypeid"]
)
# Set Zones and disk offerings
cls.services["small"]["zoneid"] = zone.id
cls.services["small"]["template"] = template.id
cls.services["medium"]["zoneid"] = zone.id
cls.services["medium"]["template"] = template.id
cls.services["iso"]["zoneid"] = zone.id
# Create VMs, NAT Rules etc
cls.account = Account.create(
cls.api_client,
cls.services["account"],
domainid=domain.id
)
cls.small_offering = ServiceOffering.create(
cls.api_client,
cls.services["service_offerings"]["small"]
)
cls.medium_offering = ServiceOffering.create(
cls.api_client,
cls.services["service_offerings"]["medium"]
)
#create small and large virtual machines
cls.small_virtual_machine = VirtualMachine.create(
cls.api_client,
cls.services["small"],
accountid=cls.account.account.name,
domainid=cls.account.account.domainid,
serviceofferingid=cls.small_offering.id,
mode=cls.services["mode"]
)
cls.medium_virtual_machine = VirtualMachine.create(
cls.api_client,
cls.services["medium"],
accountid=cls.account.account.name,
domainid=cls.account.account.domainid,
serviceofferingid=cls.medium_offering.id,
mode=cls.services["mode"]
)
cls.virtual_machine = VirtualMachine.create(
cls.api_client,
cls.services["small"],
accountid=cls.account.account.name,
domainid=cls.account.account.domainid,
serviceofferingid=cls.small_offering.id,
mode=cls.services["mode"]
)
cls._cleanup = [
cls.small_offering,
cls.medium_offering,
cls.account
]
@classmethod
def tearDownClass(cls):
cls.api_client = super(TestVMLifeCycle, cls).getClsTestClient().getApiClient()
cleanup_resources(cls.api_client, cls._cleanup)
return
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = []
def tearDown(self):
#Clean up, terminate the created ISOs
cleanup_resources(self.apiclient, self.cleanup)
return
def test_01_stop_vm(self):
"""Test Stop Virtual Machine
"""
# Validate the following
# 1. Should Not be able to login to the VM.
# 2. listVM command should return
# this VM.State of this VM should be ""Stopped"".
self.debug("Stopping VM - ID: %s" % self.virtual_machine.id)
self.small_virtual_machine.stop(self.apiclient)
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.small_virtual_machine.id
)
self.assertEqual(
isinstance(list_vm_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_vm_response),
0,
"Check VM available in List Virtual Machines"
)
self.assertEqual(
list_vm_response[0].state,
"Stopped",
"Check virtual machine is in stopped state"
)
return
def test_02_start_vm(self):
"""Test Start Virtual Machine
"""
# Validate the following
# 1. listVM command should return this VM.State
# of this VM should be Running".
self.debug("Starting VM - ID: %s" % self.virtual_machine.id)
self.small_virtual_machine.start(self.apiclient)
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.small_virtual_machine.id
)
self.assertEqual(
isinstance(list_vm_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_vm_response),
0,
"Check VM avaliable in List Virtual Machines"
)
self.debug(
"Verify listVirtualMachines response for virtual machine: %s" \
% self.small_virtual_machine.id
)
self.assertEqual(
list_vm_response[0].state,
"Running",
"Check virtual machine is in running state"
)
return
def test_03_reboot_vm(self):
"""Test Reboot Virtual Machine
"""
# Validate the following
# 1. Should be able to login to the VM.
# 2. listVM command should return the deployed VM.
# State of this VM should be "Running"
self.debug("Rebooting VM - ID: %s" % self.virtual_machine.id)
self.small_virtual_machine.reboot(self.apiclient)
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.small_virtual_machine.id
)
self.assertEqual(
isinstance(list_vm_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_vm_response),
0,
"Check VM avaliable in List Virtual Machines"
)
self.assertEqual(
list_vm_response[0].state,
"Running",
"Check virtual machine is in running state"
)
return
def test_04_change_offering_small(self):
"""Change Offering to a small capacity
"""
# Validate the following
# 1. Log in to the Vm .We should see that the CPU and memory Info of
# this Vm matches the one specified for "Small" service offering.
# 2. Using listVM command verify that this Vm
# has Small service offering Id.
self.debug("Stopping VM - ID: %s" % self.medium_virtual_machine.id)
self.medium_virtual_machine.stop(self.apiclient)
# Poll listVM to ensure VM is stopped properly
timeout = self.services["timeout"]
while True:
time.sleep(self.services["sleep"])
# Ensure that VM is in stopped state
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.medium_virtual_machine.id
)
if isinstance(list_vm_response, list):
vm = list_vm_response[0]
if vm.state == 'Stopped':
self.debug("VM state: %s" % vm.state)
break
if timeout == 0:
raise Exception(
"Failed to stop VM (ID: %s) in change service offering" % vm.id)
timeout = timeout - 1
self.debug("Change Service offering VM - ID: %s" %
self.medium_virtual_machine.id)
cmd = changeServiceForVirtualMachine.changeServiceForVirtualMachineCmd()
cmd.id = self.medium_virtual_machine.id
cmd.serviceofferingid = self.small_offering.id
self.apiclient.changeServiceForVirtualMachine(cmd)
self.debug("Starting VM - ID: %s" % self.medium_virtual_machine.id)
self.medium_virtual_machine.start(self.apiclient)
# Poll listVM to ensure VM is started properly
timeout = self.services["timeout"]
while True:
time.sleep(self.services["sleep"])
# Ensure that VM is in running state
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.medium_virtual_machine.id
)
if isinstance(list_vm_response, list):
vm = list_vm_response[0]
if vm.state == 'Running':
self.debug("VM state: %s" % vm.state)
break
if timeout == 0:
raise Exception(
"Failed to start VM (ID: %s) after changing service offering" % vm.id)
timeout = timeout - 1
return
def test_05_change_offering_medium(self):
"""Change Offering to a medium capacity
"""
# Validate the following
# 1. Log in to the Vm .We should see that the CPU and memory Info of
# this Vm matches the one specified for "Medium" service offering.
# 2. Using listVM command verify that this Vm
# has Medium service offering Id.
self.debug("Stopping VM - ID: %s" % self.small_virtual_machine.id)
self.small_virtual_machine.stop(self.apiclient)
# Poll listVM to ensure VM is stopped properly
timeout = self.services["timeout"]
while True:
time.sleep(self.services["sleep"])
# Ensure that VM is in stopped state
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.small_virtual_machine.id
)
if isinstance(list_vm_response, list):
vm = list_vm_response[0]
if vm.state == 'Stopped':
self.debug("VM state: %s" % vm.state)
break
if timeout == 0:
raise Exception(
"Failed to stop VM (ID: %s) in change service offering" % vm.id)
timeout = timeout - 1
self.debug("Change service offering VM - ID: %s" %
self.small_virtual_machine.id)
cmd = changeServiceForVirtualMachine.changeServiceForVirtualMachineCmd()
cmd.id = self.small_virtual_machine.id
cmd.serviceofferingid = self.medium_offering.id
self.apiclient.changeServiceForVirtualMachine(cmd)
self.debug("Starting VM - ID: %s" % self.small_virtual_machine.id)
self.small_virtual_machine.start(self.apiclient)
# Poll listVM to ensure VM is started properly
timeout = self.services["timeout"]
while True:
time.sleep(self.services["sleep"])
# Ensure that VM is in running state
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.small_virtual_machine.id
)
if isinstance(list_vm_response, list):
vm = list_vm_response[0]
if vm.state == 'Running':
self.debug("VM state: %s" % vm.state)
break
if timeout == 0:
raise Exception(
"Failed to start VM (ID: %s) after changing service offering" % vm.id)
timeout = timeout - 1
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.small_virtual_machine.id
)
return
def test_06_destroy_vm(self):
"""Test destroy Virtual Machine
"""
# Validate the following
# 1. Should not be able to login to the VM.
# 2. listVM command should return this VM.State
# of this VM should be "Destroyed".
self.debug("Destroy VM - ID: %s" % self.small_virtual_machine.id)
self.small_virtual_machine.delete(self.apiclient)
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.small_virtual_machine.id
)
self.assertEqual(
isinstance(list_vm_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_vm_response),
0,
"Check VM avaliable in List Virtual Machines"
)
self.assertEqual(
list_vm_response[0].state,
"Destroyed",
"Check virtual machine is in destroyed state"
)
return
def test_07_restore_vm(self):
"""Test recover Virtual Machine
"""
# Validate the following
# 1. listVM command should return this VM.
# State of this VM should be "Stopped".
# 2. We should be able to Start this VM successfully.
self.debug("Recovering VM - ID: %s" % self.small_virtual_machine.id)
cmd = recoverVirtualMachine.recoverVirtualMachineCmd()
cmd.id = self.small_virtual_machine.id
self.apiclient.recoverVirtualMachine(cmd)
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.small_virtual_machine.id
)
self.assertEqual(
isinstance(list_vm_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_vm_response),
0,
"Check VM avaliable in List Virtual Machines"
)
self.assertEqual(
list_vm_response[0].state,
"Stopped",
"Check virtual machine is in Stopped state"
)
return
def test_08_migrate_vm(self):
"""Test migrate VM
"""
# Validate the following
# 1. Should be able to login to the VM.
# 2. listVM command should return this VM.State of this VM
# should be "Running" and the host should be the host
# to which the VM was migrated to
hosts = Host.list(
self.apiclient,
zoneid=self.medium_virtual_machine.zoneid,
type='Routing'
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check the number of hosts in the zone"
)
self.assertEqual(
len(hosts),
2,
"Atleast 2 hosts should be present in a zone for VM migration"
)
# Find the host of VM and also the new host to migrate VM.
if self.medium_virtual_machine.hostid == hosts[0].id:
host = hosts[1]
else:
host = hosts[0]
self.debug("Migrating VM-ID: %s to Host: %s" % (
self.medium_virtual_machine.id,
host.id
))
cmd = migrateVirtualMachine.migrateVirtualMachineCmd()
cmd.hostid = host.id
cmd.virtualmachineid = self.medium_virtual_machine.id
self.apiclient.migrateVirtualMachine(cmd)
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.medium_virtual_machine.id
)
self.assertEqual(
isinstance(list_vm_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
list_vm_response,
None,
"Check virtual machine is listVirtualMachines"
)
vm_response = list_vm_response[0]
self.assertEqual(
vm_response.id,
self.medium_virtual_machine.id,
"Check virtual machine ID of migrated VM"
)
self.assertEqual(
vm_response.hostid,
host.id,
"Check destination hostID of migrated VM"
)
return
def test_09_expunge_vm(self):
"""Test destroy(expunge) Virtual Machine
"""
# Validate the following
# 1. listVM command should NOT return this VM any more.
self.debug("Expunge VM-ID: %s" % self.small_virtual_machine.id)
cmd = destroyVirtualMachine.destroyVirtualMachineCmd()
cmd.id = self.small_virtual_machine.id
self.apiclient.destroyVirtualMachine(cmd)
config = list_configurations(
self.apiclient,
name='expunge.delay'
)
response = config[0]
# Wait for some time more than expunge.delay
time.sleep(int(response.value) * 2)
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.small_virtual_machine.id
)
self.assertEqual(
list_vm_response,
None,
"Check Expunged virtual machine is listVirtualMachines"
)
return

View File

@ -0,0 +1,462 @@
# -*- encoding: utf-8 -*-
# Copyright 2012 Citrix Systems, Inc. Licensed under the
# Apache License, Version 2.0 (the "License"); you may not use this
# file except in compliance with the License. Citrix Systems, Inc.
# reserves all rights not expressly granted by the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Automatically generated by addcopyright.py at 04/03/2012
""" BVT tests for Volumes
"""
#Import Local Modules
import marvin
from marvin.cloudstackTestCase import *
from marvin.cloudstackAPI import *
from integration.lib.utils import *
from integration.lib.base import *
from integration.lib.common import *
#Import System modules
import os
import urllib
import time
class Services:
"""Test Volume Services
"""
def __init__(self):
self.services = {
"account": {
"email": "test@test.com",
"firstname": "Test",
"lastname": "User",
"username": "test",
# Random characters are appended for unique
# username
"password": "password",
},
"service_offering": {
"name": "Tiny Instance",
"displaytext": "Tiny Instance",
"cpunumber": 1,
"cpuspeed": 100, # in MHz
"memory": 64, # In MBs
},
"disk_offering": {
"displaytext": "Small",
"name": "Small",
"disksize": 1
},
"volume_offerings": {
0: {
"diskname": "TestDiskServ",
},
},
"customdisksize": 1, # GBs
"username": "root", # Creds for SSH to VM
"password": "password",
"ssh_port": 22,
"diskname": "TestDiskServ",
"hypervisor": 'XenServer',
"privateport": 22,
"publicport": 22,
"protocol": 'TCP',
"diskdevice": "/dev/xvdb",
"ostypeid": '52e14b2f-dea6-46dc-94e1-fba3ee264fc8',
"mode": 'advanced',
"sleep": 60,
"timeout": 10,
}
class TestCreateVolume(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = super(TestCreateVolume, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
cls.zone = get_zone(cls.api_client, cls.services)
cls.disk_offering = DiskOffering.create(
cls.api_client,
cls.services["disk_offering"]
)
cls.custom_disk_offering = DiskOffering.create(
cls.api_client,
cls.services["disk_offering"],
custom=True
)
template = get_template(
cls.api_client,
cls.zone.id,
cls.services["ostypeid"]
)
cls.services["domainid"] = cls.domain.id
cls.services["zoneid"] = cls.zone.id
cls.services["template"] = template.id
cls.services["customdiskofferingid"] = cls.custom_disk_offering.id
# Create VMs, NAT Rules etc
cls.account = Account.create(
cls.api_client,
cls.services["account"],
domainid=cls.domain.id
)
cls.services["account"] = cls.account.account.name
cls.service_offering = ServiceOffering.create(
cls.api_client,
cls.services["service_offering"]
)
cls.virtual_machine = VirtualMachine.create(
cls.api_client,
cls.services,
accountid=cls.account.account.name,
domainid=cls.account.account.domainid,
serviceofferingid=cls.service_offering.id,
mode=cls.services["mode"]
)
cls._cleanup = [
cls.service_offering,
cls.disk_offering,
cls.custom_disk_offering,
cls.account
]
def setUp(self):
self.apiClient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = []
def test_01_create_volume(self):
"""Test Volume creation for all Disk Offerings (incl. custom)
"""
# Validate the following
# 1. Create volumes from the different sizes
# 2. Verify the size of volume with actual size allocated
self.volumes = []
for k, v in self.services["volume_offerings"].items():
volume = Volume.create(
self.apiClient,
v,
zoneid=self.zone.id,
account=self.account.account.name,
domainid=self.account.account.domainid,
diskofferingid=self.disk_offering.id
)
self.debug("Created a volume with ID: %s" % volume.id)
self.volumes.append(volume)
volume = Volume.create_custom_disk(
self.apiClient,
self.services,
account=self.account.account.name,
domainid=self.account.account.domainid,
)
self.debug("Created a volume with custom offering: %s" % volume.id)
self.volumes.append(volume)
#Attach a volume with different disk offerings
#and check the memory allocated to each of them
for volume in self.volumes:
list_volume_response = list_volumes(
self.apiClient,
id=volume.id
)
self.assertEqual(
isinstance(list_volume_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
list_volume_response,
None,
"Check if volume exists in ListVolumes"
)
self.debug(
"Attaching volume (ID: %s) to VM (ID: %s)" % (
volume.id,
self.virtual_machine.id
))
self.virtual_machine.attach_volume(
self.apiClient,
volume
)
# Poll listVM to ensure VM is started properly
timeout = self.services["timeout"]
while True:
time.sleep(self.services["sleep"])
# Ensure that VM is in running state
list_vm_response = list_virtual_machines(
self.apiClient,
id=self.virtual_machine.id
)
if isinstance(list_vm_response, list):
vm = list_vm_response[0]
if vm.state == 'Running':
self.debug("VM state: %s" % vm.state)
break
if timeout == 0:
raise Exception(
"Failed to start VM (ID: %s) " % vm.id)
timeout = timeout - 1
self.virtual_machine.detach_volume(self.apiClient, volume)
def tearDown(self):
#Clean up, terminate the created volumes
cleanup_resources(self.apiClient, self.cleanup)
return
@classmethod
def tearDownClass(cls):
try:
cls.api_client = super(TestCreateVolume, cls).getClsTestClient().getApiClient()
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
class TestVolumes(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = super(TestVolumes, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
cls.zone = get_zone(cls.api_client, cls.services)
cls.disk_offering = DiskOffering.create(
cls.api_client,
cls.services["disk_offering"]
)
template = get_template(
cls.api_client,
cls.zone.id,
cls.services["ostypeid"]
)
cls.services["domainid"] = cls.domain.id
cls.services["zoneid"] = cls.zone.id
cls.services["template"] = template.id
cls.services["diskofferingid"] = cls.disk_offering.id
# Create VMs, VMs etc
cls.account = Account.create(
cls.api_client,
cls.services["account"],
domainid=cls.domain.id
)
cls.services["account"] = cls.account.account.name
cls.service_offering = ServiceOffering.create(
cls.api_client,
cls.services["service_offering"]
)
cls.virtual_machine = VirtualMachine.create(
cls.api_client,
cls.services,
accountid=cls.account.account.name,
domainid=cls.account.account.domainid,
serviceofferingid=cls.service_offering.id,
mode=cls.services["mode"]
)
cls.volume = Volume.create(
cls.api_client,
cls.services,
account=cls.account.account.name,
domainid=cls.account.account.domainid
)
cls._cleanup = [
cls.service_offering,
cls.disk_offering,
cls.account
]
@classmethod
def tearDownClass(cls):
try:
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
def setUp(self):
self.apiClient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
def test_02_attach_volume(self):
"""Attach a created Volume to a Running VM
"""
# Validate the following
# 1. shows list of volumes
# 2. "Attach Disk" pop-up box will display with list of instances
# 3. disk should be attached to instance successfully
self.debug(
"Attaching volume (ID: %s) to VM (ID: %s)" % (
self.volume.id,
self.virtual_machine.id
))
self.virtual_machine.attach_volume(self.apiClient, self.volume)
list_volume_response = list_volumes(
self.apiClient,
id=self.volume.id
)
self.assertEqual(
isinstance(list_volume_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
list_volume_response,
None,
"Check if volume exists in ListVolumes"
)
volume = list_volume_response[0]
self.assertNotEqual(
volume.virtualmachineid,
None,
"Check if volume state (attached) is reflected"
)
return
def test_03_download_attached_volume(self):
"""Download a Volume attached to a VM
"""
# Validate the following
# 1. download volume will fail with proper error message
# "Failed - Invalid state of the volume with ID:
# It should be either detached or the VM should be in stopped state
self.debug("Extract attached Volume ID: %s" % self.volume.id)
cmd = extractVolume.extractVolumeCmd()
cmd.id = self.volume.id
cmd.mode = "HTTP_DOWNLOAD"
cmd.zoneid = self.services["zoneid"]
# A proper exception should be raised;
# downloading attach VM is not allowed
with self.assertRaises(Exception):
self.apiClient.extractVolume(cmd)
def test_04_delete_attached_volume(self):
"""Delete a Volume attached to a VM
"""
# Validate the following
# 1. delete volume will fail with proper error message
# "Failed - Invalid state of the volume with ID:
# It should be either detached or the VM should be in stopped state
self.debug("Trying to delete attached Volume ID: %s" %
self.volume.id)
cmd = deleteVolume.deleteVolumeCmd()
cmd.id = self.volume.id
#Proper exception should be raised; deleting attach VM is not allowed
#with self.assertRaises(Exception):
result = self.apiClient.deleteVolume(cmd)
self.assertEqual(
result,
None,
"Check for delete download error while volume is attached"
)
def test_05_detach_volume(self):
"""Detach a Volume attached to a VM
"""
# Validate the following
# Data disk should be detached from instance and detached data disk
# details should be updated properly
self.debug(
"Detaching volume (ID: %s) from VM (ID: %s)" % (
self.volume.id,
self.virtual_machine.id
))
self.virtual_machine.detach_volume(self.apiClient, self.volume)
#Sleep to ensure the current state will reflected in other calls
time.sleep(self.services["sleep"])
list_volume_response = list_volumes(
self.apiClient,
id=self.volume.id
)
self.assertEqual(
isinstance(list_volume_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
list_volume_response,
None,
"Check if volume exists in ListVolumes"
)
volume = list_volume_response[0]
self.assertEqual(
volume.virtualmachineid,
None,
"Check if volume state (detached) is reflected"
)
return
def test_06_download_detached_volume(self):
"""Download a Volume unattached to an VM
"""
# Validate the following
# 1. able to download the volume when its not attached to instance
self.debug("Extract detached Volume ID: %s" % self.volume.id)
cmd = extractVolume.extractVolumeCmd()
cmd.id = self.volume.id
cmd.mode = "HTTP_DOWNLOAD"
cmd.zoneid = self.services["zoneid"]
extract_vol = self.apiClient.extractVolume(cmd)
def test_07_delete_detached_volume(self):
"""Delete a Volume unattached to an VM
"""
# Validate the following
# 1. volume should be deleted successfully and listVolume should not
# contain the deleted volume details.
# 2. "Delete Volume" menu item not shown under "Actions" menu.
# (UI should not allow to delete the volume when it is attached
# to instance by hiding the menu Item)
self.debug("Delete Volume ID: %s" % self.volume.id)
cmd = deleteVolume.deleteVolumeCmd()
cmd.id = self.volume.id
self.apiClient.deleteVolume(cmd)
list_volume_response = list_volumes(
self.apiClient,
id=self.volume.id,
type='DATADISK'
)
self.assertEqual(
list_volume_response,
None,
"Check if volume exists in ListVolumes"
)
return

69
test/setup.sh Executable file
View File

@ -0,0 +1,69 @@
#!/bin/bash
usage() {
printf "Usage: %s:\n
[-t path to tests ] \n
[-m mgmt-server ] \n
[-c config-file ] \n
[-d db node url ]\n" $(basename $0) >&2
}
failed() {
exit $1
}
#defaults
FMT=$(date +"%d_%I_%Y_%s")
MGMT_SVR="localhost"
BASEDIR="/root/cloudstack-oss/test"
TESTDIR="/root/cloudstack-oss/test/integration/smoke-simulator/"
CONFIG="/root/cloudstack-oss/test/integration/smoke-simulator/simulator-smoke.cfg"
DB_SVR="localhost"
while getopts 't:d:m:' OPTION
do
case $OPTION in
d) dflag=1
DB_SVR="$OPTARG"
;;
t) tflag=1
TESTDIR="$OPTARG"
;;
m) mflag=1
MGMT_SVR="$OPTARG"
;;
?) usage
failed 2
;;
esac
done
ostypeid=$(mysql -uroot -Dcloud -h$MGMT_SVR -s -N -r -e"select uuid from guest_os where id=11")
$(mysql -uroot -Dcloud -h$MGMT_SVR -s -N -r -e"update configuration set value='8096' where name='integration.api.port'")
pushd $BASEDIR
for file in `find $TESTDIR -name *.py -type f`
do
old_ostypeid=$(grep ostypeid $file | head -1 | cut -d: -f2 | tr -d " ,'")
if [[ $old_ostypeid != "" ]]
then
echo "replacing:" $old_ostypeid, "with:" $ostypeid,"in " $file
sed -i "s/$old_ostypeid/$ostypeid/g" $file
sed -i "s/http:\/\/iso.linuxquestions.org\/download\/504\/1819\/http\/gd4.tuwien.ac.at\/dsl-4.4.10.iso/http:\/\/nfs1.lab.vmops.com\/isos_32bit\/dsl-4.4.10.iso/g" $file
sed -i "s/fr3sca/password/g" $file
fi
done
version_tuple=$(python -c 'import sys; print(sys.version_info[:2])')
if [[ $version_tuple == "(2, 7)" ]]
then
python -m marvin.deployAndRun -c $CONFIG -t /tmp/t.log -r /tmp/r.log -d /tmp
sleep 60
python -m marvin.deployAndRun -c $CONFIG -t /tmp/t.log -r /tmp/r.log -f $TESTDIR/testSetupSuccess.py -l
cat /tmp/r.log
python -m marvin.deployAndRun -c $CONFIG -t /tmp/t.log -r /tmp/r.log -d $TESTDIR -l
echo "Done"
else
echo "Python version 2.7 not detected on system. Aborting"
fi
popd