Minor fixes to test cases

This commit is contained in:
Chirag Jog 2012-02-10 09:24:21 -08:00
parent 11d37d1a23
commit f99a65e59b
10 changed files with 696 additions and 499 deletions

View File

@ -61,11 +61,13 @@ class Services:
"bootable": True, # For edit template
"passwordenabled": True,
"ostypeid": 12,
# CentOS 5.3 (64 bit)
"domainid": 1,
"zoneid": 1,
# Optional, if specified the mentioned zone will be
# used for tests
"mode": 'advanced'
# Networking mode: Basic or Advanced
}
@ -150,7 +152,8 @@ class TestISO(cloudstackTestCase):
cls.services["iso_1"]["zoneid"] = cls.zone.id
cls.services["iso_2"]["zoneid"] = cls.zone.id
cls.services["sourcezoneid"] = cls.zone.id
#Create an account, network, VM and IP addresses
#Create an account, ISOs etc.
cls.account = Account.create(
cls.api_client,
cls.services["account"],
@ -168,7 +171,7 @@ class TestISO(cloudstackTestCase):
try:
cls.api_client = fetch_api_client()
#Clean up, terminate the created templates
cleanup_resources(cls.api_client, cls.cleanup)
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
@ -258,7 +261,7 @@ class TestISO(cloudstackTestCase):
# 1. UI should not show the deleted ISP
# 2. database (vm_template table) should not contain deleted ISO
self.iso_1.delete(cls.api_client)
self.iso_1.delete(self.apiclient)
#ListIsos to verify deleted ISO is properly deleted
list_iso_response = list_isos(
@ -375,7 +378,8 @@ class TestISO(cloudstackTestCase):
#Verify ISO is copied to another zone using ListIsos
list_iso_response = list_isos(
self.apiclient,
id=self.iso_2.id
id=self.iso_2.id,
zoneid=self.services["destzoneid"]
)
iso_response = list_iso_response[0]
@ -395,4 +399,10 @@ class TestISO(cloudstackTestCase):
self.services["destzoneid"],
"Check zone ID of the copied ISO"
)
# Cleanup- Delete the copied ISO
cmd = deleteIso.deleteIsoCmd()
cmd.id = iso_response.id
cmd.zoneid = self.services["destzoneid"]
self.apiclient.deleteIso(cmd)
return

View File

@ -27,7 +27,10 @@ class Services:
"zoneid": 1,
# Optional, if specified the mentioned zone will be
# used for tests
"mode": 'advanced', # Networking mode: Basic or advanced
"mode": 'advanced',
# Networking mode: Basic or advanced
"lb_switch_wait": 10,
# Time interval after which LB switches the requests
"network": {
"name": "Test Network",
"displaytext": "Test Network",
@ -37,9 +40,9 @@ class Services:
"name": "Tiny Instance",
"displaytext": "Tiny Instance",
"cpunumber": 1,
"cpuspeed": 100,
"cpuspeed": 200,
# in MHz
"memory": 64,
"memory": 256,
# In MBs
},
"account": {
@ -72,8 +75,8 @@ class Services:
"name": "SSH",
"alg": "roundrobin",
# Algorithm used for load balancing
"privateport": 80,
"publicport": 80,
"privateport": 22,
"publicport": 2222,
}
}
@ -480,9 +483,6 @@ class TestLoadBalancingRule(cloudstackTestCase):
cls.services["server"]
)
cls._cleanup = [
cls.vm_1,
cls.vm_2,
cls.non_src_nat_ip,
cls.account,
cls.service_offering
]
@ -578,7 +578,7 @@ class TestLoadBalancingRule(cloudstackTestCase):
# If Round Robin Algorithm is chosen,
# each ssh command should alternate between VMs
hostnames = [ssh_1.execute("hostname")[0]]
time.sleep(20)
time.sleep(self.services["lb_switch_wait"])
ssh_2 = remoteSSHClient.remoteSSHClient(
src_nat_ip_addr.ipaddress,
self.services['lbrule']["publicport"],
@ -600,6 +600,13 @@ class TestLoadBalancingRule(cloudstackTestCase):
#SSH should pass till there is a last VM associated with LB rule
lb_rule.remove(self.apiclient, [self.vm_2])
ssh_1 = remoteSSHClient.remoteSSHClient(
src_nat_ip_addr.ipaddress,
self.services['lbrule']["publicport"],
self.vm_1.username,
self.vm_1.password
)
hostnames.append(ssh_1.execute("hostname")[0])
self.assertIn(
self.vm_1.name,
@ -609,6 +616,12 @@ class TestLoadBalancingRule(cloudstackTestCase):
lb_rule.remove(self.apiclient, [self.vm_1])
with self.assertRaises(Exception):
ssh_1 = remoteSSHClient.remoteSSHClient(
src_nat_ip_addr.ipaddress,
self.services['lbrule']["publicport"],
self.vm_1.username,
self.vm_1.password
)
ssh_1.execute("hostname")[0]
return
@ -683,7 +696,7 @@ class TestLoadBalancingRule(cloudstackTestCase):
# If Round Robin Algorithm is chosen,
# each ssh command should alternate between VMs
hostnames = [ssh_1.execute("hostname")[0]]
time.sleep(20)
time.sleep(self.services["lb_switch_wait"])
ssh_2 = remoteSSHClient.remoteSSHClient(
self.non_src_nat_ip.ipaddress.ipaddress,
self.services['lbrule']["publicport"],
@ -705,6 +718,13 @@ class TestLoadBalancingRule(cloudstackTestCase):
#SSH should pass till there is a last VM associated with LB rule
lb_rule.remove(self.apiclient, [self.vm_2])
ssh_1 = remoteSSHClient.remoteSSHClient(
self.non_src_nat_ip.ipaddress.ipaddress,
self.services['lbrule']["publicport"],
self.vm_1.username,
self.vm_1.password
)
hostnames.append(ssh_1.execute("hostname")[0])
self.assertIn(
self.vm_1.name,
@ -714,6 +734,12 @@ class TestLoadBalancingRule(cloudstackTestCase):
lb_rule.remove(self.apiclient, [self.vm_1])
with self.assertRaises(Exception):
ssh_1 = remoteSSHClient.remoteSSHClient(
self.non_src_nat_ip.ipaddress.ipaddress,
self.services['lbrule']["publicport"],
self.vm_1.username,
self.vm_1.password
)
ssh_1.execute("hostname")[0]
return
@ -882,9 +908,6 @@ class TestAssignRemoveLB(cloudstackTestCase):
)
self.cleanup = [
self.vm_1,
self.vm_2,
self.vm_3,
self.account,
self.service_offering
]
@ -914,7 +937,6 @@ class TestAssignRemoveLB(cloudstackTestCase):
self.non_src_nat_ip.id,
self.account.account.name
)
self.cleanup.append(lb_rule)
lb_rule.assign(self.apiclient, [self.vm_1, self.vm_2])
#Create SSH client for each VM
ssh_1 = remoteSSHClient.remoteSSHClient(
@ -930,16 +952,11 @@ class TestAssignRemoveLB(cloudstackTestCase):
self.vm_2.username,
self.vm_2.password
)
ssh_3 = remoteSSHClient.remoteSSHClient(
self.non_src_nat_ip.ipaddress,
self.services["lbrule"]["publicport"],
self.vm_3.username,
self.vm_3.password
)
# If Round Robin Algorithm is chosen,
# each ssh command should alternate between VMs
res_1 = ssh_1.execute("hostname")[0]
time.sleep(20)
time.sleep(self.services["lb_switch_wait"])
res_2 = ssh_2.execute("hostname")[0]
self.assertIn(
@ -955,7 +972,13 @@ class TestAssignRemoveLB(cloudstackTestCase):
#Removing VM and assigning another VM to LB rule
lb_rule.remove(self.apiclient, [self.vm_2])
# Again make a SSH connection, as previous is not used after LB remove
ssh_1 = remoteSSHClient.remoteSSHClient(
self.non_src_nat_ip.ipaddress,
self.services["lbrule"]["publicport"],
self.vm_1.username,
self.vm_1.password
)
res_1 = ssh_1.execute("hostname")[0]
self.assertIn(
@ -966,8 +989,20 @@ class TestAssignRemoveLB(cloudstackTestCase):
lb_rule.assign(self.apiclient, [self.vm_3])
ssh_1 = remoteSSHClient.remoteSSHClient(
self.non_src_nat_ip.ipaddress,
self.services["lbrule"]["publicport"],
self.vm_1.username,
self.vm_1.password
)
ssh_3 = remoteSSHClient.remoteSSHClient(
self.non_src_nat_ip.ipaddress,
self.services["lbrule"]["publicport"],
self.vm_3.username,
self.vm_3.password
)
res_1 = ssh_1.execute("hostname")[0]
time.sleep(20)
time.sleep(self.services["lb_switch_wait"])
res_3 = ssh_3.execute("hostname")[0]
self.assertIn(
@ -982,7 +1017,7 @@ class TestAssignRemoveLB(cloudstackTestCase):
)
return
def teardown(self):
def tearDown(self):
cleanup_resources(self.apiclient, self.cleanup)
return
@ -1054,7 +1089,7 @@ class TestReleaseIP(cloudstackTestCase):
]
return
def teardown(self):
def tearDown(self):
cleanup_resources(self.apiclient, self.cleanup)
def test_releaseIP(self):
@ -1177,8 +1212,12 @@ class TestDeleteAccount(cloudstackTestCase):
# 3. The domR should have been expunged for this account
self.account.delete(self.apiclient)
interval = list_configurations(
self.apiclient,
name='account.cleanup.interval'
)
# Sleep to ensure that all resources are deleted
time.sleep(120)
time.sleep(int(interval[0].value))
# ListLoadBalancerRules should not list
# associated rules with deleted account

View File

@ -110,7 +110,240 @@ class TestRouterServices(cloudstackTestCase):
self.apiclient = self.testClient.getApiClient()
return
def test_01_router_basic(self):
def test_01_router_internal_basic(self):
"""Test router internal basic zone
"""
# Validate the following
# 1. Router only does dhcp
# 2. Verify that ports 67 (DHCP) and 53 (DNS) are open on UDP
# by checking status of dnsmasq process
# Find router associated with user account
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
router = list_router_response[0]
hosts = list_hosts(
self.apiclient,
zoneid=router.zoneid,
type='Routing',
state='Up'
)
host = hosts[0]
# Sleep to ensure that router is in ready state before double hop
time.sleep(200)
result = get_process_status(
host.ipaddress,
self.services['virtual_machine']["publicport"],
self.vm_1.username,
self.vm_1.password,
router.linklocalip,
"service dnsmasq status"
)
res = str(result)
self.assertEqual(
res.count("running"),
1,
"Check dnsmasq service is running or not"
)
return
def test_02_router_internal_adv(self):
"""Test router internal advanced zone
"""
# Validate the following
# 1. Router does dhcp, dns, gateway, LB, PF, FW
# 2. verify that dhcp, dns ports are open on UDP
# 3. dnsmasq, haproxy processes should be running
# Find router associated with user account
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
router = list_router_response[0]
hosts = list_hosts(
self.apiclient,
zoneid=router.zoneid,
type='Routing',
state='Up'
)
host = hosts[0]
# Sleep to ensure that router is in ready state before double hop
time.sleep(200)
result = get_process_status(
host.ipaddress,
self.services['virtual_machine']["publicport"],
self.vm_1.username,
self.vm_1.password,
router.linklocalip,
"service dnsmasq status"
)
res = str(result)
self.assertEqual(
res.count("running"),
1,
"Check dnsmasq service is running or not"
)
result = get_process_status(
host.ipaddress,
self.services['virtual_machine']["publicport"],
self.vm_1.username,
self.vm_1.password,
router.linklocalip,
"service haproxy status"
)
res = str(result)
self.assertEqual(
res.count("running"),
1,
"Check haproxy service is running or not"
)
return
def test_03_restart_network_cleanup(self):
"""Test restart network
"""
# Validate the following
# 1. When cleanup = true, router is destroyed and a new one created
# 2. New router will have new publicIp and linkLocalIp and
# all it's services should resume
# Find router associated with user account
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
router = list_router_response[0]
#Store old values before restart
old_linklocalip = router.linklocalip
timeout = 10
# Network should be in Implemented or Setup stage before restart
while True:
networks = list_networks(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
network = networks[0]
if network.state in ["Implemented", "Setup"]:
break
elif timeout == 0:
break
else:
time.sleep(60)
timeout = timeout - 1
cmd = restartNetwork.restartNetworkCmd()
cmd.id = network.id
cmd.cleanup = True
self.apiclient.restartNetwork(cmd)
# Get router details after restart
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
router = list_router_response[0]
self.assertNotEqual(
router.linklocalip,
old_linklocalip,
"Check link-local IP after restart"
)
return
def test_04_restart_network_wo_cleanup(self):
"""Test restart network without cleanup
"""
# Validate the following
# 1. When cleanup = false, router is restarted and
# all services inside the router are restarted
# 2. check 'uptime' to see if the actual restart happened
timeout = 10
# Network should be in Implemented or Setup stage before restart
while True:
networks = list_networks(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
network = networks[0]
if network.state in ["Implemented", "Setup"]:
break
elif timeout == 0:
break
else:
time.sleep(60)
timeout = timeout - 1
cmd = restartNetwork.restartNetworkCmd()
cmd.id = network.id
cmd.cleanup = False
self.apiclient.restartNetwork(cmd)
# Get router details after restart
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
router = list_router_response[0]
hosts = list_hosts(
self.apiclient,
zoneid=router.zoneid,
type='Routing',
state='Up'
)
host = hosts[0]
res = get_process_status(
host.ipaddress,
self.services['virtual_machine']["publicport"],
self.vm_1.username,
self.vm_1.password,
router.linklocalip,
"uptime"
)
# res = 12:37:14 up 1 min, 0 users, load average: 0.61, 0.22, 0.08
# Split result to check the uptime
result = res[0].split()
self.assertEqual(
str(result[1]),
'up',
"Check router is running or not"
)
if str(result[3]) == "min,":
self.assertEqual(
(int(result[2]) < 3),
True,
"Check uptime is less than 3 mins or not"
)
else:
self.assertEqual(
str(result[3]),
'sec,',
"Check uptime is in seconds"
)
return
def test_05_router_basic(self):
"""Test router basic setup
"""
@ -166,7 +399,7 @@ class TestRouterServices(cloudstackTestCase):
)
return
def test_02_router_advanced(self):
def test_06_router_advanced(self):
"""Test router advanced setup
"""
@ -234,7 +467,7 @@ class TestRouterServices(cloudstackTestCase):
)
return
def test_03_stop_router(self):
def test_07_stop_router(self):
"""Test stop router
"""
@ -267,7 +500,7 @@ class TestRouterServices(cloudstackTestCase):
)
return
def test_04_start_router(self):
def test_08_start_router(self):
"""Test start router
"""
@ -300,7 +533,7 @@ class TestRouterServices(cloudstackTestCase):
)
return
def test_05_reboot_router(self):
def test_09_reboot_router(self):
"""Test reboot router
"""
@ -341,7 +574,7 @@ class TestRouterServices(cloudstackTestCase):
)
return
def test_06_network_gc(self):
def test_10_network_gc(self):
"""Test network GC
"""
@ -376,7 +609,7 @@ class TestRouterServices(cloudstackTestCase):
response = config[0]
# Wait for network.gc.interval * 2 time
# Wait for network.gc.interval * 3 time
time.sleep(int(response.value) * 3)
#Check status of network router
@ -391,211 +624,6 @@ class TestRouterServices(cloudstackTestCase):
router.state,
'Stopped',
"Check state of the router after stopping all VMs associated"
)
)
return
def test_07_router_internal_basic(self):
"""Test router internal basic zone
"""
# Validate the following
# 1. Router only does dhcp
# 2. Verify that ports 67 (DHCP) and 53 (DNS) are open on UDP
# by checking status of dnsmasq process
# Find router associated with user account
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
router = list_router_response[0]
hosts = list_hosts(
self.apiclient,
zoneid=router.zoneid,
type='Routing',
state='Up'
)
host = hosts[0]
result = get_process_status(
host.ipaddress,
self.services['virtual_machine']["publicport"],
self.vm_1.username,
self.vm_1.password,
router.linklocalip,
"service dnsmasq status"
)
self.assertEqual(
result.count("running"),
1,
"Check dnsmasq service is running or not"
)
return
def test_08_router_internal_adv(self):
"""Test router internal advanced zone
"""
# Validate the following
# 1. Router does dhcp, dns, gateway, LB, PF, FW
# 2. verify that dhcp, dns ports are open on UDP
# 3. dnsmasq, haproxy processes should be running
# Find router associated with user account
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
router = list_router_response[0]
hosts = list_hosts(
self.apiclient,
zoneid=router.zoneid,
type='Routing',
state='Up'
)
host = hosts[0]
result = get_process_status(
host.ipaddress,
self.services['virtual_machine']["publicport"],
self.vm_1.username,
self.vm_1.password,
router.linklocalip,
"service dnsmasq status"
)
self.assertEqual(
result.count("running"),
1,
"Check dnsmasq service is running or not"
)
result = get_process_status(
host.ipaddress,
self.services['virtual_machine']["publicport"],
self.vm_1.username,
self.vm_1.password,
router.linklocalip,
"service haproxy status"
)
self.assertEqual(
result.count("running"),
1,
"Check haproxy service is running or not"
)
return
def test_09_restart_network_cleanup(self):
"""Test restart network
"""
# Validate the following
# 1. When cleanup = true, router is destroyed and a new one created
# 2. New router will have new publicIp and linkLocalIp and
# all it's services should resume
# Find router associated with user account
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
router = list_router_response[0]
#Store old values before restart
old_linklocalip = router.linklocalip
networks = list_networks(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
network = networks[0]
cmd = restartNetwork.restartNetworkCmd()
cmd.id = network.id
cmd.cleanup = True
self.apiclient.restartNetwork(cmd)
# Get router details after restart
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
router = list_router_response[0]
self.assertNotEqual(
router.linklocalip,
old_linklocalip,
"Check linklocal IP after restart"
)
return
def test_10_restart_network_wo_cleanup(self):
"""Test restart network without cleanup
"""
# Validate the following
# 1. When cleanup = false, router is restarted and
# all services inside the router are restarted
# 2. check 'uptime' to see if the actual restart happened
networks = list_networks(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
network = networks[0]
cmd = restartNetwork.restartNetworkCmd()
cmd.id = network.id
cmd.cleanup = False
self.apiclient.restartNetwork(cmd)
# Get router details after restart
list_router_response = list_routers(
self.apiclient,
account=self.account.account.name,
domainid=self.account.account.domainid
)
router = list_router_response[0]
hosts = list_hosts(
self.apiclient,
zoneid=router.zoneid,
type='Routing',
state='Up'
)
host = hosts[0]
res = get_process_status(
host.ipaddress,
self.services['virtual_machine']["publicport"],
self.vm_1.username,
self.vm_1.password,
router.linklocalip,
"uptime"
)
# res = 12:37:14 up 1 min, 0 users, load average: 0.61, 0.22, 0.08
# Split result to check the uptime
result = res.split()
self.assertEqual(
str(result[1]),
'up',
"Check router is running or not"
)
if str(result[3]) == "min,":
self.assertEqual(
(int(result[2]) < 3),
True,
"Check uptime is less than 3 mins or not"
)
else:
self.assertEqual(
str(result[3]),
'sec,',
"Check uptime is in seconds"
)
return

View File

@ -31,7 +31,7 @@ class Services:
"name": "Tiny Instance",
"displaytext": "Tiny Instance",
"cpunumber": 1,
"cpuspeed": 200, # in MHz
"cpuspeed": 100, # in MHz
"memory": 64, # In MBs
},
"disk_offering": {
@ -41,7 +41,7 @@ class Services:
},
"server_with_disk":
{
"displayname": "testserver",
"displayname": "Test VM -With Disk",
"username": "root",
"password": "password",
"ssh_port": 22,
@ -54,7 +54,7 @@ class Services:
"server_without_disk":
{
"displayname": "testserver",
"displayname": "Test VM-No Disk",
"username": "root",
"password": "password",
"ssh_port": 22,
@ -85,18 +85,23 @@ class Services:
},
"ostypeid": 12,
# Cent OS 5.3 (64 bit)
"diskdevice": "/dev/xvda",
"diskdevice": "/dev/xvdb", # Data Disk
"rootdisk": "/dev/xvda", # Root Disk
"diskname": "Test Disk",
"size": 1, # GBs
"domainid": 1,
"mount_dir": "/mnt/tmp",
"sub_dir": "test",
"sub_lvl_dir1": "test1",
"sub_lvl_dir2": "test2",
"random_data": "random.data",
"exportpath": 'SecondaryStorage',
"sec_storage": '192.168.100.131',
# IP address of Sec storage where snapshots are stored
# IP of Sec storage where snapshots are stored
"exportpath": 'SecondaryStorage',
#Export path of secondary storage
"username": "root",
"password": "password",
"ssh_port": 22,
@ -104,9 +109,179 @@ class Services:
# Optional, if specified the mentioned zone will be
# used for tests
"sleep":60,
"mode": 'advanced', # Networking mode, Advanced, Basic
"mode": 'advanced',
# Networking mode, Advanced, Basic
}
class TestSnapshotRootDisk(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = fetch_api_client()
cls.services = Services().services
# Get Zone, Domain and templates
cls.zone = get_zone(cls.api_client, cls.services)
template = get_template(
cls.api_client,
cls.zone.id,
cls.services["ostypeid"]
)
cls.services["server_without_disk"]["zoneid"] = cls.zone.id
cls.services["template"] = template.id
cls.services["zoneid"] = cls.zone.id
# Create VMs, NAT Rules etc
cls.account = Account.create(
cls.api_client,
cls.services["account"]
)
cls.services["account"] = cls.account.account.name
cls.service_offering = ServiceOffering.create(
cls.api_client,
cls.services["service_offering"]
)
cls.virtual_machine = cls.virtual_machine_with_disk = \
VirtualMachine.create(
cls.api_client,
cls.services["server_without_disk"],
templateid=template.id,
accountid=cls.account.account.name,
serviceofferingid=cls.service_offering.id,
mode=cls.services["mode"]
)
cls._cleanup = [
cls.service_offering,
cls.account,
]
return
@classmethod
def tearDownClass(cls):
try:
#Cleanup resources used
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = []
return
def tearDown(self):
try:
#Clean up, terminate the created instance, volumes and snapshots
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def test_01_snapshot_root_disk(self):
"""Test Snapshot Root Disk
"""
# Validate the following
# 1. listSnapshots should list the snapshot that was created.
# 2. verify that secondary storage NFS share contains
# the reqd volume under
# /secondary/snapshots//$account_id/$volumeid/$snapshot_uuid
# 3. verify backup_snap_id was non null in the `snapshots` table
volumes = list_volumes(
self.apiclient,
virtualmachineid=self.virtual_machine_with_disk.id,
type='ROOT'
)
snapshot = Snapshot.create(self.apiclient, volumes[0].id)
self.cleanup.append(snapshot)
snapshots = list_snapshots(
self.apiclient,
id=snapshot.id
)
self.assertNotEqual(
snapshots,
None,
"Check if result exists in list item call"
)
self.assertEqual(
snapshots[0].id,
snapshot.id,
"Check resource id in list resources call"
)
self.debug(
"select backup_snap_id, account_id, volume_id from snapshots where id = %s;" \
% snapshot.id
)
qresultset = self.dbclient.execute(
"select backup_snap_id, account_id, volume_id from snapshots where id = %s;" \
% snapshot.id
)
self.assertNotEqual(
len(qresultset),
0,
"Check DB Query result set"
)
qresult = qresultset[0]
snapshot_uuid = qresult[0] # backup_snap_id = snapshot UUID
account_id = qresult[1]
volume_id = qresult[2]
self.assertNotEqual(
str(snapshot_uuid),
'NULL',
"Check if backup_snap_id is not null"
)
# Sleep to ensure that snapshot is reflected in sec storage
time.sleep(self.services["sleep"])
# Login to VM to check snapshot present on sec disk
ssh_client = self.virtual_machine_with_disk.get_ssh_client()
cmds = [ "mkdir -p %s" % self.services["mount_dir"],
"mount %s:/%s %s" % (
self.services["sec_storage"],
self.services["exportpath"],
self.services["mount_dir"]
),
"ls %s/snapshots/%s/%s" % (
self.services["mount_dir"],
account_id,
volume_id
),
]
for c in cmds:
result = ssh_client.execute(c)
res = str(result)
# Check snapshot UUID in secondary storage and database
self.assertEqual(
res.count(snapshot_uuid),
1,
"Check snapshot UUID in secondary storage and database"
)
# Unmount the Sec Storage
cmds = [
"umount %s" % (self.services["mount_dir"]),
]
for c in cmds:
result = ssh_client.execute(c)
return
class TestSnapshots(cloudstackTestCase):
@classmethod
@ -193,105 +368,6 @@ class TestSnapshots(cloudstackTestCase):
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def test_01_snapshot_root_disk(self):
"""Test Snapshot Root Disk
"""
# Validate the following
# 1. listSnapshots should list the snapshot that was created.
# 2. verify that secondary storage NFS share contains
# the reqd volume under
# /secondary/snapshots/$volumeid/$snapshot_uuid
# 3. verify backup_snap_id was non null in the `snapshots` table
volumes = list_volumes(
self.apiclient,
virtualmachineid=self.virtual_machine_with_disk.id,
type='ROOT'
)
snapshot = Snapshot.create(self.apiclient, volumes[0].id)
self.cleanup.append(snapshot)
snapshots = list_snapshots(
self.apiclient,
id=snapshot.id
)
self.assertNotEqual(
snapshots,
None,
"Check if result exists in list item call"
)
self.assertEqual(
snapshots[0].id,
snapshot.id,
"Check resource id in list resources call"
)
self.debug(
"select backup_snap_id, account_id, volume_id from snapshots where id = %s;" \
% snapshot.id
)
qresultset = self.dbclient.execute(
"select backup_snap_id, account_id, volume_id from snapshots where id = %s;" \
% snapshot.id
)
self.assertNotEqual(
len(qresultset),
0,
"Check DB Query result set"
)
qresult = qresultset[0]
snapshot_uuid = qresult[0] # backup_snap_id = snapshot UUID
account_id = qresult[1]
volume_id = qresult[2]
self.assertNotEqual(
str(snapshot_uuid),
'NULL',
"Check if backup_snap_id is not null"
)
# Sleep to ensure that snapshot is reflected in sec storage
time.sleep(self.services["sleep"])
# Login to VM to check snapshot present on sec disk
ssh_client = self.virtual_machine_without_disk.get_ssh_client()
cmds = [ "mkdir -p %s" % self.services["mount_dir"],
"mount %s:/%s %s" % (
self.services["sec_storage"],
self.services["exportpath"],
self.services["mount_dir"]
),
"ls %s/snapshots/%s/%s" % (
self.services["mount_dir"],
account_id,
volume_id
),
]
for c in cmds:
result = ssh_client.execute(c)
res = str(result)
# Check snapshot UUID in secondary storage and database
self.assertEqual(
res.count(snapshot_uuid),
1,
"Check snapshot UUID in secondary storage and database"
)
# Unmount the Sec Storage
cmds = [
"umount %s" % (self.services["mount_dir"]),
]
for c in cmds:
result = ssh_client.execute(c)
return
def test_02_snapshot_data_disk(self):
"""Test Snapshot Data Disk
"""
@ -332,7 +408,6 @@ class TestSnapshots(cloudstackTestCase):
"Check DB Query result set"
)
qresult = qresultset[0]
snapshot_uuid = qresult[0] # backup_snap_id = snapshot UUID
account_id = qresult[1]
@ -394,29 +469,32 @@ class TestSnapshots(cloudstackTestCase):
ssh_client,
self.services["diskdevice"]
)
cmds = [ "mkdir -p %s" % self.services["mount_dir"],
cmds = [
"mkdir -p %s" % self.services["mount_dir"],
"mount %s1 %s" % (
self.services["diskdevice"],
self.services["mount_dir"]
),
"pushd %s" % self.services["mount_dir"],
"mkdir -p %s/{%s,%s} " % (
"mkdir -p %s/%s/{%s,%s} " % (
self.services["mount_dir"],
self.services["sub_dir"],
self.services["sub_lvl_dir1"],
self.services["sub_lvl_dir2"]
),
"echo %s > %s/%s/%s" % (
"echo %s > %s/%s/%s/%s" % (
random_data_0,
self.services["mount_dir"],
self.services["sub_dir"],
self.services["sub_lvl_dir1"],
self.services["random_data"]
),
"echo %s > %s/%s/%s" % (
"echo %s > %s/%s/%s/%s" % (
random_data_1,
self.services["mount_dir"],
self.services["sub_dir"],
self.services["sub_lvl_dir2"],
self.services["random_data"]
)
),
]
for c in cmds:
ssh_client.execute(c)
@ -426,17 +504,22 @@ class TestSnapshots(cloudstackTestCase):
"umount %s" % (self.services["mount_dir"]),
]
for c in cmds:
result = ssh_client.execute(c)
ssh_client.execute(c)
list_volume_response = list_volumes(
self.apiclient,
hostid=self.virtual_machine.id,
type='DATADISK'
)
self.apiclient,
virtualmachineid=self.virtual_machine.id,
type='DATADISK'
)
volume = list_volume_response[0]
volume_response = list_volume_response[0]
#Create snapshot from attached volume
snapshot = Snapshot.create(self.apiclient, volume.id)
snapshot = Snapshot.create(
self.apiclient,
volume_response.id,
account=self.account.account.name,
domainid=self.account.account.domainid
)
self.cleanup.append(snapshot)
#Create volume from snapshot
volume = Volume.create_from_snapshot(
@ -444,7 +527,6 @@ class TestSnapshots(cloudstackTestCase):
snapshot.id,
self.services
)
self.cleanup.append(volume)
volumes = list_volumes(
self.apiclient,
@ -468,7 +550,7 @@ class TestSnapshots(cloudstackTestCase):
cmd = attachVolume.attachVolumeCmd()
cmd.id = volume.id
cmd.virtualmachineid = new_virtual_machine.id
volume = self.apiclient.attachVolume(cmd)
self.apiclient.attachVolume(cmd)
#Login to VM to verify test directories and files
ssh = new_virtual_machine.get_ssh_client()
@ -477,18 +559,20 @@ class TestSnapshots(cloudstackTestCase):
"mount %s1 %s" % (
self.services["diskdevice"],
self.services["mount_dir"]
)
),
]
for c in cmds:
ssh.execute(c)
returned_data_0 = ssh.execute("cat %s/%s/%s" % (
returned_data_0 = ssh.execute("cat %s/%s/%s/%s" % (
self.services["mount_dir"],
self.services["sub_dir"],
self.services["sub_lvl_dir1"],
self.services["random_data"]
))
returned_data_1 = ssh.execute("cat %s/%s/%s" % (
returned_data_1 = ssh.execute("cat %s/%s/%s/%s" % (
self.services["mount_dir"],
self.services["sub_dir"],
self.services["sub_lvl_dir2"],
self.services["random_data"]
@ -510,19 +594,19 @@ class TestSnapshots(cloudstackTestCase):
]
for c in cmds:
result = ssh_client.execute(c)
#detach volume for cleanup
cmd = detachVolume.detachVolumeCmd()
cmd.id = volume.id
self.apiclient.detachVolume(cmd)
return
def test_04_delete_snapshot(self):
"""Test Delete Snapshot
"""
#1. Snapshot the Volume
#2. Delete the snapshot
#3. Verify snapshot is removed by calling List Snapshots API
volumes = list_volumes(
self.apiclient,
hostid=self.virtual_machine.id,
virtualmachineid=self.virtual_machine.id,
type='DATADISK'
)
@ -648,8 +732,8 @@ class TestSnapshots(cloudstackTestCase):
"Check interval type in list resources call"
)
# Sleep for (maxsnaps+1) hours to
# verify only maxsnaps snapshots are retained
# Sleep for (maxsnaps) hours to verify only maxsnaps snapshots are
# retained
time.sleep(
(self.services["recurring_snapshot"]["maxsnaps"]) * 3600
)
@ -688,23 +772,25 @@ class TestSnapshots(cloudstackTestCase):
cmds = [ "mkdir -p %s" % self.services["mount_dir"],
"mount %s1 %s" % (
self.services["diskdevice"],
self.services["rootdisk"],
self.services["mount_dir"]
),
"pushd %s" % self.services["mount_dir"],
"mkdir -p %s/{%s,%s} " % (
"mkdir -p %s/%s/{%s,%s} " % (
self.services["mount_dir"],
self.services["sub_dir"],
self.services["sub_lvl_dir1"],
self.services["sub_lvl_dir2"]
),
"echo %s > %s/%s/%s" % (
"echo %s > %s/%s/%s/%s" % (
random_data_0,
self.services["mount_dir"],
self.services["sub_dir"],
self.services["sub_lvl_dir1"],
self.services["random_data"]
),
"echo %s > %s/%s/%s" % (
"echo %s > %s/%s/%s/%s" % (
random_data_1,
self.services["mount_dir"],
self.services["sub_dir"],
self.services["sub_lvl_dir2"],
self.services["random_data"]
@ -714,7 +800,7 @@ class TestSnapshots(cloudstackTestCase):
for c in cmds:
ssh_client.execute(c)
# Unmount the Sec Storage
# Unmount the Volume
cmds = [
"umount %s" % (self.services["mount_dir"]),
]
@ -738,7 +824,7 @@ class TestSnapshots(cloudstackTestCase):
snapshot,
self.services["templates"]
)
# Verify created template
templates = list_templates(
self.apiclient,
templatefilter=\
@ -773,7 +859,7 @@ class TestSnapshots(cloudstackTestCase):
cmds = [
"mkdir -p %s" % self.services["mount_dir"],
"mount %s1 %s" % (
self.services["diskdevice"],
self.services["rootdisk"],
self.services["mount_dir"]
)
]
@ -781,12 +867,14 @@ class TestSnapshots(cloudstackTestCase):
for c in cmds:
ssh.execute(c)
returned_data_0 = ssh.execute("cat %s/%s/%s" % (
returned_data_0 = ssh.execute("cat %s/%s/%s/%s" % (
self.services["mount_dir"],
self.services["sub_dir"],
self.services["sub_lvl_dir1"],
self.services["random_data"]
))
returned_data_1 = ssh.execute("cat %s/%s/%s" % (
returned_data_1 = ssh.execute("cat %s/%s/%s/%s" % (
self.services["mount_dir"],
self.services["sub_dir"],
self.services["sub_lvl_dir2"],
self.services["random_data"]
@ -802,7 +890,7 @@ class TestSnapshots(cloudstackTestCase):
returned_data_1[0],
"Verify newly attached volume contents with existing one"
)
# Unmount the Sec Storage
# Unmount the volume
cmds = [
"umount %s" % (self.services["mount_dir"]),
]

View File

@ -82,7 +82,7 @@ class TestSSVMs(cloudstackTestCase):
)
list_zones_response = list_zones(self.apiclient)
# Number of Sec storage VMs = No of Zones
self.assertEqual(
len(list_ssvm_response),
len(list_zones_response),
@ -174,7 +174,7 @@ class TestSSVMs(cloudstackTestCase):
"Check list System VMs response"
)
list_zones_response = list_zones(self.apiclient)
# Number of Console Proxy VMs = No of Zones
self.assertEqual(
len(list_cpvm_response),
len(list_zones_response),
@ -290,7 +290,6 @@ class TestSSVMs(cloudstackTestCase):
)
#Check status of cloud service
result = get_process_status(
host.ipaddress,
self.services['host']["publicport"],
@ -302,7 +301,7 @@ class TestSSVMs(cloudstackTestCase):
res = str(result)
# cloud.com service (type=secstorage) is running: process id: 2346
self.assertEqual(
res.count("running"),
res.count("is running"),
1,
"Check cloud service is running or not"
)
@ -357,7 +356,7 @@ class TestSSVMs(cloudstackTestCase):
)
res = str(result)
self.assertEqual(
res.count("running"),
res.count("is running"),
1,
"Check cloud service is running or not"
)

View File

@ -159,9 +159,8 @@ class TestCreateTemplate(cloudstackTestCase):
cls.volume = list_volume[0]
cls._cleanup = [
cls.virtual_machine,
cls.service_offering,
cls.account,
cls.service_offering,
cls.disk_offering,
]
return
@ -306,7 +305,6 @@ class TestTemplates(cloudstackTestCase):
)
cls._cleanup = [
cls.template_2,
cls.virtual_machine,
cls.service_offering,
cls.disk_offering,
cls.account,
@ -533,7 +531,8 @@ class TestTemplates(cloudstackTestCase):
self.apiclient,
templatefilter=\
self.services["templatefilter"],
id=self.template_2.id
id=self.template_2.id,
zoneid=self.services["destzoneid"]
)
self.assertNotEqual(
len(list_template_response),
@ -552,6 +551,12 @@ class TestTemplates(cloudstackTestCase):
self.services["destzoneid"],
"Check zone ID of the copied template"
)
# Cleanup- Delete the copied template
cmd = deleteTemplate.deleteTemplateCmd()
cmd.id = template_response.id
cmd.zoneid = self.services["destzoneid"]
self.apiclient.deleteTemplate(cmd)
return
def test_07_list_public_templates(self):

View File

@ -102,6 +102,7 @@ class Services:
"mode": 'HTTP_DOWNLOAD', # Downloading existing ISO
},
"diskdevice": '/dev/xvdd',
# Disk device where ISO is attached to instance
"mount_dir": "/mnt/tmp",
"hostid": 5,
#Migrate VM to hostid
@ -111,6 +112,7 @@ class Services:
# Optional, if specified the mentioned zone will be
# used for tests
"mode":'advanced',
# Networking mode: Basic or Advanced
}
class TestDeployVM(cloudstackTestCase):
@ -314,7 +316,6 @@ class TestVMLifeCycle(cloudstackTestCase):
"Stopped",
"Check virtual machine is in stopped state"
)
return
def test_02_start_vm(self):
@ -345,19 +346,6 @@ class TestVMLifeCycle(cloudstackTestCase):
"Running",
"Check virtual machine is in running state"
)
self.debug(
"Verify SSH Access for virtual machine: %s" \
% self.small_virtual_machine.id
)
# SSH to check whether VM is Up and Running
try:
self.small_virtual_machine.get_ssh_client()
except Exception as e:
self.fail(
"SSH Access failed for %s: %s" \
% (self.small_virtual_machine.ipaddress, e)
)
return
def test_03_reboot_vm(self):
@ -389,66 +377,7 @@ class TestVMLifeCycle(cloudstackTestCase):
)
return
def test_04_change_offering_medium(self):
"""Change Offering to a medium capacity
"""
# Validate the following
# 1. Log in to the Vm .We should see that the CPU and memory Info of
# this Vm matches the one specified for "Medium" service offering.
# 2. Using listVM command verify that this Vm
# has Medium service offering Id.
self.small_virtual_machine.stop(self.apiclient)
cmd = changeServiceForVirtualMachine.changeServiceForVirtualMachineCmd()
cmd.id = self.small_virtual_machine.id
cmd.serviceofferingid = self.medium_offering.id
self.apiclient.changeServiceForVirtualMachine(cmd)
self.small_virtual_machine.start(self.apiclient)
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.small_virtual_machine.id
)
# Sleep to ensure that VM is started properly
time.sleep(60)
try:
ssh = self.small_virtual_machine.get_ssh_client()
except Exception as e:
self.fail(
"SSH Access failed for %s: %s" % \
(self.small_virtual_machine.ipaddress, e)
)
cpuinfo = ssh.execute("cat /proc/cpuinfo")
cpu_cnt = len([i for i in cpuinfo if "processor" in i])
#'cpu MHz\t\t: 2660.499'
cpu_speed = [i for i in cpuinfo if "cpu MHz" in i][0].split()[3]
meminfo = ssh.execute("cat /proc/meminfo")
#MemTotal: 1017464 kB
total_mem = [i for i in meminfo if "MemTotal" in i][0].split()[1]
self.assertEqual(
cpu_cnt,
self.medium_offering.cpunumber,
"Check CPU Count for medium offering"
)
self.assertEqual(
list_vm_response[0].cpuspeed,
self.medium_offering.cpuspeed,
"Check CPU Speed for medium offering"
)
self.assertEqual(
total_mem,
self.medium_offering.memory,
"Check Memory(kb) for medium offering"
)
def test_05_change_offering_small(self):
def test_04_change_offering_small(self):
"""Change Offering to a small capacity
"""
@ -510,6 +439,67 @@ class TestVMLifeCycle(cloudstackTestCase):
self.cleanup.append(self.medium_virtual_machine)
return
def test_05_change_offering_medium(self):
"""Change Offering to a medium capacity
"""
# Validate the following
# 1. Log in to the Vm .We should see that the CPU and memory Info of
# this Vm matches the one specified for "Medium" service offering.
# 2. Using listVM command verify that this Vm
# has Medium service offering Id.
# Sleep to ensure that VM is in proper state
time.sleep(120)
self.small_virtual_machine.stop(self.apiclient)
cmd = changeServiceForVirtualMachine.changeServiceForVirtualMachineCmd()
cmd.id = self.small_virtual_machine.id
cmd.serviceofferingid = self.medium_offering.id
self.apiclient.changeServiceForVirtualMachine(cmd)
self.small_virtual_machine.start(self.apiclient)
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.small_virtual_machine.id
)
# Sleep to ensure that VM is started properly
time.sleep(120)
try:
ssh_client = self.small_virtual_machine.get_ssh_client()
except Exception as e:
self.fail(
"SSH Access failed for %s: %s" % \
(self.small_virtual_machine.ipaddress, e)
)
cpuinfo = ssh_client.execute("cat /proc/cpuinfo")
cpu_cnt = len([i for i in cpuinfo if "processor" in i])
#'cpu MHz\t\t: 2660.499'
cpu_speed = [i for i in cpuinfo if "cpu MHz" in i][0].split()[3]
meminfo = ssh_client.execute("cat /proc/meminfo")
#MemTotal: 1017464 kB
total_mem = [i for i in meminfo if "MemTotal" in i][0].split()[1]
self.assertEqual(
cpu_cnt,
self.medium_offering.cpunumber,
"Check CPU Count for medium offering"
)
self.assertEqual(
list_vm_response[0].cpuspeed,
self.medium_offering.cpuspeed,
"Check CPU Speed for medium offering"
)
self.assertEqual(
total_mem,
self.medium_offering.memory,
"Check Memory(kb) for medium offering"
)
return
def test_06_destroy_vm(self):
"""Test destroy Virtual Machine
@ -581,16 +571,14 @@ class TestVMLifeCycle(cloudstackTestCase):
# should be "Running" and the host should be the host
# to which the VM was migrated to
self.small_virtual_machine.start(self.apiclient)
cmd = migrateVirtualMachine.migrateVirtualMachineCmd()
cmd.hostid = self.services["hostid"]
cmd.virtualmachineid = self.small_virtual_machine.id
cmd.virtualmachineid = self.medium_virtual_machine.id
self.apiclient.migrateVirtualMachine(cmd)
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.small_virtual_machine.id
id=self.medium_virtual_machine.id
)
self.assertNotEqual(
list_vm_response,
@ -602,7 +590,7 @@ class TestVMLifeCycle(cloudstackTestCase):
self.assertEqual(
vm_response.id,
self.small_virtual_machine.id,
self.medium_virtual_machine.id,
"Check virtual machine ID of migrated VM"
)
@ -682,14 +670,21 @@ class TestVMLifeCycle(cloudstackTestCase):
#Disk /dev/xvdd: 4393 MB, 4393723904 bytes
# Res may contain more than one strings depending on environment
# Split res with space as delimiter to form new list (result)
# Split strings to form new list which is used for assertion on ISO size
result = []
for i in res:
for k in i.split():
result.append(k)
# Get ISO size
iso_response = list_isos(
self.apiclient,
id=iso.id
)
iso_size = iso_response[0].size
self.assertEqual(
str(iso.size) in result,
str(iso_size) in result,
True,
"Check size of the attached ISO"
)

View File

@ -61,7 +61,7 @@ class Services:
"privateport": 22,
"publicport": 22,
"protocol": 'TCP',
"diskdevice": "/dev/xvda",
"diskdevice": "/dev/xvdb",
"ostypeid": 12,
"zoneid": 1,
# Optional, if specified the mentioned zone will be
@ -131,6 +131,11 @@ class TestCreateVolume(cloudstackTestCase):
def test_01_create_volume(self):
"""Test Volume creation for all Disk Offerings (incl. custom)
"""
# Validate the following
# 1. Create volumes from the different sizes
# 2. Verify the size of volume with acrual size allocated
self.volumes = []
for k, v in self.services["volume_offerings"].items():
volume = Volume.create(
@ -173,18 +178,12 @@ class TestCreateVolume(cloudstackTestCase):
ssh = self.virtual_machine.get_ssh_client(
reconnect=True
)
c = "fdisk -l|grep %s1|head -1" % self.services["diskdevice"]
c = "fdisk -l"
res = ssh.execute(c)
# Disk /dev/sda doesn't contain a valid partition table
# Disk /dev/sda: 21.5 GB, 21474836480 bytes
# Res may return more than one lines
# Split res with space as delimiter to form new list (result)
result = []
for i in res:
for k in i.split():
result.append(k)
result = str(res)
self.assertEqual(
str(list_volume_response[0].size) in result,
True,
@ -193,7 +192,7 @@ class TestCreateVolume(cloudstackTestCase):
self.virtual_machine.detach_volume(self.apiClient, volume)
def tearDown(self):
#Clean up, terminate the created templates
#Clean up, terminate the created volumes
cleanup_resources(self.apiClient, self.cleanup)
return
@ -226,7 +225,7 @@ class TestVolumes(cloudstackTestCase):
cls.services["template"] = template.id
cls.services["diskofferingid"] = cls.disk_offering.id
# Create VMs, NAT Rules etc
# Create VMs, VMs etc
cls.account = Account.create(
cls.api_client,
cls.services["account"]
@ -313,7 +312,7 @@ class TestVolumes(cloudstackTestCase):
# A proper exception should be raised;
# downloading attach VM is not allowed
with self.assertRaises(Exception):
self.apiClient.deleteVolume(cmd)
self.apiClient.extractVolume(cmd)
def test_04_delete_attached_volume(self):
"""Delete a Volume attached to a VM

View File

@ -44,6 +44,36 @@ class Account:
cmd.id = self.account.id
apiclient.deleteAccount(cmd)
class User:
""" User Life Cycle """
def __init__(self, items):
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, services, account, domainid):
cmd = createUser.createUserCmd()
"""Creates an user"""
cmd.account = account
cmd.domainid = domainid
cmd.email = services["email"]
cmd.firstname = services["firstname"]
cmd.lastname = services["lastname"]
# Password Encoding
mdf = hashlib.md5()
mdf.update(services["password"])
cmd.password = mdf.hexdigest()
cmd.username = "-".join([services["username"], random_gen()])
user = apiclient.createUser(cmd)
return User(user.__dict__)
def delete(self, apiclient):
"""Delete an account"""
cmd = deleteUser.deleteUserCmd()
cmd.id = self.id
apiclient.deleteUser(cmd)
class VirtualMachine:
"""Manage virtual machine lifecycle"""
@ -145,6 +175,9 @@ class VirtualMachine:
def get_ssh_client(self, ipaddress=None, reconnect=False):
"""Get SSH object of VM"""
# If NAT Rules are not created while VM deployment in Advanced mode
# then, IP address must be passed
if ipaddress != None:
self.ssh_ip = ipaddress
if reconnect:
@ -251,10 +284,14 @@ class Snapshot:
self.__dict__.update(items)
@classmethod
def create(cls, apiclient, volume_id):
def create(cls, apiclient, volume_id, account=None, domainid=None):
"""Create Snapshot"""
cmd = createSnapshot.createSnapshotCmd()
cmd.volumeid = volume_id
if account:
cmd.account = account
if domainid:
cmd.domainid = domainid
return Snapshot(apiclient.createSnapshot(cmd).__dict__)
def delete(self, apiclient):
@ -603,8 +640,9 @@ class LoadBalancerRule:
def remove(self, apiclient, vms):
"""Remove virtual machines from load balancing rule"""
cmd = removeFromLoadBalancerRule.removeFromLoadBalancerRuleCmd()
cmd.virtualmachineids = [vm.id for vm in vms]
self.apiclient.removeFromLoadBalancerRule(cmd)
cmd.id = self.id
cmd.virtualmachineids = [str(vm.id) for vm in vms]
apiclient.removeFromLoadBalancerRule(cmd)
return

View File

@ -16,10 +16,6 @@ import logging
import string
import random
def enum(**enums):
"""Generates Enum"""
return type('Enum', (), enums)
def random_gen(size=6, chars=string.ascii_uppercase + string.digits):
"""Generate Random Strings of variable length"""
return ''.join(random.choice(chars) for x in range(size))
@ -85,19 +81,19 @@ def get_process_status(hostip, port, username, password, linklocalip, process):
username,
password
)
ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -p 3922 %s %s" \
ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -ostricthostkeychecking=no "
ssh_command = ssh_command + "-oUserKnownHostsFile=/dev/null -p 3922 %s %s" \
% (linklocalip, process)
# Double hop into router
timeout = 5
# Ensure the SSH login is successful
while True:
res = ssh.execute(ssh_command)[0]
if res != "Host key verification failed.":
res = ssh.execute(ssh_command)
if res[0] != "Host key verification failed.":
break
elif timeout == 0:
break
time.sleep(5)
timeout = timeout - 1
return res