adds hypervisor script to convert JSON routing polcies (ACL) config in

to flow rules and applies them on the bridge

add event subscriber in OvsTunnelManager, that listens to
replaceNetworkAcl events. On event sends the updated policy info to all
the hosts in the VPC
This commit is contained in:
Murali Reddy 2014-03-11 04:07:13 +05:30
parent 2c7786992f
commit 423a748807
8 changed files with 336 additions and 57 deletions

View File

@ -16,7 +16,6 @@
// under the License.
package com.cloud.hypervisor.xen.resource;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
@ -89,6 +88,7 @@ import org.apache.cloudstack.storage.to.TemplateObjectTO;
import org.apache.cloudstack.storage.to.VolumeObjectTO;
import com.cloud.agent.IAgentControl;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.AttachIsoCommand;
import com.cloud.agent.api.AttachVolumeAnswer;
@ -510,6 +510,8 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
return execute((OvsDeleteFlowCommand)cmd);
} else if (clazz == OvsVpcPhysicalTopologyConfigCommand.class) {
return execute((OvsVpcPhysicalTopologyConfigCommand) cmd);
} else if (clazz == OvsVpcRoutingPolicyConfigCommand.class) {
return execute((OvsVpcRoutingPolicyConfigCommand) cmd);
} else if (clazz == CleanupNetworkRulesCmd.class) {
return execute((CleanupNetworkRulesCmd)cmd);
} else if (clazz == NetworkRulesSystemVmCommand.class) {
@ -533,7 +535,7 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
} else if (clazz == PlugNicCommand.class) {
return execute((PlugNicCommand)cmd);
} else if (clazz == UnPlugNicCommand.class) {
return execute((UnPlugNicCommand)cmd);
return execute((UnPlugNicCommand) cmd);
} else if (cmd instanceof StorageSubSystemCommand) {
return storageHandler.handleStorageCommands((StorageSubSystemCommand) cmd);
} else if (clazz == CreateVMSnapshotCommand.class) {
@ -5283,8 +5285,24 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
Connection conn = getConnection();
try {
String result = callHostPlugin(conn, "ovstunnel", "configure_ovs_bridge_for_network_topology", "bridge",
cmd.getBridgeName(), "config", cmd.getVpcConfigInJson());
if (result.equalsIgnoreCase("SUCCESS")) {
return new Answer(cmd, true, result);
} else {
return new Answer(cmd, false, result);
}
} catch (Exception e) {
s_logger.warn("caught exception while updating host with latest routing polcies", e);
return new Answer(cmd, false, e.getMessage());
}
}
public Answer execute(OvsVpcRoutingPolicyConfigCommand cmd) {
Connection conn = getConnection();
try {
String result = callHostPlugin(conn, "ovstunnel", "configure_ovs_bridge_for_routing_policies", "bridge",
cmd.getBridgeName(), "host-id", ((Long)cmd.getHostId()).toString(), "config",
cmd.getjsonVpcConfig());
cmd.getVpcConfigInJson());
if (result.equalsIgnoreCase("SUCCESS")) {
return new Answer(cmd, true, result);
} else {

View File

@ -1,33 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.agent.api;
/**
* This command represents view of how a VPC is laid out (on which hosts, which VM is on which host etc)
* on the physical infrastructure.
*/
public class OvsVpcLogicalTopologyConfigCommand extends Command {
public OvsVpcLogicalTopologyConfigCommand() {
}
@Override
public boolean executeInSequence() {
return false;
}
}

View File

@ -20,8 +20,8 @@ import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
/**
* This command represents physical view of how a VPC is laid out on the physical infrastructure.
* - on which hypervisor hosts VPC spans (host is running in at least one VM from the VPC)
* This command represents physical view of how a VPC is laid out on the physical infrastructure. Contains information:
* - on which hypervisor hosts VPC spans (host running at least one VM from the VPC)
* - information of tiers, so we can figure how one VM can talk to a different VM in same tier or different tier
* - information on all the VM's in the VPC.
* - information of NIC's of each VM in the VPC
@ -102,7 +102,7 @@ public class OvsVpcPhysicalTopologyConfigCommand extends Command {
vpcConfig = new VpcConfig(vpc);
}
public String getjsonVpcConfig() {
public String getVpcConfigInJson() {
Gson gson = new GsonBuilder().create();
return gson.toJson(vpcConfig).toLowerCase();
}

View File

@ -0,0 +1,124 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.agent.api;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.util.UUID;
/**
* This command represents logical view of VM's connectivity in VPC.
*/
public class OvsVpcRoutingPolicyConfigCommand extends Command {
VpcConfig vpcConfig =null;
long hostId;
String bridgeName;
public static class AclItem {
int number;
String uuid;
String action;
String direction;
String sourcePortStart;
String sourcePortEnd;
String protocol;
String[] sourceCidrs;
public AclItem(int number, String uuid, String action, String direction, String sourcePortStart,
String sourcePortEnd, String protocol, String[] sourceCidrs) {
this.number = number;
this.uuid =uuid;
this.action = action;
this.direction = direction;
this.sourceCidrs = sourceCidrs;
this.sourcePortStart = sourcePortStart;
this.sourcePortEnd = sourcePortEnd;
this.protocol = protocol;
}
}
public static class Acl {
String id;
AclItem[] aclItems;
public Acl(String uuid, AclItem[] aclItems) {
this.id = uuid;
this.aclItems = aclItems;
}
}
public static class Tier {
String id;
String cidr;
String aclId;
public Tier(String uuid, String cidr, String aclId) {
this.id = uuid;
this.cidr = cidr;
this.aclId = aclId;
}
}
public class Vpc {
String cidr;
String id;
Acl[] acls;
Tier[] tiers;
public Vpc(String id, String cidr, Acl[] acls, Tier[] tiers) {
this.id = id;
this.cidr = cidr;
this.acls = acls;
this.tiers = tiers;
}
}
public static class VpcConfig {
Vpc vpc;
public VpcConfig(Vpc vpc) {
this.vpc = vpc;
}
}
public OvsVpcRoutingPolicyConfigCommand(String id, String cidr, Acl[] acls, Tier[] tiers) {
Vpc vpc = new Vpc(id, cidr, acls, tiers);
vpcConfig = new VpcConfig(vpc);
}
public String getVpcConfigInJson() {
Gson gson = new GsonBuilder().create();
return gson.toJson(vpcConfig).toLowerCase();
}
public void setHostId(long hostId) {
this.hostId = hostId;
}
public long getHostId() {
return hostId;
}
public String getBridgeName() {
return bridgeName;
}
public void setBridgeName(String bridgeName) {
this.bridgeName = bridgeName;
}
@Override
public boolean executeInSequence() {
return false;
}
}

View File

@ -16,8 +16,12 @@
// under the License.
package com.cloud.network.ovs;
import com.amazonaws.services.ec2.model.NetworkAcl;
import com.cloud.agent.api.*;
import com.cloud.network.dao.NetworkDao;
import com.cloud.network.vpc.VpcManager;
import com.cloud.network.dao.NetworkVO;
import com.cloud.network.vpc.*;
import com.cloud.network.vpc.dao.NetworkACLDao;
import com.cloud.vm.dao.VMInstanceDao;
import com.cloud.vm.Nic;
import com.cloud.vm.NicVO;
@ -33,22 +37,14 @@ import javax.inject.Inject;
import javax.naming.ConfigurationException;
import javax.persistence.EntityExistsException;
import org.apache.cloudstack.framework.messagebus.MessageBus;
import org.apache.cloudstack.framework.messagebus.MessageSubscriber;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Component;
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
import com.cloud.agent.AgentManager;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.Command;
import com.cloud.agent.api.OvsCreateTunnelAnswer;
import com.cloud.agent.api.OvsCreateTunnelCommand;
import com.cloud.agent.api.OvsDestroyBridgeCommand;
import com.cloud.agent.api.OvsDestroyTunnelCommand;
import com.cloud.agent.api.OvsFetchInterfaceAnswer;
import com.cloud.agent.api.OvsFetchInterfaceCommand;
import com.cloud.agent.api.OvsSetupBridgeCommand;
import com.cloud.agent.api.OvsVpcPhysicalTopologyConfigCommand;
import com.cloud.agent.manager.Commands;
import com.cloud.configuration.Config;
import com.cloud.exception.AgentUnavailableException;
@ -68,7 +64,6 @@ import com.cloud.network.ovs.dao.OvsTunnelNetworkDao;
import com.cloud.network.ovs.dao.OvsTunnelNetworkVO;
import com.cloud.network.ovs.dao.OvsTunnel;
import com.cloud.network.vpc.dao.VpcDao;
import com.cloud.network.vpc.VpcVO;
import com.cloud.utils.component.ManagerBase;
import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.db.DB;
@ -112,6 +107,12 @@ public class OvsTunnelManagerImpl extends ManagerBase implements OvsTunnelManage
protected VMInstanceDao _vmInstanceDao;
@Inject
NetworkDao _networkDao;
@Inject
MessageBus _messageBus;
@Inject
NetworkACLDao _networkACLDao;
@Inject
NetworkACLItemDao _networkACLItemDao;
@Override
public boolean configure(String name, Map<String, Object> params)
@ -119,6 +120,7 @@ public class OvsTunnelManagerImpl extends ManagerBase implements OvsTunnelManage
_executorPool = Executors.newScheduledThreadPool(10, new NamedThreadFactory("OVS"));
_cleanupExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("OVS-Cleanup"));
_messageBus.subscribe("Network_ACL_Replaced", new NetworkAclEventsSubscriber());
return true;
}
@ -725,4 +727,82 @@ public class OvsTunnelManagerImpl extends ManagerBase implements OvsTunnelManage
}
}
}
// Subscriber to ACL replace events. On acl replace event, if the vpc is enabled for distributed routing
// send the ACL update to all the hosts on which VPC spans
public class NetworkAclEventsSubscriber implements MessageSubscriber {
@Override
public void onPublishMessage(String senderAddress, String subject, Object args) {
NetworkVO network = (NetworkVO) args;
String bridgeName=generateBridgeNameForVpc(network.getVpcId());
if (network.getVpcId() != null & isVpcEnabledForDistributedRouter(network.getVpcId())) {
long vpcId = network.getVpcId();
OvsVpcRoutingPolicyConfigCommand cmd = prepareVpcRoutingPolicyUpdate(vpcId);
List<Long> vpcSpannedHostIds = _ovsNetworkToplogyGuru.getVpcSpannedHosts(vpcId);
for (Long id: vpcSpannedHostIds) {
if (!sendVpcRoutingPolicyChangeUpdate(cmd, id, bridgeName)) {
s_logger.debug("Failed to send VPC routing policy change update to host : " + id +
". Moving on with rest of the host updates.");
}
}
}
}
}
OvsVpcRoutingPolicyConfigCommand prepareVpcRoutingPolicyUpdate(long vpcId) {
VpcVO vpc = _vpcDao.findById(vpcId);
assert (vpc != null): "invalid vpc id";
List<OvsVpcRoutingPolicyConfigCommand.Acl> acls = new ArrayList<>();
List<OvsVpcRoutingPolicyConfigCommand.Tier> tiers = new ArrayList<>();
List<? extends Network> vpcNetworks = _vpcMgr.getVpcNetworks(vpcId);
for (Network network : vpcNetworks) {
Long networkAclId = network.getNetworkACLId();
NetworkACLVO networkAcl = _networkACLDao.findById(networkAclId);
List<OvsVpcRoutingPolicyConfigCommand.AclItem> aclItems = new ArrayList<>();
List<NetworkACLItemVO> aclItemVos = _networkACLItemDao.listByACL(networkAclId);
for (NetworkACLItemVO aclItem : aclItemVos) {
String[] sourceCidrs = aclItem.getSourceCidrList().toArray(new String[aclItem.getSourceCidrList().size()]);
aclItems.add(new OvsVpcRoutingPolicyConfigCommand.AclItem(
aclItem.getNumber(), aclItem.getUuid(), aclItem.getAction().name(),
aclItem.getTrafficType().name(),
aclItem.getSourcePortStart().toString(), aclItem.getSourcePortEnd().toString(),
aclItem.getProtocol(), sourceCidrs));
}
OvsVpcRoutingPolicyConfigCommand.Acl acl = new OvsVpcRoutingPolicyConfigCommand.Acl(networkAcl.getUuid(),
aclItems.toArray(new OvsVpcRoutingPolicyConfigCommand.AclItem[aclItems.size()]));
acls.add(acl);
OvsVpcRoutingPolicyConfigCommand.Tier tier = new OvsVpcRoutingPolicyConfigCommand.Tier(network.getUuid(),
network.getCidr(), networkAcl.getUuid());
tiers.add(tier);
}
OvsVpcRoutingPolicyConfigCommand cmd = new OvsVpcRoutingPolicyConfigCommand(vpc.getUuid(), vpc.getCidr(),
acls.toArray(new OvsVpcRoutingPolicyConfigCommand.Acl[acls.size()]),
tiers.toArray(new OvsVpcRoutingPolicyConfigCommand.Tier[tiers.size()]));
return cmd;
}
public boolean sendVpcRoutingPolicyChangeUpdate(OvsVpcRoutingPolicyConfigCommand updateCmd, long hostId, String bridgeName) {
try {
s_logger.debug("Sending VPC routing policy change update to the host " + hostId);
updateCmd.setHostId(hostId);
updateCmd.setBridgeName(bridgeName);
Answer ans = _agentMgr.send(hostId, updateCmd);
if (ans.getResult()) {
s_logger.debug("Successfully updated the host " + hostId + " with latest VPC routing policies." );
return true;
} else {
s_logger.debug("Failed to update the host " + hostId + " with latest routing policy." );
return false;
}
} catch (Exception e) {
s_logger.debug("Failed to updated the host " + hostId + " with latest routing policy." );
return false;
}
}
}

View File

@ -312,7 +312,7 @@ class jsonLoader(object):
return '{%s}' % str(', '.join('%s : %s' % (k, repr(v)) for (k, v)
in self.__dict__.iteritems()))
def configure_bridge_for_topology(bridge, this_host_id, json_config):
def configure_bridge_for_network_topology(bridge, this_host_id, json_config):
vpconfig = jsonLoader(json.loads(json_config)).vpc
if vpconfig is None:
@ -372,4 +372,76 @@ def configure_bridge_for_topology(bridge, this_host_id, json_config):
# set DST MAC = VM's MAC, SRC MAC=tier gateway MAC and send to egress table
add_ip_lookup_table_entry(bridge, ip, network.gatewaymac, mac_addr)
return "SUCCESS: successfully configured bridge as per the VPC toplogy"
return "SUCCESS: successfully configured bridge as per the VPC topology"
def get_acl(vpcconfig, required_acl_id):
acls = vpcconfig.acls
for acl in acls:
if acl.id == required_acl_id:
return acl
return None
def configure_ovs_bridge_for_routing_policies(bridge, json_config):
vpconfig = jsonLoader(json.loads(json_config)).vpc
if vpconfig is None:
logging.debug("WARNING:Can't find VPC info in json config file")
return "FAILURE:IMPROPER_JSON_CONFG_FILE"
# First flush current egress ACL's before re-applying the ACL's
del_flows(bridge, table=3)
egress_rules_added = False
ingress_rules_added = False
tiers = vpconfig.tiers
for tier in tiers:
tier_cidr = tier.cidr
acl = get_acl(vpconfig, tier.aclid)
acl_items = acl.aclitems
for acl_item in acl_items:
number = acl_item.number
action = acl_item.action
direction = acl_item.direction
source_port_start = acl_item.sourceportstart
source_port_end = acl_item.sourceportend
protocol = acl_item.protocol
source_cidrs = acl_item.sourcecidrs
acl_priority = 1000 + number
for source_cidr in source_cidrs:
if direction is "ingress":
ingress_rules_added = True
# add flow rule to do action (allow/deny) for flows where source IP of the packet is in
# source_cidr and destination ip is in tier_cidr
port = source_port_start
while (port < source_port_end):
if action is "deny":
add_flow(bridge, priority= acl_priority, table=5, nw_src=source_cidr, nw_dst=tier_cidr, tp_dst=port,
nw_proto=protocol, actions='drop')
if action is "allow":
add_flow(bridge, priority= acl_priority,table=5, nw_src=source_cidr, nw_dst=tier_cidr, tp_dst=port,
nw_proto=protocol, actions='resubmit(,1)')
port = port + 1
elif direction in "egress":
egress_rules_added = True
# add flow rule to do action (allow/deny) for flows where destination IP of the packet is in
# source_cidr and source ip is in tier_cidr
port = source_port_start
while (port < source_port_end):
if action is "deny":
add_flow(bridge, priority= acl_priority, table=5, nw_src=tier_cidr, nw_dst=source_cidr, tp_dst=port,
nw_proto=protocol, actions='drop')
if action is "allow":
add_flow(bridge, priority= acl_priority, table=5, nw_src=tier_cidr, nw_dst=source_cidr, tp_dst=port,
nw_proto=protocol, actions='resubmit(,1)')
port = port + 1
if egress_rules_added is False:
# add a default rule in egress table to forward packet to L3 lookup table
add_flow(bridge, priority=0, table=3, actions='resubmit(,4)')
if ingress_rules_added is False:
# add a default rule in egress table drop packets
add_flow(bridge, priority=0, table=5, actions='drop')

View File

@ -184,7 +184,7 @@ def setup_ovs_bridge_for_distributed_routing(session, args):
# add a default rule in L3 lookup table to forward packet to L2 lookup table
lib.add_flow(bridge, priority=0, table=4, actions='resubmit(,1)')
# add a default rule in egress table to forward packet to L3 lookup table
# add a default rule in ingress table to drop in bound packets
lib.add_flow(bridge, priority=0, table=5, actions='drop')
result = "SUCCESS: successfully setup bridge with flow rules"
@ -391,7 +391,14 @@ def configure_ovs_bridge_for_network_topology(session, args):
json_config = args.pop("config")
this_host_id = args.pop("host-id")
return lib.configure_bridge_for_topology(bridge, this_host_id, json_config)
return lib.configure_bridge_for_network_topology(bridge, this_host_id, json_config)
@echo
def configure_ovs_bridge_for_routing_policies(session, args):
bridge = args.pop("bridge")
json_config = args.pop("config")
return lib.configure_ovs_bridge_for_router_policies(bridge, json_config)
if __name__ == "__main__":
XenAPIPlugin.dispatch({"create_tunnel": create_tunnel,
@ -401,4 +408,5 @@ if __name__ == "__main__":
"is_xcp": is_xcp,
"getLabel": getLabel,
"setup_ovs_bridge_for_distributed_routing": setup_ovs_bridge_for_distributed_routing,
"configure_ovs_bridge_for_network_topology": configure_ovs_bridge_for_network_topology})
"configure_ovs_bridge_for_network_topology": configure_ovs_bridge_for_network_topology,
"configure_ovs_bridge_for_routing_policies": "configure_ovs_bridge_for_routing_policies"})

View File

@ -23,6 +23,8 @@ import javax.ejb.Local;
import javax.inject.Inject;
import org.apache.cloudstack.context.CallContext;
import org.apache.cloudstack.framework.messagebus.MessageBus;
import org.apache.cloudstack.framework.messagebus.PublishScope;
import org.apache.log4j.Logger;
import com.cloud.configuration.ConfigurationManager;
@ -82,6 +84,8 @@ public class NetworkACLManagerImpl extends ManagerBase implements NetworkACLMana
EntityManager _entityMgr;
@Inject
VpcService _vpcSvc;
@Inject
MessageBus _messageBus;
@Override
public NetworkACL createNetworkACL(String name, String description, long vpcId, Boolean forDisplay) {
@ -210,7 +214,13 @@ public class NetworkACLManagerImpl extends ManagerBase implements NetworkACLMana
if (_networkDao.update(network.getId(), network)) {
s_logger.debug("Updated network: " + network.getId() + " with Network ACL Id: " + acl.getId() + ", Applying ACL items");
//Apply ACL to network
return applyACLToNetwork(network.getId());
Boolean result = applyACLToNetwork(network.getId());
if (result) {
// public message on message bus, so that network elements implementing distributed routing capability
// can act on the event
_messageBus.publish(_name, "Network_ACL_Replaced", PublishScope.LOCAL, network);
}
return result;
}
return false;
}