1. Add support for project related tests.

2. Fix the classes to fetch HTTP COnnection from the Parent class
3. Minor fixes
This commit is contained in:
Chirag Jog 2012-04-02 10:08:59 -07:00
parent 7dfe2ae9c7
commit 6cfa562965
31 changed files with 7680 additions and 472 deletions

View File

@ -109,7 +109,7 @@ class TestDiskOfferings(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.services = Services().services
cls.api_client = fetch_api_client()
cls.api_client = super(TestDiskOfferings, cls).getClsTestClient().getApiClient()
cls.disk_offering_1 = DiskOffering.create(
cls.api_client,
cls.services["off"]
@ -124,7 +124,7 @@ class TestDiskOfferings(cloudstackTestCase):
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
cls.api_client = super(TestDiskOfferings, cls).getClsTestClient().getApiClient()
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)

View File

@ -78,9 +78,6 @@ class Services:
"password": "fr3sca",
},
},
"zoneid": '5894a264-12b0-4257-a316-06c831bcf8e2',
# Optional, if specified the mentioned zone will be
# used for tests
}
class TestHosts(cloudstackTestCase):

View File

@ -40,7 +40,7 @@ class Services:
"isextractable": True,
"isfeatured": True,
"ispublic": True,
"ostypeid": '144f66aa-7f74-4cfe-9799-80cc21439cb3',
"ostypeid": '5776c0d2-f331-42db-ba3a-29f1f8319bc9',
},
"iso_2":
{
@ -51,7 +51,7 @@ class Services:
"isextractable": True,
"isfeatured": True,
"ispublic": True,
"ostypeid": '144f66aa-7f74-4cfe-9799-80cc21439cb3',
"ostypeid": '5776c0d2-f331-42db-ba3a-29f1f8319bc9',
"mode": 'HTTP_DOWNLOAD',
# Used in Extract template, value must be HTTP_DOWNLOAD
},
@ -64,7 +64,7 @@ class Services:
"passwordenabled": True,
"sleep": 60,
"timeout": 10,
"ostypeid": '144f66aa-7f74-4cfe-9799-80cc21439cb3',
"ostypeid": '5776c0d2-f331-42db-ba3a-29f1f8319bc9',
# CentOS 5.3 (64 bit)
"mode": 'advanced'
# Networking mode: Basic or Advanced
@ -168,7 +168,7 @@ class TestISO(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.services = Services().services
cls.api_client = fetch_api_client()
cls.api_client = super(TestISO, cls).getClsTestClient().getApiClient()
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
@ -216,7 +216,7 @@ class TestISO(cloudstackTestCase):
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
cls.api_client = super(TestISO, cls).getClsTestClient().getApiClient()
#Clean up, terminate the created templates
cleanup_resources(cls.api_client, cls._cleanup)

View File

@ -22,7 +22,7 @@ class Services:
def __init__(self):
self.services = {
"ostypeid": '144f66aa-7f74-4cfe-9799-80cc21439cb3',
"ostypeid": '5776c0d2-f331-42db-ba3a-29f1f8319bc9',
# Cent OS 5.3 (64 bit)
"mode": 'advanced',
# Networking mode: Basic or advanced
@ -30,10 +30,23 @@ class Services:
# Time interval after which LB switches the requests
"sleep": 60,
"timeout":10,
"network_offering": {
"name": 'Test Network offering',
"displaytext": 'Test Network offering',
"guestiptype": 'Isolated',
"supportedservices": 'Dhcp,Dns,SourceNat,PortForwarding',
"traffictype": 'GUEST',
"availability": 'Optional',
"serviceProviderList" : {
"Dhcp": 'VirtualRouter',
"Dns": 'VirtualRouter',
"SourceNat": 'VirtualRouter',
"PortForwarding": 'VirtualRouter',
},
},
"network": {
"name": "Test Network",
"displaytext": "Test Network",
"networkoffering": '3d4e36f1-6b5f-40fc-bb63-fe9353419e91',
},
"service_offering": {
"name": "Tiny Instance",
@ -87,7 +100,7 @@ class TestPublicIP(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestPublicIP, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
@ -107,6 +120,15 @@ class TestPublicIP(cloudstackTestCase):
domainid=cls.domain.id
)
cls.services["network"]["zoneid"] = cls.zone.id
cls.network_offering = NetworkOffering.create(
cls.api_client,
cls.services["network_offering"],
)
# Enable Network offering
cls.network_offering.update(cls.api_client, state='Enabled')
cls.services["network"]["networkoffering"] = cls.network_offering.id
cls.account_network = Network.create(
cls.api_client,
cls.services["network"],
@ -138,6 +160,7 @@ class TestPublicIP(cloudstackTestCase):
cls.user_network,
cls.account,
cls.user,
cls.network_offering
]
return
@ -256,7 +279,7 @@ class TestPortForwarding(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestPortForwarding, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
@ -301,7 +324,7 @@ class TestPortForwarding(cloudstackTestCase):
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
cls.api_client = super(TestPortForwarding, cls).getClsTestClient().getApiClient()
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
@ -536,7 +559,7 @@ class TestLoadBalancingRule(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestLoadBalancingRule, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)

View File

@ -46,9 +46,6 @@ class Services:
"hypervisor": 'XEN',
},
},
"zoneid": '5894a264-12b0-4257-a316-06c831bcf8e2',
# Optional, if specified the mentioned zone will be
# used for tests
}
class TestPrimaryStorageServices(cloudstackTestCase):

View File

@ -47,7 +47,7 @@ class Services:
"username": "testuser",
"password": "fr3sca",
},
"ostypeid":'144f66aa-7f74-4cfe-9799-80cc21439cb3',
"ostypeid":'5776c0d2-f331-42db-ba3a-29f1f8319bc9',
"sleep": 60,
"timeout": 10,
"mode": 'advanced', #Networking mode: Basic, Advanced
@ -59,7 +59,10 @@ class TestRouterServices(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(
TestRouterServices,
cls
).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
@ -99,7 +102,10 @@ class TestRouterServices(cloudstackTestCase):
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
cls.api_client = super(
TestRouterServices,
cls
).getClsTestClient().getApiClient()
#Clean up, terminate the created templates
cleanup_resources(cls.api_client, cls.cleanup)
@ -136,7 +142,8 @@ class TestRouterServices(cloudstackTestCase):
self.apiclient,
zoneid=router.zoneid,
type='Routing',
state='Up'
state='Up',
virtualmachineid=self.vm_1.id
)
self.assertEqual(
isinstance(hosts, list),
@ -197,7 +204,8 @@ class TestRouterServices(cloudstackTestCase):
self.apiclient,
zoneid=router.zoneid,
type='Routing',
state='Up'
state='Up',
virtualmachineid=self.vm_1.id
)
self.assertEqual(
isinstance(hosts, list),
@ -382,7 +390,8 @@ class TestRouterServices(cloudstackTestCase):
self.apiclient,
zoneid=router.zoneid,
type='Routing',
state='Up'
state='Up',
virtualmachineid=self.vm_1.id
)
self.assertEqual(
isinstance(hosts, list),
@ -742,6 +751,7 @@ class TestRouterServices(cloudstackTestCase):
cmd.id = vm.id
self.apiclient.stopVirtualMachine(cmd)
# Get network.gc.interval config value
config = list_configurations(
self.apiclient,
name='network.gc.interval'
@ -751,10 +761,23 @@ class TestRouterServices(cloudstackTestCase):
True,
"Check list response returns a valid list"
)
response = config[0]
gcinterval = config[0]
# Wait for network.gc.interval * 3 time to cleanup all the resources
time.sleep(int(response.value) * 3)
# Get network.gc.wait config value
config = list_configurations(
self.apiclient,
name='network.gc.wait'
)
self.assertEqual(
isinstance(config, list),
True,
"Check list response returns a valid list"
)
gcwait = config[0]
total_wait = int(gcinterval.value) + int(gcwait.value)
# Wait for wait_time * 2 time to cleanup all the resources
time.sleep(total_wait * 2)
timeout = self.services["timeout"]
while True:

View File

@ -46,7 +46,7 @@ class TestSecStorageServices(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestSecStorageServices, cls).getClsTestClient().getApiClient()
cls.services = Services().services
cls._cleanup = []
return

View File

@ -130,7 +130,7 @@ class TestServiceOfferings(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.services = Services().services
cls.api_client = fetch_api_client()
cls.api_client = super(TestServiceOfferings, cls).getClsTestClient().getApiClient()
cls.service_offering_1 = ServiceOffering.create(
cls.api_client,
cls.services["off"]
@ -145,7 +145,7 @@ class TestServiceOfferings(cloudstackTestCase):
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
cls.api_client = super(TestServiceOfferings, cls).getClsTestClient().getApiClient()
#Clean up, terminate the created templates
cleanup_resources(cls.api_client, cls._cleanup)

View File

@ -79,10 +79,10 @@ class Services:
{
"displaytext": 'Template from snapshot',
"name": 'Template from snapshot',
"ostypeid": '144f66aa-7f74-4cfe-9799-80cc21439cb3',
"ostypeid": '5776c0d2-f331-42db-ba3a-29f1f8319bc9',
"templatefilter": 'self',
},
"ostypeid": '144f66aa-7f74-4cfe-9799-80cc21439cb3',
"ostypeid": '5776c0d2-f331-42db-ba3a-29f1f8319bc9',
# Cent OS 5.3 (64 bit)
"diskdevice": "/dev/xvdb", # Data Disk
"rootdisk": "/dev/xvda", # Root Disk
@ -110,7 +110,7 @@ class TestSnapshotRootDisk(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestSnapshotRootDisk, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
@ -261,25 +261,26 @@ class TestSnapshotRootDisk(cloudstackTestCase):
True,
"Check list response returns a valid list"
)
uuids = []
for host in hosts:
# hosts[0].name = "nfs://192.168.100.21/export/test"
parse_url = (host.name).split('/')
# parse_url = ['nfs:', '', '192.168.100.21', 'export', 'test']
# hosts[0].name = "nfs://192.168.100.21/export/test"
parse_url = (hosts[0].name).split('/')
# parse_url = ['nfs:', '', '192.168.100.21', 'export', 'test']
# Split IP address and export path from name
sec_storage_ip = parse_url[2]
# Sec Storage IP: 192.168.100.21
# Split IP address and export path from name
sec_storage_ip = parse_url[2]
# Sec Storage IP: 192.168.100.21
export_path = '/'.join(parse_url[3:])
# Export path: export/test
export_path = '/'.join(parse_url[3:])
# Export path: export/test
try:
# Login to VM to check snapshot present on sec disk
ssh_client = self.virtual_machine_with_disk.get_ssh_client()
try:
# Login to VM to check snapshot present on sec disk
ssh_client = self.virtual_machine_with_disk.get_ssh_client()
cmds = [
cmds = [
"mkdir -p %s" % self.services["mount_dir"],
"mount %s:/%s %s" % (
"mount %s/%s %s" % (
sec_storage_ip,
export_path,
self.services["mount_dir"]
@ -291,32 +292,37 @@ class TestSnapshotRootDisk(cloudstackTestCase):
),
]
for c in cmds:
result = ssh_client.execute(c)
for c in cmds:
self.debug(c)
result = ssh_client.execute(c)
self.debug(result)
except Exception:
self.fail("SSH failed for Virtual machine: %s" %
except Exception:
self.fail("SSH failed for Virtual machine: %s" %
self.virtual_machine_with_disk.ipaddress)
res = str(result)
uuids.append(result)
# Unmount the Sec Storage
cmds = [
"umount %s" % (self.services["mount_dir"]),
]
try:
for c in cmds:
self.debug(c)
result = ssh_client.execute(c)
self.debug(result)
except Exception as e:
self.fail("SSH failed for Virtual machine: %s" %
self.virtual_machine_with_disk.ipaddress)
res = str(uuids)
# Check snapshot UUID in secondary storage and database
self.assertEqual(
res.count(snapshot_uuid),
1,
"Check snapshot UUID in secondary storage and database"
)
# Unmount the Sec Storage
cmds = [
"umount %s" % (self.services["mount_dir"]),
]
try:
for c in cmds:
result = ssh_client.execute(c)
except Exception as e:
self.fail("SSH failed for Virtual machine: %s" %
self.virtual_machine_with_disk.ipaddress)
return
@ -324,7 +330,7 @@ class TestSnapshots(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestSnapshots, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
@ -490,25 +496,26 @@ class TestSnapshots(cloudstackTestCase):
True,
"Check list response returns a valid list"
)
uuids = []
for host in hosts:
# hosts[0].name = "nfs://192.168.100.21/export"
parse_url = (host.name).split('/')
# parse_url = ['nfs:', '', '192.168.100.21', 'export']
# hosts[0].name = "nfs://192.168.100.21/export"
parse_url = (hosts[0].name).split('/')
# parse_url = ['nfs:', '', '192.168.100.21', 'export']
# Split IP address and export path from name
sec_storage_ip = parse_url[2]
# Sec Storage IP: 192.168.100.21
# Split IP address and export path from name
sec_storage_ip = parse_url[2]
# Sec Storage IP: 192.168.100.21
export_path = '/'.join(parse_url[3:])
# Export path: export
export_path = '/'.join(parse_url[3:])
# Export path: export
try:
# Login to VM to check snapshot present on sec disk
ssh_client = self.virtual_machine_with_disk.get_ssh_client()
try:
# Login to VM to check snapshot present on sec disk
ssh_client = self.virtual_machine_with_disk.get_ssh_client()
cmds = [
cmds = [
"mkdir -p %s" % self.services["mount_dir"],
"mount %s:/%s %s" % (
"mount %s/%s %s" % (
sec_storage_ip,
export_path,
self.services["mount_dir"]
@ -519,34 +526,36 @@ class TestSnapshots(cloudstackTestCase):
volume_id
),
]
for c in cmds:
self.debug(c)
result = ssh_client.execute(c)
self.debug(result)
for c in cmds:
self.debug(c)
result = ssh_client.execute(c)
self.debug(result)
except Exception as e:
self.fail("SSH failed for VM with IP: %s" %
except Exception as e:
self.fail("SSH failed for VM with IP: %s" %
self.virtual_machine_with_disk.ipaddress)
res = str(result)
uuids.append(result)
# Unmount the Sec Storage
cmds = [
"umount %s" % (self.services["mount_dir"]),
]
try:
for c in cmds:
self.debug(c)
ssh_client.execute(c)
except Exception as e:
self.fail("SSH failed for VM with IP: %s" %
self.virtual_machine_with_disk.ipaddress)
res = str(uuids)
# Check snapshot UUID in secondary storage and database
self.assertEqual(
res.count(snapshot_uuid),
1,
"Check snapshot UUID in secondary storage and database"
)
# Unmount the Sec Storage
cmds = [
"umount %s" % (self.services["mount_dir"]),
]
try:
for c in cmds:
ssh_client.execute(c)
except Exception as e:
self.fail("SSH failed for VM with IP: %s" %
self.virtual_machine_with_disk.ipaddress)
return
def test_03_volume_from_snapshot(self):
@ -596,6 +605,7 @@ class TestSnapshots(cloudstackTestCase):
),
]
for c in cmds:
self.debug(c)
ssh_client.execute(c)
except Exception as e:
@ -608,6 +618,7 @@ class TestSnapshots(cloudstackTestCase):
try:
for c in cmds:
self.debug(c)
ssh_client.execute(c)
except Exception as e:
@ -683,7 +694,9 @@ class TestSnapshots(cloudstackTestCase):
]
for c in cmds:
ssh.execute(c)
self.debug(c)
result = ssh.execute(c)
self.debug(result)
returned_data_0 = ssh.execute("cat %s/%s/%s/%s" % (
self.services["mount_dir"],
@ -830,7 +843,8 @@ class TestSnapshots(cloudstackTestCase):
volumeid=volume[0].id,
intervaltype=\
self.services["recurring_snapshot"]["intervaltype"],
snapshottype='RECURRING'
snapshottype='RECURRING',
listall=True
)
if isinstance(snapshots, list):
@ -924,7 +938,8 @@ class TestSnapshots(cloudstackTestCase):
volumeid=volume[0].id,
intervaltype=\
self.services["recurring_snapshot"]["intervaltype"],
snapshottype='RECURRING'
snapshottype='RECURRING',
listall=True
)
if isinstance(snapshots, list):
@ -995,7 +1010,9 @@ class TestSnapshots(cloudstackTestCase):
]
for c in cmds:
ssh_client.execute(c)
self.debug(c)
result = ssh_client.execute(c)
self.debug(result)
except Exception as e:
self.fail("SSH failed for VM with IP address: %s" %
@ -1006,6 +1023,7 @@ class TestSnapshots(cloudstackTestCase):
"umount %s" % (self.services["mount_dir"]),
]
for c in cmds:
self.debug(c)
ssh_client.execute(c)
volumes = list_volumes(
@ -1093,13 +1111,14 @@ class TestSnapshots(cloudstackTestCase):
self.services["sub_lvl_dir1"],
self.services["random_data"]
))
self.debug(returned_data_0)
returned_data_1 = ssh.execute("cat %s/%s/%s/%s" % (
self.services["mount_dir"],
self.services["sub_dir"],
self.services["sub_lvl_dir2"],
self.services["random_data"]
))
self.debug(returned_data_1)
except Exception as e:
self.fail("SSH failed for VM with IP address: %s" %
new_virtual_machine.ipaddress)
@ -1120,6 +1139,7 @@ class TestSnapshots(cloudstackTestCase):
]
try:
for c in cmds:
self.debug(c)
ssh_client.execute(c)
except Exception as e:

View File

@ -66,7 +66,9 @@ class TestSSVMs(cloudstackTestCase):
list_ssvm_response = list_ssvms(
self.apiclient,
systemvmtype='secondarystoragevm'
systemvmtype='secondarystoragevm',
state='Running',
zoneid=self.zone.id
)
self.assertEqual(
isinstance(list_ssvm_response, list),
@ -87,6 +89,9 @@ class TestSSVMs(cloudstackTestCase):
True,
"Check list response returns a valid list"
)
self.debug("Number of zones: %s" % len(list_zones_response))
self.debug("Number of SSVMs: %s" % len(list_ssvm_response))
# Number of Sec storage VMs = No of Zones
self.assertEqual(
len(list_ssvm_response),
@ -97,6 +102,7 @@ class TestSSVMs(cloudstackTestCase):
#public IP, link local IP and DNS
for ssvm in list_ssvm_response:
self.debug("SSVM state: %s" % ssvm.state)
self.assertEqual(
ssvm.state,
'Running',
@ -178,7 +184,9 @@ class TestSSVMs(cloudstackTestCase):
list_cpvm_response = list_ssvms(
self.apiclient,
systemvmtype='consoleproxy'
systemvmtype='consoleproxy',
state='Running',
zoneid=self.zone.id
)
self.assertEqual(
isinstance(list_cpvm_response, list),
@ -200,6 +208,9 @@ class TestSSVMs(cloudstackTestCase):
"Check list response returns a valid list"
)
self.debug("Number of zones: %s" % len(list_zones_response))
self.debug("Number of CPVMs: %s" % len(list_cpvm_response))
self.assertEqual(
len(list_cpvm_response),
len(list_zones_response),
@ -208,6 +219,7 @@ class TestSSVMs(cloudstackTestCase):
#For each CPVM check private IP, public IP, link local IP and DNS
for cpvm in list_cpvm_response:
self.debug("CPVM state: %s" % cpvm.state)
self.assertEqual(
cpvm.state,
'Running',
@ -280,23 +292,11 @@ class TestSSVMs(cloudstackTestCase):
# 4. If no process is running/multiple process are running
# then the test is a failure
hosts = list_hosts(
self.apiclient,
zoneid=self.zone.id,
type='Routing',
state='Up'
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list response returns a valid list"
)
host = hosts[0]
list_ssvm_response = list_ssvms(
self.apiclient,
systemvmtype='secondarystoragevm',
hostid=host.id
state='Running',
zoneid=self.zone.id
)
self.assertEqual(
isinstance(list_ssvm_response, list),
@ -305,7 +305,18 @@ class TestSSVMs(cloudstackTestCase):
)
ssvm = list_ssvm_response[0]
self.debug("Checking cloud process status")
hosts = list_hosts(
self.apiclient,
id=ssvm.hostid
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list response returns a valid list"
)
host = hosts[0]
self.debug("Running SSVM check script")
result = get_process_status(
host.ipaddress,
@ -316,6 +327,8 @@ class TestSSVMs(cloudstackTestCase):
"/usr/local/cloud/systemvm/ssvm-check.sh |grep -e ERROR -e WARNING -e FAIL"
)
res = str(result)
self.debug("SSVM script output: %s" % res)
self.assertEqual(
res.count("ERROR"),
1,
@ -338,6 +351,7 @@ class TestSSVMs(cloudstackTestCase):
"service cloud status"
)
res = str(result)
self.debug("Cloud Process status: %s" % res)
# cloud.com service (type=secstorage) is running: process id: 2346
self.assertEqual(
res.count("is running"),
@ -356,23 +370,11 @@ class TestSSVMs(cloudstackTestCase):
# 3. Service cloud status should report cloud agent status to be
# running
hosts = list_hosts(
self.apiclient,
zoneid=self.zone.id,
type='Routing',
state='Up'
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list response returns a valid list"
)
host = hosts[0]
list_cpvm_response = list_ssvms(
self.apiclient,
systemvmtype='consoleproxy',
hostid=host.id
state='Running',
zoneid=self.zone.id
)
self.assertEqual(
isinstance(list_cpvm_response, list),
@ -381,11 +383,24 @@ class TestSSVMs(cloudstackTestCase):
)
cpvm = list_cpvm_response[0]
hosts = list_hosts(
self.apiclient,
id=cpvm.hostid
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list response returns a valid list"
)
host = hosts[0]
try:
telnet = telnetlib.Telnet(
str(self.apiclient.connection.mgtSvr),
'8250'
)
self.debug("Telnet management server (IP: %s)" %
self.apiclient.connection.mgtSvr)
except Exception as e:
self.fail(
"Telnet Access failed for %s: %s" % \
@ -403,6 +418,7 @@ class TestSSVMs(cloudstackTestCase):
"service cloud status"
)
res = str(result)
self.debug("Cloud Process status: %s" % res)
self.assertEqual(
res.count("is running"),
1,
@ -421,23 +437,11 @@ class TestSSVMs(cloudstackTestCase):
# test cases still passing
# 3. If either of the two above steps fail the test is a failure
hosts = list_hosts(
self.apiclient,
zoneid=self.zone.id,
type='Routing',
state='Up'
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list response returns a valid list"
)
host = hosts[0]
list_ssvm_response = list_ssvms(
self.apiclient,
systemvmtype='secondarystoragevm',
hostid=host.id
state='Running',
zoneid=self.zone.id
)
self.assertEqual(
isinstance(list_ssvm_response, list),
@ -446,6 +450,18 @@ class TestSSVMs(cloudstackTestCase):
)
ssvm = list_ssvm_response[0]
hosts = list_hosts(
self.apiclient,
id=ssvm.hostid
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list response returns a valid list"
)
host = hosts[0]
self.debug("Stopping SSVM: %s" % ssvm.id)
cmd = stopSystemVm.stopSystemVmCmd()
cmd.id = ssvm.id
self.apiclient.stopSystemVm(cmd)
@ -474,6 +490,7 @@ class TestSSVMs(cloudstackTestCase):
"Check list response returns a valid list"
)
ssvm_response = list_ssvm_response[0]
self.debug("SSVM state after debug: %s" % ssvm_response.state)
self.assertEqual(
ssvm_response.state,
'Running',
@ -495,23 +512,11 @@ class TestSSVMs(cloudstackTestCase):
# two test cases still passing
# 3. If either of the two above steps fail the test is a failure
hosts = list_hosts(
self.apiclient,
zoneid=self.zone.id,
type='Routing',
state='Up'
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list response returns a valid list"
)
host = hosts[0]
list_cpvm_response = list_ssvms(
self.apiclient,
systemvmtype='consoleproxy',
hostid=host.id
state='Running',
zoneid=self.zone.id
)
self.assertEqual(
isinstance(list_cpvm_response, list),
@ -520,6 +525,18 @@ class TestSSVMs(cloudstackTestCase):
)
cpvm = list_cpvm_response[0]
hosts = list_hosts(
self.apiclient,
id=cpvm.hostid
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list response returns a valid list"
)
host = hosts[0]
self.debug("Stopping CPVM: %s" % cpvm.id)
cmd = stopSystemVm.stopSystemVmCmd()
cmd.id = cpvm.id
self.apiclient.stopSystemVm(cmd)
@ -544,6 +561,8 @@ class TestSSVMs(cloudstackTestCase):
cpvm_response = list_cpvm_response[0]
self.debug("CPVM state after debug: %s" % cpvm_response.state)
self.assertEqual(
cpvm_response.state,
'Running',
@ -563,23 +582,11 @@ class TestSSVMs(cloudstackTestCase):
# before and after reboot
# 3. The cloud process should still be running within the SSVM
hosts = list_hosts(
self.apiclient,
zoneid=self.zone.id,
type='Routing',
state='Up'
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list response returns a valid list"
)
host = hosts[0]
list_ssvm_response = list_ssvms(
self.apiclient,
systemvmtype='secondarystoragevm',
hostid=host.id
state='Running',
zoneid=self.zone.id
)
self.assertEqual(
@ -590,10 +597,22 @@ class TestSSVMs(cloudstackTestCase):
ssvm_response = list_ssvm_response[0]
hosts = list_hosts(
self.apiclient,
id=ssvm_response.hostid
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list response returns a valid list"
)
host = hosts[0]
#Store the public & private IP values before reboot
old_public_ip = ssvm_response.publicip
old_private_ip = ssvm_response.privateip
self.debug("Rebooting SSVM: %s" % ssvm_response.id)
cmd = rebootSystemVm.rebootSystemVmCmd()
cmd.id = ssvm_response.id
self.apiclient.rebootSystemVm(cmd)
@ -617,7 +636,7 @@ class TestSSVMs(cloudstackTestCase):
timeout = timeout - 1
ssvm_response = list_ssvm_response[0]
self.debug("SSVM State: %s" % ssvm_response.state)
self.assertEqual(
'Running',
str(ssvm_response.state),
@ -648,23 +667,12 @@ class TestSSVMs(cloudstackTestCase):
# the same before and after reboot
# 3. the cloud process should still be running within the CPVM
hosts = list_hosts(
self.apiclient,
zoneid=self.zone.id,
type='Routing',
state='Up'
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list response returns a valid list"
)
host = hosts[0]
list_cpvm_response = list_ssvms(
self.apiclient,
systemvmtype='consoleproxy',
hostid=host.id
state='Running',
zoneid=self.zone.id
)
self.assertEqual(
isinstance(list_cpvm_response, list),
@ -673,10 +681,23 @@ class TestSSVMs(cloudstackTestCase):
)
cpvm_response = list_cpvm_response[0]
hosts = list_hosts(
self.apiclient,
id=cpvm_response.hostid
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list response returns a valid list"
)
host = hosts[0]
#Store the public & private IP values before reboot
old_public_ip = cpvm_response.publicip
old_private_ip = cpvm_response.privateip
self.debug("Rebooting CPVM: %s" % cpvm_response.id)
cmd = rebootSystemVm.rebootSystemVmCmd()
cmd.id = cpvm_response.id
self.apiclient.rebootSystemVm(cmd)
@ -701,6 +722,7 @@ class TestSSVMs(cloudstackTestCase):
cpvm_response = list_cpvm_response[0]
self.debug("CPVM state: %s" % cpvm_response.state)
self.assertEqual(
'Running',
str(cpvm_response.state),
@ -735,8 +757,9 @@ class TestSSVMs(cloudstackTestCase):
list_ssvm_response = list_ssvms(
self.apiclient,
zoneid=self.zone.id,
systemvmtype='secondarystoragevm'
systemvmtype='secondarystoragevm',
state='Running',
zoneid=self.zone.id
)
self.assertEqual(
isinstance(list_ssvm_response, list),
@ -747,6 +770,7 @@ class TestSSVMs(cloudstackTestCase):
old_name = ssvm_response.name
self.debug("Destroying SSVM: %s" % ssvm_response.id)
cmd = destroySystemVm.destroySystemVmCmd()
cmd.id = ssvm_response.id
self.apiclient.destroySystemVm(cmd)
@ -826,6 +850,7 @@ class TestSSVMs(cloudstackTestCase):
old_name = cpvm_response.name
self.debug("Destroying CPVM: %s" % cpvm_response.id)
cmd = destroySystemVm.destroySystemVmCmd()
cmd.id = cpvm_response.id
self.apiclient.destroySystemVm(cmd)

View File

@ -59,12 +59,12 @@ class Services:
"template_1": {
"displaytext": "Cent OS Template",
"name": "Cent OS Template",
"ostypeid": '144f66aa-7f74-4cfe-9799-80cc21439cb3',
"ostypeid": '5776c0d2-f331-42db-ba3a-29f1f8319bc9',
},
"template_2": {
"displaytext": "Public Template",
"name": "Public template",
"ostypeid": '144f66aa-7f74-4cfe-9799-80cc21439cb3',
"ostypeid": '5776c0d2-f331-42db-ba3a-29f1f8319bc9',
"isfeatured": True,
"ispublic": True,
"isextractable": True,
@ -78,7 +78,7 @@ class Services:
"isextractable": False,
"bootable": True,
"passwordenabled": True,
"ostypeid": '144f66aa-7f74-4cfe-9799-80cc21439cb3',
"ostypeid": '5776c0d2-f331-42db-ba3a-29f1f8319bc9',
"mode": 'advanced',
# Networking mode: Advanced, basic
"sleep": 30,
@ -108,7 +108,7 @@ class TestCreateTemplate(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.services = Services().services
cls.api_client = fetch_api_client()
cls.api_client = super(TestCreateTemplate, cls).getClsTestClient().getApiClient()
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
@ -194,7 +194,7 @@ class TestCreateTemplate(cloudstackTestCase):
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
cls.api_client = super(TestCreateTemplate, cls).getClsTestClient().getApiClient()
#Cleanup resources used
cleanup_resources(cls.api_client, cls._cleanup)
@ -270,7 +270,7 @@ class TestTemplates(cloudstackTestCase):
def setUpClass(cls):
cls.services = Services().services
cls.api_client = fetch_api_client()
cls.api_client = super(TestTemplates, cls).getClsTestClient().getApiClient()
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
@ -384,7 +384,7 @@ class TestTemplates(cloudstackTestCase):
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
cls.api_client = super(TestTemplates, cls).getClsTestClient().getApiClient()
#Cleanup created resources such as templates and VMs
cleanup_resources(cls.api_client, cls._cleanup)

View File

@ -95,7 +95,7 @@ class Services:
"name": "testISO",
"url": "http://iso.linuxquestions.org/download/504/1819/http/gd4.tuwien.ac.at/dsl-4.4.10.iso",
# Source URL where ISO is located
"ostypeid": '144f66aa-7f74-4cfe-9799-80cc21439cb3',
"ostypeid": '5776c0d2-f331-42db-ba3a-29f1f8319bc9',
"mode": 'HTTP_DOWNLOAD', # Downloading existing ISO
},
"diskdevice": '/dev/xvdd',
@ -103,12 +103,10 @@ class Services:
"mount_dir": "/mnt/tmp",
"sleep": 60,
"timeout": 10,
"hostid": 5,
#Migrate VM to hostid
"ostypeid": '144f66aa-7f74-4cfe-9799-80cc21439cb3',
"ostypeid": '5776c0d2-f331-42db-ba3a-29f1f8319bc9',
# CentOS 5.3 (64-bit)
"mode":'advanced',
# Networking mode: Basic or Advanced
}
@ -218,7 +216,7 @@ class TestVMLifeCycle(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestVMLifeCycle, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
@ -286,7 +284,7 @@ class TestVMLifeCycle(cloudstackTestCase):
@classmethod
def tearDownClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestVMLifeCycle, cls).getClsTestClient().getApiClient()
cleanup_resources(cls.api_client, cls._cleanup)
return
@ -680,7 +678,6 @@ class TestVMLifeCycle(cloudstackTestCase):
)
return
def test_07_restore_vm(self):
"""Test recover Virtual Machine
"""
@ -729,13 +726,36 @@ class TestVMLifeCycle(cloudstackTestCase):
# should be "Running" and the host should be the host
# to which the VM was migrated to
hosts = Host.list(
self.apiclient,
zoneid=self.medium_virtual_machine.zoneid,
type='Routing'
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check the number of hosts in the zone"
)
self.assertEqual(
len(hosts),
2,
"Atleast 2 hosts should be present in a zone for VM migration"
)
# Find the host of VM and also the new host to migrate VM.
if self.medium_virtual_machine.hostid == hosts[0].id:
host = hosts[1]
else:
host = hosts[0]
self.debug("Migrating VM-ID: %s to Host: %s" % (
self.medium_virtual_machine.id,
self.services["hostid"]
host.id
))
cmd = migrateVirtualMachine.migrateVirtualMachineCmd()
cmd.hostid = self.services["hostid"]
cmd.hostid = host.id
cmd.virtualmachineid = self.medium_virtual_machine.id
self.apiclient.migrateVirtualMachine(cmd)
@ -765,7 +785,7 @@ class TestVMLifeCycle(cloudstackTestCase):
self.assertEqual(
vm_response.hostid,
self.services["hostid"],
host.id,
"Check destination hostID of migrated VM"
)
return

View File

@ -60,7 +60,7 @@ class Services:
"publicport": 22,
"protocol": 'TCP',
"diskdevice": "/dev/xvdb",
"ostypeid": '144f66aa-7f74-4cfe-9799-80cc21439cb3',
"ostypeid": '5776c0d2-f331-42db-ba3a-29f1f8319bc9',
"mode": 'advanced',
"sleep": 60,
"timeout": 10,
@ -71,7 +71,7 @@ class TestCreateVolume(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestCreateVolume, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
@ -187,7 +187,6 @@ class TestCreateVolume(cloudstackTestCase):
)
try:
ssh = self.virtual_machine.get_ssh_client()
ssh.execute("reboot")
except Exception as e:
@ -249,7 +248,7 @@ class TestCreateVolume(cloudstackTestCase):
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
cls.api_client = super(TestCreateVolume, cls).getClsTestClient().getApiClient()
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
@ -259,7 +258,7 @@ class TestVolumes(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestVolumes, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)

View File

@ -31,6 +31,11 @@ The following files contain these P1 cases:
4. test_account.py - Account related tests
5. test_resource_limits.py - Resource limits tests
6. test_security_groups.py - Security groups related tests
7. test_templates - templates related tests
8. test_volumes - Volumes related tests
9. test_blocker_bugs - Blocker bugs tests
7. test_templates.py - templates related tests
8. test_volumes.py - Volumes related tests
9. test_blocker_bugs.py - Blocker bugs tests
10. test_project_configs.py - Project global configuration related tests
11. test_project_limits.py - Project resource limits related tests
12. test_project_resources.py - Project resource creation related tests
13. test_project_usage.py - Project usage related tests
14. test_projects - Projects functionality tests

View File

@ -156,7 +156,7 @@ class TestAccounts(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestAccounts, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.zone = get_zone(cls.api_client, cls.services)
@ -281,7 +281,7 @@ class TestRemoveUserFromAccount(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestRemoveUserFromAccount, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.zone = get_zone(cls.api_client, cls.services)
@ -542,7 +542,7 @@ class TestNonRootAdminsPrivileges(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestNonRootAdminsPrivileges, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone settings
cls.zone = get_zone(cls.api_client, cls.services)
@ -639,7 +639,7 @@ class TestServiceOfferingSiblings(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestServiceOfferingSiblings, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Create Domains, accounts etc
@ -751,7 +751,7 @@ class TestServiceOfferingHierarchy(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestServiceOfferingHierarchy, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Create domain, service offerings etc
@ -875,7 +875,7 @@ class TesttemplateHierarchy(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TesttemplateHierarchy, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone settings
cls.zone = get_zone(cls.api_client, cls.services)
@ -1011,7 +1011,7 @@ class TestAddVmToSubDomain(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestAddVmToSubDomain, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Setup working Environment- Create domain, zone, pod cluster etc.

View File

@ -96,7 +96,7 @@ class TestSnapshots(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestSnapshots, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
@ -402,7 +402,7 @@ class TestTemplate(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.services = Services().services
cls.api_client = fetch_api_client()
cls.api_client = super(TestTemplate, cls).getClsTestClient().getApiClient()
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
@ -430,7 +430,7 @@ class TestTemplate(cloudstackTestCase):
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
cls.api_client = super(TestTemplate, cls).getClsTestClient().getApiClient()
#Cleanup resources used
cleanup_resources(cls.api_client, cls._cleanup)
@ -520,7 +520,7 @@ class TestNATRules(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestNATRules, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
@ -573,7 +573,7 @@ class TestNATRules(cloudstackTestCase):
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
cls.api_client = super(TestNATRules, cls).getClsTestClient().getApiClient()
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
@ -714,7 +714,7 @@ class TestRouters(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestRouters, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.zone = get_zone(cls.api_client, cls.services)
@ -837,7 +837,7 @@ class TestRouterRestart(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestRouterRestart, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
@ -877,7 +877,7 @@ class TestRouterRestart(cloudstackTestCase):
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
cls.api_client = super(TestRouterRestart, cls).getClsTestClient().getApiClient()
#Clean up, terminate the created templates
cleanup_resources(cls.api_client, cls.cleanup)
@ -965,7 +965,7 @@ class TestTemplates(cloudstackTestCase):
def setUpClass(cls):
cls.services = Services().services
cls.api_client = fetch_api_client()
cls.api_client = super(TestTemplates, cls).getClsTestClient().getApiClient()
# Get Zone, templates etc
cls.domain = get_domain(cls.api_client, cls.services)
@ -1023,7 +1023,7 @@ class TestTemplates(cloudstackTestCase):
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
cls.api_client = super(TestTemplates, cls).getClsTestClient().getApiClient()
#Cleanup created resources such as templates and VMs
cleanup_resources(cls.api_client, cls._cleanup)

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,860 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2012 Citrix. All rights reserved.
#
""" P1 tests for Resource limits
"""
#Import Local Modules
from cloudstackTestCase import *
from cloudstackAPI import *
from testcase.libs.utils import *
from testcase.libs.base import *
from testcase.libs.common import *
import datetime
class Services:
"""Test Resource Limits Services
"""
def __init__(self):
self.services = {
"domain": {
"name": "Domain",
},
"project": {
"name": "Project",
"displaytext": "Test project",
},
"account": {
"email": "administrator@clogeny.com",
"firstname": "Test",
"lastname": "User",
"username": "test",
# Random characters are appended for unique
# username
"password": "fr3sca",
},
"user": {
"email": "administrator@clogeny.com",
"firstname": "User",
"lastname": "User",
"username": "User",
# Random characters are appended for unique
# username
"password": "fr3sca",
},
"service_offering": {
"name": "Tiny Instance",
"displaytext": "Tiny Instance",
"cpunumber": 1,
"cpuspeed": 100, # in MHz
"memory": 64, # In MBs
},
"disk_offering": {
"displaytext": "Tiny Disk Offering",
"name": "Tiny Disk Offering",
"disksize": 1
},
"volume": {
"diskname": "Test Volume",
},
"server": {
"displayname": "TestVM",
"username": "root",
"password": "password",
"ssh_port": 22,
"hypervisor": 'XenServer',
"privateport": 22,
"publicport": 22,
"protocol": 'TCP',
},
"template": {
"displaytext": "Cent OS Template",
"name": "Cent OS Template",
"ostypeid": '471a4b5b-5523-448f-9608-7d6218995733',
"templatefilter": 'self',
},
"ostypeid": '471a4b5b-5523-448f-9608-7d6218995733',
# Cent OS 5.3 (64 bit)
"sleep": 60,
"timeout": 10,
"mode": 'advanced',
}
class TestProjectLimits(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = super(
TestProjectLimits,
cls
).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone
cls.zone = get_zone(cls.api_client, cls.services)
# Create domains, account etc.
cls.domain = Domain.create(
cls.api_client,
cls.services["domain"]
)
cls.admin = Account.create(
cls.api_client,
cls.services["account"],
admin=True,
domainid=cls.domain.id
)
cls.user = Account.create(
cls.api_client,
cls.services["user"],
domainid=cls.domain.id
)
cls._cleanup = [cls.account]
return
@classmethod
def tearDownClass(cls):
try:
#Cleanup resources used
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = []
return
def tearDown(self):
try:
#Clean up, terminate the created accounts, domains etc
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def test_01_project_limits(self):
""" Test project limits
"""
# Validate the following
# 1. Create a Project. Verify once projects are created, they inherit
# a default set of resource limits as configured by the Cloud Stack
# ROOT admin.
# 2. Reduce Project resources limits. Verify limits can be reduced by
# the Project Owner of each project and project limit applies to
# number of virtual instances, disk volumes, snapshots, IP address.
# Also, verify resource limits for the project are independent of
# account resource limits
# 3. Increase Projects Resources limits above domains limit. Verify
# project cant have more resources than domain level limit allows.
# 4. Create Resource more than its set limit for a project. Verify
# resource allocation should fail giving proper message
# Create project as a domain admin
project = Project.create(
self.apiclient,
self.services["project"],
account=self.admin.account.name,
domainid=self.admin.account.domainid
)
# Cleanup created project at end of test
self.cleanup.append(project)
self.debug("Created project with domain admin with ID: %s" %
project.id)
list_projects_reponse = Project.list(
self.apiclient,
id=project.id,
listall=True
)
self.assertEqual(
isinstance(list_projects_reponse, list),
True,
"Check for a valid list projects response"
)
list_project = list_projects_reponse[0]
self.assertNotEqual(
len(list_projects_reponse),
0,
"Check list project response returns a valid project"
)
self.assertEqual(
project.name,
list_project.name,
"Check project name from list response"
)
# Get the resource limits for ROOT domain
resource_limits = list_resource_limits(self.apiclient)
self.assertEqual(
isinstance(resource_limits, list),
True,
"List resource API should return a valid list"
)
self.assertNotEqual(
len(resource_limits),
0,
"List resource API response should not be empty"
)
# Reduce resource limits for project
# Resource: 0 - Instance. Number of instances a user can create.
# Resource: 1 - IP. Number of public IP addresses a user can own.
# Resource: 2 - Volume. Number of disk volumes a user can create.
# Resource: 3 - Snapshot. Number of snapshots a user can create.
# Resource: 4 - Template. Number of templates that a user can
# register/create
for resource in resource_limits:
update_resource_limit(
self.apiclient,
resource.resourcetype,
max=1,
projectid=project.id
)
self.debug(
"Updating resource (ID: %s) limit for project: %s" % (
resource,
project.id
))
resource_limits = list_resource_limits(
self.apiclient,
projectid=project.id
)
self.assertEqual(
isinstance(resource_limits, list),
True,
"List resource API should return a valid list"
)
self.assertNotEqual(
len(resource_limits),
0,
"List resource API response should not be empty"
)
for resource in resource_limits:
self.assertEqual(
resource.max,
1,
"Resource limit should be updated to 1"
)
# Get the resource limits for domain
resource_limits = list_resource_limits(
self.apiclient,
domainid=self.domain.id
)
self.assertEqual(
isinstance(resource_limits, list),
True,
"List resource API should return a valid list"
)
self.assertNotEqual(
len(resource_limits),
0,
"List resource API response should not be empty"
)
for resource in resource_limits:
# Update domain resource limits to 2
update_resource_limit(
self.apiclient,
resource.resourcetype,
domainid=self.domain.id,
max=2
)
with self.assertRaises(Exception):
self.debug(
"Attempting to update project: %s resource limit to: %s" % (
project.id,
max_value
))
# Update project resource limits to 3
update_resource_limit(
self.apiclient,
resource.resourcetype,
max=3,
projectid=project.id
)
return
@unittest.skip("No provision for updating resource limits from account through API")
def test_02_project_limits_normal_user(self):
""" Test project limits
"""
# Validate the following
# 1. Create a Project
# 2. Reduce the projects limits as a domain admin. Verify resource
# count is updated
# 3. Reduce the projects limits as a project user owner who is not a
# domain admin. Resource count should fail
# Create project as a domain admin
project = Project.create(
self.apiclient,
self.services["project"],
account=self.admin.account.name,
domainid=self.admin.account.domainid
)
# Cleanup created project at end of test
self.cleanup.append(project)
self.debug("Created project with domain admin with ID: %s" %
project.id)
list_projects_reponse = Project.list(
self.apiclient,
id=project.id,
listall=True
)
self.assertEqual(
isinstance(list_projects_reponse, list),
True,
"Check for a valid list projects response"
)
list_project = list_projects_reponse[0]
self.assertNotEqual(
len(list_projects_reponse),
0,
"Check list project response returns a valid project"
)
self.assertEqual(
project.name,
list_project.name,
"Check project name from list response"
)
# Get the resource limits for ROOT domain
resource_limits = list_resource_limits(self.apiclient)
self.assertEqual(
isinstance(resource_limits, list),
True,
"List resource API should return a valid list"
)
self.assertNotEqual(
len(resource_limits),
0,
"List resource API response should not be empty"
)
# Reduce resource limits for project
# Resource: 0 - Instance. Number of instances a user can create.
# Resource: 1 - IP. Number of public IP addresses a user can own.
# Resource: 2 - Volume. Number of disk volumes a user can create.
# Resource: 3 - Snapshot. Number of snapshots a user can create.
# Resource: 4 - Template. Number of templates that a user can
# register/create
for resource in resource_limits:
update_resource_limit(
self.apiclient,
resource.resourcetype,
max=1,
projectid=project.id
)
self.debug(
"Updating resource (ID: %s) limit for project: %s" % (
resource,
project.id
))
resource_limits = list_resource_limits(
self.apiclient,
projectid=project.id
)
self.assertEqual(
isinstance(resource_limits, list),
True,
"List resource API should return a valid list"
)
self.assertNotEqual(
len(resource_limits),
0,
"List resource API response should not be empty"
)
for resource in resource_limits:
self.assertEqual(
resource.max,
1,
"Resource limit should be updated to 1"
)
self.debug("Adding %s user to project: %s" % (
self.user.account.name,
project.name
))
# Add user to the project
project.addAccount(
self.apiclient,
self.user.account.name,
)
# Get the resource limits for domain
resource_limits = list_resource_limits(
self.apiclient,
domainid=self.domain.id
)
self.assertEqual(
isinstance(resource_limits, list),
True,
"List resource API should return a valid list"
)
self.assertNotEqual(
len(resource_limits),
0,
"List resource API response should not be empty"
)
for resource in resource_limits:
#with self.assertRaises(Exception):
self.debug(
"Attempting to update resource limit by user: %s" % (
self.user.account.name
))
# Update project resource limits to 3
update_resource_limit(
self.apiclient,
resource.resourcetype,
account=self.user.account.name,
domainid=self.user.account.domainid,
max=3,
projectid=project.id
)
return
class TestResourceLimitsDomain(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = super(TestResourceLimitsDomain, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.zone = get_zone(cls.api_client, cls.services)
cls.template = get_template(
cls.api_client,
cls.zone.id,
cls.services["ostypeid"]
)
cls.services["server"]["zoneid"] = cls.zone.id
# Create Domains, Account etc
cls.domain = Domain.create(
cls.api_client,
cls.services["domain"]
)
cls.account = Account.create(
cls.api_client,
cls.services["account"],
domainid=cls.domain.id
)
# Create project as a domain admin
cls.project = Project.create(
cls.api_client,
cls.services["project"],
account=cls.account.account.name,
domainid=cls.account.account.domainid
)
cls.services["account"] = cls.account.account.name
# Create Service offering and disk offerings etc
cls.service_offering = ServiceOffering.create(
cls.api_client,
cls.services["service_offering"]
)
cls.disk_offering = DiskOffering.create(
cls.api_client,
cls.services["disk_offering"]
)
cls._cleanup = [
cls.project,
cls.service_offering,
cls.disk_offering,
cls.account,
cls.domain
]
return
@classmethod
def tearDownClass(cls):
try:
#Cleanup resources used
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = []
return
def tearDown(self):
try:
#Clean up, terminate the created instance, volumes and snapshots
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def test_03_vm_per_project(self):
"""Test VM limit per project
"""
# Validate the following
# 1. Set max VM per project to 2
# 2. Create account and start 2 VMs. Verify VM state is Up and Running
# 3. Try to create 3rd VM instance. The appropriate error or alert
# should be raised
self.debug(
"Updating instance resource limits for project: %s" %
self.project.id)
# Set usage_vm=1 for Account 1
update_resource_limit(
self.apiclient,
0, # Instance
max=2,
projectid=self.project.id
)
self.debug("Deploying VM for project: %s" % self.project.id)
virtual_machine_1 = VirtualMachine.create(
self.apiclient,
self.services["server"],
templateid=self.template.id,
serviceofferingid=self.service_offering.id,
projectid=self.project.id
)
self.cleanup.append(virtual_machine_1)
# Verify VM state
self.assertEqual(
virtual_machine_1.state,
'Running',
"Check VM state is Running or not"
)
self.debug("Deploying VM for project: %s" % self.project.id)
virtual_machine_2 = VirtualMachine.create(
self.apiclient,
self.services["server"],
templateid=self.template.id,
serviceofferingid=self.service_offering.id,
projectid=self.project.id
)
self.cleanup.append(virtual_machine_2)
# Verify VM state
self.assertEqual(
virtual_machine_2.state,
'Running',
"Check VM state is Running or not"
)
# Exception should be raised for second instance
with self.assertRaises(Exception):
VirtualMachine.create(
self.apiclient,
self.services["server"],
templateid=self.template.id,
serviceofferingid=self.service_offering.id,
projectid=self.project.id
)
return
def test_04_publicip_per_project(self):
"""Test Public IP limit per project
"""
# Validate the following
# 1. set max no of IPs per project to 2.
# 2. Create an account in this domain
# 3. Create 1 VM in this domain
# 4. Acquire 1 IP in the domain. IP should be successfully acquired
# 5. Try to acquire 3rd IP in this domain. It should give the user an
# appropriate error and an alert should be generated.
self.debug(
"Updating public IP resource limits for project: %s" %
self.project.id)
# Set usage_vm=1 for Account 1
update_resource_limit(
self.apiclient,
1, # Public Ip
max=2,
projectid=self.project.id
)
self.debug("Deploying VM for Project: %s" % self.project.id)
virtual_machine_1 = VirtualMachine.create(
self.apiclient,
self.services["server"],
templateid=self.template.id,
serviceofferingid=self.service_offering.id,
projectid=self.project.id
)
self.cleanup.append(virtual_machine_1)
# Verify VM state
self.assertEqual(
virtual_machine_1.state,
'Running',
"Check VM state is Running or not"
)
networks = Network.list(
self.apiclient,
projectid=self.project.id,
listall=True
)
self.assertEqual(
isinstance(networks, list),
True,
"Check list networks response returns a valid response"
)
self.assertNotEqual(
len(networks),
0,
"Check list networks response returns a valid network"
)
network = networks[0]
self.debug("Associating public IP for project: %s" %
self.project.id)
public_ip_1 = PublicIPAddress.create(
self.apiclient,
zoneid=virtual_machine_1.zoneid,
services=self.services["server"],
networkid=network.id,
projectid=self.project.id
)
self.cleanup.append(public_ip_1)
# Verify Public IP state
self.assertEqual(
public_ip_1.ipaddress.state in [
'Allocated',
'Allocating'
],
True,
"Check Public IP state is allocated or not"
)
# Exception should be raised for second Public IP
with self.assertRaises(Exception):
public_ip_2 = PublicIPAddress.create(
self.apiclient,
zoneid=virtual_machine_1.zoneid,
services=self.services["server"],
networkid=network.id,
projectid=self.project.id
)
return
def test_05_snapshots_per_project(self):
"""Test Snapshot limit per project
"""
# Validate the following
# 1. set max no of snapshots per project to 1.
# 2. Create one snapshot in the project. Snapshot should be
# successfully created
# 5. Try to create another snapshot in this project. It should give
# user an appropriate error and an alert should be generated.
self.debug(
"Updating snapshot resource limits for project: %s" %
self.project.id)
# Set usage_vm=1 for Account 1
update_resource_limit(
self.apiclient,
3, # Snapshot
max=1,
projectid=self.project.id
)
self.debug("Deploying VM for account: %s" % self.account.account.name)
virtual_machine_1 = VirtualMachine.create(
self.apiclient,
self.services["server"],
templateid=self.template.id,
serviceofferingid=self.service_offering.id,
projectid=self.project.id
)
self.cleanup.append(virtual_machine_1)
# Verify VM state
self.assertEqual(
virtual_machine_1.state,
'Running',
"Check VM state is Running or not"
)
# Get the Root disk of VM
volumes = list_volumes(
self.apiclient,
projectid=self.project.id,
type='ROOT',
listall=True
)
self.assertEqual(
isinstance(volumes, list),
True,
"Check for list volume response return valid data"
)
volume = volumes[0]
self.debug("Creating snapshot from volume: %s" % volumes[0].id)
# Create a snapshot from the ROOTDISK
snapshot_1 = Snapshot.create(self.apiclient,
volumes[0].id,
projectid=self.project.id
)
self.cleanup.append(snapshot_1)
# Verify Snapshot state
self.assertEqual(
snapshot_1.state in [
'BackedUp',
'CreatedOnPrimary'
],
True,
"Check Snapshot state is Running or not"
)
# Exception should be raised for second snapshot
with self.assertRaises(Exception):
Snapshot.create(self.apiclient,
volumes[0].id,
projectid=self.project.id
)
return
def test_06_volumes_per_project(self):
"""Test Volumes limit per project
"""
# Validate the following
# 1. set max no of volume per project to 1.
# 2. Create 1 VM in this project
# 4. Try to Create another VM in the project. It should give the user
# an appropriate error that Volume limit is exhausted and an alert
# should be generated.
self.debug(
"Updating volume resource limits for project: %s" %
self.project.id)
# Set usage_vm=1 for Account 1
update_resource_limit(
self.apiclient,
2, # Volume
max=1,
projectid=self.project.id
)
self.debug("Deploying VM for project: %s" % self.project.id)
virtual_machine_1 = VirtualMachine.create(
self.apiclient,
self.services["server"],
templateid=self.template.id,
serviceofferingid=self.service_offering.id,
projectid=self.project.id
)
self.cleanup.append(virtual_machine_1)
# Verify VM state
self.assertEqual(
virtual_machine_1.state,
'Running',
"Check VM state is Running or not"
)
# Exception should be raised for second volume
with self.assertRaises(Exception):
Volume.create(
self.apiclient,
self.services["volume"],
zoneid=self.zone.id,
diskofferingid=self.disk_offering.id,
projectid=self.project.id
)
return
def test_07_templates_per_project(self):
"""Test Templates limit per project
"""
# Validate the following
# 1. set max no of templates per project to 1.
# 2. Create a template in this project. Both template should be in
# ready state
# 3. Try create 2nd template in the project. It should give the user
# appropriate error and an alert should be generated.
self.debug(
"Updating template resource limits for domain: %s" %
self.account.account.domainid)
# Set usage_vm=1 for Account 1
update_resource_limit(
self.apiclient,
4, # Template
max=1,
projectid=self.project.id
)
self.debug("Deploying VM for account: %s" % self.account.account.name)
virtual_machine_1 = VirtualMachine.create(
self.apiclient,
self.services["server"],
templateid=self.template.id,
serviceofferingid=self.service_offering.id,
projectid=self.project.id
)
self.cleanup.append(virtual_machine_1)
# Verify VM state
self.assertEqual(
virtual_machine_1.state,
'Running',
"Check VM state is Running or not"
)
virtual_machine_1.stop(self.apiclient)
# Get the Root disk of VM
volumes = list_volumes(
self.apiclient,
projectid=self.project.id,
type='ROOT',
listall=True
)
self.assertEqual(
isinstance(volumes, list),
True,
"Check for list volume response return valid data"
)
volume = volumes[0]
self.debug("Creating template from volume: %s" % volume.id)
# Create a template from the ROOTDISK
template_1 = Template.create(
self.apiclient,
self.services["template"],
volumeid=volume.id,
projectid=self.project.id
)
self.cleanup.append(template_1)
# Verify Template state
self.assertEqual(
template_1.isready,
True,
"Check Template is in ready state or not"
)
# Exception should be raised for second template
with self.assertRaises(Exception):
Template.create(
self.apiclient,
self.services["template"],
volumeid=volume.id,
projectid=self.project.id
)
return

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -72,7 +72,7 @@ class TestResourceLimitsAccount(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestResourceLimitsAccount, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
@ -830,7 +830,7 @@ class TestResourceLimitsDomain(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestResourceLimitsDomain, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.zone = get_zone(cls.api_client, cls.services)

View File

@ -74,7 +74,7 @@ class Services:
"cidr": '55.55.0.0/11',
# Any network (For creating FW rule
},
"ostypeid": '144f66aa-7f74-4cfe-9799-80cc21439cb3',
"ostypeid": '5776c0d2-f331-42db-ba3a-29f1f8319bc9',
# Used for Get_Template : CentOS 5.3 (64 bit)
"mode": 'advanced', # Networking mode: Advanced, basic
}
@ -85,7 +85,7 @@ class TestRouterServices(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestRouterServices, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
@ -133,7 +133,7 @@ class TestRouterServices(cloudstackTestCase):
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
cls.api_client = super(TestRouterServices, cls).getClsTestClient().getApiClient()
#Clean up, terminate the created templates
cleanup_resources(cls.api_client, cls.cleanup)
@ -558,7 +558,7 @@ class TestRouterStopCreatePF(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestRouterStopCreatePF, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
@ -598,7 +598,7 @@ class TestRouterStopCreatePF(cloudstackTestCase):
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
cls.api_client = super(TestRouterStopCreatePF, cls).getClsTestClient().getApiClient()
# Clean up, terminate the created resources
cleanup_resources(cls.api_client, cls.cleanup)
@ -758,7 +758,7 @@ class TestRouterStopCreateLB(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestRouterStopCreateLB, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
@ -798,7 +798,7 @@ class TestRouterStopCreateLB(cloudstackTestCase):
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
cls.api_client = super(TestRouterStopCreateLB, cls).getClsTestClient().getApiClient()
#Clean up, terminate the created resources
cleanup_resources(cls.api_client, cls.cleanup)
@ -959,7 +959,7 @@ class TestRouterStopCreateFW(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestRouterStopCreateFW, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
@ -998,7 +998,7 @@ class TestRouterStopCreateFW(cloudstackTestCase):
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
cls.api_client = super(TestRouterStopCreateFW, cls).getClsTestClient().getApiClient()
#Clean up, terminate the created templates
cleanup_resources(cls.api_client, cls.cleanup)

View File

@ -107,7 +107,7 @@ class TestDefaultSecurityGroup(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.services = Services().services
cls.api_client = fetch_api_client()
cls.api_client = super(TestDefaultSecurityGroup, cls).getClsTestClient().getApiClient()
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
@ -143,7 +143,7 @@ class TestDefaultSecurityGroup(cloudstackTestCase):
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
cls.api_client = super(TestDefaultSecurityGroup, cls).getClsTestClient().getApiClient()
#Cleanup resources used
cleanup_resources(cls.api_client, cls._cleanup)
@ -374,7 +374,7 @@ class TestAuthorizeIngressRule(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.services = Services().services
cls.api_client = fetch_api_client()
cls.api_client = super(TestAuthorizeIngressRule, cls).getClsTestClient().getApiClient()
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
@ -408,7 +408,7 @@ class TestAuthorizeIngressRule(cloudstackTestCase):
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
cls.api_client = super(TestAuthorizeIngressRule, cls).getClsTestClient().getApiClient()
#Cleanup resources used
cleanup_resources(cls.api_client, cls._cleanup)
@ -508,7 +508,7 @@ class TestRevokeIngressRule(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.services = Services().services
cls.api_client = fetch_api_client()
cls.api_client = super(TestRevokeIngressRule, cls).getClsTestClient().getApiClient()
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
@ -542,7 +542,7 @@ class TestRevokeIngressRule(cloudstackTestCase):
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
cls.api_client = super(TestRevokeIngressRule, cls).getClsTestClient().getApiClient()
#Cleanup resources used
cleanup_resources(cls.api_client, cls._cleanup)
@ -665,7 +665,7 @@ class TestDhcpOnlyRouter(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.services = Services().services
cls.api_client = fetch_api_client()
cls.api_client = super(TestDhcpOnlyRouter, cls).getClsTestClient().getApiClient()
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
@ -707,7 +707,7 @@ class TestDhcpOnlyRouter(cloudstackTestCase):
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
cls.api_client = super(TestDhcpOnlyRouter, cls).getClsTestClient().getApiClient()
#Cleanup resources used
cleanup_resources(cls.api_client, cls._cleanup)
@ -740,7 +740,8 @@ class TestDhcpOnlyRouter(cloudstackTestCase):
self.apiclient,
zoneid=router.zoneid,
type='Routing',
state='Up'
state='Up',
virtualmachineid=self.virtual_machine.id
)
self.assertEqual(
isinstance(hosts, list),
@ -798,7 +799,7 @@ class TestdeployVMWithUserData(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.services = Services().services
cls.api_client = fetch_api_client()
cls.api_client = super(TestdeployVMWithUserData, cls).getClsTestClient().getApiClient()
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
@ -833,7 +834,7 @@ class TestdeployVMWithUserData(cloudstackTestCase):
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
cls.api_client = super(TestdeployVMWithUserData, cls).getClsTestClient().getApiClient()
#Cleanup resources used
cleanup_resources(cls.api_client, cls._cleanup)
@ -1000,14 +1001,14 @@ class TestDeleteSecurityGroup(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.services = Services().services
cls.api_client = fetch_api_client()
cls.api_client = super(TestDeleteSecurityGroup, cls).getClsTestClient().getApiClient()
cls._cleanup = []
return
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
cls.api_client = super(TestDeleteSecurityGroup, cls).getClsTestClient().getApiClient()
#Cleanup resources used
cleanup_resources(cls.api_client, cls._cleanup)
@ -1242,14 +1243,14 @@ class TestIngressRule(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.services = Services().services
cls.api_client = fetch_api_client()
cls.api_client = super(TestIngressRule, cls).getClsTestClient().getApiClient()
cls._cleanup = []
return
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
cls.api_client = super(TestIngressRule, cls).getClsTestClient().getApiClient()
#Cleanup resources used
cleanup_resources(cls.api_client, cls._cleanup)

View File

@ -91,7 +91,7 @@ class TestCreateVMsnapshotTemplate(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestCreateVMsnapshotTemplate, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
@ -307,32 +307,33 @@ class TestCreateVMsnapshotTemplate(cloudstackTestCase):
True,
"Check list response returns a valid list"
)
uuids = []
for host in hosts:
# hosts[0].name = "nfs://192.168.100.21/export/test"
parse_url = (host.name).split('/')
# parse_url = ['nfs:', '', '192.168.100.21', 'export', 'test']
# hosts[0].name = "nfs://192.168.100.21/export/test"
parse_url = (hosts[0].name).split('/')
# parse_url = ['nfs:', '', '192.168.100.21', 'export', 'test']
# Split IP address and export path from name
sec_storage_ip = parse_url[2]
# Sec Storage IP: 192.168.100.21
# Split IP address and export path from name
sec_storage_ip = parse_url[2]
# Sec Storage IP: 192.168.100.21
export_path = '/'.join(parse_url[3:])
# Export path: export/test
export_path = '/'.join(parse_url[3:])
# Export path: export/test
# Sleep to ensure that snapshot is reflected in sec storage
time.sleep(self.services["sleep"])
try:
# Login to VM to check snapshot present on sec disk
ssh_client = remoteSSHClient.remoteSSHClient(
# Sleep to ensure that snapshot is reflected in sec storage
time.sleep(self.services["sleep"])
try:
# Login to VM to check snapshot present on sec disk
ssh_client = remoteSSHClient.remoteSSHClient(
self.services["mgmt_server"]["ipaddress"],
self.services["mgmt_server"]["port"],
self.services["mgmt_server"]["username"],
self.services["mgmt_server"]["password"],
)
cmds = [
cmds = [
"mkdir -p %s" % self.services["mount_dir"],
"mount %s:/%s %s" % (
"mount %s/%s %s" % (
sec_storage_ip,
export_path,
self.services["mount_dir"]
@ -343,34 +344,36 @@ class TestCreateVMsnapshotTemplate(cloudstackTestCase):
volume_id
),
]
for c in cmds:
self.debug("command: %s" % c)
result = ssh_client.execute(c)
self.debug("Result: %s" % result)
for c in cmds:
self.debug("command: %s" % c)
result = ssh_client.execute(c)
self.debug("Result: %s" % result)
except Exception as e:
self.fail("SSH failed for Management server: %s" %
except Exception as e:
self.fail("SSH failed for Management server: %s" %
self.services["mgmt_server"]["ipaddress"])
uuids.append(result)
# Unmount the Sec Storage
cmds = [
"umount %s" % (self.services["mount_dir"]),
]
try:
for c in cmds:
self.debug("command: %s" % c)
result = ssh_client.execute(c)
self.debug("Result: %s" % result)
res = str(result)
except Exception as e:
self.fail("SSH failed for Management server: %s" %
self.services["mgmt_server"]["ipaddress"])
res = str(uuids)
self.assertEqual(
res.count(snapshot_uuid),
1,
"Check snapshot UUID in secondary storage and database"
)
# Unmount the Sec Storage
cmds = [
"umount %s" % (self.services["mount_dir"]),
]
try:
for c in cmds:
self.debug("command: %s" % c)
result = ssh_client.execute(c)
self.debug("Result: %s" % result)
except Exception as e:
self.fail("SSH failed for Management server: %s" %
self.services["mgmt_server"]["ipaddress"])
return
@ -378,7 +381,7 @@ class TestAccountSnapshotClean(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestAccountSnapshotClean, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
@ -563,32 +566,33 @@ class TestAccountSnapshotClean(cloudstackTestCase):
True,
"Check list response returns a valid list"
)
uuids = []
for host in hosts:
# hosts[0].name = "nfs://192.168.100.21/export/test"
parse_url = (host.name).split('/')
# parse_url = ['nfs:', '', '192.168.100.21', 'export', 'test']
# hosts[0].name = "nfs://192.168.100.21/export/test"
parse_url = (hosts[0].name).split('/')
# parse_url = ['nfs:', '', '192.168.100.21', 'export', 'test']
# Split IP address and export path from name
sec_storage_ip = parse_url[2]
# Sec Storage IP: 192.168.100.21
# Split IP address and export path from name
sec_storage_ip = parse_url[2]
# Sec Storage IP: 192.168.100.21
export_path = '/'.join(parse_url[3:])
# Export path: export/test
export_path = '/'.join(parse_url[3:])
# Export path: export/test
# Sleep to ensure that snapshot is reflected in sec storage
time.sleep(self.services["sleep"])
try:
# Login to Secondary storage VM to check snapshot present on sec disk
ssh_client = remoteSSHClient.remoteSSHClient(
# Sleep to ensure that snapshot is reflected in sec storage
time.sleep(self.services["sleep"])
try:
# Login to Secondary storage VM to check snapshot present on sec disk
ssh_client = remoteSSHClient.remoteSSHClient(
self.services["mgmt_server"]["ipaddress"],
self.services["mgmt_server"]["port"],
self.services["mgmt_server"]["username"],
self.services["mgmt_server"]["password"],
)
cmds = [
cmds = [
"mkdir -p %s" % self.services["mount_dir"],
"mount %s:/%s %s" % (
"mount %s/%s %s" % (
sec_storage_ip,
export_path,
self.services["mount_dir"]
@ -600,27 +604,29 @@ class TestAccountSnapshotClean(cloudstackTestCase):
),
]
for c in cmds:
self.debug("command: %s" % c)
result = ssh_client.execute(c)
self.debug("Result: %s" % result)
for c in cmds:
self.debug("command: %s" % c)
result = ssh_client.execute(c)
self.debug("Result: %s" % result)
res = str(result)
self.assertEqual(
uuids.append(result)
# Unmount the Sec Storage
cmds = [
"umount %s" % (self.services["mount_dir"]),
]
for c in cmds:
result = ssh_client.execute(c)
except Exception:
self.fail("SSH failed for management server: %s" %
self.services["mgmt_server"]["ipaddress"])
res = str(uuids)
self.assertEqual(
res.count(snapshot_uuid),
1,
"Check snapshot UUID in secondary storage and database"
)
# Unmount the Sec Storage
cmds = [
"umount %s" % (self.services["mount_dir"]),
]
for c in cmds:
result = ssh_client.execute(c)
except Exception:
self.fail("SSH failed for management server: %s" %
self.services["mgmt_server"]["ipaddress"])
self.debug("Deleting account: %s" % self.account.account.name)
# Delete account
@ -645,44 +651,58 @@ class TestAccountSnapshotClean(cloudstackTestCase):
self.apiclient,
id=self.account.account.id
)
try:
cmds = [
"mount %s:/%s %s" % (
uuids = []
for host in hosts:
# hosts[0].name = "nfs://192.168.100.21/export/test"
parse_url = (host.name).split('/')
# parse_url = ['nfs:', '', '192.168.100.21', 'export', 'test']
# Split IP address and export path from name
sec_storage_ip = parse_url[2]
# Sec Storage IP: 192.168.100.21
export_path = '/'.join(parse_url[3:])
# Export path: export/test
try:
cmds = [
"mount %s/%s %s" % (
sec_storage_ip,
export_path,
self.services["mount_dir"]
),
"ls %s/snapshots/%s/%s" % (
"ls %s/snapshots/%s/%s" % (
self.services["mount_dir"],
account_id,
volume_id
),
]
]
for c in cmds:
self.debug("command: %s" % c)
result = ssh_client.execute(c)
self.debug("Result: %s" % result)
for c in cmds:
self.debug("command: %s" % c)
result = ssh_client.execute(c)
self.debug("Result: %s" % result)
res = str(result)
self.assertNotEqual(
uuids.append(result)
# Unmount the Sec Storage
cmds = [
"umount %s" % (self.services["mount_dir"]),
]
for c in cmds:
self.debug("command: %s" % c)
result = ssh_client.execute(c)
self.debug("Result: %s" % result)
except Exception:
self.fail("SSH failed for management server: %s" %
self.services["mgmt_server"]["ipaddress"])
res = str(uuids)
self.assertNotEqual(
res.count(snapshot_uuid),
1,
"Check snapshot UUID in secondary storage and database"
)
# Unmount the Sec Storage
cmds = [
"umount %s" % (self.services["mount_dir"]),
]
for c in cmds:
self.debug("command: %s" % c)
result = ssh_client.execute(c)
self.debug("Result: %s" % result)
except Exception:
self.fail("SSH failed for management server: %s" %
self.services["mgmt_server"]["ipaddress"])
return
@ -690,7 +710,7 @@ class TestSnapshotDetachedDisk(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestSnapshotDetachedDisk, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
@ -902,32 +922,34 @@ class TestSnapshotDetachedDisk(cloudstackTestCase):
True,
"Check list response returns a valid list"
)
uuids = []
for host in hosts:
# hosts[0].name = "nfs://192.168.100.21/export/test"
parse_url = (host.name).split('/')
# parse_url = ['nfs:', '', '192.168.100.21', 'export', 'test']
# hosts[0].name = "nfs://192.168.100.21/export/test"
parse_url = (hosts[0].name).split('/')
# parse_url = ['nfs:', '', '192.168.100.21', 'export', 'test']
# Split IP address and export path from name
sec_storage_ip = parse_url[2]
# Sec Storage IP: 192.168.100.21
# Split IP address and export path from name
sec_storage_ip = parse_url[2]
# Sec Storage IP: 192.168.100.21
export_path = '/'.join(parse_url[3:])
# Export path: export/test
export_path = '/'.join(parse_url[3:])
# Export path: export/test
# Sleep to ensure that snapshot is reflected in sec storage
time.sleep(self.services["sleep"])
try:
# Login to Management server to check snapshot present on sec disk
ssh_client = remoteSSHClient.remoteSSHClient(
# Sleep to ensure that snapshot is reflected in sec storage
time.sleep(self.services["sleep"])
try:
# Login to Management server to check snapshot present on
# sec disk
ssh_client = remoteSSHClient.remoteSSHClient(
self.services["mgmt_server"]["ipaddress"],
self.services["mgmt_server"]["port"],
self.services["mgmt_server"]["username"],
self.services["mgmt_server"]["password"],
)
cmds = [
cmds = [
"mkdir -p %s" % self.services["mount_dir"],
"mount %s:/%s %s" % (
"mount %s/%s %s" % (
sec_storage_ip,
export_path,
self.services["mount_dir"]
@ -937,26 +959,28 @@ class TestSnapshotDetachedDisk(cloudstackTestCase):
account_id,
volume_id
),
]
]
for c in cmds:
result = ssh_client.execute(c)
res = str(result)
self.assertEqual(
for c in cmds:
result = ssh_client.execute(c)
uuids.append(result)
# Unmount the Sec Storage
cmds = [
"umount %s" % (self.services["mount_dir"]),
]
for c in cmds:
result = ssh_client.execute(c)
except Exception as e:
self.fail("SSH failed for management server: %s" %
self.services["mgmt_server"]["ipaddress"])
res = str(uuids)
self.assertEqual(
res.count(snapshot_uuid),
1,
"Check snapshot UUID in secondary storage and database"
)
# Unmount the Sec Storage
cmds = [
"umount %s" % (self.services["mount_dir"]),
]
for c in cmds:
result = ssh_client.execute(c)
except Exception as e:
self.fail("SSH failed for management server: %s" %
self.services["mgmt_server"]["ipaddress"])
return
@ -964,7 +988,7 @@ class TestSnapshotLimit(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestSnapshotLimit, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
@ -1150,29 +1174,30 @@ class TestSnapshotLimit(cloudstackTestCase):
True,
"Check list response returns a valid list"
)
uuids = []
for host in hosts:
# hosts[0].name = "nfs://192.168.100.21/export/test"
parse_url = (host.name).split('/')
# parse_url = ['nfs:', '', '192.168.100.21', 'export', 'test']
# hosts[0].name = "nfs://192.168.100.21/export/test"
parse_url = (hosts[0].name).split('/')
# parse_url = ['nfs:', '', '192.168.100.21', 'export', 'test']
# Split IP address and export path from name
sec_storage_ip = parse_url[2]
# Sec Storage IP: 192.168.100.21
# Split IP address and export path from name
sec_storage_ip = parse_url[2]
# Sec Storage IP: 192.168.100.21
export_path = '/'.join(parse_url[3:])
# Export path: export/test
try:
# Login to VM to check snapshot present on sec disk
ssh_client = remoteSSHClient.remoteSSHClient(
export_path = '/'.join(parse_url[3:])
# Export path: export/test
try:
# Login to VM to check snapshot present on sec disk
ssh_client = remoteSSHClient.remoteSSHClient(
self.services["mgmt_server"]["ipaddress"],
self.services["mgmt_server"]["port"],
self.services["mgmt_server"]["username"],
self.services["mgmt_server"]["password"],
)
cmds = [
cmds = [
"mkdir -p %s" % self.services["mount_dir"],
"mount %s:/%s %s" % (
"mount %s/%s %s" % (
sec_storage_ip,
export_path,
self.services["mount_dir"]
@ -1184,25 +1209,28 @@ class TestSnapshotLimit(cloudstackTestCase):
),
]
for c in cmds:
result = ssh_client.execute(c)
for c in cmds:
result = ssh_client.execute(c)
res = str(result)
self.assertEqual(
uuids.append(result)
# Unmount the Sec Storage
cmds = [
"umount %s" % (self.services["mount_dir"]),
]
for c in cmds:
result = ssh_client.execute(c)
except Exception as e:
raise Exception(
"SSH access failed for management server: %s" %
self.services["mgmt_server"]["ipaddress"])
res = str(uuids)
self.assertEqual(
res.count(snapshot_uuid),
1,
"Check snapshot UUID in secondary storage and database"
)
# Unmount the Sec Storage
cmds = [
"umount %s" % (self.services["mount_dir"]),
]
for c in cmds:
result = ssh_client.execute(c)
except Exception as e:
raise Exception("SSH access failed for management server: %s" %
self.services["mgmt_server"]["ipaddress"])
return
@ -1210,7 +1238,7 @@ class TestSnapshotEvents(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestSnapshotEvents, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)

View File

@ -62,7 +62,7 @@ class Services:
0:{
"displaytext": "Public Template",
"name": "Public template",
"ostypeid": '144f66aa-7f74-4cfe-9799-80cc21439cb3',
"ostypeid": '5776c0d2-f331-42db-ba3a-29f1f8319bc9',
"url": "http://download.cloud.com/releases/2.0.0/UbuntuServer-10-04-64bit.vhd.bz2",
"hypervisor": 'XenServer',
"format" : 'VHD',
@ -74,12 +74,12 @@ class Services:
"template": {
"displaytext": "Cent OS Template",
"name": "Cent OS Template",
"ostypeid": '144f66aa-7f74-4cfe-9799-80cc21439cb3',
"ostypeid": '5776c0d2-f331-42db-ba3a-29f1f8319bc9',
"templatefilter": 'self',
},
"templatefilter": 'self',
"destzoneid": 2, # For Copy template (Destination zone)
"ostypeid": '144f66aa-7f74-4cfe-9799-80cc21439cb3',
"ostypeid": '5776c0d2-f331-42db-ba3a-29f1f8319bc9',
"sleep": 60,
"timeout": 10,
"mode": 'advanced', # Networking mode: Advanced, basic
@ -109,7 +109,7 @@ class TestCreateTemplate(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.services = Services().services
cls.api_client = fetch_api_client()
cls.api_client = super(TestCreateTemplate, cls).getClsTestClient().getApiClient()
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
@ -136,7 +136,7 @@ class TestCreateTemplate(cloudstackTestCase):
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
cls.api_client = super(TestCreateTemplate, cls).getClsTestClient().getApiClient()
#Cleanup resources used
cleanup_resources(cls.api_client, cls._cleanup)
@ -264,7 +264,7 @@ class TestTemplates(cloudstackTestCase):
def setUpClass(cls):
cls.services = Services().services
cls.api_client = fetch_api_client()
cls.api_client = super(TestTemplates, cls).getClsTestClient().getApiClient()
# Get Zone, templates etc
cls.domain = get_domain(cls.api_client, cls.services)
@ -335,7 +335,7 @@ class TestTemplates(cloudstackTestCase):
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
cls.api_client = super(TestTemplates, cls).getClsTestClient().getApiClient()
#Cleanup created resources such as templates and VMs
cleanup_resources(cls.api_client, cls._cleanup)

View File

@ -98,7 +98,7 @@ class TestVmUsage(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestVmUsage, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
@ -281,7 +281,7 @@ class TestPublicIPUsage(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestPublicIPUsage, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
@ -432,7 +432,7 @@ class TestVolumeUsage(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestVolumeUsage, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
@ -605,7 +605,7 @@ class TestTemplateUsage(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestTemplateUsage, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
@ -769,7 +769,7 @@ class TestISOUsage(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestISOUsage, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
@ -905,7 +905,7 @@ class TestLBRuleUsage(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestLBRuleUsage, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
@ -1066,7 +1066,7 @@ class TestSnapshotUsage(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestSnapshotUsage, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
@ -1231,7 +1231,7 @@ class TestNatRuleUsage(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestNatRuleUsage, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
@ -1392,7 +1392,7 @@ class TestVpnUsage(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestVpnUsage, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)

View File

@ -65,10 +65,10 @@ class Services:
"name": "testISO",
"url": "http://iso.linuxquestions.org/download/504/1819/http/gd4.tuwien.ac.at/dsl-4.4.10.iso",
# Source URL where ISO is located
"ostypeid": '144f66aa-7f74-4cfe-9799-80cc21439cb3',
"ostypeid": '5776c0d2-f331-42db-ba3a-29f1f8319bc9',
},
"sleep": 50,
"ostypeid": '144f66aa-7f74-4cfe-9799-80cc21439cb3',
"ostypeid": '5776c0d2-f331-42db-ba3a-29f1f8319bc9',
"mode": 'advanced',
}
@ -77,7 +77,7 @@ class TestAttachVolume(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestAttachVolume, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
@ -336,7 +336,7 @@ class TestAttachVolume(cloudstackTestCase):
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
cls.api_client = super(TestAttachVolume, cls).getClsTestClient().getApiClient()
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
@ -346,7 +346,7 @@ class TestAttachDetachVolume(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestAttachDetachVolume, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
@ -404,7 +404,7 @@ class TestAttachDetachVolume(cloudstackTestCase):
@classmethod
def tearDownClass(cls):
try:
cls.api_client = fetch_api_client()
cls.api_client = super(TestAttachDetachVolume, cls).getClsTestClient().getApiClient()
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
@ -590,7 +590,7 @@ class TestAttachVolumeISO(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestAttachVolumeISO, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
@ -781,7 +781,7 @@ class TestVolumes(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.api_client = super(TestVolumes, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)

View File

@ -162,7 +162,7 @@ class VirtualMachine:
@classmethod
def create(cls, apiclient, services, templateid=None, accountid=None,
domainid=None, networkids=None, serviceofferingid=None,
securitygroupids=None, mode='basic'):
securitygroupids=None, projectid=None, mode='basic'):
"""Create the instance"""
cmd = deployVirtualMachine.deployVirtualMachineCmd()
@ -204,6 +204,9 @@ class VirtualMachine:
if "userdata" in services:
cmd.userdata = base64.b64encode(services["userdata"])
if projectid:
cmd.projectid = projectid
virtual_machine = apiclient.deployVirtualMachine(cmd)
# VM should be in Running state after deploy
@ -325,7 +328,7 @@ class Volume:
@classmethod
def create(cls, apiclient, services, zoneid=None, account=None, domainid=None,
diskofferingid=None):
diskofferingid=None, projectid=None):
"""Create Volume"""
cmd = createVolume.createVolumeCmd()
cmd.name = services["diskname"]
@ -350,6 +353,8 @@ class Volume:
elif "domainid" in services:
cmd.domainid = services["domainid"]
if projectid:
cmd.projectid = projectid
return Volume(apiclient.createVolume(cmd).__dict__)
@classmethod
@ -414,7 +419,7 @@ class Snapshot:
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, volume_id, account=None, domainid=None):
def create(cls, apiclient, volume_id, account=None, domainid=None, projectid=None):
"""Create Snapshot"""
cmd = createSnapshot.createSnapshotCmd()
cmd.volumeid = volume_id
@ -422,6 +427,8 @@ class Snapshot:
cmd.account = account
if domainid:
cmd.domainid = domainid
if projectid:
cmd.projectid = projectid
return Snapshot(apiclient.createSnapshot(cmd).__dict__)
def delete(self, apiclient):
@ -447,7 +454,7 @@ class Template:
@classmethod
def create(cls, apiclient, services, volumeid=None,
account=None, domainid=None):
account=None, domainid=None, projectid=None):
"""Create template from Volume"""
#Create template from Virtual machine and Volume ID
cmd = createTemplate.createTemplateCmd()
@ -469,6 +476,8 @@ class Template:
if domainid:
cmd.domainid = domainid
if projectid:
cmd.projectid = projectid
return Template(apiclient.createTemplate(cmd).__dict__)
@classmethod
@ -561,7 +570,15 @@ class Template:
time.sleep(interval)
timeout = timeout - 1
return
def updatePermissions(self, apiclient, **kwargs):
"""Updates the template permissions"""
cmd = updateTemplatePermissions.updateTemplatePermissionsCmd()
cmd.id = self.id
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.updateTemplatePermissions(cmd))
@classmethod
def list(cls, apiclient, **kwargs):
"""List all templates matching criteria"""
@ -578,7 +595,8 @@ class Iso:
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, services, account=None, domainid=None):
def create(cls, apiclient, services, account=None, domainid=None,
projectid=None):
"""Create an ISO"""
#Create ISO from URL
cmd = registerIso.registerIsoCmd()
@ -599,6 +617,8 @@ class Iso:
cmd.account = account
if domainid:
cmd.domainid = domainid
if projectid:
cmd.projectid = projectid
# Register ISO
iso = apiclient.registerIso(cmd)
@ -656,17 +676,29 @@ class PublicIPAddress:
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, accountid, zoneid=None, domainid=None,
services=None, networkid=None):
def create(cls, apiclient, accountid=None, zoneid=None, domainid=None,
services=None, networkid=None, projectid=None):
"""Associate Public IP address"""
cmd = associateIpAddress.associateIpAddressCmd()
cmd.account = accountid
cmd.zoneid = zoneid or services["zoneid"]
cmd.domainid = domainid or services["domainid"]
if accountid:
cmd.account = accountid
if zoneid:
cmd.zoneid = zoneid
elif "zoneid" in services:
services["zoneid"]
if domainid:
cmd.domainid = domainid
elif "domainid" in services:
services["domainid"]
if networkid:
cmd.networkid = networkid
if projectid:
cmd.projectid = projectid
return PublicIPAddress(apiclient.associateIpAddress(cmd).__dict__)
def delete(self, apiclient):
@ -691,7 +723,8 @@ class NATRule:
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, virtual_machine, services, ipaddressid=None):
def create(cls, apiclient, virtual_machine, services, ipaddressid=None,
projectid=None):
"""Create Port forwarding rule"""
cmd = createPortForwardingRule.createPortForwardingRuleCmd()
@ -705,6 +738,9 @@ class NATRule:
cmd.protocol = services["protocol"]
cmd.virtualmachineid = virtual_machine.id
if projectid:
cmd.projectid = projectid
return NATRule(apiclient.createPortForwardingRule(cmd).__dict__)
def delete(self, apiclient):
@ -793,7 +829,7 @@ class FireWallRule:
@classmethod
def create(cls, apiclient, ipaddressid, protocol, cidrlist=None,
startport=None, endport=None):
startport=None, endport=None, projectid=None):
"""Create Firewall Rule"""
cmd = createFirewallRule.createFirewallRuleCmd()
cmd.ipaddressid = ipaddressid
@ -805,7 +841,10 @@ class FireWallRule:
if endport:
cmd.endport = endport
return NATRule(apiclient.createFirewallRule(cmd).__dict__)
if projectid:
cmd.projectid = projectid
return FireWallRule(apiclient.createFirewallRule(cmd).__dict__)
def delete(self, apiclient):
"""Delete Firewall rule"""
@ -867,7 +906,7 @@ class DiskOffering:
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, services, custom=False):
def create(cls, apiclient, services, custom=False, domainid=None):
"""Create Disk offering"""
cmd = createDiskOffering.createDiskOfferingCmd()
cmd.displaytext = services["displaytext"]
@ -876,6 +915,10 @@ class DiskOffering:
cmd.customized = True
else:
cmd.disksize = services["disksize"]
if domainid:
cmd.domainid = domainid
return DiskOffering(apiclient.createDiskOffering(cmd).__dict__)
def delete(self, apiclient):
@ -894,6 +937,58 @@ class DiskOffering:
return(apiclient.listDiskOfferings(cmd))
class NetworkOffering:
"""Manage network offerings cycle"""
def __init__(self, items):
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, services, serviceProviderList=None, **kwargs):
"""Create network offering"""
cmd = createNetworkOffering.createNetworkOfferingCmd()
cmd.displaytext = services["displaytext"]
cmd.name = services["name"]
cmd.guestiptype = services["guestiptype"]
cmd.supportedservices = services["supportedservices"]
cmd.traffictype = services["traffictype"]
cmd.serviceProviderList = []
for service, provider in services["serviceProviderList"].items():
cmd.serviceProviderList.append({
'service' : service,
'provider': provider
})
[setattr(cmd, k, v) for k, v in kwargs.items()]
return NetworkOffering(apiclient.createNetworkOffering(cmd).__dict__)
def delete(self, apiclient):
"""Delete network offering"""
cmd = deleteNetworkOffering.deleteNetworkOfferingCmd()
cmd.id = self.id
apiclient.deleteNetworkOffering(cmd)
return
def update(self, apiclient, **kwargs):
"""Lists all available network offerings."""
cmd = updateNetworkOffering.updateNetworkOfferingCmd()
cmd.id = self.id
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.updateNetworkOffering(cmd))
@classmethod
def list(cls, apiclient, **kwargs):
"""Lists all available network offerings."""
cmd = listNetworkOfferings.listNetworkOfferingsCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listNetworkOfferings(cmd))
class SnapshotPolicy:
"""Manage snapshot policies"""
@ -934,16 +1029,26 @@ class LoadBalancerRule:
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, services, ipaddressid, accountid=None):
def create(cls, apiclient, services, ipaddressid, accountid=None,
projectid=None):
"""Create Load balancing Rule"""
cmd = createLoadBalancerRule.createLoadBalancerRuleCmd()
cmd.publicipid = ipaddressid or services["ipaddressid"]
cmd.account = accountid or services["account"]
if accountid:
cmd.account = accountid
elif "account" in services:
cmd.account = services["account"]
cmd.name = services["name"]
cmd.algorithm = services["alg"]
cmd.privateport = services["privateport"]
cmd.publicport = services["publicport"]
if projectid:
cmd.projectid = projectid
return LoadBalancerRule(apiclient.createLoadBalancerRule(cmd).__dict__)
def delete(self, apiclient):
@ -1162,17 +1267,42 @@ class Network:
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, services, accountid=None, domainid=None):
def create(cls, apiclient, services, accountid=None, domainid=None,
networkofferingid=None, projectid=None, zoneid=None):
"""Create Network for account"""
cmd = createNetwork.createNetworkCmd()
cmd.name = services["name"]
cmd.displaytext = services["displaytext"]
cmd.networkofferingid = services["networkoffering"]
cmd.zoneid = services["zoneid"]
if networkofferingid:
cmd.networkofferingid = networkofferingid
elif "networkoffering" in services:
cmd.networkofferingid = services["networkoffering"]
if zoneid:
cmd.zoneid = zoneid
elif "zoneid" in services:
cmd.zoneid = services["zoneid"]
if "gateway" in services:
cmd.gateway = services["gateway"]
if "netmask" in services:
cmd.netmask = services["netmask"]
if "startip" in services:
cmd.startip = services["startip"]
if "endip" in services:
cmd.endip = services["endip"]
if "vlan" in services:
cmd.vlan = services["vlan"]
if "acltype" in services:
cmd.acltype = services["acltype"]
if accountid:
cmd.account = accountid
if domainid:
cmd.domainid = domainid
if projectid:
cmd.projectid = projectid
return Network(apiclient.createNetwork(cmd).__dict__)
@ -1199,7 +1329,8 @@ class Vpn:
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, publicipid, account=None, domainid=None):
def create(cls, apiclient, publicipid, account=None, domainid=None,
projectid=None):
"""Create VPN for Public IP address"""
cmd = createRemoteAccessVpn.createRemoteAccessVpnCmd()
cmd.publicipid = publicipid
@ -1207,7 +1338,8 @@ class Vpn:
cmd.account = account
if domainid:
cmd.domainid = domainid
if projectid:
cmd.projectid = projectid
return Vpn(apiclient.createRemoteAccessVpn(cmd).__dict__)
def delete(self, apiclient):
@ -1225,7 +1357,8 @@ class VpnUser:
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, username, password, account=None, domainid=None):
def create(cls, apiclient, username, password, account=None, domainid=None,
projectid=None):
"""Create VPN user"""
cmd = addVpnUser.addVpnUserCmd()
cmd.username = username
@ -1235,7 +1368,8 @@ class VpnUser:
cmd.account = account
if domainid:
cmd.domainid = domainid
if projectid:
cmd.projectid = projectid
return VpnUser(apiclient.addVpnUser(cmd).__dict__)
def delete(self, apiclient):
@ -1442,7 +1576,7 @@ class SecurityGroup:
@classmethod
def create(cls, apiclient, services, account=None, domainid=None,
description=None):
description=None, projectid=None):
"""Create security group"""
cmd = createSecurityGroup.createSecurityGroupCmd()
@ -1453,7 +1587,9 @@ class SecurityGroup:
cmd.domainid=domainid
if description:
cmd.description=description
if projectid:
cmd.projectid = projectid
return SecurityGroup(apiclient.createSecurityGroup(cmd).__dict__)
def delete(self, apiclient):
@ -1464,7 +1600,7 @@ class SecurityGroup:
apiclient.deleteSecurityGroup(cmd)
def authorize(self, apiclient, services,
account=None, domainid=None):
account=None, domainid=None, projectid=None):
"""Authorize Ingress Rule"""
cmd=authorizeSecurityGroupIngress.authorizeSecurityGroupIngressCmd()
@ -1473,7 +1609,9 @@ class SecurityGroup:
cmd.domainid = domainid
if account:
cmd.account = account
if projectid:
cmd.projectid = projectid
cmd.securitygroupid=self.id
cmd.protocol=services["protocol"]
@ -1500,4 +1638,144 @@ class SecurityGroup:
cmd = listSecurityGroups.listSecurityGroupsCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listSecurityGroups(cmd))
return(apiclient.listSecurityGroups(cmd))
class Project:
"""Manage Project life cycle"""
def __init__(self, items):
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, services, account=None, domainid=None):
"""Create project"""
cmd = createProject.createProjectCmd()
cmd.displaytext = services["displaytext"]
cmd.name = "-".join([services["name"], random_gen()])
if account:
cmd.account = account
if domainid:
cmd.domainid = domainid
return Project(apiclient.createProject(cmd).__dict__)
def delete(self, apiclient):
"""Delete Project"""
cmd = deleteProject.deleteProjectCmd()
cmd.id = self.id
apiclient.deleteProject(cmd)
def update(self, apiclient, **kwargs):
"""Updates the project"""
cmd = updateProject.updateProjectCmd()
cmd.id = self.id
[setattr(cmd, k, v) for k, v in kwargs.items()]
return apiclient.updateProject(cmd)
def activate(self, apiclient):
"""Activates the suspended project"""
cmd = activateProject.activateProjectCmd()
cmd.id = self.id
return apiclient.activateProject(cmd)
def suspend(self, apiclient):
"""Suspend the active project"""
cmd = suspendProject.suspendProjectCmd()
cmd.id = self.id
return apiclient.suspendProject(cmd)
def addAccount(self, apiclient, account=None, email=None):
"""Add account to project"""
cmd = addAccountToProject.addAccountToProjectCmd()
cmd.projectid = self.id
if account:
cmd.account = account
if email:
cmd.email = email
return apiclient.addAccountToProject(cmd)
def deleteAccount(self, apiclient, account):
"""Delete account from project"""
cmd = deleteAccountFromProject.deleteAccountFromProjectCmd()
cmd.projectid = self.id
cmd.account = account
return apiclient.deleteAccountFromProject(cmd)
@classmethod
def listAccounts(cls, apiclient, **kwargs):
"""Lists all accounts associated with projects."""
cmd = listProjectAccounts.listProjectAccountsCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listProjectAccounts(cmd))
@classmethod
def list(cls, apiclient, **kwargs):
"""Lists all projects."""
cmd = listProjects.listProjectsCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listProjects(cmd))
class ProjectInvitation:
"""Manage project invitations"""
def __init__(self, items):
self.__dict__.update(items)
@classmethod
def update(cls, apiclient, projectid, accept, account=None, token=None):
"""Updates the project invitation for that account"""
cmd = updateProjectInvitation.updateProjectInvitationCmd()
cmd.projectid = projectid
cmd.accept = accept
if account:
cmd.account = account
if token:
cmd.token = token
return (apiclient.updateProjectInvitation(cmd).__dict__)
def delete(self, apiclient, id):
"""Deletes the project invitation"""
cmd = deleteProjectInvitation.deleteProjectInvitationCmd()
cmd.id = id
return apiclient.deleteProjectInvitation(cmd)
@classmethod
def list(cls, apiclient, **kwargs):
"""Lists project invitations"""
cmd = listProjectInvitations.listProjectInvitationsCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listProjectInvitations(cmd))
class Configurations:
"""Manage Configuration"""
@classmethod
def update(cls, apiclient, name, value=None):
"""Updates the specified configuration"""
cmd = updateConfiguration.updateConfigurationCmd()
cmd.name = name
cmd.value = value
apiclient.updateConfiguration(cmd)
@classmethod
def list(cls, apiclient, **kwargs):
"""Lists configurations"""
cmd = listConfigurations.listConfigurationsCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listConfigurations(cmd))

View File

@ -227,7 +227,7 @@ def download_builtin_templates(apiclient, zoneid, hypervisor, host, linklocalip,
return
def update_resource_limit(apiclient, resourcetype, account=None, domainid=None,
max=None):
max=None, projectid=None):
"""Updates the resource limit to 'max' for given account"""
cmd = updateResourceLimit.updateResourceLimitCmd()
@ -238,6 +238,8 @@ def update_resource_limit(apiclient, resourcetype, account=None, domainid=None,
cmd.domainid = domainid
if max:
cmd.max = max
if projectid:
cmd.projectid = projectid
apiclient.updateResourceLimit(cmd)
return
@ -451,4 +453,11 @@ def list_network_offerings(apiclient, **kwargs):
cmd = listNetworkOfferings.listNetworkOfferingsCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listNetworkOfferings(cmd))
return(apiclient.listNetworkOfferings(cmd))
def list_resource_limits(apiclient, **kwargs):
"""Lists resource limits"""
cmd = listResourceLimits.listResourceLimitsCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listResourceLimits(cmd))

View File

@ -15,6 +15,72 @@ import configGenerator
import logging
import string
import random
import imaplib
import email
import datetime
def restart_mgmt_server(server):
"""Restarts the management server"""
try:
# Get the SSH client
ssh = is_server_ssh_ready(
server["ipaddress"],
server["port"],
server["username"],
server["password"],
)
result = ssh.execute("/etc/init.d/cloud-management restart")
res = str(result)
# Server Stop - OK
# Server Start - OK
if res.count("OK") != 2:
raise ("ErrorInReboot!")
except Exception as e:
raise e
return
def fetch_latest_mail(services, from_mail):
"""Fetch mail"""
# Login to mail server to verify email
mail = imaplib.IMAP4_SSL(services["server"])
mail.login(
services["email"],
services["password"]
)
mail.list()
mail.select(services["folder"])
date = (datetime.date.today() - datetime.timedelta(1)).strftime("%d-%b-%Y")
result, data = mail.uid(
'search',
None,
'(SENTSINCE {date} HEADER FROM "{mail}")'.format(
date=date,
mail=from_mail
)
)
# Return False if email is not present
if data == []:
return False
latest_email_uid = data[0].split()[-1]
result, data = mail.uid('fetch', latest_email_uid, '(RFC822)')
raw_email = data[0][1]
email_message = email.message_from_string(raw_email)
result = get_first_text_block(email_message)
return result
def get_first_text_block(email_message_instance):
"""fetches first text block from the mail"""
maintype = email_message_instance.get_content_maintype()
if maintype == 'multipart':
for part in email_message_instance.get_payload():
if part.get_content_maintype() == 'text':
return part.get_payload()
elif maintype == 'text':
return email_message_instance.get_payload()
def random_gen(size=6, chars=string.ascii_uppercase + string.digits):
"""Generate Random Strings of variable length"""