Extracted common functionality in libs.

Added P1-testcases.
Abstracted functionality to behave different for BASIC and ADVANCED networking mode.
This commit is contained in:
Chirag Jog 2012-02-08 03:07:59 -08:00
parent c023db10cc
commit a046161b56
21 changed files with 6058 additions and 1300 deletions

View File

@ -8,8 +8,9 @@
#Import Local Modules
from cloudstackTestCase import *
from cloudstackAPI import *
from utils import *
from base import *
from testcase.libs.utils import *
from testcase.libs.base import *
from testcase.libs.common import *
class Services:
"""Test Disk offerings Services
@ -56,10 +57,10 @@ class TestCreateDiskOffering(cloudstackTestCase):
)
self.cleanup.append(disk_offering)
cmd = listDiskOfferings.listDiskOfferingsCmd()
cmd.id = disk_offering.id
list_disk_response = self.apiclient.listDiskOfferings(cmd)
list_disk_response = list_disk_offering(
self.apiclient,
id=disk_offering.id
)
self.assertNotEqual(
len(list_disk_response),
0,
@ -140,9 +141,10 @@ class TestDiskOfferings(cloudstackTestCase):
self.apiclient.updateDiskOffering(cmd)
cmd = listDiskOfferings.listDiskOfferingsCmd()
cmd.id = self.disk_offering_1.id
list_disk_response = self.apiclient.listDiskOfferings(cmd)
list_disk_response = list_disk_offering(
self.apiclient,
id=self.disk_offering_1.id
)
self.assertNotEqual(
len(list_disk_response),
@ -171,13 +173,12 @@ class TestDiskOfferings(cloudstackTestCase):
# 1. deleteDiskOffering should return
# a valid information for newly created offering
cmd = deleteDiskOffering.deleteDiskOfferingCmd()
cmd.id = self.disk_offering_2.id
self.apiclient.deleteDiskOffering(cmd)
self.disk_offering_2.delete(self.apiclient)
cmd = listDiskOfferings.listDiskOfferingsCmd()
cmd.id = self.disk_offering_2.id
list_disk_response = self.apiclient.listDiskOfferings(cmd)
list_disk_response = list_disk_offering(
self.apiclient,
id=self.disk_offering_2.id
)
self.assertEqual(
list_disk_response,

View File

@ -7,8 +7,9 @@
#Import Local Modules
from cloudstackTestCase import *
from cloudstackAPI import *
from utils import *
from base import *
from testcase.libs.utils import *
from testcase.libs.base import *
from testcase.libs.common import *
#Import System modules
import time
@ -51,7 +52,6 @@ class Services:
"xenserver": {
# Must be name of corresponding Hypervisor type
# in cluster in small letters
"clusterid": 16,
"hypervisor": 'XenServer',
# Hypervisor type
"clustertype": 'ExternalManaged',
@ -61,7 +61,6 @@ class Services:
"password": "fr3sca",
},
"kvm": {
"clusterid": 35,
"hypervisor": 'KVM',
# Hypervisor type
"clustertype": 'CloudManaged',
@ -71,7 +70,6 @@ class Services:
"password": "fr3sca",
},
"vmware": {
"clusterid": 16,
"hypervisor": 'VMware',
# Hypervisor type
"clustertype": 'ExternalManaged',
@ -80,7 +78,10 @@ class Services:
"username": "administrator",
"password": "fr3sca",
},
}
},
"zoneid": 2,
# Optional, if specified the mentioned zone will be
# used for tests
}
class TestHosts(cloudstackTestCase):
@ -93,7 +94,7 @@ class TestHosts(cloudstackTestCase):
self.cleanup = []
# Get Zone and pod
self.zone = get_zone(self.apiclient)
self.zone = get_zone(self.apiclient, self.services)
self.pod = get_pod(self.apiclient, self.zone.id)
self.services["clusters"][0]["zoneid"] = self.zone.id
@ -150,9 +151,10 @@ class TestHosts(cloudstackTestCase):
)
#If host is externally managed host is already added with cluster
cmd = listHosts.listHostsCmd()
cmd.clusterid = cluster.id
response = self.apiclient.listHosts(cmd)
response = list_hosts(
self.apiclient,
clusterid=cluster.id
)
if not response:
hypervisor_type = str(cluster.hypervisortype.lower())
@ -166,9 +168,10 @@ class TestHosts(cloudstackTestCase):
self.cleanup.append(host)
self.cleanup.append(cluster)
cmd = listHosts.listHostsCmd()
cmd.clusterid = cluster.id
list_hosts_response = self.apiclient.listHosts(cmd)
list_hosts_response = list_hosts(
self.apiclient,
clusterid=cluster.id
)
self.assertNotEqual(
len(list_hosts_response),
@ -184,9 +187,10 @@ class TestHosts(cloudstackTestCase):
"Check if state of host is Up or not"
)
#Verify List Cluster Response has newly added cluster
cmd = listClusters.listClustersCmd()
cmd.id = cluster.id
list_cluster_response = self.apiclient.listClusters(cmd)
list_cluster_response = list_clusters(
self.apiclient,
id=cluster.id
)
self.assertNotEqual(
len(list_cluster_response),

View File

@ -7,8 +7,9 @@
#Import Local Modules
from cloudstackTestCase import *
from cloudstackAPI import *
from utils import *
from base import *
from testcase.libs.utils import *
from testcase.libs.base import *
from testcase.libs.common import *
import urllib
from random import random
#Import System modules
@ -30,8 +31,8 @@ class Services:
},
"iso_1":
{
"displaytext": "Test ISO type 1",
"name": "testISOType_1",
"displaytext": "Test ISO 1",
"name": "ISO 1",
"url": "http://iso.linuxquestions.org/download/504/1819/http/gd4.tuwien.ac.at/dsl-4.4.10.iso",
# Source URL where ISO is located
"isextractable": True,
@ -41,8 +42,8 @@ class Services:
},
"iso_2":
{
"displaytext": "Test ISO type 2",
"name": "testISOType_2",
"displaytext": "Test ISO 2",
"name": "ISO 2",
"url": "http://iso.linuxquestions.org/download/504/1819/http/gd4.tuwien.ac.at/dsl-4.4.10.iso",
# Source URL where ISO is located
"isextractable": True,
@ -52,7 +53,7 @@ class Services:
"mode": 'HTTP_DOWNLOAD',
# Used in Extract template, value must be HTTP_DOWNLOAD
},
"destzoneid": 2,
"destzoneid": 5,
# Copy ISO from one zone to another (Destination Zone)
"isfeatured": True,
"ispublic": True,
@ -61,6 +62,10 @@ class Services:
"passwordenabled": True,
"ostypeid": 12,
"domainid": 1,
"zoneid": 1,
# Optional, if specified the mentioned zone will be
# used for tests
"mode": 'advanced'
}
@ -71,7 +76,7 @@ class TestCreateIso(cloudstackTestCase):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
# Get Zone, Domain and templates
self.zone = get_zone(self.apiclient)
self.zone = get_zone(self.apiclient, self.services)
self.services["iso_2"]["zoneid"] = self.zone.id
self.cleanup = []
return
@ -102,9 +107,10 @@ class TestCreateIso(cloudstackTestCase):
iso.download(self.apiclient)
self.cleanup.append(iso)
cmd = listIsos.listIsosCmd()
cmd.id = iso.id
list_iso_response = self.apiclient.listIsos(cmd)
list_iso_response = list_isos(
self.apiclient,
id=iso.id
)
iso_response = list_iso_response[0]
@ -140,7 +146,7 @@ class TestISO(cloudstackTestCase):
cls.api_client = fetch_api_client()
# Get Zone, Domain and templates
cls.zone = get_zone(cls.api_client)
cls.zone = get_zone(cls.api_client, cls.services)
cls.services["iso_1"]["zoneid"] = cls.zone.id
cls.services["iso_2"]["zoneid"] = cls.zone.id
cls.services["sourcezoneid"] = cls.zone.id
@ -209,9 +215,10 @@ class TestISO(cloudstackTestCase):
self.apiclient.updateIso(cmd)
#Check whether attributes are updated in ISO using listIsos
cmd = listIsos.listIsosCmd()
cmd.id = self.iso_1.id
list_iso_response = self.apiclient.listIsos(cmd)
list_iso_response = list_isos(
self.apiclient,
id=self.iso_1.id
)
self.assertNotEqual(
len(list_iso_response),
@ -254,9 +261,10 @@ class TestISO(cloudstackTestCase):
self.iso_1.delete(cls.api_client)
#ListIsos to verify deleted ISO is properly deleted
cmd = listIsos.listIsosCmd()
cmd.id = self.iso_1.id
list_iso_response = self.apiclient.listIsos(cmd)
list_iso_response = list_isos(
self.apiclient,
id=self.iso_1.id
)
self.assertEqual(
list_iso_response,
@ -324,11 +332,12 @@ class TestISO(cloudstackTestCase):
self.apiclient.updateIsoPermissions(cmd)
#Verify ListIsos have updated permissions for the ISO for normal user
cmd = listIsos.listIsosCmd()
cmd.id = self.iso_2.id
cmd.account = self.account.account.name
cmd.domainid = self.account.account.domainid
list_iso_response = self.apiclient.listIsos(cmd)
list_iso_response = list_isos(
self.apiclient,
id=self.iso_2.id,
account=self.account.account.name,
domainid=self.account.account.domainid
)
iso_response = list_iso_response[0]
@ -360,13 +369,14 @@ class TestISO(cloudstackTestCase):
cmd = copyIso.copyIsoCmd()
cmd.id = self.iso_2.id
cmd.destzoneid = self.services["destzoneid"]
cmd.sourcezoneid = self.services["sourcezoneid"]
cmd.sourcezoneid = self.zone.id
self.apiclient.copyIso(cmd)
#Verify ISO is copied to another zone using ListIsos
cmd = listIsos.listIsosCmd()
cmd.id = self.iso_2.id
list_iso_response = self.apiclient.listIsos(cmd)
list_iso_response = list_isos(
self.apiclient,
id=self.iso_2.id
)
iso_response = list_iso_response[0]

View File

@ -9,8 +9,9 @@
from cloudstackTestCase import *
from cloudstackAPI import *
import remoteSSHClient
from utils import *
from base import *
from testcase.libs.utils import *
from testcase.libs.base import *
from testcase.libs.common import *
#Import System modules
import time
@ -23,6 +24,10 @@ class Services:
self.services = {
"ostypeid": 12,
# Cent OS 5.3 (64 bit)
"zoneid": 1,
# Optional, if specified the mentioned zone will be
# used for tests
"mode": 'advanced', # Networking mode: Basic or advanced
"network": {
"name": "Test Network",
"displaytext": "Test Network",
@ -38,23 +43,23 @@ class Services:
# In MBs
},
"account": {
"email": "test@test.com",
"firstname": "Test",
"lastname": "User",
"username": "test",
"password": "password",
},
"email": "test@test.com",
"firstname": "Test",
"lastname": "User",
"username": "test",
"password": "password",
},
"server":
{
"displayname": "Small Instance",
"username": "root",
"password": "password",
"hypervisor": 'XenServer',
"domainid": 1,
"privateport": 22,
"publicport": 22,
"ssh_port": 22,
"protocol": 'TCP',
"displayname": "Small Instance",
"username": "root",
"password": "password",
"hypervisor": 'XenServer',
"domainid": 1,
"privateport": 22,
"publicport": 22,
"ssh_port": 22,
"protocol": 'TCP',
},
"natrule":
{
@ -84,7 +89,7 @@ class TestPublicIP(cloudstackTestCase):
cls.api_client = fetch_api_client()
cls.services = Services().services
# Get Zone, Domain and templates
cls.zone = get_zone(cls.api_client)
cls.zone = get_zone(cls.api_client, cls.services)
# Create Accounts & networks
cls.account = Account.create(
@ -155,10 +160,10 @@ class TestPublicIP(cloudstackTestCase):
self.zone.id,
self.account.account.domainid
)
cmd = listPublicIpAddresses.listPublicIpAddressesCmd()
cmd.id = ip_address.ipaddress.id
list_pub_ip_addr_resp = self.apiclient.listPublicIpAddresses(cmd)
list_pub_ip_addr_resp = list_publicIP(
self.apiclient,
id=ip_address.ipaddress.id
)
#listPublicIpAddresses should return newly created public IP
self.assertNotEqual(
@ -176,11 +181,10 @@ class TestPublicIP(cloudstackTestCase):
# Validate the following:
# 1.listPublicIpAddresses should no more return the released address
cmd = listPublicIpAddresses.listPublicIpAddressesCmd()
cmd.id = ip_address.ipaddress.id
list_pub_ip_addr_resp = self.apiclient.listPublicIpAddresses(cmd)
list_pub_ip_addr_resp = list_publicIP(
self.apiclient,
id=ip_address.ipaddress.id
)
self.assertEqual(
list_pub_ip_addr_resp,
None,
@ -202,11 +206,12 @@ class TestPublicIP(cloudstackTestCase):
self.zone.id,
self.user.account.domainid
)
cmd = listPublicIpAddresses.listPublicIpAddressesCmd()
cmd.id = ip_address.ipaddress.id
#listPublicIpAddresses should return newly created public IP
list_pub_ip_addr_resp = self.apiclient.listPublicIpAddresses(cmd)
list_pub_ip_addr_resp = list_publicIP(
self.apiclient,
id=ip_address.ipaddress.id
)
self.assertNotEqual(
len(list_pub_ip_addr_resp),
@ -221,9 +226,10 @@ class TestPublicIP(cloudstackTestCase):
ip_address.delete(self.apiclient)
cmd = listPublicIpAddresses.listPublicIpAddressesCmd()
cmd.id = ip_address.ipaddress.id
list_pub_ip_addr_resp = self.apiclient.listPublicIpAddresses(cmd)
list_pub_ip_addr_resp = list_publicIP(
self.apiclient,
id=ip_address.ipaddress.id
)
self.assertEqual(
list_pub_ip_addr_resp,
@ -242,7 +248,7 @@ class TestPortForwarding(cloudstackTestCase):
cls.services = Services().services
# Get Zone, Domain and templates
cls.zone = get_zone(cls.api_client)
cls.zone = get_zone(cls.api_client, cls.services)
template = get_template(
cls.api_client,
cls.zone.id,
@ -296,11 +302,12 @@ class TestPortForwarding(cloudstackTestCase):
#1. listPortForwarding rules API should return the added PF rule
#2. attempt to do an ssh into the user VM through the sourceNAT
cmd = listPublicIpAddresses.listPublicIpAddressesCmd()
cmd.account = self.account.account.name
cmd.domainid = self.account.account.domainid
src_nat_ip_addr = self.apiclient.listPublicIpAddresses(cmd)[0]
src_nat_ip_addrs = list_publicIP(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
src_nat_ip_addr = src_nat_ip_addrs[0]
#Create NAT rule
nat_rule = NATRule.create(
self.apiclient,
@ -309,10 +316,10 @@ class TestPortForwarding(cloudstackTestCase):
src_nat_ip_addr.id
)
cmd = listPortForwardingRules.listPortForwardingRulesCmd()
cmd.id = nat_rule.id
list_nat_rule_response = self.apiclient.listPortForwardingRules(cmd)
list_nat_rule_response = list_nat_rules(
self.apiclient,
id=nat_rule.id
)
self.assertNotEqual(
len(list_nat_rule_response),
0,
@ -334,9 +341,10 @@ class TestPortForwarding(cloudstackTestCase):
nat_rule.delete(self.apiclient)
cmd = listPortForwardingRules.listPortForwardingRulesCmd()
cmd.id = nat_rule.id
list_nat_rule_response = self.apiclient.listPortForwardingRules(cmd)
list_nat_rule_response = list_nat_rules(
self.apiclient,
id=nat_rule.id
)
self.assertEqual(
list_nat_rule_response,
@ -379,10 +387,10 @@ class TestPortForwarding(cloudstackTestCase):
#1. listPortForwardingRules should not return the deleted rule anymore
#2. attempt to do ssh should now fail
cmd = listPortForwardingRules.listPortForwardingRulesCmd()
cmd.id = nat_rule.id
list_nat_rule_response = self.apiclient.listPortForwardingRules(cmd)
list_nat_rule_response = list_nat_rules(
self.apiclient,
id=nat_rule.id
)
self.assertNotEqual(
len(list_nat_rule_response),
0,
@ -399,17 +407,17 @@ class TestPortForwarding(cloudstackTestCase):
except Exception as e:
self.fail(
"SSH Access failed for %s: %s" % \
(self.virtual_machine.ipaddress.ipaddress, e)
(self.virtual_machine.ipaddress, e)
)
nat_rule.delete(self.apiclient)
cmd = listPortForwardingRules.listPortForwardingRulesCmd()
cmd.id = nat_rule.id
list_nat_rule_response = self.apiclient.listPortForwardingRules(cmd)
list_nat_rule_response = list_nat_rules(
self.apiclient,
id=nat_rule.id
)
self.assertEqual(
len(list_nat_rule_response),
list_nat_rule_response,
None,
"Check Port Forwarding Rule is deleted"
)
@ -432,7 +440,7 @@ class TestLoadBalancingRule(cloudstackTestCase):
cls.api_client = fetch_api_client()
cls.services = Services().services
# Get Zone, Domain and templates
cls.zone = get_zone(cls.api_client)
cls.zone = get_zone(cls.api_client, cls.services)
template = get_template(
cls.api_client,
cls.zone.id,
@ -502,10 +510,12 @@ class TestLoadBalancingRule(cloudstackTestCase):
#3. verify using the hostname of the VM
# that round robin is indeed happening as expected
cmd = listPublicIpAddresses.listPublicIpAddressesCmd()
cmd.account = self.account.account.name
cmd.domainid = self.account.account.domainid
src_nat_ip_addr = self.apiclient.listPublicIpAddresses(cmd)[0]
src_nat_ip_addrs = list_publicIP(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
src_nat_ip_addr = src_nat_ip_addrs[0]
#Create Load Balancer rule and assign VMs to rule
lb_rule = LoadBalancerRule.create(
@ -517,10 +527,11 @@ class TestLoadBalancingRule(cloudstackTestCase):
self.cleanup.append(lb_rule)
lb_rule.assign(self.apiclient, [self.vm_1, self.vm_2])
cmd = listLoadBalancerRules.listLoadBalancerRulesCmd()
cmd.id = lb_rule.id
lb_rules = self.apiclient.listLoadBalancerRules(cmd)
lb_rules = list_lb_rules(
self.apiclient,
id=lb_rule.id
)
#verify listLoadBalancerRules lists the added load balancing rule
self.assertNotEqual(
len(lb_rules),
@ -535,10 +546,10 @@ class TestLoadBalancingRule(cloudstackTestCase):
# listLoadBalancerRuleInstances should list all
# instances associated with that LB rule
cmd = listLoadBalancerRuleInstances.listLoadBalancerRuleInstancesCmd()
cmd.id = lb_rule.id
lb_instance_rules = self.apiclient.listLoadBalancerRuleInstances(cmd)
lb_instance_rules = list_lb_instances(
self.apiclient,
id=lb_rule.id
)
self.assertNotEqual(
len(lb_instance_rules),
0,
@ -559,7 +570,7 @@ class TestLoadBalancingRule(cloudstackTestCase):
ssh_1 = remoteSSHClient.remoteSSHClient(
src_nat_ip_addr.ipaddress,
self.services['natrule']["publicport"],
self.services['lbrule']["publicport"],
self.vm_1.username,
self.vm_1.password
)
@ -570,7 +581,7 @@ class TestLoadBalancingRule(cloudstackTestCase):
time.sleep(20)
ssh_2 = remoteSSHClient.remoteSSHClient(
src_nat_ip_addr.ipaddress,
self.services['natrule']["publicport"],
self.services['lbrule']["publicport"],
self.vm_1.username,
self.vm_1.password
)
@ -620,9 +631,11 @@ class TestLoadBalancingRule(cloudstackTestCase):
self.cleanup.append(lb_rule)
lb_rule.assign(self.apiclient, [self.vm_1, self.vm_2])
cmd = listLoadBalancerRules.listLoadBalancerRulesCmd()
cmd.id = lb_rule.id
lb_rules = self.apiclient.listLoadBalancerRules(cmd)
lb_rules = list_lb_rules(
self.apiclient,
id=lb_rule.id
)
#verify listLoadBalancerRules lists the added load balancing rule
self.assertNotEqual(
@ -637,9 +650,10 @@ class TestLoadBalancingRule(cloudstackTestCase):
)
# listLoadBalancerRuleInstances should list
# all instances associated with that LB rule
cmd = listLoadBalancerRuleInstances.listLoadBalancerRuleInstancesCmd()
cmd.id = lb_rule.id
lb_instance_rules = self.apiclient.listLoadBalancerRuleInstances(cmd)
lb_instance_rules = list_lb_instances(
self.apiclient,
id=lb_rule.id
)
self.assertNotEqual(
len(lb_instance_rules),
@ -661,7 +675,7 @@ class TestLoadBalancingRule(cloudstackTestCase):
ssh_1 = remoteSSHClient.remoteSSHClient(
self.non_src_nat_ip.ipaddress.ipaddress,
self.services['natrule']["publicport"],
self.services['lbrule']["publicport"],
self.vm_1.username,
self.vm_1.password
)
@ -672,7 +686,7 @@ class TestLoadBalancingRule(cloudstackTestCase):
time.sleep(20)
ssh_2 = remoteSSHClient.remoteSSHClient(
self.non_src_nat_ip.ipaddress.ipaddress,
self.services['natrule']["publicport"],
self.services['lbrule']["publicport"],
self.vm_1.username,
self.vm_1.password
)
@ -712,7 +726,7 @@ class TestRebootRouter(cloudstackTestCase):
self.services = Services().services
# Get Zone, Domain and templates
self.zone = get_zone(self.apiclient)
self.zone = get_zone(self.apiclient, self.services)
template = get_template(
self.apiclient,
self.zone.id,
@ -738,10 +752,12 @@ class TestRebootRouter(cloudstackTestCase):
serviceofferingid=self.service_offering.id
)
cmd = listPublicIpAddresses.listPublicIpAddressesCmd()
cmd.account = self.account.account.name
cmd.domainid = self.account.account.domainid
src_nat_ip_addr = self.apiclient.listPublicIpAddresses(cmd)[0]
src_nat_ip_addrs = list_publicIP(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
src_nat_ip_addr = src_nat_ip_addrs[0]
self.public_ip = PublicIPAddress.create(
self.apiclient,
@ -782,10 +798,12 @@ class TestRebootRouter(cloudstackTestCase):
# still works through the sourceNAT Ip
#Retrieve router for the user account
cmd = listRouters.listRoutersCmd()
cmd.account = self.account.account.name
cmd.domainid = self.account.account.domainid
routers = self.apiclient.listRouters(cmd)
routers = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
router = routers[0]
cmd = rebootRouter.rebootRouterCmd()
@ -820,7 +838,7 @@ class TestAssignRemoveLB(cloudstackTestCase):
self.apiclient = self.testClient.getApiClient()
self.services = Services().services
# Get Zone, Domain and templates
self.zone = get_zone(self.apiclient)
self.zone = get_zone(self.apiclient, self.services)
template = get_template(
self.apiclient,
self.zone.id,
@ -883,10 +901,12 @@ class TestAssignRemoveLB(cloudstackTestCase):
#3. verify ssh attempts should pass as long as there
# is at least one instance associated with the rule
cmd = listPublicIpAddresses.listPublicIpAddressesCmd()
cmd.account = self.account.account.name
cmd.domainid = self.account.account.domainid
self.non_src_nat_ip = self.apiclient.listPublicIpAddresses(cmd)[0]
src_nat_ip_addrs = list_publicIP(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.non_src_nat_ip = src_nat_ip_addrs[0]
lb_rule = LoadBalancerRule.create(
self.apiclient,
@ -899,20 +919,20 @@ class TestAssignRemoveLB(cloudstackTestCase):
#Create SSH client for each VM
ssh_1 = remoteSSHClient.remoteSSHClient(
self.non_src_nat_ip.ipaddress,
self.services["natrule"]["publicport"],
self.services["lbrule"]["publicport"],
self.vm_1.username,
self.vm_1.password
)
ssh_2 = remoteSSHClient.remoteSSHClient(
self.non_src_nat_ip.ipaddress,
self.services["natrule"]["publicport"],
self.services["lbrule"]["publicport"],
self.vm_2.username,
self.vm_2.password
)
ssh_3 = remoteSSHClient.remoteSSHClient(
self.non_src_nat_ip.ipaddress,
self.services["natrule"]["publicport"],
self.services["lbrule"]["publicport"],
self.vm_3.username,
self.vm_3.password
)
@ -974,7 +994,7 @@ class TestReleaseIP(cloudstackTestCase):
self.services = Services().services
# Get Zone, Domain and templates
self.zone = get_zone(self.apiclient)
self.zone = get_zone(self.apiclient, self.services)
template = get_template(
self.apiclient,
self.zone.id,
@ -1009,10 +1029,12 @@ class TestReleaseIP(cloudstackTestCase):
self.account.account.domainid
)
cmd = listPublicIpAddresses.listPublicIpAddressesCmd()
cmd.account = self.account.account.name
cmd.domainid = self.account.account.domainid
self.ip_addr = self.apiclient.listPublicIpAddresses(cmd)[0]
ip_addrs = list_publicIP(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.ip_addr = ip_addrs[0]
self.nat_rule = NATRule.create(
self.apiclient,
@ -1041,9 +1063,10 @@ class TestReleaseIP(cloudstackTestCase):
self.ip_address.delete(self.apiclient)
# ListPublicIpAddresses should not list deleted Public IP address
cmd = listPublicIpAddresses.listPublicIpAddressesCmd()
cmd.id = self.ip_addr.id
list_pub_ip_addr_resp = self.apiclient.listPublicIpAddresses(cmd)
list_pub_ip_addr_resp = list_publicIP(
self.apiclient,
id=self.ip_addr.id
)
self.assertEqual(
list_pub_ip_addr_resp,
@ -1053,24 +1076,24 @@ class TestReleaseIP(cloudstackTestCase):
# ListPortForwardingRules should not list
# associated rules with Public IP address
cmd = listPortForwardingRules.listPortForwardingRulesCmd()
cmd.id = self.nat_rule.id
list_nat_rules = self.apiclient.listPortForwardingRules(cmd)
list_nat_rule = list_nat_rules(
self.apiclient,
id=self.nat_rule.id
)
self.assertEqual(
list_nat_rules,
list_nat_rule,
None,
"Check if PF rules are no longer available for IP address"
)
# listLoadBalancerRules should not list
# associated rules with Public IP address
cmd = listLoadBalancerRules.listLoadBalancerRulesCmd()
cmd.id = self.lb_rule.id
list_lb_rules = self.apiclient.listLoadBalancerRules(cmd)
list_lb_rule = list_lb_rules(
self.apiclient,
id=self.lb_rule.id
)
self.assertEqual(
list_lb_rules,
list_lb_rule,
None,
"Check if LB rules for IP Address are no longer available"
)
@ -1094,7 +1117,7 @@ class TestDeleteAccount(cloudstackTestCase):
self.services = Services().services
# Get Zone, Domain and templates
self.zone = get_zone(self.apiclient)
self.zone = get_zone(self.apiclient, self.services)
template = get_template(
self.apiclient,
self.zone.id,
@ -1120,10 +1143,12 @@ class TestDeleteAccount(cloudstackTestCase):
serviceofferingid=self.service_offering.id
)
cmd = listPublicIpAddresses.listPublicIpAddressesCmd()
cmd.account = self.account.account.name
cmd.domainid = self.account.account.domainid
src_nat_ip_addr = self.apiclient.listPublicIpAddresses(cmd)[0]
src_nat_ip_addrs = list_publicIP(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
src_nat_ip_addr = src_nat_ip_addrs[0]
self.lb_rule = LoadBalancerRule.create(
self.apiclient,
@ -1157,28 +1182,30 @@ class TestDeleteAccount(cloudstackTestCase):
# ListLoadBalancerRules should not list
# associated rules with deleted account
cmd = listLoadBalancerRules.listLoadBalancerRulesCmd()
cmd.account = self.account.account.name
cmd.domainid = self.account.account.domainid
# Unable to find account testuser1 in domain 1 : Exception
with self.assertRaises(Exception):
self.apiclient.listLoadBalancerRules(cmd)
list_lb_rules(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
# ListPortForwardingRules should not
# list associated rules with deleted account
cmd = listPortForwardingRules.listPortForwardingRulesCmd()
cmd.account = self.account.account.name
cmd.domainid = self.account.account.domainid
with self.assertRaises(Exception):
self.apiclient.listPortForwardingRules(cmd)
list_nat_rules(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
#Retrieve router for the user account
cmd = listRouters.listRoutersCmd()
cmd.account = self.account.account.name
cmd.domainid = self.account.account.domainid
with self.assertRaises(Exception):
routers = self.apiclient.listRouters(cmd)
list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
return
def tearDown(self):

View File

@ -7,8 +7,9 @@
#Import Local Modules
from cloudstackTestCase import *
from cloudstackAPI import *
from utils import *
from base import *
from testcase.libs.utils import *
from testcase.libs.base import *
from testcase.libs.common import *
#Import System modules
import time
@ -24,19 +25,16 @@ class Services:
"url": "nfs://192.168.100.131/Primary",
# Format: File_System_Type/Location/Path
"name": "Primary XEN",
"clusterid": 1, # XEN Cluster
"hypervisor": 'XEN',
},
1: {
"url": "nfs://192.168.100.131/Primary",
"name": "Primary KVM",
"clusterid": 40, # KVM Cluster
"hypervisor": 'KVM',
},
2: {
"url": "nfs://192.168.100.131/Primary",
"name": "Primary VMWare",
"clusterid": 33, # VMWare Cluster
"hypervisor": 'VMWare',
},
},
@ -45,16 +43,12 @@ class Services:
"url": "iscsi://192.168.100.21/iqn.2012-01.localdomain.clo-cstack-cos6:iser/1",
# Format : iscsi://IP Address/IQN number/LUN#
"name": "Primary iSCSI",
"clusterid": 1, # XEN Cluster
"hypervisor": 'XEN',
},
1: {
"url": "iscsi://192.168.100.21/export",
"name": "Primary KVM",
"clusterid": 1, # KVM Cluster
"hypervisor": 'KVM',
},
},
"zoneid": 2,
# Optional, if specified the mentioned zone will be
# used for tests
}
class TestPrimaryStorageServices(cloudstackTestCase):
@ -65,7 +59,7 @@ class TestPrimaryStorageServices(cloudstackTestCase):
self.services = Services().services
self.cleanup = []
# Get Zone and pod
self.zone = get_zone(self.apiclient)
self.zone = get_zone(self.apiclient, self.services)
self.pod = get_pod(self.apiclient, self.zone.id)
self.services["nfs"][0]["zoneid"] = self.zone.id
@ -77,11 +71,7 @@ class TestPrimaryStorageServices(cloudstackTestCase):
self.services["nfs"][2]["podid"] = self.pod.id
self.services["iscsi"][0]["zoneid"] = self.zone.id
self.services["iscsi"][1]["zoneid"] = self.zone.id
self.services["iscsi"][0]["podid"] = self.pod.id
self.services["iscsi"][1]["podid"] = self.pod.id
return
def tearDown(self):
@ -106,18 +96,28 @@ class TestPrimaryStorageServices(cloudstackTestCase):
#Create NFS storage pools with on XEN/KVM/VMWare clusters
for k, v in self.services["nfs"].items():
clusters = list_clusters(
self.apiclient,
zoneid=self.zone.id,
hypervisortype=v["hypervisor"]
)
cluster = clusters[0]
#Host should be present before adding primary storage
cmd = listHosts.listHostsCmd()
cmd.clusterid = v["clusterid"]
list_hosts_response = self.apiclient.listHosts(cmd)
list_hosts_response = list_hosts(
self.apiclient,
clusterid=cluster.id
)
self.assertNotEqual(
len(list_hosts_response),
0,
"Check list Hosts for hypervisor: " + v["hypervisor"]
)
len(list_hosts_response),
0,
"Check list Hosts for hypervisor: " + v["hypervisor"]
)
storage = StoragePool.create(self.apiclient, v)
storage = StoragePool.create(self.apiclient,
v,
clusterid=cluster.id
)
self.cleanup.append(storage)
self.assertEqual(
@ -132,11 +132,11 @@ class TestPrimaryStorageServices(cloudstackTestCase):
"Check storage pool type for hypervisor : " + v["hypervisor"]
)
#Verify List Storage pool Response has newly added storage pool
cmd = listStoragePools.listStoragePoolsCmd()
cmd.id = storage.id
storage_pools_response = self.apiclient.listStoragePools(cmd)
#Verify List Storage pool Response has newly added storage pool
storage_pools_response = list_storage_pools(
self.apiclient,
id=storage.id,
)
self.assertNotEqual(
len(storage_pools_response),
0,
@ -160,7 +160,17 @@ class TestPrimaryStorageServices(cloudstackTestCase):
# Create iSCSI storage pools with on XEN/KVM clusters
for k, v in self.services["iscsi"].items():
storage = StoragePool.create(self.apiclient, v)
clusters = list_clusters(
self.apiclient,
zoneid=self.zone.id,
hypervisortype=v["hypervisor"]
)
cluster = clusters[0]
storage = StoragePool.create(self.apiclient,
v,
clusterid=cluster.id
)
self.cleanup.append(storage)
self.assertEqual(
@ -170,9 +180,10 @@ class TestPrimaryStorageServices(cloudstackTestCase):
)
#Verify List Storage pool Response has newly added storage pool
cmd = listStoragePools.listStoragePoolsCmd()
cmd.id = storage.id
storage_pools_response = self.apiclient.listStoragePools(cmd)
storage_pools_response = list_storage_pools(
self.apiclient,
id=storage.id,
)
self.assertNotEqual(
len(storage_pools_response),

View File

@ -8,8 +8,9 @@
from cloudstackTestCase import *
from cloudstackAPI import *
import remoteSSHClient
from utils import *
from base import *
from testcase.libs.utils import *
from testcase.libs.base import *
from testcase.libs.common import *
#Import System modules
import time
@ -22,8 +23,8 @@ class Services:
def __init__(self):
self.services = {
"service_offering": {
"name": "Tiny Service Offering",
"displaytext": "Tiny service offering",
"name": "Tiny Instance",
"displaytext": "Tiny Instance",
"cpunumber": 1,
"cpuspeed": 100, # in MHz
"memory": 64, # In MBs
@ -32,22 +33,26 @@ class Services:
{
"displayname": "Test VM",
"username": "root",
"password": "password",
"password": "fr3sca",
"ssh_port": 22,
"hypervisor": 'VMWare',
"hypervisor": 'XenServer',
"domainid": 1,
"privateport": 22,
"publicport": 22,
"protocol": 'TCP',
},
"account": {
"email": "test@test.com",
"firstname": "Test",
"lastname": "User",
"username": "testuser",
"password": "fr3sca",
"email": "test@test.com",
"firstname": "Test",
"lastname": "User",
"username": "testuser",
"password": "fr3sca",
},
"ostypeid":12,
"zoneid": 1,
# Optional, if specified the mentioned zone will be
# used for tests
"mode": 'advanced', #Networking mode: Basic, Advanced
}
@ -59,7 +64,7 @@ class TestRouterServices(cloudstackTestCase):
cls.api_client = fetch_api_client()
cls.services = Services().services
# Get Zone, Domain and templates
cls.zone = get_zone(cls.api_client)
cls.zone = get_zone(cls.api_client, cls.services)
template = get_template(
cls.api_client,
cls.zone.id,
@ -70,8 +75,7 @@ class TestRouterServices(cloudstackTestCase):
#Create an account, network, VM and IP addresses
cls.account = Account.create(
cls.api_client,
cls.services["account"],
admin=True
cls.services["account"]
)
cls.service_offering = ServiceOffering.create(
cls.api_client,
@ -115,8 +119,11 @@ class TestRouterServices(cloudstackTestCase):
# 2. router will have dns same as that seen in listZones
# 3. router will have a guestIP and a linkLocalIp"
cmd = listRouters.listRoutersCmd()
list_router_response = self.apiclient.listRouters(cmd)
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertNotEqual(
len(list_router_response),
@ -130,9 +137,11 @@ class TestRouterServices(cloudstackTestCase):
"Check list router response for router state"
)
cmd = listZones.listZonesCmd()
cmd.zoneid = router.zoneid
zone = self.apiclient.listZones(cmd)[0]
zones = list_zones(
self.apiclient,
id=router.zoneid
)
zone = zones[0]
self.assertEqual(
router.dns1,
@ -166,10 +175,11 @@ class TestRouterServices(cloudstackTestCase):
# 2. router will have dns and gateway as in listZones, listVlanIpRanges
# 3. router will have guest,public and linklocal IPs
cmd = listRouters.listRoutersCmd()
cmd.account = self.account.account.name
cmd.domainid = self.account.account.domainid
list_router_response = self.apiclient.listRouters(cmd)
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertNotEqual(
len(list_router_response),
@ -183,9 +193,11 @@ class TestRouterServices(cloudstackTestCase):
"Check list router response for router state"
)
cmd = listZones.listZonesCmd()
cmd.zoneid = router.zoneid
zone = self.apiclient.listZones(cmd)[0]
zones = list_zones(
self.apiclient,
id=router.zoneid
)
zone = zones[0]
self.assertEqual(
router.dns1,
@ -210,13 +222,14 @@ class TestRouterServices(cloudstackTestCase):
)
#Fetch corresponding ip ranges information from listVlanIpRanges
cmd = listVlanIpRanges.listVlanIpRangesCmd()
cmd.id = router.zoneid
ipranges_response = self.apiclient.listVlanIpRanges(cmd)[0]
ipranges_response = list_vlan_ipranges(
self.apiclient,
zoneid=router.zoneid
)
iprange = ipranges_response[0]
self.assertEqual(
router.gateway,
ipranges_response.gateway,
iprange.gateway,
"Check gateway with that of corresponding IP range"
)
return
@ -228,10 +241,12 @@ class TestRouterServices(cloudstackTestCase):
# Validate the following
# 1. listRouter should report the router for the account as stopped
cmd = listRouters.listRoutersCmd()
cmd.account = self.account.account.name
cmd.domainid = self.account.account.domainid
router = self.apiclient.listRouters(cmd)[0]
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
router = list_router_response[0]
#Stop the router
cmd = stopRouter.stopRouterCmd()
@ -239,13 +254,14 @@ class TestRouterServices(cloudstackTestCase):
self.apiclient.stopRouter(cmd)
#List routers to check state of router
cmd = listRouters.listRoutersCmd()
cmd.id = router.id
router_response = self.apiclient.listRouters(cmd)[0]
router_response = list_routers(
self.apiclient,
id=router.id
)
#List router should have router in stopped state
self.assertEqual(
router_response.state,
router_response[0].state,
'Stopped',
"Check list router response for router state"
)
@ -258,10 +274,12 @@ class TestRouterServices(cloudstackTestCase):
# Validate the following
# 1. listRouter should report the router for the account as stopped
cmd = listRouters.listRoutersCmd()
cmd.account = self.account.account.name
cmd.domainid = self.account.account.domainid
router = self.apiclient.listRouters(cmd)[0]
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
router = list_router_response[0]
#Start the router
cmd = startRouter.startRouterCmd()
@ -269,13 +287,14 @@ class TestRouterServices(cloudstackTestCase):
self.apiclient.startRouter(cmd)
#List routers to check state of router
cmd = listRouters.listRoutersCmd()
cmd.id = router.id
router_response = self.apiclient.listRouters(cmd)[0]
router_response = list_routers(
self.apiclient,
id=router.id
)
#List router should have router in running state
self.assertEqual(
router_response.state,
router_response[0].state,
'Running',
"Check list router response for router state"
)
@ -288,10 +307,12 @@ class TestRouterServices(cloudstackTestCase):
# Validate the following
# 1. listRouter should report the router for the account as stopped
cmd = listRouters.listRoutersCmd()
cmd.account = self.account.account.name
cmd.domainid = self.account.account.domainid
router = self.apiclient.listRouters(cmd)[0]
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
router = list_router_response[0]
public_ip = router.publicip
@ -301,19 +322,20 @@ class TestRouterServices(cloudstackTestCase):
self.apiclient.rebootRouter(cmd)
#List routers to check state of router
cmd = listRouters.listRoutersCmd()
cmd.id = router.id
router_response = self.apiclient.listRouters(cmd)[0]
router_response = list_routers(
self.apiclient,
id=router.id
)
#List router should have router in running state and same public IP
self.assertEqual(
router_response.state,
router_response[0].state,
'Running',
"Check list router response for router state"
)
self.assertEqual(
router_response.publicip,
router_response[0].publicip,
public_ip,
"Check list router response for router public IP"
)
@ -329,10 +351,11 @@ class TestRouterServices(cloudstackTestCase):
# 3. After network.gc.interval, router should be stopped
# 4. ListRouters should return the router in Stopped state
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.account = self.account.account.name
cmd.domainid = self.account.account.domainid
list_vms = self.apiclient.listVirtualMachines(cmd)
list_vms = list_virtual_machines(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.assertNotEqual(
len(list_vms),
@ -346,18 +369,23 @@ class TestRouterServices(cloudstackTestCase):
cmd.id = vm.id
self.apiclient.stopVirtualMachine(cmd)
# Wait for network.gc.interval
cmd = listConfigurations.listConfigurationsCmd()
cmd.name = 'network.gc.interval'
response = self.apiclient.listConfigurations(cmd)[0]
config = list_configurations(
self.apiclient,
name='network.gc.interval'
)
time.sleep(int(response.value) * 2)
response = config[0]
# Wait for network.gc.interval * 2 time
time.sleep(int(response.value) * 3)
#Check status of network router
cmd = listRouters.listRoutersCmd()
cmd.account = self.account.account.name
cmd.domainid = self.account.account.domainid
router = self.apiclient.listRouters(cmd)[0]
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
router = list_router_response[0]
self.assertEqual(
router.state,
@ -375,41 +403,31 @@ class TestRouterServices(cloudstackTestCase):
# by checking status of dnsmasq process
# Find router associated with user account
cmd = listRouters.listRoutersCmd()
cmd.account = self.account.account.name
cmd.domainid = self.account.account.domainid
router = self.apiclient.listRouters(cmd)[0]
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
router = list_router_response[0]
cmd = listHosts.listHostsCmd()
cmd.zoneid = router.zoneid
cmd.type = 'Routing'
cmd.state = 'Up'
host = self.apiclient.listHosts(cmd)[0]
hosts = list_hosts(
self.apiclient,
zoneid=router.zoneid,
type='Routing',
state='Up'
)
host = hosts[0]
#SSH to the machine
ssh = remoteSSHClient.remoteSSHClient(
host.ipaddress,
self.services['virtual_machine']["publicport"],
self.vm_1.username,
self.vm_1.password
)
ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s "\
% router.linklocalip
# Double hop into router
timeout = 5
# Ensure the SSH login is successful
c = ssh_command + "service dnsmasq status"
while True:
res = ssh.execute(c)[0]
if res != "Host key verification failed.":
break
elif timeout == 0:
break
time.sleep(5)
timeout = timeout - 1
result = get_process_status(
host.ipaddress,
self.services['virtual_machine']["publicport"],
self.vm_1.username,
self.vm_1.password,
router.linklocalip,
"service dnsmasq status"
)
self.assertEqual(
res.count("is running"),
result.count("running"),
1,
"Check dnsmasq service is running or not"
)
@ -424,58 +442,45 @@ class TestRouterServices(cloudstackTestCase):
# 3. dnsmasq, haproxy processes should be running
# Find router associated with user account
cmd = listRouters.listRoutersCmd()
cmd.account = self.account.account.name
cmd.domainid = self.services["virtual_machine"]["domainid"]
router = self.apiclient.listRouters(cmd)[0]
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
router = list_router_response[0]
cmd = listHosts.listHostsCmd()
cmd.zoneid = router.zoneid
cmd.type = 'Routing'
cmd.state = 'Up'
host = self.apiclient.listHosts(cmd)[0]
hosts = list_hosts(
self.apiclient,
zoneid=router.zoneid,
type='Routing',
state='Up'
)
host = hosts[0]
#SSH to the machine
ssh = remoteSSHClient.remoteSSHClient(
result = get_process_status(
host.ipaddress,
self.services['virtual_machine']["publicport"],
self.vm_1.username,
self.vm_1.password
self.vm_1.password,
router.linklocalip,
"service dnsmasq status"
)
ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s "\
% router.linklocalip
# Double hop into router
timeout = 5
# Ensure the SSH login is successful
c = ssh_command + "service dnsmasq status"
while True:
res = ssh.execute(c)[0]
if res != "Host key verification failed.":
break
elif timeout == 0:
break
time.sleep(5)
timeout = timeout - 1
self.assertEqual(
res.count("running"),
result.count("running"),
1,
"Check dnsmasq service is running or not"
)
timeout = 5
# Ensure the SSH login is successful
c = ssh_command + "service haproxy status"
while True:
res = ssh.execute(c)[0]
if res != "Host key verification failed.":
break
elif timeout == 0:
break
time.sleep(5)
timeout = timeout - 1
result = get_process_status(
host.ipaddress,
self.services['virtual_machine']["publicport"],
self.vm_1.username,
self.vm_1.password,
router.linklocalip,
"service haproxy status"
)
self.assertEqual(
res.count("running"),
result.count("running"),
1,
"Check haproxy service is running or not"
)
@ -491,18 +496,22 @@ class TestRouterServices(cloudstackTestCase):
# all it's services should resume
# Find router associated with user account
cmd = listRouters.listRoutersCmd()
cmd.account = self.account.account.name
cmd.domainid = self.services["virtual_machine"]["domainid"]
router = self.apiclient.listRouters(cmd)[0]
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
router = list_router_response[0]
#Store old values before restart
old_linklocalip = router.linklocalip
cmd = listNetworks.listNetworksCmd()
cmd.account = self.account.account.name
cmd.domainid = self.services["virtual_machine"]["domainid"]
network = self.apiclient.listNetworks(cmd)[0]
networks = list_networks(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
network = networks[0]
cmd = restartNetwork.restartNetworkCmd()
cmd.id = network.id
@ -510,10 +519,12 @@ class TestRouterServices(cloudstackTestCase):
self.apiclient.restartNetwork(cmd)
# Get router details after restart
cmd = listRouters.listRoutersCmd()
cmd.account = self.account.account.name
cmd.domainid = self.services["virtual_machine"]["domainid"]
router = self.apiclient.listRouters(cmd)[0]
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
router = list_router_response[0]
self.assertNotEqual(
router.linklocalip,
@ -531,10 +542,12 @@ class TestRouterServices(cloudstackTestCase):
# all services inside the router are restarted
# 2. check 'uptime' to see if the actual restart happened
cmd = listNetworks.listNetworksCmd()
cmd.account = self.account.account.name
cmd.domainid = self.services["virtual_machine"]["domainid"]
network = self.apiclient.listNetworks(cmd)[0]
networks = list_networks(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
network = networks[0]
cmd = restartNetwork.restartNetworkCmd()
cmd.id = network.id
@ -542,38 +555,29 @@ class TestRouterServices(cloudstackTestCase):
self.apiclient.restartNetwork(cmd)
# Get router details after restart
cmd = listRouters.listRoutersCmd()
cmd.account = self.account.account.name
cmd.domainid = self.services["virtual_machine"]["domainid"]
router = self.apiclient.listRouters(cmd)[0]
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
router = list_router_response[0]
cmd = listHosts.listHostsCmd()
cmd.zoneid = router.zoneid
cmd.type = 'Routing'
cmd.state = 'Up'
host = self.apiclient.listHosts(cmd)[0]
hosts = list_hosts(
self.apiclient,
zoneid=router.zoneid,
type='Routing',
state='Up'
)
host = hosts[0]
#SSH to the machine
ssh = remoteSSHClient.remoteSSHClient(
res = get_process_status(
host.ipaddress,
self.services['virtual_machine']["publicport"],
self.vm_1.username,
self.vm_1.password
self.vm_1.password,
router.linklocalip,
"uptime"
)
ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s uptime"\
% router.linklocalip
# Double hop into router to check router uptime
timeout = 5
while True:
res = ssh.execute(ssh_command)[0]
if res != "Host key verification failed.":
break
elif timeout == 0:
break
time.sleep(5)
timeout = timeout - 1
# res = 12:37:14 up 1 min, 0 users, load average: 0.61, 0.22, 0.08
# Split result to check the uptime
result = res.split()

View File

@ -7,8 +7,9 @@
#Import Local Modules
from cloudstackTestCase import *
from cloudstackAPI import *
from utils import *
from base import *
from testcase.libs.utils import *
from testcase.libs.base import *
from testcase.libs.common import *
#Import System modules
import time
@ -39,6 +40,9 @@ class Services:
},
"sleep": 180,
"timeout": 5,
"zoneid": 1,
# Optional, if specified the mentioned zone will be
# used for tests
}
class TestSecStorageServices(cloudstackTestCase):
@ -49,7 +53,7 @@ class TestSecStorageServices(cloudstackTestCase):
self.cleanup = []
self.services = Services().services
# Get Zone and pod
self.zone = get_zone(self.apiclient)
self.zone = get_zone(self.apiclient, self.services)
self.pod = get_pod(self.apiclient, self.zone.id)
self.services["storage"]["zoneid"] = self.zone.id
@ -91,10 +95,11 @@ class TestSecStorageServices(cloudstackTestCase):
"Check zoneid where sec storage is added"
)
cmd = listHosts.listHostsCmd()
cmd.type = 'SecondaryStorage'
cmd.id = sec_storage.id
list_hosts_response = self.apiclient.listHosts(cmd)
list_hosts_response = list_hosts(
self.apiclient,
type='SecondaryStorage',
id=sec_storage.id
)
self.assertNotEqual(
len(list_hosts_response),
@ -125,11 +130,12 @@ class TestSecStorageServices(cloudstackTestCase):
# in UP state
# 3. verify that secondary storage was added successfully
cmd = listHosts.listHostsCmd()
cmd.type = 'Routing'
cmd.zoneid = self.zone.id
cmd.podid = self.pod.id
list_hosts_response = self.apiclient.listHosts(cmd)
list_hosts_response = list_hosts(
self.apiclient,
type='Routing',
zoneid=self.zone.id,
podid=self.pod.id
)
# ListHosts has all 'routing' hosts in UP state
self.assertNotEqual(
len(list_hosts_response),
@ -144,12 +150,11 @@ class TestSecStorageServices(cloudstackTestCase):
)
# ListStoragePools shows all primary storage pools in UP state
cmd = listStoragePools.listStoragePoolsCmd()
cmd.name = 'Primary'
cmd.zoneid = self.zone.id
cmd.podid = self.pod.id
list_storage_response = self.apiclient.listStoragePools(cmd)
list_storage_response = list_storage_pools(
self.apiclient,
zoneid=self.zone.id,
podid=self.pod.id
)
self.assertNotEqual(
len(list_storage_response),
0,
@ -164,13 +169,14 @@ class TestSecStorageServices(cloudstackTestCase):
)
# Secondary storage is added successfully
cmd = listHosts.listHostsCmd()
cmd.type = 'SecondaryStorage'
cmd.zoneid = self.zone.id
timeout = self.services["timeout"]
while True:
list_hosts_response = self.apiclient.listHosts(cmd)
list_hosts_response = list_hosts(
self.apiclient,
type='SecondaryStorage',
zoneid=self.zone.id,
)
if not list_hosts_response:
# Sleep to ensure Secondary storage is Up
time.sleep(int(self.services["sleep"]))
@ -192,15 +198,15 @@ class TestSecStorageServices(cloudstackTestCase):
"Check state of secondary storage"
)
cmd = listSystemVms.listSystemVmsCmd()
cmd.systemvmtype = 'secondarystoragevm'
cmd.zoneid = self.zone.id
cmd.podid = self.pod.id
timeout = self.services["timeout"]
while True:
list_ssvm_response = self.apiclient.listSystemVms(cmd)
list_ssvm_response = list_ssvms(
self.apiclient,
systemvmtype='secondarystoragevm',
zoneid=self.zone.id,
podid=self.pod.id
)
if not list_ssvm_response:
# Sleep to ensure SSVMs are Up and Running
time.sleep(int(self.services["sleep"]))
@ -234,31 +240,34 @@ class TestSecStorageServices(cloudstackTestCase):
for k, v in self.services["hypervisors"].items():
cmd = listTemplates.listTemplatesCmd()
cmd.hypervisor = v["hypervisor"]
cmd.zoneid = self.zone.id
cmd.templatefilter = v["templatefilter"]
list_templates = self.apiclient.listTemplates(cmd)
list_template_response = list_templates(
self.apiclient,
hypervisor=v["hypervisor"],
zoneid=self.zone.id,
templatefilter=v["templatefilter"]
)
# Ensure all BUILTIN templates are downloaded
templateid = None
for template in list_templates:
for template in list_template_response:
if template.templatetype == "BUILTIN":
templateid = template.id
cmd = listTemplates.listTemplatesCmd()
cmd.id = templateid
cmd.templatefilter = v["templatefilter"]
cmd.zoneid = self.zone.id
# Wait to start a downloadin of template
time.sleep(self.services["sleep"])
while True and (templateid != None):
template = self.apiclient.listTemplates(cmd)[0]
template_response = list_templates(
self.apiclient,
id=templateid,
zoneid=self.zone.id,
templatefilter=v["templatefilter"]
)
template = template_response[0]
# If template is ready,
# template.status = Download Complete
# Downloading - x% Downloaded
# Error - Any other string
if template.status == 'Download Complete' :
break
elif 'Downloaded' not in template.status.split():
@ -268,7 +277,13 @@ class TestSecStorageServices(cloudstackTestCase):
#Ensuring the template is in ready state
time.sleep(30)
template = self.apiclient.listTemplates(cmd)[0]
template_response = list_templates(
self.apiclient,
id=templateid,
zoneid=self.zone.id,
templatefilter=v["templatefilter"]
)
template = template_response[0]
self.assertEqual(
template.isready,

View File

@ -8,8 +8,9 @@
#Import Local Modules
from cloudstackTestCase import *
from cloudstackAPI import *
from utils import *
from base import *
from testcase.libs.utils import *
from testcase.libs.base import *
from testcase.libs.common import *
class Services:
@ -60,10 +61,10 @@ class TestCreateServiceOffering(cloudstackTestCase):
)
self.cleanup.append(service_offering)
cmd = listServiceOfferings.listServiceOfferingsCmd()
cmd.id = service_offering.id
list_service_response = self.apiclient.listServiceOfferings(cmd)
list_service_response = list_service_offering(
self.apiclient,
id=service_offering.id
)
self.assertNotEqual(
len(list_service_response),
0,
@ -163,9 +164,10 @@ class TestServiceOfferings(cloudstackTestCase):
cmd.name = random_name
self.apiclient.updateServiceOffering(cmd)
cmd = listServiceOfferings.listServiceOfferingsCmd()
cmd.id = self.service_offering_1.id
list_service_response = self.apiclient.listServiceOfferings(cmd)
list_service_response = list_service_offering(
self.apiclient,
id=self.service_offering_1.id
)
self.assertNotEqual(
len(list_service_response),
@ -193,14 +195,12 @@ class TestServiceOfferings(cloudstackTestCase):
# 1. deleteServiceOffering should return
# a valid information for newly created offering
cmd = deleteServiceOffering.deleteServiceOfferingCmd()
#Add the required parameters required to call for API
cmd.id = self.service_offering_2.id
self.apiclient.deleteServiceOffering(cmd)
self.service_offering_2.delete(self.apiclient)
cmd = listServiceOfferings.listServiceOfferingsCmd()
cmd.id = self.service_offering_2.id
list_service_response = self.apiclient.listServiceOfferings(cmd)
list_service_response = list_service_offering(
self.apiclient,
id=self.service_offering_2.id
)
self.assertEqual(
list_service_response,

File diff suppressed because it is too large Load Diff

View File

@ -8,8 +8,9 @@
from cloudstackTestCase import *
from cloudstackAPI import *
import remoteSSHClient
from utils import *
from base import *
from testcase.libs.utils import *
from testcase.libs.base import *
from testcase.libs.common import *
import telnetlib
#Import System modules
@ -21,11 +22,7 @@ class Services:
def __init__(self):
self.services = {
"ssvm": {
"id": 1, # SSVM ID in particular zone
},
"cpvm": {
"id": 2, # CPVM ID in particular zone
"mgmtserverIP": '192.168.100.154' # For Telnet
},
"host": {
@ -33,6 +30,10 @@ class Services:
"password": 'fr3sca',
"publicport": 22,
},
"zoneid": 1,
# Optional, if specified the mentioned zone will be
# used for tests
"sleep": 120,
}
class TestSSVMs(cloudstackTestCase):
@ -42,9 +43,7 @@ class TestSSVMs(cloudstackTestCase):
self.apiclient = self.testClient.getApiClient()
self.cleanup = []
self.services = Services().services
self.zone = get_zone(self.apiclient)
self.services["ssvm"]["zoneid"] = self.zone.id
self.services["cpvm"]["zoneid"] = self.zone.id
self.zone = get_zone(self.apiclient, self.services)
return
def tearDown(self):
@ -70,9 +69,10 @@ class TestSSVMs(cloudstackTestCase):
# the same as the gateway returned by listVlanIpRanges
# 5. DNS entries must match those given for the zone
cmd = listSystemVms.listSystemVmsCmd()
cmd.systemvmtype = 'secondarystoragevm'
list_ssvm_response = self.apiclient.listSystemVms(cmd)
list_ssvm_response = list_ssvms(
self.apiclient,
systemvmtype='secondarystoragevm'
)
#Verify SSVM response
self.assertNotEqual(
@ -81,8 +81,7 @@ class TestSSVMs(cloudstackTestCase):
"Check list System VMs response"
)
cmd = listZones.listZonesCmd()
list_zones_response = self.apiclient.listZones(cmd)
list_zones_response = list_zones(self.apiclient)
self.assertEqual(
len(list_ssvm_response),
@ -118,30 +117,33 @@ class TestSSVMs(cloudstackTestCase):
)
#Fetch corresponding ip ranges information from listVlanIpRanges
cmd = listVlanIpRanges.listVlanIpRangesCmd()
cmd.id = ssvm.zoneid
ipranges_response = self.apiclient.listVlanIpRanges(cmd)[0]
ipranges_response = list_vlan_ipranges(
self.apiclient,
zoneid=ssvm.zoneid
)
iprange = ipranges_response[0]
self.assertEqual(
ssvm.gateway,
ipranges_response.gateway,
iprange.gateway,
"Check gateway with that of corresponding ip range"
)
#Fetch corresponding zone information from listZones
cmd = listZones.listZonesCmd()
cmd.id = ssvm.zoneid
zone_response = self.apiclient.listZones(cmd)[0]
zone_response = list_zones(
self.apiclient,
id=ssvm.zoneid
)
self.assertEqual(
ssvm.dns1,
zone_response.dns1,
zone_response[0].dns1,
"Check DNS1 with that of corresponding zone"
)
self.assertEqual(
ssvm.dns2,
zone_response.dns2,
zone_response[0].dns2,
"Check DNS2 with that of corresponding zone"
)
return
@ -160,9 +162,10 @@ class TestSSVMs(cloudstackTestCase):
# as the gateway returned by listZones
# 5. DNS entries must match those given for the zone
cmd = listSystemVms.listSystemVmsCmd()
cmd.systemvmtype = 'consoleproxy'
list_cpvm_response = self.apiclient.listSystemVms(cmd)
list_cpvm_response = list_ssvms(
self.apiclient,
systemvmtype='consoleproxy'
)
#Verify CPVM response
self.assertNotEqual(
@ -170,9 +173,7 @@ class TestSSVMs(cloudstackTestCase):
0,
"Check list System VMs response"
)
cmd = listZones.listZonesCmd()
list_zones_response = self.apiclient.listZones(cmd)
list_zones_response = list_zones(self.apiclient)
self.assertEqual(
len(list_cpvm_response),
@ -206,30 +207,33 @@ class TestSSVMs(cloudstackTestCase):
"Check whether CPVM has public IP field"
)
#Fetch corresponding ip ranges information from listVlanIpRanges
cmd = listVlanIpRanges.listVlanIpRangesCmd()
cmd.id = cpvm.zoneid
ipranges_response = self.apiclient.listVlanIpRanges(cmd)[0]
ipranges_response = list_vlan_ipranges(
self.apiclient,
zoneid=cpvm.zoneid
)
iprange = ipranges_response[0]
self.assertEqual(
cpvm.gateway,
ipranges_response.gateway,
iprange.gateway,
"Check gateway with that of corresponding ip range"
)
#Fetch corresponding zone information from listZones
cmd = listZones.listZonesCmd()
cmd.id = cpvm.zoneid
zone_response = self.apiclient.listZones(cmd)[0]
zone_response = list_zones(
self.apiclient,
id=cpvm.zoneid
)
self.assertEqual(
cpvm.dns1,
zone_response.dns1,
zone_response[0].dns1,
"Check DNS1 with that of corresponding zone"
)
self.assertEqual(
cpvm.dns2,
zone_response.dns2,
zone_response[0].dns2,
"Check DNS2 with that of corresponding zone"
)
return
@ -246,43 +250,33 @@ class TestSSVMs(cloudstackTestCase):
# 4. If no process is running/multiple process are running
# then the test is a failure
cmd = listHosts.listHostsCmd()
cmd.zoneid = self.services["ssvm"]["zoneid"]
cmd.type = 'Routing'
cmd.state = 'Up'
host = self.apiclient.listHosts(cmd)[0]
hosts = list_hosts(
self.apiclient,
zoneid=self.zone.id,
type='Routing',
state='Up'
)
host = hosts[0]
cmd = listSystemVms.listSystemVmsCmd()
cmd.systemvmtype = 'secondarystoragevm'
cmd.hostid = host.id
ssvm = self.apiclient.listSystemVms(cmd)[0]
list_ssvm_response = list_ssvms(
self.apiclient,
systemvmtype='secondarystoragevm',
hostid=host.id
)
ssvm = list_ssvm_response[0]
self.debug("Cheking cloud process status")
#SSH to the machine
ssh = remoteSSHClient.remoteSSHClient(
host.ipaddress,
self.services['host']["publicport"],
self.services['host']["username"],
self.services['host']["password"]
)
timeout = 5
ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s " \
% ssvm.linklocalip
c = ssh_command + "/usr/local/cloud/systemvm/ssvm-check.sh |grep -e ERROR -e WARNING -e FAIL"
# Ensure the SSH login is successful
while True:
res = ssh.execute(c)[0]
#Output:Tests Complete. Look for ERROR or WARNING above.
if res != "Host key verification failed.":
break
elif timeout == 0:
break
time.sleep(5)
timeout = timeout - 1
result = get_process_status(
host.ipaddress,
self.services['host']["publicport"],
self.services['host']["username"],
self.services['host']["password"],
ssvm.linklocalip,
"/usr/local/cloud/systemvm/ssvm-check.sh |grep -e ERROR -e WARNING -e FAIL"
)
res = str(result)
self.assertEqual(
res.count("ERROR"),
1,
@ -295,18 +289,20 @@ class TestSSVMs(cloudstackTestCase):
"Check for warnings in tests"
)
self.assertEqual(
res.count("FAIL"),
1,
"Check for failed tests"
)
#Check status of cloud service
c = ssh_command + "service cloud status"
res = ssh.execute(c)[0]
result = get_process_status(
host.ipaddress,
self.services['host']["publicport"],
self.services['host']["username"],
self.services['host']["password"],
ssvm.linklocalip,
"service cloud status"
)
res = str(result)
# cloud.com service (type=secstorage) is running: process id: 2346
self.assertEqual(
res.count("is running"),
res.count("running"),
1,
"Check cloud service is running or not"
)
@ -322,52 +318,46 @@ class TestSSVMs(cloudstackTestCase):
# 3. Service cloud status should report cloud agent status to be
# running
cmd = listHosts.listHostsCmd()
cmd.zoneid = self.services["cpvm"]["zoneid"]
cmd.type = 'Routing'
cmd.state = 'Up'
host = self.apiclient.listHosts(cmd)[0]
hosts = list_hosts(
self.apiclient,
zoneid=self.zone.id,
type='Routing',
state='Up'
)
host = hosts[0]
cmd = listSystemVms.listSystemVmsCmd()
cmd.systemvmtype = 'consoleproxy'
cmd.hostid = host.id
cpvm = self.apiclient.listSystemVms(cmd)[0]
list_cpvm_response = list_ssvms(
self.apiclient,
systemvmtype='consoleproxy',
hostid=host.id
)
with assertNotRaises(Exception):
cpvm = list_cpvm_response[0]
try:
telnet = telnetlib.Telnet(
self.services["cpvm"]["mgmtserverIP"] ,
self.services["cpvm"]["mgmtserverIP"],
'8250'
)
except Exception as e:
self.fail(
"Telnet Access failed for %s: %s" % \
(self.services["cpvm"]["mgmtserverIP"], e)
)
self.debug("Cheking cloud process status")
#SSH to the machine
ssh = remoteSSHClient.remoteSSHClient(
host.ipaddress,
self.services['host']["publicport"],
self.services['host']["username"],
self.services['host']["password"]
)
timeout = 5
# Check status of cloud service
ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s " \
% cpvm.linklocalip
c = ssh_command + "service cloud status"
while True:
res = ssh.execute(c)[0]
# Response: cloud.com service (type=secstorage) is
# running: process id: 2346
#Check if double hop is successful
if res != "Host key verification failed.":
break
elif timeout == 0:
break
time.sleep(5)
timeout = timeout - 1
self.debug("Checking cloud process status")
result = get_process_status(
host.ipaddress,
self.services['host']["publicport"],
self.services['host']["username"],
self.services['host']["password"],
cpvm.linklocalip,
"service cloud status"
)
res = str(result)
self.assertEqual(
res.count("is running"),
res.count("running"),
1,
"Check cloud service is running or not"
)
@ -384,16 +374,32 @@ class TestSSVMs(cloudstackTestCase):
# test cases still passing
# 3. If either of the two above steps fail the test is a failure
hosts = list_hosts(
self.apiclient,
zoneid=self.zone.id,
type='Routing',
state='Up'
)
host = hosts[0]
list_ssvm_response = list_ssvms(
self.apiclient,
systemvmtype='secondarystoragevm',
hostid=host.id
)
ssvm = list_ssvm_response[0]
cmd = stopSystemVm.stopSystemVmCmd()
cmd.id = self.services["ssvm"]["id"]
cmd.systemvmtype = 'secondarystoragevm'
cmd.id = ssvm.id
self.apiclient.stopSystemVm(cmd)
#Sleep to ensure that SSVM is properly restarted
time.sleep(90)
cmd = listSystemVms.listSystemVmsCmd()
cmd.id = self.services["ssvm"]["id"]
list_ssvm_response = self.apiclient.listSystemVms(cmd)
time.sleep(self.services["sleep"])
list_ssvm_response = list_ssvms(
self.apiclient,
id=ssvm.id
)
ssvm_response = list_ssvm_response[0]
self.assertEqual(
@ -417,27 +423,34 @@ class TestSSVMs(cloudstackTestCase):
# two test cases still passing
# 3. If either of the two above steps fail the test is a failure
hosts = list_hosts(
self.apiclient,
zoneid=self.zone.id,
type='Routing',
state='Up'
)
host = hosts[0]
list_cpvm_response = list_ssvms(
self.apiclient,
systemvmtype='consoleproxy',
hostid=host.id
)
cpvm = list_cpvm_response[0]
cmd = stopSystemVm.stopSystemVmCmd()
cmd.id = self.services["cpvm"]["id"]
cmd.systemvmtype = 'consoleproxy'
cmd.id = cpvm.id
self.apiclient.stopSystemVm(cmd)
cmd = listSystemVms.listSystemVmsCmd()
cmd.id = self.services["cpvm"]["id"]
time.sleep(self.services["sleep"])
timeout = 10
while True :
list_cpvm_response = self.apiclient.listSystemVms(cmd)
if not list_cpvm_response:
break;
else:
if timeout == 0:
break
#Sleep to ensure that SSVM is properly restarted
time.sleep(10)
timeout = timeout - 1
list_cpvm_response = list_ssvms(
self.apiclient,
id=cpvm.id
)
cpvm_response = list_cpvm_response[0]
self.assertEqual(
cpvm_response.state,
'Running',
@ -457,24 +470,37 @@ class TestSSVMs(cloudstackTestCase):
# before and after reboot
# 3. The cloud process should still be running within the SSVM
cmd = listSystemVms.listSystemVmsCmd()
cmd.id = self.services["ssvm"]["id"]
cmd.systemvmtype = 'secondarystoragevm'
ssvm_response = self.apiclient.listSystemVms(cmd)[0]
hosts = list_hosts(
self.apiclient,
zoneid=self.zone.id,
type='Routing',
state='Up'
)
host = hosts[0]
list_ssvm_response = list_ssvms(
self.apiclient,
systemvmtype='secondarystoragevm',
hostid=host.id
)
ssvm_response = list_ssvm_response[0]
#Store the public & private IP values before reboot
old_public_ip = ssvm_response.publicip
old_private_ip = ssvm_response.privateip
cmd = rebootSystemVm.rebootSystemVmCmd()
cmd.id = self.services["ssvm"]["id"]
cmd.id = ssvm_response.id
self.apiclient.rebootSystemVm(cmd)
#Sleep to ensure that SSVM is properly stopped/started
time.sleep(60)
cmd = listSystemVms.listSystemVmsCmd()
cmd.id = self.services["ssvm"]["id"]
ssvm_response = self.apiclient.listSystemVms(cmd)[0]
time.sleep(self.services["sleep"])
list_ssvm_response = list_ssvms(
self.apiclient,
id=ssvm_response.id
)
ssvm_response = list_ssvm_response[0]
self.assertEqual(
'Running',
@ -506,25 +532,38 @@ class TestSSVMs(cloudstackTestCase):
# the same before and after reboot
# 3. the cloud process should still be running within the CPVM
cmd = listSystemVms.listSystemVmsCmd()
cmd.id = self.services["cpvm"]["id"]
cmd.systemvmtype = 'consoleproxy'
cpvm_response = self.apiclient.listSystemVms(cmd)[0]
hosts = list_hosts(
self.apiclient,
zoneid=self.zone.id,
type='Routing',
state='Up'
)
host = hosts[0]
list_cpvm_response = list_ssvms(
self.apiclient,
systemvmtype='consoleproxy',
hostid=host.id
)
cpvm_response = list_cpvm_response[0]
#Store the public & private IP values before reboot
old_public_ip = cpvm_response.publicip
old_private_ip = cpvm_response.privateip
cmd = rebootSystemVm.rebootSystemVmCmd()
cmd.id = self.services["cpvm"]["id"]
cmd.id = cpvm_response.id
self.apiclient.rebootSystemVm(cmd)
#Sleep to ensure that SSVM is properly stopped/started
time.sleep(60)
time.sleep(self.services["sleep"])
cmd = listSystemVms.listSystemVmsCmd()
cmd.id = self.services["cpvm"]["id"]
cpvm_response = self.apiclient.listSystemVms(cmd)[0]
list_cpvm_response = list_ssvms(
self.apiclient,
id=cpvm_response.id
)
cpvm_response = list_cpvm_response[0]
self.assertEqual(
'Running',
@ -558,10 +597,12 @@ class TestSSVMs(cloudstackTestCase):
# 3. new SSVM will have a public/private and link-local-ip
# 4. cloud process within SSVM must be up and running
cmd = listSystemVms.listSystemVmsCmd()
cmd.zoneid = self.services["ssvm"]["zoneid"]
cmd.systemvmtype = 'secondarystoragevm'
ssvm_response = self.apiclient.listSystemVms(cmd)[0]
list_ssvm_response = list_ssvms(
self.apiclient,
zoneid=self.zone.id,
systemvmtype='secondarystoragevm'
)
ssvm_response = list_ssvm_response[0]
old_name = ssvm_response.name
@ -570,12 +611,14 @@ class TestSSVMs(cloudstackTestCase):
self.apiclient.destroySystemVm(cmd)
#Sleep to ensure that new SSVM is created
time.sleep(60)
time.sleep(self.services["sleep"])
cmd = listSystemVms.listSystemVmsCmd()
cmd.zoneid = self.services["ssvm"]["zoneid"]
cmd.systemvmtype = 'secondarystoragevm'
ssvm_response = self.apiclient.listSystemVms(cmd)[0]
list_ssvm_response = list_ssvms(
self.apiclient,
zoneid=self.zone.id,
systemvmtype='secondarystoragevm'
)
ssvm_response = list_ssvm_response[0]
# Verify Name, Public IP, Private IP and Link local IP
# for newly created SSVM
@ -616,10 +659,12 @@ class TestSSVMs(cloudstackTestCase):
# 3. new CPVM will have a public/private and link-local-ip
# 4. cloud process within CPVM must be up and running
cmd = listSystemVms.listSystemVmsCmd()
cmd.zoneid = self.services["cpvm"]["zoneid"]
cmd.systemvmtype = 'consoleproxy'
cpvm_response = self.apiclient.listSystemVms(cmd)[0]
list_cpvm_response = list_ssvms(
self.apiclient,
systemvmtype='consoleproxy',
zoneid=self.zone.id
)
cpvm_response = list_cpvm_response[0]
old_name = cpvm_response.name
@ -628,12 +673,14 @@ class TestSSVMs(cloudstackTestCase):
self.apiclient.destroySystemVm(cmd)
#Sleep to ensure that new CPVM is created
time.sleep(60)
time.sleep(self.services["sleep"])
cmd = listSystemVms.listSystemVmsCmd()
cmd.zoneid = self.services["cpvm"]["zoneid"]
cmd.systemvmtype = 'consoleproxy'
cpvm_response = self.apiclient.listSystemVms(cmd)[0]
list_cpvm_response = list_ssvms(
self.apiclient,
systemvmtype='consoleproxy',
zoneid=self.zone.id
)
cpvm_response = list_cpvm_response[0]
# Verify Name, Public IP, Private IP and Link local IP
# for newly created CPVM

View File

@ -7,8 +7,9 @@
#Import Local Modules
from cloudstackTestCase import *
from cloudstackAPI import *
from utils import *
from base import *
from testcase.libs.utils import *
from testcase.libs.base import *
from testcase.libs.common import *
import urllib
from random import random
#Import System modules
@ -34,8 +35,8 @@ class Services:
"name": "Tiny Instance",
"displaytext": "Tiny Instance",
"cpunumber": 1,
"cpuspeed": 200, # in MHz
"memory": 256, # In MBs
"cpuspeed": 100, # in MHz
"memory": 64, # In MBs
},
"disk_offering": {
"displaytext": "Small",
@ -49,7 +50,9 @@ class Services:
"protocol": 'TCP',
"ssh_port": 22,
"username": "root",
"password": "password"
"password": "password",
"privateport": 22,
"publicport": 22,
},
"volume": {
"diskname": "Test Volume",
@ -69,13 +72,17 @@ class Services:
"mode": "HTTP_DOWNLOAD",
},
"templatefilter": 'self',
"destzoneid": 2, # For Copy template (Destination zone)
"destzoneid": 5, # For Copy template (Destination zone)
"isfeatured": True,
"ispublic": True,
"isextractable": False,
"bootable": True,
"passwordenabled": True,
"ostypeid": 12,
"zoneid": 1,
# Optional, if specified the mentioned zone will be
# used for tests
"mode": 'advanced', # Networking mode: Advanced, basic
}
@ -104,7 +111,7 @@ class TestCreateTemplate(cloudstackTestCase):
cls.api_client = fetch_api_client()
# Get Zone, Domain and templates
cls.zone = get_zone(cls.api_client)
cls.zone = get_zone(cls.api_client, cls.services)
cls.disk_offering = DiskOffering.create(
cls.api_client,
cls.services["disk_offering"]
@ -121,8 +128,7 @@ class TestCreateTemplate(cloudstackTestCase):
cls.account = Account.create(
cls.api_client,
cls.services["account"],
admin=True
cls.services["account"]
)
cls.services["account"] = cls.account.account.name
@ -136,20 +142,21 @@ class TestCreateTemplate(cloudstackTestCase):
cls.services["virtual_machine"],
templateid=template.id,
accountid=cls.account.account.name,
serviceofferingid=cls.service_offering.id
serviceofferingid=cls.service_offering.id,
mode=cls.services["mode"]
)
#Stop virtual machine
cmd = stopVirtualMachine.stopVirtualMachineCmd()
cmd.id = cls.virtual_machine.id
cls.api_client.stopVirtualMachine(cmd)
cls.virtual_machine.stop(cls.api_client)
#Wait before server has be successfully stopped
time.sleep(30)
cmd = listVolumes.listVolumesCmd()
cmd.virtualmachineid = cls.virtual_machine.id
cmd.type = 'ROOT'
list_volume = cls.api_client.listVolumes(cmd)
list_volume = list_volumes(
cls.api_client,
virtualmachineid=cls.virtual_machine.id,
type='ROOT'
)
cls.volume = list_volume[0]
cls._cleanup = [
cls.virtual_machine,
@ -184,23 +191,25 @@ class TestCreateTemplate(cloudstackTestCase):
#Create template from Virtual machine and Volume ID
template = Template.create(
self.apiclient,
self.volume,
self.services["template_1"]
self.services["template_1"],
self.volume.id
)
self.cleanup.append(template)
cmd = listTemplates.listTemplatesCmd()
cmd.templatefilter = self.services["templatefilter"]
cmd.id = template.id
list_template_response = self.apiclient.listTemplates(cmd)
list_template_response = list_templates(
self.apiclient,
templatefilter=\
self.services["templatefilter"],
id=template.id
)
#Verify template response to check whether template added successfully
template_response = list_template_response[0]
self.assertNotEqual(
len(list_template_response),
0,
"Check template avaliable in List Templates"
)
template_response = list_template_response[0]
self.assertEqual(
template_response.displaytext,
@ -230,7 +239,7 @@ class TestTemplates(cloudstackTestCase):
cls.api_client = fetch_api_client()
# Get Zone, Domain and templates
cls.zone = get_zone(cls.api_client)
cls.zone = get_zone(cls.api_client, cls.services)
cls.disk_offering = DiskOffering.create(
cls.api_client,
cls.services["disk_offering"]
@ -248,8 +257,7 @@ class TestTemplates(cloudstackTestCase):
cls.account = Account.create(
cls.api_client,
cls.services["account"],
admin=True
cls.services["account"]
)
cls.user = Account.create(
@ -269,31 +277,32 @@ class TestTemplates(cloudstackTestCase):
cls.services["virtual_machine"],
templateid=template.id,
accountid=cls.account.account.name,
serviceofferingid=cls.service_offering.id
serviceofferingid=cls.service_offering.id,
mode=cls.services["mode"]
)
#Stop virtual machine
cmd = stopVirtualMachine.stopVirtualMachineCmd()
cmd.id = cls.virtual_machine.id
cls.api_client.stopVirtualMachine(cmd)
cls.virtual_machine.stop(cls.api_client)
#Wait before server has be successfully stopped
time.sleep(30)
cmd = listVolumes.listVolumesCmd()
cmd.virtualmachineid = cls.virtual_machine.id
cmd.type = 'ROOT'
list_volume = cls.api_client.listVolumes(cmd)
list_volume = list_volumes(
cls.api_client,
virtualmachineid=cls.virtual_machine.id,
type='ROOT'
)
cls.volume = list_volume[0]
#Create templates for Edit, Delete & update permissions testcases
cls.template_1 = Template.create(
cls.api_client,
cls.volume,
cls.services["template_1"]
cls.services["template_1"],
cls.volume.id
)
cls.template_2 = Template.create(
cls.api_client,
cls.volume,
cls.services["template_2"]
cls.services["template_2"],
cls.volume.id
)
cls._cleanup = [
cls.template_2,
@ -358,18 +367,18 @@ class TestTemplates(cloudstackTestCase):
self.apiclient.updateTemplate(cmd)
# Verify template response for updated attributes
cmd = listTemplates.listTemplatesCmd()
cmd.templatefilter = self.services["templatefilter"]
cmd.id = self.template_1.id
list_template_response = self.apiclient.listTemplates(cmd)
template_response = list_template_response[0]
list_template_response = list_templates(
self.apiclient,
templatefilter=\
self.services["templatefilter"],
id=self.template_1.id
)
self.assertNotEqual(
len(list_template_response),
0,
"Check template available in List Templates"
)
template_response = list_template_response[0]
self.assertEqual(
template_response.displaytext,
@ -403,11 +412,12 @@ class TestTemplates(cloudstackTestCase):
self.template_1.delete(self.apiclient)
cmd = listTemplates.listTemplatesCmd()
cmd.templatefilter = self.services["templatefilter"]
cmd.id = self.template_1.id
list_template_response = self.apiclient.listTemplates(cmd)
list_template_response = list_templates(
self.apiclient,
templatefilter=\
self.services["templatefilter"],
id=self.template_1.id
)
# Verify template is deleted properly using ListTemplates
self.assertEqual(
list_template_response,
@ -476,12 +486,13 @@ class TestTemplates(cloudstackTestCase):
self.apiclient.updateTemplatePermissions(cmd)
cmd = listTemplates.listTemplatesCmd()
cmd.id = self.template_2.id
cmd.account = self.account.account.name
cmd.domainid = self.account.account.domainid
cmd.templatefilter = 'featured'
list_template_response = self.apiclient.listTemplates(cmd)
list_template_response = list_templates(
self.apiclient,
templatefilter='featured',
id=self.template_2.id,
account=self.account.account.name,
domainid=self.account.account.domainid
)
# Verify template response for updated permissions for normal user
template_response = list_template_response[0]
@ -518,17 +529,19 @@ class TestTemplates(cloudstackTestCase):
self.apiclient.copyTemplate(cmd)
# Verify template is copied to another zone using ListTemplates
cmd = listTemplates.listTemplatesCmd()
cmd.id = self.template_2.id
cmd.templatefilter = self.services["templatefilter"]
list_template_response = self.apiclient.listTemplates(cmd)
template_response = list_template_response[0]
list_template_response = list_templates(
self.apiclient,
templatefilter=\
self.services["templatefilter"],
id=self.template_2.id
)
self.assertNotEqual(
len(list_template_response),
0,
"Check template extracted in List Templates"
)
template_response = list_template_response[0]
self.assertEqual(
template_response.id,
self.template_2.id,
@ -547,12 +560,12 @@ class TestTemplates(cloudstackTestCase):
# Validate the following
# 1. ListTemplates should show only 'public' templates for normal user
cmd = listTemplates.listTemplatesCmd()
cmd.templatefilter = 'featured'
cmd.account = self.user.account.name
cmd.domainid = self.user.account.domainid
list_template_response = self.apiclient.listTemplates(cmd)
list_template_response = list_templates(
self.apiclient,
templatefilter='featured',
account=self.user.account.name,
domainid=self.user.account.domainid
)
self.assertNotEqual(
len(list_template_response),
@ -574,12 +587,12 @@ class TestTemplates(cloudstackTestCase):
# Validate the following
# 1. ListTemplates should not show 'SYSTEM' templates for normal user
cmd = listTemplates.listTemplatesCmd()
cmd.templatefilter = 'featured'
cmd.account = self.user.account.name
cmd.domainid = self.user.account.domainid
list_template_response = self.apiclient.listTemplates(cmd)
list_template_response = list_templates(
self.apiclient,
templatefilter='featured',
account=self.user.account.name,
domainid=self.user.account.domainid
)
self.assertNotEqual(
len(list_template_response),

View File

@ -9,8 +9,9 @@
from cloudstackTestCase import *
from cloudstackAPI import *
import remoteSSHClient
from utils import *
from base import *
from testcase.libs.utils import *
from testcase.libs.base import *
from testcase.libs.common import *
#Import System modules
import time
@ -78,7 +79,7 @@ class Services:
"displaytext": "Small Instance",
"cpunumber": 1,
"cpuspeed": 500,
"memory": 512
"memory": 256
},
"medium":
{
@ -102,10 +103,14 @@ class Services:
},
"diskdevice": '/dev/xvdd',
"mount_dir": "/mnt/tmp",
"hostid": 1,
"hostid": 5,
#Migrate VM to hostid
"ostypeid": 12
"ostypeid": 12,
# CentOS 5.3 (64-bit)
"zoneid": 1,
# Optional, if specified the mentioned zone will be
# used for tests
"mode":'advanced',
}
class TestDeployVM(cloudstackTestCase):
@ -116,12 +121,8 @@ class TestDeployVM(cloudstackTestCase):
self.dbclient = self.testClient.getDbConnection()
self.services = Services().services
# Get Zone, Domain and templates
zone = get_zone(self.apiclient)
zone = get_zone(self.apiclient, self.services)
disk_offering = DiskOffering.create(
self.apiclient,
self.services["disk_offering"]
)
template = get_template(
self.apiclient,
zone.id,
@ -129,7 +130,6 @@ class TestDeployVM(cloudstackTestCase):
)
# Set Zones and disk offerings
self.services["small"]["zoneid"] = zone.id
self.services["small"]["diskoffering"] = disk_offering.id
self.services["small"]["template"] = template.id
self.services["medium"]["zoneid"] = zone.id
@ -139,8 +139,7 @@ class TestDeployVM(cloudstackTestCase):
# Create Account, VMs, NAT Rules etc
self.account = Account.create(
self.apiclient,
self.services["account"],
admin=True
self.services["account"]
)
self.service_offering = ServiceOffering.create(
@ -150,7 +149,6 @@ class TestDeployVM(cloudstackTestCase):
# Cleanup
self.cleanup = [
self.service_offering,
disk_offering,
self.account
]
@ -166,14 +164,15 @@ class TestDeployVM(cloudstackTestCase):
self.virtual_machine = VirtualMachine.create(
self.apiclient,
self.services["small"],
self.account.account.name,
accountid=self.account.account.name,
serviceofferingid=self.service_offering.id
)
self.cleanup.append(self.virtual_machine)
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.id = self.virtual_machine.id
list_vm_response = self.apiclient.listVirtualMachines(cmd)
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.virtual_machine.id
)
self.debug(
"Verify listVirtualMachines response for virtual machine: %s" \
% self.virtual_machine.id
@ -215,11 +214,7 @@ class TestVMLifeCycle(cloudstackTestCase):
cls.services = Services().services
# Get Zone, Domain and templates
zone = get_zone(cls.api_client)
disk_offering = DiskOffering.create(
cls.api_client,
cls.services["disk_offering"]
)
zone = get_zone(cls.api_client, cls.services)
template = get_template(
cls.api_client,
zone.id,
@ -227,7 +222,6 @@ class TestVMLifeCycle(cloudstackTestCase):
)
# Set Zones and disk offerings
cls.services["small"]["zoneid"] = zone.id
cls.services["small"]["diskoffering"] = disk_offering.id
cls.services["small"]["template"] = template.id
cls.services["medium"]["zoneid"] = zone.id
@ -237,8 +231,7 @@ class TestVMLifeCycle(cloudstackTestCase):
# Create VMs, NAT Rules etc
cls.account = Account.create(
cls.api_client,
cls.services["account"],
admin=True
cls.services["account"]
)
cls.small_offering = ServiceOffering.create(
@ -255,38 +248,24 @@ class TestVMLifeCycle(cloudstackTestCase):
cls.api_client,
cls.services["small"],
accountid=cls.account.account.name,
serviceofferingid=cls.small_offering.id
serviceofferingid=cls.small_offering.id,
mode=cls.services["mode"]
)
cls.medium_virtual_machine = VirtualMachine.create(
cls.api_client,
cls.services["medium"],
accountid=cls.account.account.name,
serviceofferingid=cls.medium_offering.id
serviceofferingid=cls.medium_offering.id,
mode=cls.services["mode"]
)
cls.virtual_machine = VirtualMachine.create(
cls.api_client,
cls.services["small"],
accountid=cls.account.account.name,
serviceofferingid=cls.small_offering.id
serviceofferingid=cls.small_offering.id,
mode=cls.services["mode"]
)
cls.public_ip = PublicIPAddress.create(
cls.api_client,
cls.virtual_machine.account,
cls.virtual_machine.zoneid,
cls.virtual_machine.domainid,
cls.services["small"]
)
cls.nat_rule = NATRule.create(
cls.api_client,
cls.virtual_machine,
cls.services["small"],
ipaddressid=cls.public_ip.ipaddress.id
)
cls._cleanup = [
cls.nat_rule,
cls.public_ip,
cls.medium_virtual_machine,
cls.virtual_machine,
cls.small_offering,
cls.medium_offering,
cls.account
@ -317,13 +296,13 @@ class TestVMLifeCycle(cloudstackTestCase):
# 2. listVM command should return
# this VM.State of this VM should be ""Stopped"".
cmd = stopVirtualMachine.stopVirtualMachineCmd()
cmd.id = self.small_virtual_machine.id
self.api_client.stopVirtualMachine(cmd)
self.small_virtual_machine.stop(self.apiclient)
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.small_virtual_machine.id
)
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.id = self.small_virtual_machine.id
list_vm_response = self.api_client.listVirtualMachines(cmd)
self.assertNotEqual(
len(list_vm_response),
0,
@ -345,14 +324,12 @@ class TestVMLifeCycle(cloudstackTestCase):
# 1. listVM command should return this VM.State
# of this VM should be Running".
cmd = startVirtualMachine.startVirtualMachineCmd()
cmd.id = self.small_virtual_machine.id
self.apiclient.startVirtualMachine(cmd)
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.id = self.small_virtual_machine.id
list_vm_response = self.apiclient.listVirtualMachines(cmd)
self.small_virtual_machine.start(self.apiclient)
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.small_virtual_machine.id
)
self.assertNotEqual(
len(list_vm_response),
0,
@ -375,7 +352,7 @@ class TestVMLifeCycle(cloudstackTestCase):
)
# SSH to check whether VM is Up and Running
try:
self.small_virtual_machine.get_ssh_client(self.nat_rule.ipaddress)
self.small_virtual_machine.get_ssh_client()
except Exception as e:
self.fail(
"SSH Access failed for %s: %s" \
@ -392,13 +369,12 @@ class TestVMLifeCycle(cloudstackTestCase):
# 2. listVM command should return the deployed VM.
# State of this VM should be "Running"
cmd = rebootVirtualMachine.rebootVirtualMachineCmd()
cmd.id = self.small_virtual_machine.id
self.apiclient.rebootVirtualMachine(cmd)
self.small_virtual_machine.reboot(self.apiclient)
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.id = self.small_virtual_machine.id
list_vm_response = self.apiclient.listVirtualMachines(cmd)
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.small_virtual_machine.id
)
self.assertNotEqual(
len(list_vm_response),
@ -422,28 +398,23 @@ class TestVMLifeCycle(cloudstackTestCase):
# 2. Using listVM command verify that this Vm
# has Medium service offering Id.
cmd = stopVirtualMachine.stopVirtualMachineCmd()
cmd.id = self.small_virtual_machine.id
self.apiclient.stopVirtualMachine(cmd)
self.small_virtual_machine.stop(self.apiclient)
cmd = changeServiceForVirtualMachine.changeServiceForVirtualMachineCmd()
cmd.id = self.small_virtual_machine.id
cmd.serviceofferingid = self.medium_offering.id
self.apiclient.changeServiceForVirtualMachine(cmd)
cmd = startVirtualMachine.startVirtualMachineCmd()
cmd.id = self.small_virtual_machine.id
self.apiclient.startVirtualMachine(cmd)
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.id = self.small_virtual_machine.id
list_vm_response = self.apiclient.listVirtualMachines(cmd)
self.small_virtual_machine.start(self.apiclient)
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.small_virtual_machine.id
)
# Sleep to ensure that VM is started properly
time.sleep(60)
try:
ssh = self.small_virtual_machine.get_ssh_client(
self.nat_rule.ipaddress
)
ssh = self.small_virtual_machine.get_ssh_client()
except Exception as e:
self.fail(
"SSH Access failed for %s: %s" % \
@ -487,9 +458,7 @@ class TestVMLifeCycle(cloudstackTestCase):
# 2. Using listVM command verify that this Vm
# has Small service offering Id.
cmd = stopVirtualMachine.stopVirtualMachineCmd()
cmd.id = self.medium_virtual_machine.id
self.apiclient.stopVirtualMachine(cmd)
self.medium_virtual_machine.stop(self.apiclient)
cmd = changeServiceForVirtualMachine.changeServiceForVirtualMachineCmd()
cmd.id = self.medium_virtual_machine.id
@ -497,18 +466,14 @@ class TestVMLifeCycle(cloudstackTestCase):
self.apiclient.changeServiceForVirtualMachine(cmd)
cmd = startVirtualMachine.startVirtualMachineCmd()
cmd.id = self.medium_virtual_machine.id
self.apiclient.startVirtualMachine(cmd)
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.id = self.small_virtual_machine.id
list_vm_response = self.apiclient.listVirtualMachines(cmd)
self.medium_virtual_machine.start(self.apiclient)
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.medium_virtual_machine.id
)
try:
ssh = self.medium_virtual_machine.get_ssh_client(
self.nat_rule.ipaddress
)
ssh = self.medium_virtual_machine.get_ssh_client()
except Exception as e:
self.fail(
"SSH Access failed for %s: %s" % \
@ -541,6 +506,9 @@ class TestVMLifeCycle(cloudstackTestCase):
self.small_offering.memory,
"Check Memory(kb) for small offering"
)
# Cleanup - Not required for further tests
self.cleanup.append(self.medium_virtual_machine)
return
def test_06_destroy_vm(self):
@ -552,14 +520,12 @@ class TestVMLifeCycle(cloudstackTestCase):
# 2. listVM command should return this VM.State
# of this VM should be "Destroyed".
cmd = destroyVirtualMachine.destroyVirtualMachineCmd()
cmd.id = self.small_virtual_machine.id
self.apiclient.destroyVirtualMachine(cmd)
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.id = self.small_virtual_machine.id
list_vm_response = self.apiclient.listVirtualMachines(cmd)
self.small_virtual_machine.delete(self.apiclient)
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.small_virtual_machine.id
)
self.assertNotEqual(
len(list_vm_response),
0,
@ -586,14 +552,11 @@ class TestVMLifeCycle(cloudstackTestCase):
cmd = recoverVirtualMachine.recoverVirtualMachineCmd()
cmd.id = self.small_virtual_machine.id
self.apiclient.recoverVirtualMachine(cmd)
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.id = self.small_virtual_machine.id
list_vm_response = self.apiclient.listVirtualMachines(cmd)
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.id = self.small_virtual_machine.id
list_vm_response = self.apiclient.listVirtualMachines(cmd)
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.small_virtual_machine.id
)
self.assertNotEqual(
len(list_vm_response),
@ -618,19 +581,17 @@ class TestVMLifeCycle(cloudstackTestCase):
# should be "Running" and the host should be the host
# to which the VM was migrated to
cmd = startVirtualMachine.startVirtualMachineCmd()
cmd.id = self.small_virtual_machine.id
self.apiclient.startVirtualMachine(cmd)
self.small_virtual_machine.start(self.apiclient)
cmd = migrateVirtualMachine.migrateVirtualMachineCmd()
cmd.hostid = self.services["hostid"]
cmd.virtualmachineid = self.small_virtual_machine.id
self.apiclient.migrateVirtualMachine(cmd)
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.id = self.small_virtual_machine.id
list_vm_response = self.apiclient.listVirtualMachines(cmd)
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.small_virtual_machine.id
)
self.assertNotEqual(
list_vm_response,
None,
@ -662,15 +623,19 @@ class TestVMLifeCycle(cloudstackTestCase):
cmd.id = self.small_virtual_machine.id
self.apiclient.destroyVirtualMachine(cmd)
cmd = listConfigurations.listConfigurationsCmd()
cmd.name = 'expunge.delay'
response = self.apiclient.listConfigurations(cmd)[0]
config = list_configurations(
self.apiclient,
name='expunge.delay'
)
response = config[0]
# Wait for some time more than expunge.delay
time.sleep(int(response.value) * 2)
cmd = listVirtualMachines.listVirtualMachinesCmd()
cmd.id = self.small_virtual_machine.id
list_vm_response = self.apiclient.listVirtualMachines(cmd)
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.small_virtual_machine.id
)
self.assertEqual(
list_vm_response,
@ -700,9 +665,7 @@ class TestVMLifeCycle(cloudstackTestCase):
cmd.virtualmachineid = self.virtual_machine.id
self.apiclient.attachIso(cmd)
ssh_client = self.virtual_machine.get_ssh_client(
self.nat_rule.ipaddress
)
ssh_client = self.virtual_machine.get_ssh_client()
cmds = [
"mkdir -p %s" % self.services["mount_dir"],

View File

@ -7,8 +7,9 @@
#Import Local Modules
from cloudstackTestCase import *
from cloudstackAPI import *
from utils import *
from base import *
from testcase.libs.utils import *
from testcase.libs.base import *
from testcase.libs.common import *
import remoteSSHClient
#Import System modules
import os
@ -36,8 +37,8 @@ class Services:
"name": "Tiny Instance",
"displaytext": "Tiny Instance",
"cpunumber": 1,
"cpuspeed": 200, # in MHz
"memory": 256, # In MBs
"cpuspeed": 100, # in MHz
"memory": 64, # In MBs
},
"disk_offering": {
"displaytext": "Small",
@ -50,19 +51,23 @@ class Services:
"domainid": 1,
},
},
"customdisksize": 1, # GBs
"username": "root", # Creds for SSH to VM
"password": "password",
"ssh_port": 22,
"diskname": "TestDiskServ",
"hypervisor": 'XenServer',
"domainid": 1,
"privateport": 22,
"publicport": 22,
"protocol": 'TCP',
"diskdevice": "/dev/sda",
"ostypeid": 12,
}
"customdisksize": 1, # GBs
"username": "root", # Creds for SSH to VM
"password": "password",
"ssh_port": 22,
"diskname": "TestDiskServ",
"hypervisor": 'XenServer',
"domainid": 1,
"privateport": 22,
"publicport": 22,
"protocol": 'TCP',
"diskdevice": "/dev/xvda",
"ostypeid": 12,
"zoneid": 1,
# Optional, if specified the mentioned zone will be
# used for tests
"mode": 'advanced',
}
class TestCreateVolume(cloudstackTestCase):
@ -73,7 +78,7 @@ class TestCreateVolume(cloudstackTestCase):
cls.services = Services().services
# Get Zone, Domain and templates
cls.zone = get_zone(cls.api_client)
cls.zone = get_zone(cls.api_client, cls.services)
cls.disk_offering = DiskOffering.create(
cls.api_client,
cls.services["disk_offering"]
@ -95,8 +100,7 @@ class TestCreateVolume(cloudstackTestCase):
# Create VMs, NAT Rules etc
cls.account = Account.create(
cls.api_client,
cls.services["account"],
admin=True
cls.services["account"]
)
cls.services["account"] = cls.account.account.name
@ -108,27 +112,11 @@ class TestCreateVolume(cloudstackTestCase):
cls.api_client,
cls.services,
accountid=cls.account.account.name,
serviceofferingid=cls.service_offering.id
)
cls.public_ip = PublicIPAddress.create(
cls.api_client,
cls.virtual_machine.account,
cls.virtual_machine.zoneid,
cls.virtual_machine.domainid,
cls.services
)
cls.nat_rule = NATRule.create(
cls.api_client,
cls.virtual_machine,
cls.services,
ipaddressid=cls.public_ip.ipaddress.id
serviceofferingid=cls.service_offering.id,
mode=cls.services["mode"]
)
cls._cleanup = [
cls.nat_rule,
cls.virtual_machine,
cls.service_offering,
cls.public_ip,
cls.disk_offering,
cls.custom_disk_offering,
cls.account
@ -162,30 +150,30 @@ class TestCreateVolume(cloudstackTestCase):
#Attach a volume with different disk offerings
#and check the memory allocated to each of them
for volume in self.volumes:
cmd = listVolumes.listVolumesCmd()
cmd.id = volume.id
list_volume_response = self.apiClient.listVolumes(cmd)
list_volume_response = list_volumes(
self.apiClient,
id=volume.id
)
self.assertNotEqual(
list_volume_response,
None,
"Check if volume exists in ListVolumes"
)
attached_volume = self.virtual_machine.attach_volume(
self.apiClient,
volume
)
ssh = self.virtual_machine.get_ssh_client(self.nat_rule.ipaddress)
ssh = self.virtual_machine.get_ssh_client()
ssh.execute("reboot")
#Sleep to ensure the machine is rebooted properly
time.sleep(120)
ssh = self.virtual_machine.get_ssh_client(
self.nat_rule.ipaddress,
reconnect=True
)
c = "fdisk -l|grep %s|head -1" % self.services["diskdevice"]
c = "fdisk -l|grep %s1|head -1" % self.services["diskdevice"]
res = ssh.execute(c)
# Disk /dev/sda doesn't contain a valid partition table
# Disk /dev/sda: 21.5 GB, 21474836480 bytes
@ -224,7 +212,7 @@ class TestVolumes(cloudstackTestCase):
cls.api_client = fetch_api_client()
cls.services = Services().services
# Get Zone, Domain and templates
cls.zone = get_zone(cls.api_client)
cls.zone = get_zone(cls.api_client, cls.services)
cls.disk_offering = DiskOffering.create(
cls.api_client,
cls.services["disk_offering"]
@ -241,8 +229,7 @@ class TestVolumes(cloudstackTestCase):
# Create VMs, NAT Rules etc
cls.account = Account.create(
cls.api_client,
cls.services["account"],
admin=True
cls.services["account"]
)
cls.services["account"] = cls.account.account.name
@ -254,30 +241,15 @@ class TestVolumes(cloudstackTestCase):
cls.api_client,
cls.services,
accountid=cls.account.account.name,
serviceofferingid=cls.service_offering.id
serviceofferingid=cls.service_offering.id,
mode=cls.services["mode"]
)
cls.public_ip = PublicIPAddress.create(
cls.api_client,
cls.virtual_machine.account,
cls.virtual_machine.zoneid,
cls.virtual_machine.domainid,
cls.services
)
cls.nat_rule = NATRule.create(
cls.api_client,
cls.virtual_machine,
cls.services,
ipaddressid=cls.public_ip.ipaddress.id
)
cls.volume = Volume.create(
cls.api_client,
cls.services
)
cls._cleanup = [
cls.nat_rule,
cls.virtual_machine,
cls.public_ip,
cls.service_offering,
cls.disk_offering,
cls.account
@ -306,10 +278,11 @@ class TestVolumes(cloudstackTestCase):
#Sleep to ensure the current state will reflected in other calls
time.sleep(60)
cmd = listVolumes.listVolumesCmd()
cmd.id = self.volume.id
list_volume_response = self.apiClient.listVolumes(cmd)
list_volume_response = list_volumes(
self.apiClient,
id=self.volume.id
)
self.assertNotEqual(
list_volume_response,
None,
@ -323,10 +296,7 @@ class TestVolumes(cloudstackTestCase):
)
#Format the attached volume to a known fs
format_volume_to_ext3(
self.virtual_machine.get_ssh_client(
self.nat_rule.ipaddress
))
format_volume_to_ext3(self.virtual_machine.get_ssh_client())
def test_03_download_attached_volume(self):
"""Download a Volume attached to a VM
@ -372,10 +342,10 @@ class TestVolumes(cloudstackTestCase):
self.virtual_machine.detach_volume(self.apiClient, self.volume)
#Sleep to ensure the current state will reflected in other calls
time.sleep(60)
cmd = listVolumes.listVolumesCmd()
cmd.id = self.volume.id
list_volume_response = self.apiClient.listVolumes(cmd)
list_volume_response = list_volumes(
self.apiClient,
id=self.volume.id
)
self.assertNotEqual(
list_volume_response,
None,
@ -433,11 +403,11 @@ class TestVolumes(cloudstackTestCase):
self.apiClient.deleteVolume(cmd)
time.sleep(60)
cmd = listVolumes.listVolumesCmd()
cmd.id = self.volume.id
cmd.type = 'DATADISK'
list_volume_response = self.apiClient.listVolumes(cmd)
list_volume_response = list_volumes(
self.apiClient,
id=self.volume.id,
type='DATADISK'
)
self.assertEqual(
list_volume_response,
None,

View File

@ -0,0 +1,31 @@
P1 Cases
--------------------------------------
These test cases are the core functionality tests that ensure the application is stable and can be tested thoroughly.
These P1 cases definitions are located at : https://docs.google.com/a/clogeny.com/spreadsheet/ccc?key=0Aq5M2ldK6eyedDJBa0EzM0RPNmdVNVZOWnFnOVJJcHc&hl=en_US
Guidelines
----------
P1 test cases are being developed using Python's unittests2. Following are certain guidelines being followed
1. Tests exercised for the same resource should ideally be present under a single suite or file.
2. Time-consuming operations that create new cloud resources like server creation, volume creation etc
should not necessarily be exercised per unit test. The resources can be shared by creating them at
the class-level using setUpClass and shared across all instances during a single run.
3. Certain tests pertaining to NAT, Firewall and Load Balancing warrant fresh resources per test. Hence a call should be
taken by the stakeholders regarding sharing resources.
4. Ensure that the tearDown/tearDownClass functions clean up all the resources created during the test run.
For more information about unittests: http://docs.python.org/library/unittest.html
P1 Tests
----------
The following files contain these P1 cases:
1. test_snapshots.py - Snapshots related tests
2. test_routers.py - Router related tests
3. test_usage.py - Usage realted tests

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,799 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2012 Citrix. All rights reserved.
#
""" Base class for all Cloudstack resources
-Virtual machine, Volume, Snapshot etc
"""
from utils import is_server_ssh_ready, random_gen
from cloudstackAPI import *
#Import System modules
import time
import hashlib
class Account:
""" Account Life Cycle """
def __init__(self, items):
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, services, admin=False):
cmd = createAccount.createAccountCmd()
"""Creates an account"""
#0 - User, 1 - Root Admin
cmd.accounttype = int(admin)
cmd.email = services["email"]
cmd.firstname = services["firstname"]
cmd.lastname = services["lastname"]
# Password Encoding
mdf = hashlib.md5()
mdf.update(services["password"])
cmd.password = mdf.hexdigest()
cmd.username = "-".join([services["username"], random_gen()])
account = apiclient.createAccount(cmd)
return Account(account.__dict__)
def delete(self, apiclient):
"""Delete an account"""
cmd = deleteAccount.deleteAccountCmd()
cmd.id = self.account.id
apiclient.deleteAccount(cmd)
class VirtualMachine:
"""Manage virtual machine lifecycle"""
def __init__(self, items, services):
self.__dict__.update(items)
self.username = services["username"]
self.password = services["password"]
self.ssh_port = services["ssh_port"]
self.ssh_client = None
#extract out the ipaddress
self.ipaddress = self.nic[0].ipaddress
@classmethod
def create(cls, apiclient, services, templateid=None, accountid=None,
networkids=None, serviceofferingid=None,
mode='basic'):
"""Create the instance"""
cmd = deployVirtualMachine.deployVirtualMachineCmd()
if serviceofferingid:
cmd.serviceofferingid = serviceofferingid
elif "serviceoffering" in services:
cmd.serviceofferingid = services["serviceoffering"]
cmd.zoneid = services["zoneid"]
cmd.hypervisor = services["hypervisor"]
if accountid:
cmd.account = accountid
elif "account" in services:
cmd.account = services["account"]
cmd.domainid = services["domainid"]
# List Networks for that user
command = listNetworks.listNetworksCmd()
command.zoneid = services["zoneid"]
command.account = accountid or services["account"]
command.domainid = services["domainid"]
network = apiclient.listNetworks(command)
if networkids:
cmd.networkids = networkids
elif "networkids" in services:
cmd.networkids = services["networkids"]
elif network: #If user already has source NAT created, then use that
if hasattr(network[0], "account"):
cmd.networkids = network[0].id
if templateid:
cmd.templateid = templateid
elif "template" in services:
cmd.templateid = services["template"]
if "diskoffering" in services:
cmd.diskofferingid = services["diskoffering"]
virtual_machine = apiclient.deployVirtualMachine(cmd)
if mode.lower() == 'advanced':
public_ip = PublicIPAddress.create(
apiclient,
virtual_machine.account,
virtual_machine.zoneid,
virtual_machine.domainid,
services
)
nat_rule = NATRule.create(
apiclient,
virtual_machine,
services,
ipaddressid=public_ip.ipaddress.id
)
virtual_machine.ssh_ip = nat_rule.ipaddress
virtual_machine.public_ip = nat_rule.ipaddress
else:
virtual_machine.ssh_ip = virtual_machine.nic[0].ipaddress
virtual_machine.public_ip = virtual_machine.nic[0].ipaddress
return VirtualMachine(virtual_machine.__dict__, services)
def start(self, apiclient):
"""Start the instance"""
cmd = startVirtualMachine.startVirtualMachineCmd()
cmd.id = self.id
apiclient.startVirtualMachine(cmd)
def stop(self, apiclient):
"""Stop the instance"""
cmd = stopVirtualMachine.stopVirtualMachineCmd()
cmd.id = self.id
apiclient.stopVirtualMachine(cmd)
def reboot(self, apiclient):
"""Reboot the instance"""
cmd = rebootVirtualMachine.rebootVirtualMachineCmd()
cmd.id = self.id
apiclient.rebootVirtualMachine(cmd)
def get_ssh_client(self, ipaddress=None, reconnect=False):
"""Get SSH object of VM"""
if ipaddress != None:
self.ssh_ip = ipaddress
if reconnect:
self.ssh_client = is_server_ssh_ready(
self.ssh_ip,
self.ssh_port,
self.username,
self.password
)
self.ssh_client = self.ssh_client or is_server_ssh_ready(
self.ssh_ip,
self.ssh_port,
self.username,
self.password
)
return self.ssh_client
def delete(self, apiclient):
"""Destroy an Instance"""
cmd = destroyVirtualMachine.destroyVirtualMachineCmd()
cmd.id = self.id
apiclient.destroyVirtualMachine(cmd)
def attach_volume(self, apiclient, volume):
"""Attach volume to instance"""
cmd = attachVolume.attachVolumeCmd()
cmd.id = volume.id
cmd.virtualmachineid = self.id
return apiclient.attachVolume(cmd)
def detach_volume(self, apiclient, volume):
"""Detach volume to instance"""
cmd = detachVolume.detachVolumeCmd()
cmd.id = volume.id
return apiclient.detachVolume(cmd)
class Volume:
"""Manage Volume Lifecycle
"""
def __init__(self, items):
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, services, zoneid=None, account=None,
diskofferingid=None):
"""Create Volume"""
cmd = createVolume.createVolumeCmd()
cmd.name = services["diskname"]
cmd.domainid = services["domainid"]
if diskofferingid:
cmd.diskofferingid = diskofferingid
elif "diskofferingid" in services:
cmd.diskofferingid = services["diskofferingid"]
if zoneid:
cmd.zoneid = zoneid
elif "zoneid" in services:
cmd.zoneid = services["zoneid"]
if account:
cmd.account = account
elif "account" in services:
cmd.account = services["account"]
return Volume(apiclient.createVolume(cmd).__dict__)
@classmethod
def create_custom_disk(cls, apiclient, services):
"""Create Volume from Custom disk offering"""
cmd = createVolume.createVolumeCmd()
cmd.name = services["diskname"]
cmd.diskofferingid = services["customdiskofferingid"]
cmd.size = services["customdisksize"]
cmd.zoneid = services["zoneid"]
cmd.account = services["account"]
cmd.domainid = services["domainid"]
return Volume(apiclient.createVolume(cmd).__dict__)
@classmethod
def create_from_snapshot(cls, apiclient, snapshot_id, services):
"""Create Volume from snapshot"""
cmd = createVolume.createVolumeCmd()
cmd.name = "-".join([services["diskname"], random_gen()])
cmd.snapshotid = snapshot_id
cmd.zoneid = services["zoneid"]
cmd.size = services["size"]
cmd.account = services["account"]
cmd.domainid = services["domainid"]
return Volume(apiclient.createVolume(cmd).__dict__)
def delete(self, apiclient):
"""Delete Volume"""
cmd = deleteVolume.deleteVolumeCmd()
cmd.id = self.id
apiclient.deleteVolume(cmd)
class Snapshot:
"""Manage Snapshot Lifecycle
"""
def __init__(self, items):
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, volume_id):
"""Create Snapshot"""
cmd = createSnapshot.createSnapshotCmd()
cmd.volumeid = volume_id
return Snapshot(apiclient.createSnapshot(cmd).__dict__)
def delete(self, apiclient):
"""Delete Snapshot"""
cmd = deleteSnapshot.deleteSnapshotCmd()
cmd.id = self.id
apiclient.deleteSnapshot(cmd)
class Template:
"""Manage template life cycle"""
def __init__(self, items):
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, services, volumeid=None,
account=None, domainid=None):
"""Create template from Volume or URL"""
#Create template from Virtual machine and Volume ID
cmd = createTemplate.createTemplateCmd()
cmd.displaytext = services["displaytext"]
cmd.name = "-".join([services["name"], random_gen()])
cmd.ostypeid = services["ostypeid"]
if "isfeatured" in services:
cmd.isfeatured = services["isfeatured"]
else:
cmd.isfeatured = False
if "ispublic" in services:
cmd.ispublic = services["ispublic"]
else:
cmd.ispublic = False
if "isextractable" in services:
cmd.isextractable = services["isextractable"]
else:
cmd.isextractable = False
if volumeid:
cmd.volumeid = volumeid
elif "url" in services:
cmd.url = services["url"]
if account:
cmd.account = account
if domainid:
cmd.domainid = domainid
return Template(apiclient.createTemplate(cmd).__dict__)
@classmethod
def create_from_snapshot(cls, apiclient, snapshot, services):
"""Create Template from snapshot"""
#Create template from Virtual machine and Snapshot ID
cmd = createTemplate.createTemplateCmd()
cmd.displaytext = services["displaytext"]
cmd.name = "-".join([services["name"], random_gen()])
cmd.ostypeid = services["ostypeid"]
cmd.snapshotid = snapshot.id
return Template(apiclient.createTemplate(cmd).__dict__)
def delete(self, apiclient):
"""Delete Template"""
cmd = deleteTemplate.deleteTemplateCmd()
cmd.id = self.id
apiclient.deleteTemplate(cmd)
def download(self, apiclient):
"""Download Template"""
while True:
template_response = list_templates(
apiclient,
id=self.id,
zoneid=self.zoneid,
templatefilter='self'
)
template = template_response[0]
# If template is ready,
# template.status = Download Complete
# Downloading - x% Downloaded
# Error - Any other string
if template.status == 'Download Complete' :
break
elif 'Downloaded' not in template.status.split():
raise Exception
elif 'Downloaded' in template.status.split():
time.sleep(120)
class Iso:
"""Manage ISO life cycle"""
def __init__(self, items):
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, services, account=None, domainid=None):
"""Create an ISO"""
#Create ISO from URL
cmd = registerIso.registerIsoCmd()
cmd.displaytext = services["displaytext"]
cmd.name = services["name"]
cmd.ostypeid = services["ostypeid"]
cmd.url = services["url"]
cmd.zoneid = services["zoneid"]
if "isextractable" in services:
cmd.isextractable = services["isextractable"]
if "isfeatured" in services:
cmd.isfeatured = services["isfeatured"]
if "ispublic" in services:
cmd.ispublic = services["ispublic"]
if account:
cmd.account = account
if domainid:
cmd.domainid = domainid
return Iso(apiclient.registerIso(cmd)[0].__dict__)
def delete(self, apiclient):
"""Delete an ISO"""
cmd = deleteIso.deleteIsoCmd()
cmd.id = self.id
apiclient.deleteIso(cmd)
return
def download(self, apiclient):
"""Download an ISO"""
#Ensuring ISO is successfully downloaded
while True:
time.sleep(120)
cmd = listIsos.listIsosCmd()
cmd.id = self.id
response = apiclient.listIsos(cmd)[0]
# Check whether download is in progress (for Ex: 10% Downloaded)
# or ISO is 'Successfully Installed'
if response.status == 'Successfully Installed':
return
elif 'Downloaded' not in response.status.split():
raise Exception
return
class PublicIPAddress:
"""Manage Public IP Addresses"""
def __init__(self, items):
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, accountid, zoneid=None, domainid=None,
services=None, networkid=None):
"""Associate Public IP address"""
cmd = associateIpAddress.associateIpAddressCmd()
cmd.account = accountid
cmd.zoneid = zoneid or services["zoneid"]
cmd.domainid = domainid or services["domainid"]
if networkid:
cmd.networkid = networkid
return PublicIPAddress(apiclient.associateIpAddress(cmd).__dict__)
def delete(self, apiclient):
"""Dissociate Public IP address"""
cmd = disassociateIpAddress.disassociateIpAddressCmd()
cmd.id = self.ipaddress.id
apiclient.disassociateIpAddress(cmd)
return
class NATRule:
"""Manage NAT rule"""
def __init__(self, items):
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, virtual_machine, services, ipaddressid=None):
"""Create Port forwarding rule"""
cmd = createPortForwardingRule.createPortForwardingRuleCmd()
if ipaddressid:
cmd.ipaddressid = ipaddressid
elif "ipaddressid" in services:
cmd.ipaddressid = services["ipaddressid"]
cmd.privateport = services["privateport"]
cmd.publicport = services["publicport"]
cmd.protocol = services["protocol"]
cmd.virtualmachineid = virtual_machine.id
return NATRule(apiclient.createPortForwardingRule(cmd).__dict__)
def delete(self, apiclient):
"""Delete port forwarding"""
cmd = deletePortForwardingRule.deletePortForwardingRuleCmd()
cmd.id = self.id
apiclient.deletePortForwardingRule(cmd)
return
class FireWallRule:
"""Manage Firewall rule"""
def __init__(self, items):
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, ipaddressid, protocol, cidrlist=None,
startport=None, endport=None):
"""Create Firewall Rule"""
cmd = createFirewallRule.createFirewallRuleCmd()
cmd.ipaddressid = ipaddressid
cmd.protocol = protocol
if cidrlist:
cmd.cidrlist = cidrlist
if startport:
cmd.startport = startport
if endport:
cmd.endport = endport
return NATRule(apiclient.createFirewallRule(cmd).__dict__)
def delete(self, apiclient):
"""Delete Firewall rule"""
cmd = deleteFirewallRule.deleteFirewallRuleCmd()
cmd.id = self.id
apiclient.deleteFirewallRule(cmd)
return
class ServiceOffering:
"""Manage service offerings cycle"""
def __init__(self, items):
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, services):
"""Create Service offering"""
cmd = createServiceOffering.createServiceOfferingCmd()
cmd.cpunumber = services["cpunumber"]
cmd.cpuspeed = services["cpuspeed"]
cmd.displaytext = services["displaytext"]
cmd.memory = services["memory"]
cmd.name = services["name"]
return ServiceOffering(apiclient.createServiceOffering(cmd).__dict__)
def delete(self, apiclient):
"""Delete Service offering"""
cmd = deleteServiceOffering.deleteServiceOfferingCmd()
cmd.id = self.id
apiclient.deleteServiceOffering(cmd)
return
class DiskOffering:
"""Manage disk offerings cycle"""
def __init__(self, items):
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, services, custom=False):
"""Create Disk offering"""
cmd = createDiskOffering.createDiskOfferingCmd()
cmd.displaytext = services["displaytext"]
cmd.name = services["name"]
if custom:
cmd.customized = True
else:
cmd.disksize = services["disksize"]
return DiskOffering(apiclient.createDiskOffering(cmd).__dict__)
def delete(self, apiclient):
"""Delete Disk offering"""
cmd = deleteDiskOffering.deleteDiskOfferingCmd()
cmd.id = self.id
apiclient.deleteDiskOffering(cmd)
return
class SnapshotPolicy:
"""Manage snapshot policies"""
def __init__(self, items):
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, volumeid, services):
"""Create Snapshot policy"""
cmd = createSnapshotPolicy.createSnapshotPolicyCmd()
cmd.intervaltype = services["intervaltype"]
cmd.maxsnaps = services["maxsnaps"]
cmd.schedule = services["schedule"]
cmd.timezone = services["timezone"]
cmd.volumeid = volumeid
return SnapshotPolicy(apiclient.createSnapshotPolicy(cmd).__dict__)
def delete(self, apiclient):
"""Delete Snapshot policy"""
cmd = deleteSnapshotPolicies.deleteSnapshotPoliciesCmd()
cmd.id = self.id
apiclient.deleteSnapshotPolicies(cmd)
return
class LoadBalancerRule:
"""Manage Load Balancer rule"""
def __init__(self, items):
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, services, ipaddressid, accountid=None):
"""Create Load balancing Rule"""
cmd = createLoadBalancerRule.createLoadBalancerRuleCmd()
cmd.publicipid = ipaddressid or services["ipaddressid"]
cmd.account = accountid or services["account"]
cmd.name = services["name"]
cmd.algorithm = services["alg"]
cmd.privateport = services["privateport"]
cmd.publicport = services["publicport"]
return LoadBalancerRule(apiclient.createLoadBalancerRule(cmd).__dict__)
def delete(self, apiclient):
"""Delete load balancing rule"""
cmd = deleteLoadBalancerRule.deleteLoadBalancerRuleCmd()
cmd.id = self.id
apiclient.deleteLoadBalancerRule(cmd)
return
def assign(self, apiclient, vms):
"""Assign virtual machines to load balancing rule"""
cmd = assignToLoadBalancerRule.assignToLoadBalancerRuleCmd()
cmd.id = self.id
cmd.virtualmachineids = [str(vm.id) for vm in vms]
apiclient.assignToLoadBalancerRule(cmd)
return
def remove(self, apiclient, vms):
"""Remove virtual machines from load balancing rule"""
cmd = removeFromLoadBalancerRule.removeFromLoadBalancerRuleCmd()
cmd.virtualmachineids = [vm.id for vm in vms]
self.apiclient.removeFromLoadBalancerRule(cmd)
return
class Cluster:
"""Manage Cluster life cycle"""
def __init__(self, items):
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, services):
"""Create Cluster"""
cmd = addCluster.addClusterCmd()
cmd.clustertype = services["clustertype"]
cmd.hypervisor = services["hypervisor"]
cmd.zoneid = services["zoneid"]
cmd.podid = services["podid"]
if "username" in services:
cmd.username = services["username"]
if "password" in services:
cmd.password = services["password"]
if "url" in services:
cmd.url = services["url"]
if "clustername" in services:
cmd.clustername = services["clustername"]
return Cluster(apiclient.addCluster(cmd)[0].__dict__)
def delete(self, apiclient):
"""Delete Cluster"""
cmd = deleteCluster.deleteClusterCmd()
cmd.id = self.id
apiclient.deleteCluster(cmd)
return
class Host:
"""Manage Host life cycle"""
def __init__(self, items):
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, cluster, services):
"""Create Host"""
cmd = addHost.addHostCmd()
cmd.hypervisor = services["hypervisor"]
cmd.url = services["url"]
cmd.zoneid = services["zoneid"]
cmd.clusterid = cluster.id
cmd.podid = services["podid"]
if "clustertype" in services:
cmd.clustertype = services["clustertype"]
if "username" in services:
cmd.username = services["username"]
if "password" in services:
cmd.password = services["password"]
return Host(apiclient.addHost(cmd)[0].__dict__)
def delete(self, apiclient):
"""Delete Host"""
# Host must be in maintenance mode before deletion
cmd = prepareHostForMaintenance.prepareHostForMaintenanceCmd()
cmd.id = self.id
apiclient.prepareHostForMaintenance(cmd)
time.sleep(60)
cmd = deleteHost.deleteHostCmd()
cmd.id = self.id
apiclient.deleteHost(cmd)
return
class StoragePool:
"""Manage Storage pools"""
def __init__(self, items):
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, services, clusterid=None):
"""Create Storage pool"""
cmd = createStoragePool.createStoragePoolCmd()
cmd.name = services["name"]
cmd.podid = services["podid"]
cmd.url = services["url"]
if clusterid:
cmd.clusterid = clusterid
elif "clusterid" in services:
cmd.clusterid = services["clusterid"]
cmd.zoneid = services["zoneid"]
return StoragePool(apiclient.createStoragePool(cmd).__dict__)
def delete(self, apiclient):
"""Delete Storage pool"""
# Storage pool must be in maintenance mode before deletion
cmd = enableStorageMaintenance.enableStorageMaintenanceCmd()
cmd.id = self.id
apiclient.enableStorageMaintenance(cmd)
time.sleep(60)
cmd = deleteStoragePool.deleteStoragePoolCmd()
cmd.id = self.id
apiclient.deleteStoragePool(cmd)
return
class Network:
"""Manage Network pools"""
def __init__(self, items):
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, services, accountid=None, domainid=None):
"""Create Network for account"""
cmd = createNetwork.createNetworkCmd()
cmd.name = services["name"]
cmd.displaytext = services["displaytext"]
cmd.networkofferingid = services["networkoffering"]
cmd.zoneid = services["zoneid"]
if accountid:
cmd.account = accountid
if domainid:
cmd.domainid = domainid
return Network(apiclient.createNetwork(cmd).__dict__)
def delete(self, apiclient):
"""Delete Account"""
cmd = deleteNetwork.deleteNetworkCmd()
cmd.id = self.id
apiclient.deleteNetwork(cmd)
class Vpn:
"""Manage Network pools"""
def __init__(self, items):
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, publicipid, account=None, domainid=None):
"""Create VPN for Public IP"""
cmd = createRemoteAccessVpn.createRemoteAccessVpnCmd()
cmd.publicipid = publicipid
if account:
cmd.account = account
if domainid:
cmd.domainid = domainid
return Vpn(apiclient.createRemoteAccessVpn(cmd).__dict__)
def delete(self, apiclient):
"""Delete Account"""
cmd = deleteRemoteAccessVpn.deleteRemoteAccessVpnCmd()
cmd.publicipid = self.publicipid
apiclient.deleteRemoteAccessVpn(cmd)
class VpnUser:
"""Manage Network pools"""
def __init__(self, items):
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, username, password, account=None, domainid=None):
"""Create VPN for Public IP"""
cmd = addVpnUser.addVpnUserCmd()
cmd.username = username
cmd.password = password
if account:
cmd.account = account
if domainid:
cmd.domainid = domainid
return VpnUser(apiclient.addVpnUser(cmd).__dict__)
def delete(self, apiclient):
"""Delete Account"""
cmd = removeVpnUser.removeVpnUserCmd()
cmd.username = self.username
apiclient.removeVpnUser(cmd)

View File

@ -0,0 +1,231 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2012 Citrix. All rights reserved.
#
"""Common functions
"""
#Import Local Modules
from cloudstackTestCase import *
from cloudstackAPI import *
import remoteSSHClient
from utils import *
from base import *
#Import System modules
import time
def get_zone(apiclient, services=None):
"Returns a default zone"
cmd = listZones.listZonesCmd()
if services:
if "zoneid" in services:
cmd.id = services["zoneid"]
return apiclient.listZones(cmd)[0]
def get_pod(apiclient, zoneid, services=None):
"Returns a default pod for specified zone"
cmd = listPods.listPodsCmd()
cmd.zoneid = zoneid
if services:
if "podid" in services:
cmd.id = services["podid"]
return apiclient.listPods(cmd)[0]
def get_template(apiclient, zoneid, ostypeid=12, services=None):
"Returns a template"
cmd = listTemplates.listTemplatesCmd()
cmd.templatefilter = 'featured'
cmd.zoneid = zoneid
if services:
if "template" in services:
cmd.id = services["template"]
list_templates = apiclient.listTemplates(cmd)
for template in list_templates:
if template.ostypeid == ostypeid:
return template
def list_routers(apiclient, **kwargs):
"""List all Routers matching criteria"""
cmd = listRouters.listRoutersCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listRouters(cmd))
def list_zones(apiclient, **kwargs):
"""List all Zones matching criteria"""
cmd = listZones.listZonesCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listZones(cmd))
def list_networks(apiclient, **kwargs):
"""List all Networks matching criteria"""
cmd = listNetworks.listNetworksCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listNetworks(cmd))
def list_clusters(apiclient, **kwargs):
"""List all Clusters matching criteria"""
cmd = listClusters.listClustersCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listClusters(cmd))
def list_ssvms(apiclient, **kwargs):
"""List all SSVMs matching criteria"""
cmd = listSystemVms.listSystemVmsCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listSystemVms(cmd))
def list_storage_pools(apiclient, **kwargs):
"""List all storage pools matching criteria"""
cmd = listStoragePools.listStoragePoolsCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listStoragePools(cmd))
def list_virtual_machines(apiclient, **kwargs):
"""List all VMs matching criteria"""
cmd = listVirtualMachines.listVirtualMachinesCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listVirtualMachines(cmd))
def list_hosts(apiclient, **kwargs):
"""List all Hosts matching criteria"""
cmd = listHosts.listHostsCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listHosts(cmd))
def list_configurations(apiclient, **kwargs):
"""List configuration with specified name"""
cmd = listConfigurations.listConfigurationsCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listConfigurations(cmd))
def list_publicIP(apiclient, **kwargs):
"""List all Public IPs matching criteria"""
cmd = listPublicIpAddresses.listPublicIpAddressesCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listPublicIpAddresses(cmd))
def list_nat_rules(apiclient, **kwargs):
"""List all NAT rules matching criteria"""
cmd = listPortForwardingRules.listPortForwardingRulesCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listPortForwardingRules(cmd))
def list_lb_rules(apiclient, **kwargs):
"""List all Load balancing rules matching criteria"""
cmd = listLoadBalancerRules.listLoadBalancerRulesCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listLoadBalancerRules(cmd))
def list_lb_instances(apiclient, **kwargs):
"""List all Load balancing instances matching criteria"""
cmd = listLoadBalancerRuleInstances.listLoadBalancerRuleInstancesCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listLoadBalancerRuleInstances(cmd))
def list_firewall_rules(apiclient, **kwargs):
"""List all Firewall Rules matching criteria"""
cmd = listFirewallRules.listFirewallRulesCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listFirewallRules(cmd))
def list_volumes(apiclient, **kwargs):
"""List all volumes matching criteria"""
cmd = listVolumes.listVolumesCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listVolumes(cmd))
def list_isos(apiclient, **kwargs):
"""Lists all available ISO files."""
cmd = listIsos.listIsosCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listIsos(cmd))
def list_snapshots(apiclient, **kwargs):
"""List all snapshots matching criteria"""
cmd = listSnapshots.listSnapshotsCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listSnapshots(cmd))
def list_templates(apiclient, **kwargs):
"""List all templates matching criteria"""
cmd = listTemplates.listTemplatesCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listTemplates(cmd))
def list_accounts(apiclient, **kwargs):
"""Lists accounts and provides detailed account information for
listed accounts"""
cmd = listAccounts.listAccountsCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listAccounts(cmd))
def list_snapshot_policy(apiclient, **kwargs):
"""Lists snapshot policies."""
cmd = listSnapshotPolicies.listSnapshotPoliciesCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listSnapshotPolicies(cmd))
def list_events(apiclient, **kwargs):
"""Lists events"""
cmd = listEvents.listEventsCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listEvents(cmd))
def list_disk_offering(apiclient, **kwargs):
"""Lists all available disk offerings."""
cmd = listDiskOfferings.listDiskOfferingsCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listDiskOfferings(cmd))
def list_service_offering(apiclient, **kwargs):
"""Lists all available service offerings."""
cmd = listServiceOfferings.listServiceOfferingsCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listServiceOfferings(cmd))
def list_vlan_ipranges(apiclient, **kwargs):
"""Lists all VLAN IP ranges."""
cmd = listVlanIpRanges.listVlanIpRangesCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listVlanIpRanges(cmd))
def list_usage_records(apiclient, **kwargs):
"""Lists usage records for accounts"""
cmd = listUsageRecords.listUsageRecordsCmd()
[setattr(cmd, k, v) for k, v in kwargs.items()]
return(apiclient.listUsageRecords(cmd))

View File

@ -0,0 +1,103 @@
# -*- encoding: utf-8 -*-
#
# Copyright (c) 2012 Citrix. All rights reserved.
#
"""Utilities functions
"""
import time
import remoteSSHClient
from cloudstackAPI import *
import cloudstackConnection
#from cloudstackConnection import cloudConnection
import configGenerator
import logging
import string
import random
def enum(**enums):
"""Generates Enum"""
return type('Enum', (), enums)
def random_gen(size=6, chars=string.ascii_uppercase + string.digits):
"""Generate Random Strings of variable length"""
return ''.join(random.choice(chars) for x in range(size))
def cleanup_resources(api_client, resources):
"""Delete resources"""
for obj in resources:
obj.delete(api_client)
def is_server_ssh_ready(ipaddress, port, username, password, retries=50):
"""Return ssh handle else wait till sshd is running"""
loop_cnt = retries
while True:
try:
ssh = remoteSSHClient.remoteSSHClient(
ipaddress,
port,
username,
password
)
except Exception as e:
if loop_cnt == 0:
raise e
loop_cnt = loop_cnt - 1
time.sleep(60)
else:
return ssh
def format_volume_to_ext3(ssh_client, device="/dev/sda"):
"""Format attached storage to ext3 fs"""
cmds = [
"echo -e 'n\np\n1\n\n\nw' | fdisk %s" % device,
"mkfs.ext3 %s1" % device,
]
for c in cmds:
ssh_client.execute(c)
def fetch_api_client(config_file='datacenterCfg'):
"""Fetch the Cloudstack API Client"""
config = configGenerator.get_setup_config(config_file)
mgt = config.mgtSvr[0]
testClientLogger = logging.getLogger("testClient")
asyncTimeout = 3600
return cloudstackAPIClient.CloudStackAPIClient(
cloudstackConnection.cloudConnection(
mgt.mgtSvrIp,
mgt.port,
mgt.apiKey,
mgt.securityKey,
asyncTimeout,
testClientLogger
)
)
def get_process_status(hostip, port, username, password, linklocalip, process):
"""Double hop and returns a process status"""
#SSH to the machine
ssh = remoteSSHClient.remoteSSHClient(
hostip,
port,
username,
password
)
ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s %s" \
% (linklocalip, process)
# Double hop into router
timeout = 5
# Ensure the SSH login is successful
while True:
res = ssh.execute(ssh_command)[0]
if res != "Host key verification failed.":
break
elif timeout == 0:
break
time.sleep(5)
timeout = timeout - 1
return res