From cbc9bce6ae764747a09a1dd8c28991d834195589 Mon Sep 17 00:00:00 2001 From: Murali Reddy Date: Thu, 15 Nov 2012 15:39:29 +0530 Subject: [PATCH] -adds the plugin that implements EventBus with AMQP broker as MOM provider -modify the components.xml to load the RabbitMQ plugin as EventBus --- client/tomcatconf/components.xml.in | 8 ++ framework/events/pom.xml | 14 +++- .../framework/events/EventCategory.java | 25 +++++- plugins/message-brokers/rabbitmq/pom.xml | 46 +++++++++++ .../mom/rabbitmq/RabbitMQEventBus.java | 76 +++++++++++++++++++ plugins/pom.xml | 1 + server/pom.xml | 5 ++ .../com/cloud/event/ActionEventCallback.java | 31 ++++---- .../src/com/cloud/event/AlertGenerator.java | 13 +++- .../com/cloud/event/UsageEventGenerator.java | 21 +++-- .../com/cloud/network/dao/NetworkDaoImpl.java | 47 ++++-------- .../com/cloud/vpc/dao/MockNetworkDaoImpl.java | 17 +++-- 12 files changed, 238 insertions(+), 66 deletions(-) create mode 100644 plugins/message-brokers/rabbitmq/pom.xml create mode 100644 plugins/message-brokers/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java diff --git a/client/tomcatconf/components.xml.in b/client/tomcatconf/components.xml.in index 5957b61c0fb..10bcd1ecd72 100755 --- a/client/tomcatconf/components.xml.in +++ b/client/tomcatconf/components.xml.in @@ -172,6 +172,14 @@ under the License. + + + localhost + 55672 + guest + guest + + diff --git a/framework/events/pom.xml b/framework/events/pom.xml index c536c172b45..8da4e4caa46 100644 --- a/framework/events/pom.xml +++ b/framework/events/pom.xml @@ -24,7 +24,19 @@ org.apache.cloudstack cloudstack - 4.0.0-SNAPSHOT + 4.1.0-SNAPSHOT ../../pom.xml + + + org.apache.cloudstack + cloud-utils + ${project.version} + + + + install + src + test + diff --git a/framework/events/src/org/apache/cloudstack/framework/events/EventCategory.java b/framework/events/src/org/apache/cloudstack/framework/events/EventCategory.java index f9d5e678aaf..11be29b9f5a 100644 --- a/framework/events/src/org/apache/cloudstack/framework/events/EventCategory.java +++ b/framework/events/src/org/apache/cloudstack/framework/events/EventCategory.java @@ -19,8 +19,27 @@ package org.apache.cloudstack.framework.events; +import java.util.ArrayList; +import java.util.List; + public class EventCategory { - public static final String ACTION_EVENT = "Action Event"; - public static final String USAGE_EVENT = "Usage Event"; - public static final String ALERT_EVENT = "Alert Event"; + private static List eventCategories = new ArrayList(); + private String eventCategoryName; + + public EventCategory(String categoryName) { + this.eventCategoryName = categoryName; + } + + public String getName() { + return eventCategoryName; + } + + public static List listAllEventCategory() { + return eventCategories; + } + + public static final EventCategory ACTION_EVENT = new EventCategory("Action Events"); + public static final EventCategory ALERT_EVENT = new EventCategory("Alert Event"); + public static final EventCategory USAGE_EVENT = new EventCategory("Usage Event"); + } diff --git a/plugins/message-brokers/rabbitmq/pom.xml b/plugins/message-brokers/rabbitmq/pom.xml new file mode 100644 index 00000000000..0956ee8ce9b --- /dev/null +++ b/plugins/message-brokers/rabbitmq/pom.xml @@ -0,0 +1,46 @@ + + + 4.0.0 + cloud-mom-rabbitmq + Apache CloudStack RabbitMQ MOM + + org.apache.cloudstack + cloudstack + 4.1.0-SNAPSHOT + ../../pom.xml + + + + com.rabbitmq + amqp-client + 2.8.7 + + + org.apache.cloudstack + cloud-framework-events + ${project.version} + + + + install + src + + diff --git a/plugins/message-brokers/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java b/plugins/message-brokers/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java new file mode 100644 index 00000000000..b9607e559f9 --- /dev/null +++ b/plugins/message-brokers/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java @@ -0,0 +1,76 @@ +package org.apache.cloudstack.mom.rabbitmq; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import org.apache.cloudstack.framework.events.EventBus; +import org.apache.cloudstack.framework.events.EventCategory; +import org.apache.cloudstack.framework.events.EventSubscriber; +import org.apache.log4j.Logger; + +import javax.naming.ConfigurationException; +import java.util.Map; + +public class RabbitMQEventBus implements EventBus { + + + public static final Logger s_logger = Logger.getLogger(RabbitMQEventBus.class); + + @Override + public boolean publish(String category, String type, Map description) { + return false; + } + + @Override + public boolean subscribe(String category, String type, EventSubscriber subscriber) { + return false; + } + + @Override + public boolean configure(String name, Map params) throws ConfigurationException { + try { + String rabbitMqHost = (String) params.get("server"); + Integer port = (Integer) params.get("port"); + String username = (String) params.get("username"); + String password = (String) params.get("password"); + + // obtain a connection to RabbitMQ server + ConnectionFactory factory = new ConnectionFactory(); + factory.setUsername(username); + factory.setPassword(password); + factory.setVirtualHost("/"); + factory.setHost(rabbitMqHost); + factory.setPort(port); + Connection connection = factory.newConnection(); + Channel channel = connection.createChannel(); + + // create the exchange for each event category + for (EventCategory category : EventCategory.listAllEventCategory()) { + try { + channel.exchangeDeclare(category.getName(), "topic", true); + } catch (java.io.IOException exception) { + s_logger.debug("Failed to create exchange on RabbitMQ server for the event category " + category.getName()); + } + } + + } catch (Exception e) { + return false; + } + return true; + } + + @Override + public String getName() { + return null; + } + + @Override + public boolean start() { + return false; + } + + @Override + public boolean stop() { + return false; + } +} \ No newline at end of file diff --git a/plugins/pom.xml b/plugins/pom.xml index 2009302423e..220b065d9a7 100644 --- a/plugins/pom.xml +++ b/plugins/pom.xml @@ -38,6 +38,7 @@ hypervisors/ovm hypervisors/xen hypervisors/kvm + message-brokers/rabbitmq network-elements/elastic-loadbalancer network-elements/ovs network-elements/nicira-nvp diff --git a/server/pom.xml b/server/pom.xml index 06cfd7c45f3..18b6bc2573a 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -75,6 +75,11 @@ tests test + + org.apache.cloudstack + cloud-framework-events + ${project.version} + diff --git a/server/src/com/cloud/event/ActionEventCallback.java b/server/src/com/cloud/event/ActionEventCallback.java index 8ec53a4ff93..95fd2fdc935 100644 --- a/server/src/com/cloud/event/ActionEventCallback.java +++ b/server/src/com/cloud/event/ActionEventCallback.java @@ -16,16 +16,21 @@ // under the License. package com.cloud.event; -import java.lang.reflect.AnnotatedElement; -import java.lang.reflect.Method; - +import com.cloud.user.UserContext; +import com.cloud.utils.component.Adapters; +import com.cloud.utils.component.AnnotationInterceptor; +import com.cloud.utils.component.ComponentLocator; import net.sf.cglib.proxy.Callback; import net.sf.cglib.proxy.MethodInterceptor; import net.sf.cglib.proxy.MethodProxy; - -import com.cloud.user.UserContext; -import com.cloud.utils.component.AnnotationInterceptor; import org.apache.cloudstack.framework.events.EventBus; +import org.apache.cloudstack.framework.events.EventCategory; + +import java.lang.reflect.AnnotatedElement; +import java.lang.reflect.Method; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; public class ActionEventCallback implements MethodInterceptor, AnnotationInterceptor { @@ -81,7 +86,7 @@ public class ActionEventCallback implements MethodInterceptor, AnnotationInterce eventDescription += ". "+ctx.getEventDetails(); } EventUtils.saveStartedActionEvent(userId, accountId, actionEvent.eventType(), eventDescription, startEventId); - publishOnEventBus(userId, accountId, actionEvent.eventType(), "Started", eventDescription); + publishOnEventBus(userId, accountId, actionEvent.eventType(), Event.State.Started, eventDescription); } } return event; @@ -103,11 +108,11 @@ public class ActionEventCallback implements MethodInterceptor, AnnotationInterce if(actionEvent.create()){ //This start event has to be used for subsequent events of this action startEventId = EventUtils.saveCreatedActionEvent(userId, accountId, EventVO.LEVEL_INFO, actionEvent.eventType(), "Successfully created entity for "+eventDescription); - publishOnEventBus(userId, accountId, actionEvent.eventType(), "Successfully created entity for "+eventDescription); + publishOnEventBus(userId, accountId, actionEvent.eventType(), Event.State.Created, "Successfully created entity for " + eventDescription); ctx.setStartEventId(startEventId); } else { EventUtils.saveActionEvent(userId, accountId, EventVO.LEVEL_INFO, actionEvent.eventType(), "Successfully completed "+eventDescription, startEventId); - publishOnEventBus(userId, accountId, actionEvent.eventType(), "Successfully completed "+eventDescription, startEventId); + publishOnEventBus(userId, accountId, actionEvent.eventType(), Event.State.Completed, "Successfully completed " + eventDescription + startEventId); } } } @@ -126,10 +131,10 @@ public class ActionEventCallback implements MethodInterceptor, AnnotationInterce eventDescription += ". "+ctx.getEventDetails(); } if(actionEvent.create()){ - long eventId = EventUtils.saveCreatedActionEvent(userId, accountId, EventVO.LEVEL_ERROR, actionEvent.eventType(), "Error while creating entity for "+eventDescription); + long eventId = EventUtils.saveCreatedActionEvent(userId, accountId, EventVO.LEVEL_ERROR, actionEvent.eventType(), "Error while creating entity for " + eventDescription); ctx.setStartEventId(eventId); } else { - EventUtils.saveActionEvent(userId, accountId, EventVO.LEVEL_ERROR, actionEvent.eventType(), "Error while "+eventDescription, startEventId); + EventUtils.saveActionEvent(userId, accountId, EventVO.LEVEL_ERROR, actionEvent.eventType(), "Error while " + eventDescription, startEventId); } } } @@ -139,12 +144,12 @@ public class ActionEventCallback implements MethodInterceptor, AnnotationInterce return this; } - void publishOnEventBus(long userId, long accountId, String type, String state, String description) { + void publishOnEventBus(long userId, long accountId, String type, Event.State state, String description) { if (getEventBus() != null) { Map eventDescription = new HashMap(); eventDescription.put("user", String.valueOf(userId)); eventDescription.put("account", String.valueOf(accountId)); - eventDescription.put("state", state); + eventDescription.put("state", state.toString()); eventDescription.put("description", description); _eventBus.publish(EventCategory.ACTION_EVENT, type, eventDescription); } diff --git a/server/src/com/cloud/event/AlertGenerator.java b/server/src/com/cloud/event/AlertGenerator.java index 08cf081806b..1d3eeea7633 100644 --- a/server/src/com/cloud/event/AlertGenerator.java +++ b/server/src/com/cloud/event/AlertGenerator.java @@ -1,6 +1,13 @@ package com.cloud.event; -import import org.apache.cloudstack.framework.events.EventBus; +import com.cloud.utils.component.Adapters; +import com.cloud.utils.component.ComponentLocator; +import org.apache.cloudstack.framework.events.EventBus; +import org.apache.cloudstack.framework.events.EventCategory; + +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; public class AlertGenerator { @@ -14,8 +21,8 @@ public class AlertGenerator { if (getEventBus() != null) { Map eventDescription = new HashMap(); eventDescription.put("alertType", alertType); - eventDescription.put("dataCenterId", dataCenterId); - eventDescription.put("podId", podId); + eventDescription.put("dataCenterId", Long.toString(dataCenterId)); + eventDescription.put("podId", Long.toString(podId)); eventDescription.put("subject", subject); eventDescription.put("body", body); _eventBus.publish(EventCategory.ALERT_EVENT, alertType, eventDescription); diff --git a/server/src/com/cloud/event/UsageEventGenerator.java b/server/src/com/cloud/event/UsageEventGenerator.java index 40bfbbe3f82..1aab9b24602 100644 --- a/server/src/com/cloud/event/UsageEventGenerator.java +++ b/server/src/com/cloud/event/UsageEventGenerator.java @@ -1,6 +1,13 @@ package com.cloud.event; -import import org.apache.cloudstack.framework.events.EventBus; +import com.cloud.utils.component.Adapters; +import com.cloud.utils.component.ComponentLocator; +import org.apache.cloudstack.framework.events.EventBus; +import org.apache.cloudstack.framework.events.EventCategory; + +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; public class UsageEventGenerator { @@ -29,21 +36,21 @@ public class UsageEventGenerator { public static void publishUsageEvent(String usageType, long accountId,long zoneId, long vmId, long securityGroupId) { EventUtils.saveUsageEvent(usageType, accountId, zoneId, vmId, securityGroupId); - publishOnEventBus((usageType, accountId, zoneId, vmId, null, null); + publishOnEventBus(usageType, accountId, zoneId, vmId, null, null); } - void publishOnEventBus(String usageType, Long accountId, Long zoneId, Long resourceId, String resourceName, String resourceType) { + private static void publishOnEventBus(String usageType, Long accountId, Long zoneId, Long resourceId, String resourceName, String resourceType) { if (getEventBus() != null) { Map eventDescription = new HashMap(); eventDescription.put("usage type", usageType); if (accountId != null) { - eventDescription.put("accountId", usageType) + eventDescription.put("accountId", usageType); } if (zoneId != null) { - eventDescription.put("zoneId", String.valueOf(zoneId)) + eventDescription.put("zoneId", String.valueOf(zoneId)); } if (resourceId != null) { - eventDescription.put("resourceId", String.valueOf(resourceId)) + eventDescription.put("resourceId", String.valueOf(resourceId)); } eventDescription.put("resourceName", resourceName); eventDescription.put("resourceType", resourceType); @@ -51,7 +58,7 @@ public class UsageEventGenerator { } } - private EventBus getEventBus() { + private static EventBus getEventBus() { //TODO: check if there is way of getting single adapter if (_eventBus == null) { if (!_eventBusLoaded) { diff --git a/server/src/com/cloud/network/dao/NetworkDaoImpl.java b/server/src/com/cloud/network/dao/NetworkDaoImpl.java index b3d3f71151a..4f316816470 100644 --- a/server/src/com/cloud/network/dao/NetworkDaoImpl.java +++ b/server/src/com/cloud/network/dao/NetworkDaoImpl.java @@ -16,25 +16,9 @@ // under the License. package com.cloud.network.dao; -import java.util.List; -import java.util.Map; -import java.util.Random; - -import javax.ejb.Local; -import javax.persistence.TableGenerator; - import com.cloud.acl.ControlledEntity.ACLType; -import com.cloud.network.Network; -import com.cloud.network.Network.Event; -import com.cloud.network.Network.GuestType; -import com.cloud.network.Network.Provider; -import com.cloud.network.Network.Service; -import com.cloud.network.Network.State; -import com.cloud.network.NetworkAccountDaoImpl; -import com.cloud.network.NetworkAccountVO; -import com.cloud.network.NetworkDomainVO; -import com.cloud.network.NetworkServiceMapVO; -import com.cloud.network.NetworkVO; +import com.cloud.network.*; +import com.cloud.network.Network.*; import com.cloud.network.Networks.BroadcastDomainType; import com.cloud.network.Networks.Mode; import com.cloud.network.Networks.TrafficType; @@ -44,19 +28,18 @@ import com.cloud.offerings.dao.NetworkOfferingDaoImpl; import com.cloud.server.ResourceTag.TaggedResourceType; import com.cloud.tags.dao.ResourceTagsDaoImpl; import com.cloud.utils.component.ComponentLocator; -import com.cloud.utils.db.DB; -import com.cloud.utils.db.GenericDaoBase; -import com.cloud.utils.db.GenericSearchBuilder; -import com.cloud.utils.db.JoinBuilder; +import com.cloud.utils.db.*; import com.cloud.utils.db.JoinBuilder.JoinType; -import com.cloud.utils.db.SearchBuilder; -import com.cloud.utils.db.SearchCriteria; import com.cloud.utils.db.SearchCriteria.Func; import com.cloud.utils.db.SearchCriteria.Op; -import com.cloud.utils.db.SequenceFetcher; -import com.cloud.utils.db.Transaction; import com.cloud.utils.net.NetUtils; +import javax.ejb.Local; +import javax.persistence.TableGenerator; +import java.util.List; +import java.util.Map; +import java.util.Random; + @Local(value = NetworkDao.class) @DB(txn = false) public class NetworkDaoImpl extends GenericDaoBase implements NetworkDao { @@ -558,13 +541,13 @@ public class NetworkDaoImpl extends GenericDaoBase implements N public boolean updateState(State currentState, Event event, State nextState, Network vo, Object data) { // TODO: ensure this update is correct Transaction txn = Transaction.currentTxn(); - txn.start(); + txn.start(); - NetworkVO networkVo = (NetworkVO) vo; - networkVo.setState(nextState); - super.update(networkVo.getId(), networkVo); + NetworkVO networkVo = (NetworkVO) vo; + networkVo.setState(nextState); + super.update(networkVo.getId(), networkVo); - txn.commit(); - return true; + txn.commit(); + return true; } } diff --git a/server/test/com/cloud/vpc/dao/MockNetworkDaoImpl.java b/server/test/com/cloud/vpc/dao/MockNetworkDaoImpl.java index 2a675b3a217..2a2c28e07bd 100644 --- a/server/test/com/cloud/vpc/dao/MockNetworkDaoImpl.java +++ b/server/test/com/cloud/vpc/dao/MockNetworkDaoImpl.java @@ -16,12 +16,7 @@ // under the License. package com.cloud.vpc.dao; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import javax.ejb.Local; - +import com.cloud.network.Network; import com.cloud.network.Network.GuestType; import com.cloud.network.NetworkAccountVO; import com.cloud.network.NetworkVO; @@ -31,6 +26,11 @@ import com.cloud.utils.db.DB; import com.cloud.utils.db.GenericDaoBase; import com.cloud.utils.db.SearchBuilder; +import javax.ejb.Local; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + @Local(value = NetworkDao.class) @DB(txn = false) public class MockNetworkDaoImpl extends GenericDaoBase implements NetworkDao{ @@ -341,5 +341,8 @@ public class MockNetworkDaoImpl extends GenericDaoBase implemen // TODO Auto-generated method stub return 0; } - + @Override + public boolean updateState(Network.State currentState, Network.Event event, Network.State nextState, Network vo, Object data) { + return true; + } }