CLOUDSTACK-8299: Adding additional test case to test ingress rules with two security groups together

This closes #102

Signed-off-by: Rohit Yadav <rohit.yadav@shapeblue.com>
This commit is contained in:
Gaurav Aradhye 2015-03-09 14:21:44 +05:30 committed by Rohit Yadav
parent 8ed833a13b
commit 1fd401ff43
1 changed files with 178 additions and 1 deletions

View File

@ -1760,7 +1760,7 @@ class TestIngressRuleSpecificIpSet(cloudstackTestCase):
# 5. Revoke the ingress rule and add ingress rule for specific IP
set (including test machine)
# 6. Add new Vm to default sec group
# 7. Verify that SSH works to VM from tst machine
# 7. Verify that SSH works to VM from test machine
# 8. Verify that SSH does not work to VM from different machine which
is outside specified IP set
"""
@ -1920,3 +1920,180 @@ class TestIngressRuleSpecificIpSet(cloudstackTestCase):
(virtual_machine_3.ssh_ip,
self.mgtSvrDetails["mgtSvrIp"]))
return
@attr(tags=["sg", "eip", "advancedsg"])
def test_ingress_rules_specific_IP_set_non_def_sec_group(self):
"""Test ingress rules for specific IP set and non default security group
# Validate the following:
# 1. Create an account and add ingress rule
(CIDR 0.0.0.0/0) in default security group
# 2. Deploy 2 VMs in the default sec group
# 3. Check if SSH works for the VMs from test machine, should work
# 4. Check if SSH works for the VM from different machine (
for instance, management server), should work
# 5. Add new security group to the account and add ingress rule for
specific IP set (including test machine)
# 6. Add new Vm to new sec group
# 7. Verify that SSH works to VM from test machine
# 8. Verify that SSH does not work to VM from different machine which
is outside specified IP set
"""
# Default Security group should not have any ingress rule
security_groups = SecurityGroup.list(
self.apiclient,
account=self.account.name,
domainid=self.account.domainid,
listall=True
)
self.assertEqual(
validateList(security_groups)[0],
PASS,
"Security groups list validation failed"
)
defaultSecurityGroup = security_groups[0]
# Authorize Security group to SSH to VM
cmd = authorizeSecurityGroupIngress.authorizeSecurityGroupIngressCmd()
cmd.securitygroupid = defaultSecurityGroup.id
cmd.protocol = 'TCP'
cmd.startport = 22
cmd.endport = 22
cmd.cidrlist = '0.0.0.0/0'
self.apiclient.authorizeSecurityGroupIngress(cmd)
virtual_machine_1 = VirtualMachine.create(
self.apiclient,
self.testdata["virtual_machine"],
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id,
securitygroupids=[defaultSecurityGroup.id]
)
virtual_machine_2 = VirtualMachine.create(
self.apiclient,
self.testdata["virtual_machine"],
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id,
securitygroupids=[defaultSecurityGroup.id]
)
try:
SshClient(
virtual_machine_1.ssh_ip,
virtual_machine_1.ssh_port,
virtual_machine_1.username,
virtual_machine_1.password
)
except Exception as e:
self.fail("SSH Access failed for %s: %s" %
(self.virtual_machine.ipaddress, e)
)
try:
SshClient(
virtual_machine_2.ssh_ip,
virtual_machine_2.ssh_port,
virtual_machine_2.username,
virtual_machine_2.password
)
except Exception as e:
self.fail("SSH Access failed for %s: %s" %
(self.virtual_machine.ipaddress, e)
)
sshClient = SshClient(
self.mgtSvrDetails["mgtSvrIp"],
22,
self.mgtSvrDetails["user"],
self.mgtSvrDetails["passwd"]
)
response = sshClient.execute("ssh %s@%s -v" %
(virtual_machine_1.username,
virtual_machine_1.ssh_ip))
self.debug("Response is :%s" % response)
self.assertTrue("connection established" in str(response).lower(),
"SSH to VM at %s failed from external machine ip %s other than test machine" %
(virtual_machine_1.ssh_ip,
self.mgtSvrDetails["mgtSvrIp"]))
response = sshClient.execute("ssh %s@%s -v" %
(virtual_machine_2.username,
virtual_machine_2.ssh_ip))
self.debug("Response is :%s" % response)
self.assertTrue("connection established" in str(response).lower(),
"SSH to VM at %s failed from external machine ip %s other than test machine" %
(virtual_machine_2.ssh_ip,
self.mgtSvrDetails["mgtSvrIp"]))
localMachineIpAddress = self.getLocalMachineIpAddress()
cidr = localMachineIpAddress + "/32"
security_group = SecurityGroup.create(
self.apiclient,
self.testdata["security_group"],
account=self.account.name,
domainid=self.account.domainid
)
# Authorize Security group to SSH to VM
cmd = authorizeSecurityGroupIngress.authorizeSecurityGroupIngressCmd()
cmd.securitygroupid = security_group.id
cmd.protocol = 'TCP'
cmd.startport = 22
cmd.endport = 22
cmd.cidrlist = cidr
self.apiclient.authorizeSecurityGroupIngress(cmd)
virtual_machine_3 = VirtualMachine.create(
self.apiclient,
self.testdata["virtual_machine"],
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id,
securitygroupids=[security_group.id]
)
if self.testdata["configurableData"]["setHostConfigurationForIngressRule"]:
self.setHostConfiguration()
time.sleep(180)
virtual_machine_3.stop(self.apiclient)
virtual_machine_3.start(self.apiclient)
try:
sshClient = SshClient(
virtual_machine_3.ssh_ip,
virtual_machine_3.ssh_port,
virtual_machine_3.username,
virtual_machine_3.password
)
except Exception as e:
self.fail("SSH Access failed for %s: %s" %
(virtual_machine_3.ssh_ip, e)
)
sshClient = SshClient(
self.mgtSvrDetails["mgtSvrIp"],
22,
self.mgtSvrDetails["user"],
self.mgtSvrDetails["passwd"]
)
response = sshClient.execute("ssh %s@%s -v" %
(virtual_machine_3.username,
virtual_machine_3.ssh_ip))
self.debug("Response is :%s" % response)
self.assertFalse("connection established" in str(response).lower(),
"SSH to VM at %s succeeded from external machine ip %s other than test machine" %
(virtual_machine_3.ssh_ip,
self.mgtSvrDetails["mgtSvrIp"]))
return