-adds the plugin that implements EventBus with AMQP broker as MOM provider

-modify the components.xml to load the RabbitMQ plugin as EventBus
This commit is contained in:
Murali Reddy 2012-11-15 15:39:29 +05:30
parent 9bb34be7a9
commit cbc9bce6ae
12 changed files with 238 additions and 66 deletions

View File

@ -172,6 +172,14 @@ under the License.
<adapters key="com.cloud.agent.StartupCommandProcessor">
<adapter name="BasicAgentAuthorizer" class="com.cloud.agent.manager.authn.impl.BasicAgentAuthManager"/>
</adapters>
<adapters key="org.apache.cloudstack.framework.events.EventBus">
<adapter name="RabbitMQ Message Broker" class="org.apache.cloudstack.mom.rabbitmq.RabbitMQEventBus">
<param name="server">localhost</param>
<param name="port">55672</param>
<param name="username">guest</param>
<param name="password">guest</param>
</adapter>
</adapters>
<manager name="OvsTunnelManager" key="com.cloud.network.ovs.OvsTunnelManager" class="com.cloud.network.ovs.OvsTunnelManagerImpl"/>
<manager name="ElasticLoadBalancerManager" key="com.cloud.network.lb.ElasticLoadBalancerManager" class="com.cloud.network.lb.ElasticLoadBalancerManagerImpl"/>
<pluggableservice name="VirtualRouterElementService" key="com.cloud.network.element.VirtualRouterElementService" class="com.cloud.network.element.VirtualRouterElement"/>

View File

@ -24,7 +24,19 @@
<parent>
<groupId>org.apache.cloudstack</groupId>
<artifactId>cloudstack</artifactId>
<version>4.0.0-SNAPSHOT</version>
<version>4.1.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>org.apache.cloudstack</groupId>
<artifactId>cloud-utils</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<defaultGoal>install</defaultGoal>
<sourceDirectory>src</sourceDirectory>
<testSourceDirectory>test</testSourceDirectory>
</build>
</project>

View File

@ -19,8 +19,27 @@
package org.apache.cloudstack.framework.events;
import java.util.ArrayList;
import java.util.List;
public class EventCategory {
public static final String ACTION_EVENT = "Action Event";
public static final String USAGE_EVENT = "Usage Event";
public static final String ALERT_EVENT = "Alert Event";
private static List<EventCategory> eventCategories = new ArrayList<EventCategory>();
private String eventCategoryName;
public EventCategory(String categoryName) {
this.eventCategoryName = categoryName;
}
public String getName() {
return eventCategoryName;
}
public static List<EventCategory> listAllEventCategory() {
return eventCategories;
}
public static final EventCategory ACTION_EVENT = new EventCategory("Action Events");
public static final EventCategory ALERT_EVENT = new EventCategory("Alert Event");
public static final EventCategory USAGE_EVENT = new EventCategory("Usage Event");
}

View File

@ -0,0 +1,46 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>cloud-mom-rabbitmq</artifactId>
<name>Apache CloudStack RabbitMQ MOM</name>
<parent>
<groupId>org.apache.cloudstack</groupId>
<artifactId>cloudstack</artifactId>
<version>4.1.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>2.8.7</version>
</dependency>
<dependency>
<groupId>org.apache.cloudstack</groupId>
<artifactId>cloud-framework-events</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<defaultGoal>install</defaultGoal>
<sourceDirectory>src</sourceDirectory>
</build>
</project>

View File

@ -0,0 +1,76 @@
package org.apache.cloudstack.mom.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.apache.cloudstack.framework.events.EventBus;
import org.apache.cloudstack.framework.events.EventCategory;
import org.apache.cloudstack.framework.events.EventSubscriber;
import org.apache.log4j.Logger;
import javax.naming.ConfigurationException;
import java.util.Map;
public class RabbitMQEventBus implements EventBus {
public static final Logger s_logger = Logger.getLogger(RabbitMQEventBus.class);
@Override
public boolean publish(String category, String type, Map<String, String> description) {
return false;
}
@Override
public boolean subscribe(String category, String type, EventSubscriber subscriber) {
return false;
}
@Override
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
try {
String rabbitMqHost = (String) params.get("server");
Integer port = (Integer) params.get("port");
String username = (String) params.get("username");
String password = (String) params.get("password");
// obtain a connection to RabbitMQ server
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost("/");
factory.setHost(rabbitMqHost);
factory.setPort(port);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// create the exchange for each event category
for (EventCategory category : EventCategory.listAllEventCategory()) {
try {
channel.exchangeDeclare(category.getName(), "topic", true);
} catch (java.io.IOException exception) {
s_logger.debug("Failed to create exchange on RabbitMQ server for the event category " + category.getName());
}
}
} catch (Exception e) {
return false;
}
return true;
}
@Override
public String getName() {
return null;
}
@Override
public boolean start() {
return false;
}
@Override
public boolean stop() {
return false;
}
}

View File

@ -38,6 +38,7 @@
<module>hypervisors/ovm</module>
<module>hypervisors/xen</module>
<module>hypervisors/kvm</module>
<module>message-brokers/rabbitmq</module>
<module>network-elements/elastic-loadbalancer</module>
<module>network-elements/ovs</module>
<module>network-elements/nicira-nvp</module>

View File

@ -75,6 +75,11 @@
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.cloudstack</groupId>
<artifactId>cloud-framework-events</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>

View File

@ -16,16 +16,21 @@
// under the License.
package com.cloud.event;
import java.lang.reflect.AnnotatedElement;
import java.lang.reflect.Method;
import com.cloud.user.UserContext;
import com.cloud.utils.component.Adapters;
import com.cloud.utils.component.AnnotationInterceptor;
import com.cloud.utils.component.ComponentLocator;
import net.sf.cglib.proxy.Callback;
import net.sf.cglib.proxy.MethodInterceptor;
import net.sf.cglib.proxy.MethodProxy;
import com.cloud.user.UserContext;
import com.cloud.utils.component.AnnotationInterceptor;
import org.apache.cloudstack.framework.events.EventBus;
import org.apache.cloudstack.framework.events.EventCategory;
import java.lang.reflect.AnnotatedElement;
import java.lang.reflect.Method;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
public class ActionEventCallback implements MethodInterceptor, AnnotationInterceptor<EventVO> {
@ -81,7 +86,7 @@ public class ActionEventCallback implements MethodInterceptor, AnnotationInterce
eventDescription += ". "+ctx.getEventDetails();
}
EventUtils.saveStartedActionEvent(userId, accountId, actionEvent.eventType(), eventDescription, startEventId);
publishOnEventBus(userId, accountId, actionEvent.eventType(), "Started", eventDescription);
publishOnEventBus(userId, accountId, actionEvent.eventType(), Event.State.Started, eventDescription);
}
}
return event;
@ -103,11 +108,11 @@ public class ActionEventCallback implements MethodInterceptor, AnnotationInterce
if(actionEvent.create()){
//This start event has to be used for subsequent events of this action
startEventId = EventUtils.saveCreatedActionEvent(userId, accountId, EventVO.LEVEL_INFO, actionEvent.eventType(), "Successfully created entity for "+eventDescription);
publishOnEventBus(userId, accountId, actionEvent.eventType(), "Successfully created entity for "+eventDescription);
publishOnEventBus(userId, accountId, actionEvent.eventType(), Event.State.Created, "Successfully created entity for " + eventDescription);
ctx.setStartEventId(startEventId);
} else {
EventUtils.saveActionEvent(userId, accountId, EventVO.LEVEL_INFO, actionEvent.eventType(), "Successfully completed "+eventDescription, startEventId);
publishOnEventBus(userId, accountId, actionEvent.eventType(), "Successfully completed "+eventDescription, startEventId);
publishOnEventBus(userId, accountId, actionEvent.eventType(), Event.State.Completed, "Successfully completed " + eventDescription + startEventId);
}
}
}
@ -126,10 +131,10 @@ public class ActionEventCallback implements MethodInterceptor, AnnotationInterce
eventDescription += ". "+ctx.getEventDetails();
}
if(actionEvent.create()){
long eventId = EventUtils.saveCreatedActionEvent(userId, accountId, EventVO.LEVEL_ERROR, actionEvent.eventType(), "Error while creating entity for "+eventDescription);
long eventId = EventUtils.saveCreatedActionEvent(userId, accountId, EventVO.LEVEL_ERROR, actionEvent.eventType(), "Error while creating entity for " + eventDescription);
ctx.setStartEventId(eventId);
} else {
EventUtils.saveActionEvent(userId, accountId, EventVO.LEVEL_ERROR, actionEvent.eventType(), "Error while "+eventDescription, startEventId);
EventUtils.saveActionEvent(userId, accountId, EventVO.LEVEL_ERROR, actionEvent.eventType(), "Error while " + eventDescription, startEventId);
}
}
}
@ -139,12 +144,12 @@ public class ActionEventCallback implements MethodInterceptor, AnnotationInterce
return this;
}
void publishOnEventBus(long userId, long accountId, String type, String state, String description) {
void publishOnEventBus(long userId, long accountId, String type, Event.State state, String description) {
if (getEventBus() != null) {
Map<String, String> eventDescription = new HashMap<String, String>();
eventDescription.put("user", String.valueOf(userId));
eventDescription.put("account", String.valueOf(accountId));
eventDescription.put("state", state);
eventDescription.put("state", state.toString());
eventDescription.put("description", description);
_eventBus.publish(EventCategory.ACTION_EVENT, type, eventDescription);
}

View File

@ -1,6 +1,13 @@
package com.cloud.event;
import import org.apache.cloudstack.framework.events.EventBus;
import com.cloud.utils.component.Adapters;
import com.cloud.utils.component.ComponentLocator;
import org.apache.cloudstack.framework.events.EventBus;
import org.apache.cloudstack.framework.events.EventCategory;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
public class AlertGenerator {
@ -14,8 +21,8 @@ public class AlertGenerator {
if (getEventBus() != null) {
Map<String, String> eventDescription = new HashMap<String, String>();
eventDescription.put("alertType", alertType);
eventDescription.put("dataCenterId", dataCenterId);
eventDescription.put("podId", podId);
eventDescription.put("dataCenterId", Long.toString(dataCenterId));
eventDescription.put("podId", Long.toString(podId));
eventDescription.put("subject", subject);
eventDescription.put("body", body);
_eventBus.publish(EventCategory.ALERT_EVENT, alertType, eventDescription);

View File

@ -1,6 +1,13 @@
package com.cloud.event;
import import org.apache.cloudstack.framework.events.EventBus;
import com.cloud.utils.component.Adapters;
import com.cloud.utils.component.ComponentLocator;
import org.apache.cloudstack.framework.events.EventBus;
import org.apache.cloudstack.framework.events.EventCategory;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
public class UsageEventGenerator {
@ -29,21 +36,21 @@ public class UsageEventGenerator {
public static void publishUsageEvent(String usageType, long accountId,long zoneId, long vmId, long securityGroupId) {
EventUtils.saveUsageEvent(usageType, accountId, zoneId, vmId, securityGroupId);
publishOnEventBus((usageType, accountId, zoneId, vmId, null, null);
publishOnEventBus(usageType, accountId, zoneId, vmId, null, null);
}
void publishOnEventBus(String usageType, Long accountId, Long zoneId, Long resourceId, String resourceName, String resourceType) {
private static void publishOnEventBus(String usageType, Long accountId, Long zoneId, Long resourceId, String resourceName, String resourceType) {
if (getEventBus() != null) {
Map<String, String> eventDescription = new HashMap<String, String>();
eventDescription.put("usage type", usageType);
if (accountId != null) {
eventDescription.put("accountId", usageType)
eventDescription.put("accountId", usageType);
}
if (zoneId != null) {
eventDescription.put("zoneId", String.valueOf(zoneId))
eventDescription.put("zoneId", String.valueOf(zoneId));
}
if (resourceId != null) {
eventDescription.put("resourceId", String.valueOf(resourceId))
eventDescription.put("resourceId", String.valueOf(resourceId));
}
eventDescription.put("resourceName", resourceName);
eventDescription.put("resourceType", resourceType);
@ -51,7 +58,7 @@ public class UsageEventGenerator {
}
}
private EventBus getEventBus() {
private static EventBus getEventBus() {
//TODO: check if there is way of getting single adapter
if (_eventBus == null) {
if (!_eventBusLoaded) {

View File

@ -16,25 +16,9 @@
// under the License.
package com.cloud.network.dao;
import java.util.List;
import java.util.Map;
import java.util.Random;
import javax.ejb.Local;
import javax.persistence.TableGenerator;
import com.cloud.acl.ControlledEntity.ACLType;
import com.cloud.network.Network;
import com.cloud.network.Network.Event;
import com.cloud.network.Network.GuestType;
import com.cloud.network.Network.Provider;
import com.cloud.network.Network.Service;
import com.cloud.network.Network.State;
import com.cloud.network.NetworkAccountDaoImpl;
import com.cloud.network.NetworkAccountVO;
import com.cloud.network.NetworkDomainVO;
import com.cloud.network.NetworkServiceMapVO;
import com.cloud.network.NetworkVO;
import com.cloud.network.*;
import com.cloud.network.Network.*;
import com.cloud.network.Networks.BroadcastDomainType;
import com.cloud.network.Networks.Mode;
import com.cloud.network.Networks.TrafficType;
@ -44,19 +28,18 @@ import com.cloud.offerings.dao.NetworkOfferingDaoImpl;
import com.cloud.server.ResourceTag.TaggedResourceType;
import com.cloud.tags.dao.ResourceTagsDaoImpl;
import com.cloud.utils.component.ComponentLocator;
import com.cloud.utils.db.DB;
import com.cloud.utils.db.GenericDaoBase;
import com.cloud.utils.db.GenericSearchBuilder;
import com.cloud.utils.db.JoinBuilder;
import com.cloud.utils.db.*;
import com.cloud.utils.db.JoinBuilder.JoinType;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.SearchCriteria.Func;
import com.cloud.utils.db.SearchCriteria.Op;
import com.cloud.utils.db.SequenceFetcher;
import com.cloud.utils.db.Transaction;
import com.cloud.utils.net.NetUtils;
import javax.ejb.Local;
import javax.persistence.TableGenerator;
import java.util.List;
import java.util.Map;
import java.util.Random;
@Local(value = NetworkDao.class)
@DB(txn = false)
public class NetworkDaoImpl extends GenericDaoBase<NetworkVO, Long> implements NetworkDao {
@ -558,13 +541,13 @@ public class NetworkDaoImpl extends GenericDaoBase<NetworkVO, Long> implements N
public boolean updateState(State currentState, Event event, State nextState, Network vo, Object data) {
// TODO: ensure this update is correct
Transaction txn = Transaction.currentTxn();
txn.start();
txn.start();
NetworkVO networkVo = (NetworkVO) vo;
networkVo.setState(nextState);
super.update(networkVo.getId(), networkVo);
NetworkVO networkVo = (NetworkVO) vo;
networkVo.setState(nextState);
super.update(networkVo.getId(), networkVo);
txn.commit();
return true;
txn.commit();
return true;
}
}

View File

@ -16,12 +16,7 @@
// under the License.
package com.cloud.vpc.dao;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.ejb.Local;
import com.cloud.network.Network;
import com.cloud.network.Network.GuestType;
import com.cloud.network.NetworkAccountVO;
import com.cloud.network.NetworkVO;
@ -31,6 +26,11 @@ import com.cloud.utils.db.DB;
import com.cloud.utils.db.GenericDaoBase;
import com.cloud.utils.db.SearchBuilder;
import javax.ejb.Local;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@Local(value = NetworkDao.class)
@DB(txn = false)
public class MockNetworkDaoImpl extends GenericDaoBase<NetworkVO, Long> implements NetworkDao{
@ -341,5 +341,8 @@ public class MockNetworkDaoImpl extends GenericDaoBase<NetworkVO, Long> implemen
// TODO Auto-generated method stub
return 0;
}
@Override
public boolean updateState(Network.State currentState, Network.Event event, Network.State nextState, Network vo, Object data) {
return true;
}
}