mirror of https://github.com/apache/cloudstack.git
CLOUDSTACK-8634: Made changes to test_security_group.py test suite to support EIP
Signed-off-by: Pierre-Luc Dion <pdion891@apache.org>
This commit is contained in:
parent
fd17e47e15
commit
7f7026ace5
|
|
@ -27,7 +27,8 @@ from marvin.lib.base import (Account,
|
|||
VirtualMachine,
|
||||
SecurityGroup,
|
||||
Router,
|
||||
Host)
|
||||
Host,
|
||||
Network)
|
||||
from marvin.lib.common import (get_domain,
|
||||
get_zone,
|
||||
get_template,
|
||||
|
|
@ -528,6 +529,12 @@ class TestRevokeIngressRule(cloudstackTestCase):
|
|||
|
||||
return
|
||||
|
||||
def revokeSGRule(self, sgid):
|
||||
cmd = revokeSecurityGroupIngress.revokeSecurityGroupIngressCmd()
|
||||
cmd.id = sgid
|
||||
self.apiclient.revokeSecurityGroupIngress(cmd)
|
||||
return
|
||||
|
||||
@attr(tags=["sg", "eip", "advancedsg"])
|
||||
def test_01_revokeIngressRule(self):
|
||||
"""Test revoke ingress rule
|
||||
|
|
@ -605,11 +612,21 @@ class TestRevokeIngressRule(cloudstackTestCase):
|
|||
|
||||
self.debug("Revoking ingress rule for sec group ID: %s for ssh access"
|
||||
% security_group.id)
|
||||
# Revoke Security group to SSH to VM
|
||||
security_group.revoke(
|
||||
security_groups = SecurityGroup.list(
|
||||
self.apiclient,
|
||||
id=ssh_rule["ruleid"]
|
||||
account=self.account.name,
|
||||
domainid=self.account.domainid,
|
||||
listall=True
|
||||
)
|
||||
self.assertEqual(
|
||||
validateList(security_groups)[0],
|
||||
PASS,
|
||||
"Security groups list validation failed"
|
||||
)
|
||||
for sg in security_groups:
|
||||
if not sg.ingressrule:
|
||||
continue
|
||||
self.revokeSGRule(sg.ingressrule[0].ruleid)
|
||||
|
||||
# SSH Attempt to VM should fail
|
||||
with self.assertRaises(Exception):
|
||||
|
|
@ -617,7 +634,8 @@ class TestRevokeIngressRule(cloudstackTestCase):
|
|||
SshClient(self.virtual_machine.ssh_ip,
|
||||
self.virtual_machine.ssh_port,
|
||||
self.virtual_machine.username,
|
||||
self.virtual_machine.password
|
||||
self.virtual_machine.password,
|
||||
retries=5
|
||||
)
|
||||
return
|
||||
|
||||
|
|
@ -1490,32 +1508,45 @@ class TestIngressRule(cloudstackTestCase):
|
|||
security_group.id,
|
||||
self.account.name
|
||||
))
|
||||
result = security_group.revoke(
|
||||
#skip revoke and ping tests in case of EIP/ELB zone
|
||||
vm_res = VirtualMachine.list(
|
||||
self.apiclient,
|
||||
id=icmp_rule["ruleid"]
|
||||
id=self.virtual_machine.id
|
||||
)
|
||||
self.debug("Revoke ingress rule result: %s" % result)
|
||||
time.sleep(self.testdata["sleep"])
|
||||
# User should not be able to ping VM
|
||||
try:
|
||||
self.debug("Trying to ping VM %s" % self.virtual_machine.ssh_ip)
|
||||
if platform_type == 'windows':
|
||||
result = subprocess.call(
|
||||
['ping', '-n', '1', self.virtual_machine.ssh_ip])
|
||||
else:
|
||||
result = subprocess.call(
|
||||
['ping', '-c 1', self.virtual_machine.ssh_ip])
|
||||
|
||||
self.debug("Ping result: %s" % result)
|
||||
# if ping successful, then result should be 0
|
||||
self.assertNotEqual(
|
||||
result,
|
||||
0,
|
||||
"Check if ping is successful or not"
|
||||
self.assertEqual(validateList(vm_res)[0], PASS, "invalid vm response")
|
||||
vm_nw = Network.list(
|
||||
self.apiclient,
|
||||
id=vm_res[0].nic[0].networkid
|
||||
)
|
||||
self.assertEqual(validateList(vm_nw)[0], PASS, "invalid network response")
|
||||
vm_nw_off = vm_nw[0].networkofferingname
|
||||
if vm_nw_off != "DefaultSharedNetscalerEIPandELBNetworkOffering":
|
||||
result = security_group.revoke(
|
||||
self.apiclient,
|
||||
id=icmp_rule["ruleid"]
|
||||
)
|
||||
except Exception as e:
|
||||
self.fail("Ping failed for ingress rule ID: %s, %s"
|
||||
% (icmp_rule["ruleid"], e))
|
||||
self.debug("Revoke ingress rule result: %s" % result)
|
||||
time.sleep(self.testdata["sleep"])
|
||||
# User should not be able to ping VM
|
||||
try:
|
||||
self.debug("Trying to ping VM %s" % self.virtual_machine.ssh_ip)
|
||||
if platform_type == 'windows':
|
||||
result = subprocess.call(
|
||||
['ping', '-n', '1', self.virtual_machine.ssh_ip])
|
||||
else:
|
||||
result = subprocess.call(
|
||||
['ping', '-c 1', self.virtual_machine.ssh_ip])
|
||||
|
||||
self.debug("Ping result: %s" % result)
|
||||
# if ping successful, then result should be 0
|
||||
self.assertNotEqual(
|
||||
result,
|
||||
0,
|
||||
"Check if ping is successful or not"
|
||||
)
|
||||
except Exception as e:
|
||||
self.fail("Ping failed for ingress rule ID: %s, %s"
|
||||
% (icmp_rule["ruleid"], e))
|
||||
return
|
||||
|
||||
@attr(tags=["sg", "eip", "advancedsg"])
|
||||
|
|
@ -1754,6 +1785,12 @@ class TestIngressRuleSpecificIpSet(cloudstackTestCase):
|
|||
self.debug(response)
|
||||
return
|
||||
|
||||
def revokeSGRule(self, sgid):
|
||||
cmd = revokeSecurityGroupIngress.revokeSecurityGroupIngressCmd()
|
||||
cmd.id = sgid
|
||||
self.apiclient.revokeSecurityGroupIngress(cmd)
|
||||
return
|
||||
|
||||
@attr(tags=["sg", "eip", "advancedsg"])
|
||||
def test_ingress_rules_specific_IP_set(self):
|
||||
"""Test ingress rules for specific IP set
|
||||
|
|
@ -1803,18 +1840,20 @@ class TestIngressRuleSpecificIpSet(cloudstackTestCase):
|
|||
accountid=self.account.name,
|
||||
domainid=self.account.domainid,
|
||||
serviceofferingid=self.service_offering.id,
|
||||
securitygroupids=[defaultSecurityGroup.id]
|
||||
securitygroupids=[defaultSecurityGroup.id],
|
||||
mode=self.testdata['mode']
|
||||
)
|
||||
|
||||
self.cleanup.append(virtual_machine_1)
|
||||
virtual_machine_2 = VirtualMachine.create(
|
||||
self.apiclient,
|
||||
self.testdata["virtual_machine"],
|
||||
accountid=self.account.name,
|
||||
domainid=self.account.domainid,
|
||||
serviceofferingid=self.service_offering.id,
|
||||
securitygroupids=[defaultSecurityGroup.id]
|
||||
securitygroupids=[defaultSecurityGroup.id],
|
||||
mode=self.testdata['mode']
|
||||
)
|
||||
|
||||
self.cleanup.append(virtual_machine_2)
|
||||
try:
|
||||
SshClient(
|
||||
virtual_machine_1.ssh_ip,
|
||||
|
|
@ -1823,9 +1862,10 @@ class TestIngressRuleSpecificIpSet(cloudstackTestCase):
|
|||
virtual_machine_1.password
|
||||
)
|
||||
except Exception as e:
|
||||
self.revokeSGRule(ingress_rule.ingressrule[0].ruleid)
|
||||
self.fail("SSH Access failed for %s: %s" %
|
||||
(virtual_machine_1.ipaddress, e)
|
||||
)
|
||||
)
|
||||
|
||||
try:
|
||||
SshClient(
|
||||
|
|
@ -1835,42 +1875,66 @@ class TestIngressRuleSpecificIpSet(cloudstackTestCase):
|
|||
virtual_machine_2.password
|
||||
)
|
||||
except Exception as e:
|
||||
self.revokeSGRule(ingress_rule.ingressrule[0].ruleid)
|
||||
self.fail("SSH Access failed for %s: %s" %
|
||||
(virtual_machine_2.ipaddress, e)
|
||||
)
|
||||
)
|
||||
try:
|
||||
sshClient = SshClient(
|
||||
self.mgtSvrDetails["mgtSvrIp"],
|
||||
22,
|
||||
self.mgtSvrDetails["user"],
|
||||
self.mgtSvrDetails["passwd"]
|
||||
)
|
||||
except Exception as e:
|
||||
self.revokeSGRule(ingress_rule.ingressrule[0].ruleid)
|
||||
self.fail("SSH Access failed for %s: %s" %
|
||||
(self.mgtSvrDetails["mgtSvrIp"], e)
|
||||
)
|
||||
response = sshClient.execute("ssh %s@%s -v" %
|
||||
(virtual_machine_1.username,
|
||||
virtual_machine_1.ssh_ip))
|
||||
self.debug("Response is :%s" % response)
|
||||
|
||||
sshClient = SshClient(
|
||||
self.mgtSvrDetails["mgtSvrIp"],
|
||||
22,
|
||||
self.mgtSvrDetails["user"],
|
||||
self.mgtSvrDetails["passwd"]
|
||||
self.assertTrue("connection established" in str(response).lower(),
|
||||
"SSH to VM at %s failed from external machine ip %s other than test machine" %
|
||||
(virtual_machine_1.ssh_ip,
|
||||
self.mgtSvrDetails["mgtSvrIp"]))
|
||||
|
||||
response = sshClient.execute("ssh %s@%s -v" %
|
||||
(virtual_machine_2.username,
|
||||
virtual_machine_2.ssh_ip))
|
||||
self.debug("Response is :%s" % response)
|
||||
|
||||
self.assertTrue("connection established" in str(response).lower(),
|
||||
"SSH to VM at %s failed from external machine ip %s other than test machine" %
|
||||
(virtual_machine_2.ssh_ip,
|
||||
self.mgtSvrDetails["mgtSvrIp"]))
|
||||
virtual_machine_3 = VirtualMachine.create(
|
||||
self.apiclient,
|
||||
self.testdata["virtual_machine"],
|
||||
accountid=self.account.name,
|
||||
domainid=self.account.domainid,
|
||||
serviceofferingid=self.service_offering.id,
|
||||
securitygroupids=[defaultSecurityGroup.id],
|
||||
mode=self.testdata['mode']
|
||||
)
|
||||
|
||||
response = sshClient.execute("ssh %s@%s -v" %
|
||||
(virtual_machine_1.username,
|
||||
virtual_machine_1.ssh_ip))
|
||||
self.debug("Response is :%s" % response)
|
||||
|
||||
self.assertTrue("connection established" in str(response).lower(),
|
||||
"SSH to VM at %s failed from external machine ip %s other than test machine" %
|
||||
(virtual_machine_1.ssh_ip,
|
||||
self.mgtSvrDetails["mgtSvrIp"]))
|
||||
|
||||
response = sshClient.execute("ssh %s@%s -v" %
|
||||
(virtual_machine_2.username,
|
||||
virtual_machine_2.ssh_ip))
|
||||
self.debug("Response is :%s" % response)
|
||||
|
||||
self.assertTrue("connection established" in str(response).lower(),
|
||||
"SSH to VM at %s failed from external machine ip %s other than test machine" %
|
||||
(virtual_machine_2.ssh_ip,
|
||||
self.mgtSvrDetails["mgtSvrIp"]))
|
||||
|
||||
|
||||
cmd = revokeSecurityGroupIngress.revokeSecurityGroupIngressCmd()
|
||||
cmd.id = ingress_rule.ingressrule[0].ruleid
|
||||
self.apiclient.revokeSecurityGroupIngress(cmd)
|
||||
|
||||
self.cleanup.append(virtual_machine_3)
|
||||
security_groups = SecurityGroup.list(
|
||||
self.apiclient,
|
||||
account=self.account.name,
|
||||
domainid=self.account.domainid,
|
||||
listall=True
|
||||
)
|
||||
self.assertEqual(
|
||||
validateList(security_groups)[0],
|
||||
PASS,
|
||||
"Security groups list validation failed"
|
||||
)
|
||||
for sg in security_groups:
|
||||
if not sg.ingressrule:
|
||||
continue
|
||||
self.revokeSGRule(sg.ingressrule[0].ruleid)
|
||||
localMachineIpAddress = self.getLocalMachineIpAddress()
|
||||
cidr = localMachineIpAddress + "/32"
|
||||
|
||||
|
|
@ -1882,16 +1946,6 @@ class TestIngressRuleSpecificIpSet(cloudstackTestCase):
|
|||
cmd.endport = 22
|
||||
cmd.cidrlist = cidr
|
||||
ingress_rule = self.apiclient.authorizeSecurityGroupIngress(cmd)
|
||||
|
||||
virtual_machine_3 = VirtualMachine.create(
|
||||
self.apiclient,
|
||||
self.testdata["virtual_machine"],
|
||||
accountid=self.account.name,
|
||||
domainid=self.account.domainid,
|
||||
serviceofferingid=self.service_offering.id,
|
||||
securitygroupids=[defaultSecurityGroup.id]
|
||||
)
|
||||
|
||||
if self.testdata["configurableData"]["setHostConfigurationForIngressRule"]:
|
||||
self.setHostConfiguration()
|
||||
time.sleep(180)
|
||||
|
|
@ -1905,28 +1959,28 @@ class TestIngressRuleSpecificIpSet(cloudstackTestCase):
|
|||
virtual_machine_3.ssh_port,
|
||||
virtual_machine_3.username,
|
||||
virtual_machine_3.password
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
self.fail("SSH Access failed for %s: %s" %
|
||||
(virtual_machine_3.ssh_ip, e)
|
||||
)
|
||||
)
|
||||
|
||||
sshClient = SshClient(
|
||||
self.mgtSvrDetails["mgtSvrIp"],
|
||||
22,
|
||||
self.mgtSvrDetails["user"],
|
||||
self.mgtSvrDetails["passwd"]
|
||||
self.mgtSvrDetails["mgtSvrIp"],
|
||||
22,
|
||||
self.mgtSvrDetails["user"],
|
||||
self.mgtSvrDetails["passwd"]
|
||||
)
|
||||
|
||||
response = sshClient.execute("ssh %s@%s -v" %
|
||||
(virtual_machine_3.username,
|
||||
virtual_machine_3.ssh_ip))
|
||||
(virtual_machine_3.username,
|
||||
virtual_machine_3.ssh_ip))
|
||||
self.debug("Response is :%s" % response)
|
||||
|
||||
self.assertFalse("connection established" in str(response).lower(),
|
||||
"SSH to VM at %s succeeded from external machine ip %s other than test machine" %
|
||||
(virtual_machine_3.ssh_ip,
|
||||
self.mgtSvrDetails["mgtSvrIp"]))
|
||||
"SSH to VM at %s succeeded from external machine ip %s other than test machine" %
|
||||
(virtual_machine_3.ssh_ip,
|
||||
self.mgtSvrDetails["mgtSvrIp"]))
|
||||
return
|
||||
|
||||
@attr(tags=["sg", "eip", "advancedsg"])
|
||||
|
|
@ -1978,18 +2032,20 @@ class TestIngressRuleSpecificIpSet(cloudstackTestCase):
|
|||
accountid=self.account.name,
|
||||
domainid=self.account.domainid,
|
||||
serviceofferingid=self.service_offering.id,
|
||||
securitygroupids=[defaultSecurityGroup.id]
|
||||
securitygroupids=[defaultSecurityGroup.id],
|
||||
mode=self.testdata['mode']
|
||||
)
|
||||
|
||||
self.cleanup.append(virtual_machine_1)
|
||||
virtual_machine_2 = VirtualMachine.create(
|
||||
self.apiclient,
|
||||
self.testdata["virtual_machine"],
|
||||
accountid=self.account.name,
|
||||
domainid=self.account.domainid,
|
||||
serviceofferingid=self.service_offering.id,
|
||||
securitygroupids=[defaultSecurityGroup.id]
|
||||
securitygroupids=[defaultSecurityGroup.id],
|
||||
mode=self.testdata['mode']
|
||||
)
|
||||
|
||||
self.cleanup.append(virtual_machine_2)
|
||||
try:
|
||||
SshClient(
|
||||
virtual_machine_1.ssh_ip,
|
||||
|
|
@ -2066,9 +2122,10 @@ class TestIngressRuleSpecificIpSet(cloudstackTestCase):
|
|||
accountid=self.account.name,
|
||||
domainid=self.account.domainid,
|
||||
serviceofferingid=self.service_offering.id,
|
||||
securitygroupids=[security_group.id]
|
||||
securitygroupids=[security_group.id],
|
||||
mode=self.testdata['mode']
|
||||
)
|
||||
|
||||
self.cleanup.append(virtual_machine_3)
|
||||
if self.testdata["configurableData"]["setHostConfigurationForIngressRule"]:
|
||||
self.setHostConfiguration()
|
||||
time.sleep(180)
|
||||
|
|
@ -2087,7 +2144,21 @@ class TestIngressRuleSpecificIpSet(cloudstackTestCase):
|
|||
self.fail("SSH Access failed for %s: %s" %
|
||||
(virtual_machine_3.ssh_ip, e)
|
||||
)
|
||||
|
||||
security_groups = SecurityGroup.list(
|
||||
self.apiclient,
|
||||
account=self.account.name,
|
||||
domainid=self.account.domainid,
|
||||
listall=True
|
||||
)
|
||||
self.assertEqual(
|
||||
validateList(security_groups)[0],
|
||||
PASS,
|
||||
"Security groups list validation failed"
|
||||
)
|
||||
for sg in security_groups:
|
||||
if sg.id == security_group.id or not sg.ingressrule:
|
||||
continue
|
||||
self.revokeSGRule(sg.ingressrule[0].ruleid)
|
||||
sshClient = SshClient(
|
||||
self.mgtSvrDetails["mgtSvrIp"],
|
||||
22,
|
||||
|
|
|
|||
Loading…
Reference in New Issue