diff --git a/test/integration/component/test_blocker_bugs.py b/test/integration/component/test_blocker_bugs.py index 7ff6315954c..0e2b2c0db11 100644 --- a/test/integration/component/test_blocker_bugs.py +++ b/test/integration/component/test_blocker_bugs.py @@ -81,7 +81,8 @@ class Services: "isextractable": True, "passwordenabled": True, }, - "static_nat": { + "firewall_rule": { + "cidrlist" : "0.0.0.0/0", "startport": 22, "endport": 22, "protocol": "TCP" @@ -320,111 +321,117 @@ class TestNATRules(cloudstackTestCase): ) self.debug("Enabled static NAT for public IP ID: %s" % public_ip.id) - #Create Static NAT rule + + #Create Static NAT rule, in fact it's firewall rule nat_rule = StaticNATRule.create( self.apiclient, - self.services["static_nat"], + self.services["firewall_rule"], public_ip.id ) self.debug("Created Static NAT rule for public IP ID: %s" % public_ip.id) - list_rules_repsonse = StaticNATRule.list( - self.apiclient, - id=nat_rule.id - ) + self.debug("Checking IP address") + ip_response = PublicIPAddress.list( + self.apiclient, + id = public_ip.id + ) self.assertEqual( - isinstance(list_rules_repsonse, list), + isinstance(ip_response, list), True, - "Check list response returns a valid list" + "Check ip response returns a valid list" ) self.assertNotEqual( - len(list_rules_repsonse), + len(ip_response), 0, - "Check IP Forwarding Rule is created" + "Check static NAT Rule is created" ) - self.assertEqual( - list_rules_repsonse[0].id, - nat_rule.id, - "Check Correct IP forwarding Rule is returned" - ) - # Verify the entries made in firewall_rules tables - self.debug( - "select id from user_ip_address where uuid = '%s';" \ - % public_ip.id - ) - qresultset = self.dbclient.execute( - "select id from user_ip_address where uuid = '%s';" \ - % public_ip.id + self.assertTrue( + ip_response[0].isstaticnat, + "IP is not static nat enabled" ) self.assertEqual( - isinstance(qresultset, list), + ip_response[0].virtualmachineid, + self.virtual_machine.id, + "IP is not binding with the VM" + ) + + self.debug("Checking Firewall rule") + firewall_response = FireWallRule.list( + self.apiclient, + ipaddressid = public_ip.id, + listall = True + ) + self.assertEqual( + isinstance(firewall_response, list), True, - "Check database query returns a valid data" + "Check firewall response returns a valid list" ) - self.assertNotEqual( - len(qresultset), + len(firewall_response), 0, - "Check DB Query result set" + "Check firewall rule is created" ) - qresult = qresultset[0] - public_ip_id = qresult[0] - # Verify the entries made in firewall_rules tables - self.debug( - "select id, state from firewall_rules where ip_address_id = '%s';" \ - % public_ip_id - ) - qresultset = self.dbclient.execute( - "select id, state from firewall_rules where ip_address_id = '%s';" \ - % public_ip_id + self.assertEqual( + firewall_response[0].state, + "Active", + "Firewall rule is not active" ) self.assertEqual( - isinstance(qresultset, list), - True, - "Check database query returns a valid data for firewall rules" + firewall_response[0].ipaddressid, + public_ip.id, + "Firewall rule is not static nat related" + ) + self.assertEqual( + firewall_response[0].startport, + str(self.services["firewall_rule"]["startport"]), + "Firewall rule is not with specific port" ) - self.assertNotEqual( - len(qresultset), - 0, - "Check DB Query result set" - ) - - for qresult in qresultset: - self.assertEqual( - qresult[1], - 'Active', - "Check state of the static NAT rule in database" - ) - + self.debug("Removed the firewall rule") nat_rule.delete(self.apiclient) - list_rules_repsonse = StaticNATRule.list( - self.apiclient, - id=nat_rule.id - ) - + self.debug("Checking IP address, it should still existed") + ip_response = PublicIPAddress.list( + self.apiclient, + id = public_ip.id + ) self.assertEqual( - list_rules_repsonse, - None, - "Check Port Forwarding Rule is deleted" + isinstance(ip_response, list), + True, + "Check ip response returns a valid list" + ) + self.assertNotEqual( + len(ip_response), + 0, + "Check static NAT Rule is created" ) - - # Verify the entries made in firewall_rules tables - self.debug( - "select id, state from firewall_rules where ip_address_id = '%s';" \ - % public_ip.id - ) - qresultset = self.dbclient.execute( - "select id, state from firewall_rules where ip_address_id = '%s';" \ - % public_ip.id + self.assertTrue( + ip_response[0].isstaticnat, + "IP is not static nat enabled" + ) + self.assertEqual( + ip_response[0].virtualmachineid, + self.virtual_machine.id, + "IP is not binding with the VM" ) + self.debug("Checking Firewall rule, it should be removed") + firewall_response = FireWallRule.list( + self.apiclient, + ipaddressid = public_ip.id, + listall = True + ) self.assertEqual( - len(qresultset), - 0, - "Check DB Query result set" - ) + isinstance(firewall_response, list), + True, + "Check firewall response returns a valid list" + ) + if len(firewall_response) != 0 : + self.assertEqual( + firewall_response[0].state, + "Deleting", + "Firewall rule should be deleted or in deleting state" + ) return