-Added recoonect logic using shutdown listner in RabbitMQEventBus

-changed EventBus interface method signtures to return/take UUID for
subscriber management
This commit is contained in:
Murali Reddy 2013-01-25 14:18:49 +05:30
parent 28738e4e22
commit 2dd40c2823
15 changed files with 671 additions and 234 deletions

View File

@ -48,7 +48,8 @@ public class EventCategory {
return null;
}
public static final EventCategory ACTION_EVENT = new EventCategory("Action Events");
public static final EventCategory ALERT_EVENT = new EventCategory("Alert Event");
public static final EventCategory USAGE_EVENT = new EventCategory("Usage Event");
public static final EventCategory ACTION_EVENT = new EventCategory("ActionEvent");
public static final EventCategory ALERT_EVENT = new EventCategory("AlertEvent");
public static final EventCategory USAGE_EVENT = new EventCategory("UsageEvent");
public static final EventCategory RESOURCE_STATE_CHANGE_EVENT = new EventCategory("ResourceStateEvent");
}

View File

@ -202,6 +202,7 @@ public interface Network extends ControlledEntity, StateObject<Network.State>, I
}
public enum State {
Allocated("Indicates the network configuration is in allocated but not setup"),
Setup("Indicates the network configuration is setup"),
Implementing("Indicates the network configuration is being implemented"),
@ -210,6 +211,7 @@ public interface Network extends ControlledEntity, StateObject<Network.State>, I
Destroy("Indicates that the network is destroyed");
protected static final StateMachine2<State, Network.Event, Network> s_fsm = new StateMachine2<State, Network.Event, Network>();
static {
s_fsm.addTransition(State.Allocated, Event.ImplementNetwork, State.Implementing);
s_fsm.addTransition(State.Implementing, Event.OperationSucceeded, State.Implemented);

View File

@ -234,6 +234,7 @@ under the License.
<param name="port">5672</param>
<param name="username">guest</param>
<param name="password">guest</param>
<param name="exchangename">cloudstack-evetns</param>
</adapter>
</adapters>
<manager name="OvsTunnelManager" key="com.cloud.network.ovs.OvsTunnelManager" class="com.cloud.network.ovs.OvsTunnelManagerImpl"/>

View File

@ -23,34 +23,44 @@ import com.google.gson.Gson;
public class Event {
String category;
String type;
String routingKey;
String description;
String publisher;
String date;
String eventCategory;
String eventType;
String eventSource;
String resourceType;
String resourceUUID;
String description;
public Event(String category, String type, String routingKey) {
this.category = category;
this.type = type;
this.routingKey = routingKey;
public Event(String eventSource, String eventCategory, String eventType, String resourceType,
String resourceUUID) {
this.eventCategory = eventCategory;
this.eventType = eventType;
this.eventSource = eventSource;
this.resourceType = resourceType;
this.resourceUUID = resourceUUID;
}
public String getCategory() {
return category;
public String getEventCategory() {
return eventCategory;
}
public String getType() {
return type;
public void setEventCategory(String category) {
eventCategory = category;
}
public String getRoutingKey() {
return routingKey;
public String getEventType() {
return eventType;
}
public void setRoutingKey(String routingKey) {
this.routingKey = routingKey;
public void setEventType(String type) {
eventType = type;
}
public String getEventSource() {
return eventSource;
}
void setEventSource(String source) {
eventSource = source;
}
public String getDescription() {
@ -62,19 +72,23 @@ public class Event {
this.description = gson.toJson(message).toString();
}
public String getEventPublisher() {
return publisher;
public void setDescription(String description) {
this.description = description;
}
void setEventPublisher(String source) {
this.publisher = source;
public String getResourceType() {
return resourceType;
}
public String getDate() {
return date;
public void setResourceType(String resourceType) {
this.resourceType = resourceType;
}
void setDate(String date) {
this.date = date;
public void setResourceUUID(String uuid) {
this.resourceUUID = uuid;
}
}
public String getResourceUUID () {
return resourceUUID;
}
}

View File

@ -21,6 +21,8 @@ package org.apache.cloudstack.framework.events;
import com.cloud.utils.component.Adapter;
import java.util.UUID;
/**
* Interface to publish and subscribe to CloudStack events
*
@ -28,29 +30,26 @@ import com.cloud.utils.component.Adapter;
public interface EventBus extends Adapter{
/**
* publish an event
* publish an event on to the event bus
*
* @param event event that needs to be published
* @return true if the event has been successfully published on event bus
* @param event event that needs to be published on the event bus
*/
boolean publish(Event event);
void publish(Event event) throws EventBusException;
/**
* subscribe to events of a category and a type
* subscribe to events that matches specified event topics
*
* @param topic defines category and type of the events being subscribed to
* @param subscriber subscriber that intends to receive event notification
* @return true if the subscriber has been successfully registered.
* @return UUID returns the subscription ID
*/
boolean subscribe(EventTopic topic, EventSubscriber subscriber);
UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws EventBusException;
/**
* unsubscribe to events of a category and a type
*
* @param topic defines category and type of the events to unsubscribe
* @param subscriber subscriber that intends to unsubscribe from the event notification
* @return true if the subscriber has been successfully unsubscribed.
*/
boolean unsubscribe(EventTopic topic, EventSubscriber subscriber);
void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException;
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.events;
public class EventBusException extends Exception{
public EventBusException (String msg) {
super(msg);
}
}

View File

@ -24,9 +24,7 @@ public interface EventSubscriber {
/**
* Callback method. EventBus calls this method on occurrence of subscribed event
*
* @param category category of the event being subscribed (e.g. action, usage, alert etc)
* @param type type of the event (e.g. vm stop, volume delete etc)
* @param description description of the event
* @param event details of the event
*/
void recieve(Event event);
void onEvent(Event event);
}

View File

@ -23,12 +23,16 @@ public class EventTopic {
String eventCategory;
String eventType;
String bindingKey;
String resourceType;
String resourceUUID;
String eventSource;
public EventTopic(String eventCategory, String eventType, String bindingKey) {
public EventTopic(String eventCategory, String eventType, String resourceType, String resourceUUID, String eventSource) {
this.eventCategory = eventCategory;
this.eventType = eventType;
this.bindingKey = bindingKey;
this.resourceType = resourceType;
this.resourceUUID = resourceUUID;
this.eventSource = eventSource;
}
public String getEventCategory() {
@ -39,7 +43,15 @@ public class EventTopic {
return eventType;
}
public String getBindingKey() {
return bindingKey;
public String getResourceType() {
return resourceType;
}
public String getEventSource() {
return eventSource;
}
public String getResourceUUID() {
return resourceUUID;
}
}

View File

@ -19,15 +19,15 @@
package org.apache.cloudstack.framework.events;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
@Target({METHOD })
@Retention(RUNTIME)
public @interface Subscribe {
public @interface Subscribe {
String eventCategory();

View File

@ -1,126 +1,533 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.mom.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import org.apache.cloudstack.framework.events.Event;
import org.apache.cloudstack.framework.events.EventBus;
import org.apache.cloudstack.framework.events.EventSubscriber;
import org.apache.cloudstack.framework.events.EventTopic;
import com.rabbitmq.client.*;
import org.apache.cloudstack.framework.events.*;
import org.apache.log4j.Logger;
import com.cloud.utils.Ternary;
import javax.ejb.Local;
import javax.naming.ConfigurationException;
import java.io.IOException;
import java.net.ConnectException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Local(value=EventBus.class)
public class RabbitMQEventBus implements EventBus {
// details of AMQP server
private static String _amqpHost;
private static Integer _port;
private static String _username;
private static String _password;
public static final Logger s_logger = Logger.getLogger(RabbitMQEventBus.class);
public Connection _connection = null;
public Channel _channel = null;
private String _rabbitMqHost;
private Integer _port;
private String _username;
private String _password;
// AMQP exchange name where all CloudStack events will be published
private static String _amqpExchangeName;
// hashmap to book keep the registered subscribers
private static ConcurrentHashMap<String, Ternary<String, Channel, EventSubscriber>> _subscribers;
// connection to AMQP server,
private static Connection _connection=null;
// AMQP server should consider messages acknowledged once delivered if _autoAck is true
private static boolean _autoAck = true;
private ExecutorService executorService;
private String _name;
private static DisconnectHandler disconnectHandler;
private static Integer _retryInterval;
private static final Logger s_logger = Logger.getLogger(RabbitMQEventBus.class);
@Override
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
_rabbitMqHost = (String) params.get("server");
_port = Integer.parseInt((String) params.get("port"));
_amqpHost = (String) params.get("server");
if (_amqpHost == null || _amqpHost.isEmpty()) {
throw new ConfigurationException("Unable to get the AMQP server details");
}
_username = (String) params.get("username");
if (_username == null || _username.isEmpty()) {
throw new ConfigurationException("Unable to get the username details");
}
_password = (String) params.get("password");
if (_password == null || _password.isEmpty()) {
throw new ConfigurationException("Unable to get the password details");
}
_amqpExchangeName = (String) params.get("exchangename");
if (_amqpExchangeName == null || _amqpExchangeName.isEmpty()) {
throw new ConfigurationException("Unable to get the _exchange details on the AMQP server");
}
try {
String portStr = (String) params.get("port");
if (portStr == null || portStr.isEmpty()) {
throw new ConfigurationException("Unable to get the port details of AMQP server");
}
_port = Integer.parseInt(portStr);
String retryIntervalStr = (String) params.get("retryinterval");
if (retryIntervalStr == null || retryIntervalStr.isEmpty()) {
// default to 10s to try out reconnect
retryIntervalStr = "10000";
}
_retryInterval = Integer.parseInt(retryIntervalStr);
} catch (NumberFormatException e) {
throw new ConfigurationException("Invalid port number/retry interval");
}
_subscribers = new ConcurrentHashMap<String, Ternary<String, Channel, EventSubscriber>>();
executorService = Executors.newCachedThreadPool();
disconnectHandler = new DisconnectHandler();
_name = name;
return true;
}
/** Call to subscribe to interested set of events
*
* @param topic defines category and type of the events being subscribed to
* @param subscriber subscriber that intends to receive event notification
* @return UUID that represents the subscription with event bus
* @throws EventBusException
*/
@Override
public String getName() {
return null;
public UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws EventBusException {
if (subscriber == null || topic == null) {
throw new EventBusException("Invalid EventSubscriber/EventTopic object passed.");
}
// create a UUID, that will be used for managing subscriptions and also used as queue name
// for on the queue used for the subscriber on the AMQP broker
UUID queueId = UUID.randomUUID();
String queueName = queueId.toString();
try {
String bindingKey = createBindingKey(topic);
// store the subscriber details before creating channel
_subscribers.put(queueName, new Ternary(bindingKey, null, subscriber));
// create a channel dedicated for this subscription
Connection connection = getConnection();
Channel channel = createChannel(connection);
// create a queue and bind it to the exchange with binding key formed from event topic
createExchange(channel, _amqpExchangeName);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, _amqpExchangeName, bindingKey);
// register a callback handler to receive the events that a subscriber subscribed to
channel.basicConsume(queueName, _autoAck, queueName,
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String queueName,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
Ternary<String, Channel, EventSubscriber> queueDetails = _subscribers.get(queueName);
if (queueDetails != null) {
EventSubscriber subscriber = queueDetails.third();
String routingKey = envelope.getRoutingKey();
Event event = new Event(null, getEventCategoryFromRoutingKey(routingKey),
getEventTypeFromRoutingKey(routingKey), null, null);
event.setDescription(body.toString());
// deliver the event to call back object provided by subscriber
subscriber.onEvent(event);
}
}
}
);
// update the channel details for the subscription
Ternary<String, Channel, EventSubscriber> queueDetails = _subscribers.get(queueName);
queueDetails.second(channel);
_subscribers.put(queueName, queueDetails);
} catch (AlreadyClosedException closedException) {
s_logger.warn("Connection to AMQP service is lost. Subscription:" + queueName +
" will be active after reconnection");
} catch (ConnectException connectException) {
s_logger.warn("Connection to AMQP service is lost. Subscription:" + queueName +
" will be active after reconnection");
} catch (Exception e) {
throw new EventBusException("Failed to subscribe to event due to " + e.getMessage());
}
return queueId;
}
@Override
public boolean start() {
return true;
public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException {
try {
String classname = subscriber.getClass().getName();
String queueName = UUID.nameUUIDFromBytes(classname.getBytes()).toString();
Ternary<String, Channel, EventSubscriber> queueDetails = _subscribers.get(queueName);
Channel channel = queueDetails.second();
channel.basicCancel(queueName);
_subscribers.remove(queueName, queueDetails);
} catch (Exception e) {
throw new EventBusException("Failed to unsubscribe from event bus due to " + e.getMessage());
}
}
// publish event on to the exchange created on AMQP server
@Override
public boolean stop() {
return true;
}
public void publish(Event event) throws EventBusException {
@Override
public boolean publish(Event event) {
String exchangeName = getExchangeName(event.getCategory());
String routingKey = getRoutingKey(event.getType());
String routingKey = createRoutingKey(event);
String eventDescription = event.getDescription();
try {
createConnection();
createExchange(exchangeName);
publishEventToExchange(exchangeName, routingKey, eventDescription);
Connection connection = getConnection();
Channel channel = createChannel(connection);
createExchange(channel, _amqpExchangeName);
publishEventToExchange(channel, _amqpExchangeName, routingKey, eventDescription);
channel.close();
} catch (AlreadyClosedException e) {
closeConnection();
throw new EventBusException("Failed to publish event to message broker as connection to AMQP broker in lost");
} catch (Exception e) {
s_logger.error("Failed to publish event to message broker due to " + e.getMessage());
return false;
throw new EventBusException("Failed to publish event to message broker due to " + e.getMessage());
}
return true;
}
@Override
public boolean subscribe(EventTopic topic, EventSubscriber subscriber) {
return true;
/** creates a routing key from the event details.
* created routing key will be used while publishing the message to exchange on AMQP server
*/
private String createRoutingKey(Event event) {
StringBuilder routingKey = new StringBuilder();
String eventSource = replaceNullWithWildcard(event.getEventSource());
eventSource = eventSource.replace(".", "-");
String eventCategory = replaceNullWithWildcard(event.getEventCategory());
eventCategory = eventCategory.replace(".", "-");
String eventType = replaceNullWithWildcard(event.getEventType());
eventType = eventType.replace(".", "-");
String resourceType = replaceNullWithWildcard(event.getResourceType());
resourceType = resourceType.replace(".", "-");
String resourceUuid = replaceNullWithWildcard(event.getResourceUUID());
resourceUuid = resourceUuid.replace(".", "-");
// routing key will be of format: eventSource.eventCategory.eventType.resourceType.resourceUuid
routingKey.append(eventSource);
routingKey.append(".");
routingKey.append(eventCategory);
routingKey.append(".");
routingKey.append(eventType);
routingKey.append(".");
routingKey.append(resourceType);
routingKey.append(".");
routingKey.append(resourceUuid);
return routingKey.toString();
}
@Override
public boolean unsubscribe(EventTopic topic, EventSubscriber subscriber) {
return true;
/** creates a binding key from the event topic that subscriber specified
* binding key will be used to bind the queue created for subscriber to exchange on AMQP server
*/
private String createBindingKey(EventTopic topic) {
StringBuilder bindingKey = new StringBuilder();
String eventSource = replaceNullWithWildcard(topic.getEventSource());
eventSource = eventSource.replace(".", "-");
String eventCategory = replaceNullWithWildcard(topic.getEventCategory());
eventCategory = eventCategory.replace(".", "-");
String eventType = replaceNullWithWildcard(topic.getEventType());
eventType = eventType.replace(".", "-");
String resourceType = replaceNullWithWildcard(topic.getResourceType());
resourceType = resourceType.replace(".", "-");
String resourceUuid = replaceNullWithWildcard(topic.getResourceUUID());
resourceUuid = resourceUuid.replace(".", "-");
// binding key will be of format: eventSource.eventCategory.eventType.resourceType.resourceUuid
bindingKey.append(eventSource);
bindingKey.append(".");
bindingKey.append(eventCategory);
bindingKey.append(".");
bindingKey.append(eventType);
bindingKey.append(".");
bindingKey.append(resourceType);
bindingKey.append(".");
bindingKey.append(resourceUuid);
return bindingKey.toString();
}
private String getExchangeName(String eventCategory) {
return "CloudStack " + eventCategory;
private synchronized Connection getConnection() throws Exception {
if (_connection == null) {
try {
return createConnection();
} catch (Exception e) {
s_logger.error("Failed to create a connection to AMQP server due to " + e.getMessage());
throw e;
}
} else {
return _connection;
}
}
private String getRoutingKey(String eventType) {
return eventType;
}
private void createConnection() throws Exception {
private synchronized Connection createConnection() throws Exception {
try {
// obtain a connection to RabbitMQ server
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(_username);
factory.setPassword(_password);
factory.setVirtualHost("/");
factory.setHost(_rabbitMqHost);
factory.setHost(_amqpHost);
factory.setPort(_port);
_connection = factory.newConnection();
_channel = _connection.createChannel();
Connection connection = factory.newConnection();
connection.addShutdownListener(disconnectHandler);
_connection = connection;
return _connection;
} catch (Exception e) {
s_logger.error("Failed to create a connection to RabbitMQ server due to " + e.getMessage());
throw e;
}
}
private void createExchange(String exchangeName) throws Exception {
private synchronized void closeConnection() {
try {
_channel.exchangeDeclare(exchangeName, "topic", true);
if (_connection != null) {
_connection.close();
}
} catch (Exception e) {
s_logger.warn("Failed to close connection to AMQP server due to " + e.getMessage());
}
_connection = null;
}
private synchronized void abortConnection () {
if (_connection == null)
return;
try {
_connection.abort();
} catch (Exception e) {
s_logger.warn("Failed to abort connection due to " + e.getMessage());
}
_connection = null;
}
private String replaceNullWithWildcard(String key) {
if (key == null || key.isEmpty()) {
return "*";
} else {
return key;
}
}
private Channel createChannel(Connection connection) throws Exception {
try {
return connection.createChannel();
} catch (java.io.IOException exception) {
s_logger.warn("Failed to create a channel due to " + exception.getMessage());
throw exception;
}
}
private void createExchange(Channel channel, String exchangeName) throws Exception {
try {
channel.exchangeDeclare(exchangeName, "topic", true);
} catch (java.io.IOException exception) {
s_logger.error("Failed to create exchange" + exchangeName + " on RabbitMQ server");
throw exception;
}
}
private void publishEventToExchange(String exchangeName, String routingKey, String eventDescription) throws Exception {
private void publishEventToExchange(Channel channel, String exchangeName,
String routingKey, String eventDescription) throws Exception {
try {
_channel.txSelect();
byte[] messageBodyBytes = eventDescription.getBytes();
_channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
_channel.txCommit();
channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
} catch (Exception e) {
s_logger.error("Failed to publish event " + routingKey + " on exchange " + exchangeName +
" of message broker due to " + e.getMessage());
throw e;
}
}
private String getEventCategoryFromRoutingKey(String routingKey) {
String[] keyParts = routingKey.split("\\.");
return keyParts[1];
}
private String getEventTypeFromRoutingKey(String routingKey) {
String[] keyParts = routingKey.split("\\.");
return keyParts[2];
}
private String getEventSourceFromRoutingKey(String routingKey) {
String[] keyParts = routingKey.split("\\.");
return keyParts[0];
}
@Override
public String getName() {
return _name;
}
@Override
public boolean start() {
ReconnectionTask reconnect = new ReconnectionTask(); // initiate connection to AMQP server
executorService.submit(reconnect);
return true;
}
@Override
public boolean stop() {
if (_connection.isOpen()) {
for (String subscriberId : _subscribers.keySet()) {
Ternary<String, Channel, EventSubscriber> subscriberDetails = _subscribers.get(subscriberId);
Channel channel = subscriberDetails.second();
String queueName = subscriberId;
try {
channel.queueDelete(queueName);
channel.abort();
} catch (IOException ioe) {
s_logger.warn("Failed to delete queue: " + queueName + " on AMQP server due to " + ioe.getMessage() );
}
}
}
closeConnection();
return true;
}
// logic to deal with loss of connection to AMQP server
private class DisconnectHandler implements ShutdownListener {
@Override
public void shutdownCompleted(ShutdownSignalException shutdownSignalException) {
if (!shutdownSignalException.isInitiatedByApplication()) {
for (String subscriberId : _subscribers.keySet()) {
Ternary<String, Channel, EventSubscriber> subscriberDetails = _subscribers.get(subscriberId);
subscriberDetails.second(null);
_subscribers.put(subscriberId, subscriberDetails);
}
abortConnection(); // disconnected to AMQP server, so abort the connection and channels
s_logger.warn("Connection has been shutdown by AMQP server. Attempting to reconnect.");
// initiate re-connect process
ReconnectionTask reconnect = new ReconnectionTask();
executorService.submit(reconnect);
}
}
}
// retry logic to connect back to AMQP server after loss of connection
private class ReconnectionTask implements Runnable {
boolean connected = false;
Connection connection = null;
public void run() {
while (!connected) {
try {
Thread.sleep(_retryInterval);
} catch (InterruptedException ie) {
// ignore timer interrupts
}
try {
try {
connection = createConnection();
connected = true;
} catch (IOException ie) {
continue; // can't establish connection to AMQP server yet, so continue
}
// prepare consumer on AMQP server for each of subscriber
for (String subscriberId : _subscribers.keySet()) {
Ternary<String, Channel, EventSubscriber> subscriberDetails = _subscribers.get(subscriberId);
String bindingKey = subscriberDetails.first();
EventSubscriber subscriber = subscriberDetails.third();
/** create a queue with subscriber ID as queue name and bind it to the exchange
* with binding key formed from event topic
*/
Channel channel = createChannel(connection);
createExchange(channel, _amqpExchangeName);
channel.queueDeclare(subscriberId, false, false, false, null);
channel.queueBind(subscriberId, _amqpExchangeName, bindingKey);
// register a callback handler to receive the events that a subscriber subscribed to
channel.basicConsume(subscriberId, _autoAck, subscriberId,
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String queueName,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
Ternary<String, Channel, EventSubscriber> subscriberDetails
= _subscribers.get(queueName); // queue name == subscriber ID
if (subscriberDetails != null) {
EventSubscriber subscriber = subscriberDetails.third();
String routingKey = envelope.getRoutingKey();
Event event = new Event(null, getEventCategoryFromRoutingKey(routingKey),
getEventTypeFromRoutingKey(routingKey), null, null);
event.setDescription(body.toString());
// deliver the event to call back object provided by subscriber
subscriber.onEvent(event);
}
}
}
);
// update the channel details for the subscription
subscriberDetails.second(channel);
_subscribers.put(subscriberId, subscriberDetails);
}
} catch (Exception e) {
s_logger.warn("Failed to recreate queues and binding for the subscribers due to " + e.getMessage());
}
}
return;
}
}
}

View File

@ -263,7 +263,7 @@ public class AlertManagerImpl implements AlertManager {
public void sendAlert(short alertType, long dataCenterId, Long podId, String subject, String body) {
// publish alert
AlertGenerator.publishAlert(getAlertType(alertType), dataCenterId, podId, subject, body);
AlertGenerator.publishAlertOnEventBus(getAlertType(alertType), dataCenterId, podId, subject, body);
// TODO: queue up these messages and send them as one set of issues once a certain number of issues is reached? If that's the case,
// shouldn't we have a type/severity as part of the API so that severe errors get sent right away?

View File

@ -25,6 +25,8 @@ import net.sf.cglib.proxy.MethodInterceptor;
import net.sf.cglib.proxy.MethodProxy;
import org.apache.cloudstack.framework.events.Event;
import org.apache.cloudstack.framework.events.EventBus;
import org.apache.cloudstack.framework.events.EventBusException;
import org.apache.log4j.Logger;
import java.lang.reflect.AnnotatedElement;
import java.lang.reflect.Method;
@ -36,6 +38,7 @@ public class ActionEventCallback implements MethodInterceptor, AnnotationInterce
protected static EventBus _eventBus = null;
protected static boolean _eventBusLoaded = false;
private static final Logger s_logger = Logger.getLogger(ActionEventCallback.class);
@Override
public Object intercept(Object object, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
@ -144,21 +147,34 @@ public class ActionEventCallback implements MethodInterceptor, AnnotationInterce
return this;
}
void publishOnEventBus(long userId, long accountId, String type, com.cloud.event.Event.State state, String description) {
void publishOnEventBus(long userId, long accountId, String eventType, com.cloud.event.Event.State state, String description) {
if (getEventBus() != null) {
Map<String, String> eventDescription = new HashMap<String, String>();
eventDescription.put("user", String.valueOf(userId));
eventDescription.put("account", String.valueOf(accountId));
eventDescription.put("state", state.toString());
eventDescription.put("description", description);
Event event = new Event(EventCategory.ACTION_EVENT.getName(), type, type);
int index = eventType.lastIndexOf("\\.");
String resourceType = null;
if (index != -1 ) {
resourceType = eventType.substring(0, index);
}
Event event = new Event(null, EventCategory.ACTION_EVENT.getName(), eventType,
resourceType, null);
event.setDescription(eventDescription);
_eventBus.publish(event);
try {
_eventBus.publish(event);
} catch (EventBusException e) {
s_logger.warn("Failed to publish action event on the the event bus.");
}
}
}
private EventBus getEventBus() {
//TODO: check if there is way of getting single adapter
if (_eventBus == null) {
if (!_eventBusLoaded) {
ComponentLocator locator = ComponentLocator.getLocator("management-server");

View File

@ -1,9 +1,26 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.event;
import com.cloud.utils.component.Adapters;
import com.cloud.utils.component.ComponentLocator;
import org.apache.cloudstack.framework.events.EventBus;
import org.apache.cloudstack.framework.events.Event;
import org.apache.cloudstack.framework.events.*;
import org.apache.log4j.Logger;
import java.util.Enumeration;
import java.util.HashMap;
@ -11,13 +28,11 @@ import java.util.Map;
public class AlertGenerator {
private static final Logger s_logger = Logger.getLogger(AlertGenerator.class);
protected static EventBus _eventBus = null;
protected static boolean _eventBusLoaded = false;
public static void publishAlert(String alertType, long dataCenterId, Long podId, String subject, String body) {
}
void publishOnEventBus(String alertType, long dataCenterId, Long podId, String subject, String body) {
public static void publishAlertOnEventBus(String alertType, long dataCenterId, Long podId, String subject, String body) {
if (getEventBus() != null) {
Map<String, String> eventDescription = new HashMap<String, String>();
eventDescription.put("alertType", alertType);
@ -25,14 +40,22 @@ public class AlertGenerator {
eventDescription.put("podId", Long.toString(podId));
eventDescription.put("subject", subject);
eventDescription.put("body", body);
Event event = new Event(EventCategory.ALERT_EVENT.getName(), alertType, alertType);
org.apache.cloudstack.framework.events.Event event =
new org.apache.cloudstack.framework.events.Event(null,
EventCategory.ALERT_EVENT.getName(),
alertType,
null,
null);
event.setDescription(eventDescription);
_eventBus.publish(event);
try {
_eventBus.publish(event);
} catch (EventBusException e) {
s_logger.warn("Failed to publish alert on the the event bus.");
}
}
}
private EventBus getEventBus() {
//TODO: check if there is way of getting single adapter
private static EventBus getEventBus() {
if (_eventBus == null) {
if (!_eventBusLoaded) {
ComponentLocator locator = ComponentLocator.getLocator("management-server");

View File

@ -4,6 +4,8 @@ import com.cloud.utils.component.Adapters;
import com.cloud.utils.component.ComponentLocator;
import org.apache.cloudstack.framework.events.EventBus;
import org.apache.cloudstack.framework.events.Event;
import org.apache.cloudstack.framework.events.EventBusException;
import org.apache.log4j.Logger;
import java.util.Enumeration;
import java.util.HashMap;
@ -11,6 +13,7 @@ import java.util.Map;
public class UsageEventGenerator {
private static final Logger s_logger = Logger.getLogger(UsageEventGenerator.class);
protected static EventBus _eventBus = null;
protected static boolean _eventBusLoaded = false;
@ -54,14 +57,17 @@ public class UsageEventGenerator {
}
eventDescription.put("resourceName", resourceName);
eventDescription.put("resourceType", resourceType);
Event event = new Event(EventCategory.USAGE_EVENT.getName(), usageType, usageType);
Event event = new Event(null, EventCategory.USAGE_EVENT.getName(), usageType, null, null);
event.setDescription(eventDescription);
_eventBus.publish(event);
try {
_eventBus.publish(event);
} catch (EventBusException e) {
s_logger.warn("Failed to publish usage event on the the event bus.");
}
}
}
private static EventBus getEventBus() {
//TODO: check if there is way of getting single adapter
if (_eventBus == null) {
if (!_eventBusLoaded) {
ComponentLocator locator = ComponentLocator.getLocator("management-server");

View File

@ -16,39 +16,9 @@
// under the License.
package com.cloud.network;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.ejb.Local;
import javax.naming.ConfigurationException;
import org.apache.cloudstack.acl.ControlledEntity.ACLType;
import org.apache.cloudstack.acl.SecurityChecker.AccessType;
import org.apache.log4j.Logger;
import com.cloud.agent.AgentManager;
import com.cloud.agent.Listener;
import com.cloud.agent.api.AgentControlAnswer;
import com.cloud.agent.api.AgentControlCommand;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.CheckNetworkAnswer;
import com.cloud.agent.api.CheckNetworkCommand;
import com.cloud.agent.api.Command;
import com.cloud.agent.api.StartupCommand;
import com.cloud.agent.api.StartupRoutingCommand;
import com.cloud.agent.api.*;
import com.cloud.agent.api.to.NicTO;
import com.cloud.alert.AlertManager;
import com.cloud.api.ApiDBUtils;
@ -56,15 +26,9 @@ import com.cloud.configuration.Config;
import com.cloud.configuration.ConfigurationManager;
import com.cloud.configuration.Resource.ResourceType;
import com.cloud.configuration.dao.ConfigurationDao;
import com.cloud.dc.AccountVlanMapVO;
import com.cloud.dc.DataCenter;
import com.cloud.dc.*;
import com.cloud.dc.DataCenter.NetworkType;
import com.cloud.dc.DataCenterVO;
import com.cloud.dc.Pod;
import com.cloud.dc.PodVlanMapVO;
import com.cloud.dc.Vlan;
import com.cloud.dc.Vlan.VlanType;
import com.cloud.dc.VlanVO;
import com.cloud.dc.dao.AccountVlanMapDao;
import com.cloud.dc.dao.DataCenterDao;
import com.cloud.dc.dao.PodVlanMapDao;
@ -77,64 +41,28 @@ import com.cloud.domain.dao.DomainDao;
import com.cloud.event.EventTypes;
import com.cloud.event.UsageEventGenerator;
import com.cloud.event.dao.UsageEventDao;
import com.cloud.exception.AccountLimitException;
import com.cloud.exception.ConcurrentOperationException;
import com.cloud.exception.ConnectionException;
import com.cloud.exception.InsufficientAddressCapacityException;
import com.cloud.exception.InsufficientCapacityException;
import com.cloud.exception.InsufficientVirtualNetworkCapcityException;
import com.cloud.exception.InvalidParameterValueException;
import com.cloud.exception.PermissionDeniedException;
import com.cloud.exception.ResourceAllocationException;
import com.cloud.exception.ResourceUnavailableException;
import com.cloud.exception.UnsupportedServiceException;
import com.cloud.exception.*;
import com.cloud.host.Host;
import com.cloud.host.HostVO;
import com.cloud.host.Status;
import com.cloud.host.dao.HostDao;
import com.cloud.hypervisor.Hypervisor.HypervisorType;
import com.cloud.network.IpAddress.State;
import com.cloud.network.Network.Capability;
import com.cloud.network.Network.GuestType;
import com.cloud.network.Network.Provider;
import com.cloud.network.Network.Service;
import com.cloud.network.Network.*;
import com.cloud.network.Networks.AddressFormat;
import com.cloud.network.Networks.BroadcastDomainType;
import com.cloud.network.Networks.IsolationType;
import com.cloud.network.Networks.TrafficType;
import com.cloud.network.addr.PublicIp;
import com.cloud.network.Network.Event;
import com.cloud.network.dao.*;
import com.cloud.network.element.*;
import com.cloud.network.dao.FirewallRulesDao;
import com.cloud.network.dao.IPAddressDao;
import com.cloud.network.dao.LoadBalancerDao;
import com.cloud.network.dao.NetworkDao;
import com.cloud.network.dao.NetworkServiceMapDao;
import com.cloud.network.dao.PhysicalNetworkDao;
import com.cloud.network.dao.PhysicalNetworkServiceProviderDao;
import com.cloud.network.dao.PhysicalNetworkTrafficTypeDao;
import com.cloud.network.dao.PhysicalNetworkTrafficTypeVO;
import com.cloud.network.element.DhcpServiceProvider;
import com.cloud.network.element.IpDeployer;
import com.cloud.network.element.LoadBalancingServiceProvider;
import com.cloud.network.element.NetworkElement;
import com.cloud.network.element.StaticNatServiceProvider;
import com.cloud.network.element.UserDataServiceProvider;
import com.cloud.network.guru.NetworkGuru;
import com.cloud.network.lb.LoadBalancingRule;
import com.cloud.network.lb.LoadBalancingRule.LbDestination;
import com.cloud.network.lb.LoadBalancingRule.LbStickinessPolicy;
import com.cloud.network.lb.LoadBalancingRulesManager;
import com.cloud.network.rules.FirewallManager;
import com.cloud.network.rules.FirewallRule;
import com.cloud.network.rules.*;
import com.cloud.network.rules.FirewallRule.Purpose;
import com.cloud.network.rules.FirewallRuleVO;
import com.cloud.network.rules.PortForwardingRuleVO;
import com.cloud.network.rules.RulesManager;
import com.cloud.network.rules.StaticNat;
import com.cloud.network.rules.StaticNatRule;
import com.cloud.network.rules.StaticNatRuleImpl;
import com.cloud.network.rules.dao.PortForwardingRulesDao;
import com.cloud.network.vpc.NetworkACLManager;
import com.cloud.network.vpc.VpcManager;
@ -147,11 +75,7 @@ import com.cloud.offerings.NetworkOfferingVO;
import com.cloud.offerings.dao.NetworkOfferingDao;
import com.cloud.offerings.dao.NetworkOfferingServiceMapDao;
import com.cloud.org.Grouping;
import com.cloud.user.Account;
import com.cloud.user.AccountManager;
import com.cloud.user.ResourceLimitService;
import com.cloud.user.User;
import com.cloud.user.UserContext;
import com.cloud.user.*;
import com.cloud.user.dao.AccountDao;
import com.cloud.utils.NumbersUtil;
import com.cloud.utils.Pair;
@ -159,32 +83,30 @@ import com.cloud.utils.component.Adapters;
import com.cloud.utils.component.Inject;
import com.cloud.utils.component.Manager;
import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.db.DB;
import com.cloud.utils.db.Filter;
import com.cloud.utils.db.*;
import com.cloud.utils.db.JoinBuilder.JoinType;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.SearchCriteria.Op;
import com.cloud.utils.db.Transaction;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.utils.fsm.NoTransitionException;
import com.cloud.utils.fsm.StateMachine2;
import com.cloud.utils.net.Ip;
import com.cloud.utils.net.NetUtils;
import com.cloud.vm.Nic;
import com.cloud.vm.NicProfile;
import com.cloud.vm.NicVO;
import com.cloud.vm.ReservationContext;
import com.cloud.vm.ReservationContextImpl;
import com.cloud.vm.UserVmVO;
import com.cloud.vm.VMInstanceVO;
import com.cloud.vm.VirtualMachine;
import com.cloud.vm.*;
import com.cloud.vm.VirtualMachine.Type;
import com.cloud.vm.VirtualMachineProfile;
import com.cloud.vm.VirtualMachineProfileImpl;
import com.cloud.vm.dao.NicDao;
import com.cloud.vm.dao.UserVmDao;
import com.cloud.vm.dao.VMInstanceDao;
import org.apache.cloudstack.acl.ControlledEntity.ACLType;
import org.apache.cloudstack.acl.SecurityChecker.AccessType;
import org.apache.log4j.Logger;
import javax.ejb.Local;
import javax.naming.ConfigurationException;
import java.net.URI;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* NetworkManagerImpl implements NetworkManager.
@ -2059,9 +1981,12 @@ public class NetworkManagerImpl implements NetworkManager, Manager, Listener {
s_logger.debug("Network is not implemented: " + network);
return false;
}
network.setState(Network.State.Shutdown);
_networksDao.update(network.getId(), network);
try {
stateTransitTo(network, Event.DestroyNetwork);
} catch (NoTransitionException e) {
network.setState(Network.State.Shutdown);
_networksDao.update(network.getId(), network);
}
boolean success = shutdownNetworkElementsAndResources(context, cleanupElements, network);
@ -2076,15 +2001,22 @@ public class NetworkManagerImpl implements NetworkManager, Manager, Listener {
guru.shutdown(profile, _networkOfferingDao.findById(network.getNetworkOfferingId()));
applyProfileToNetwork(network, profile);
network.setState(Network.State.Allocated);
network.setRestartRequired(false);
try {
stateTransitTo(network, Event.OperationSucceeded);
} catch (NoTransitionException e) {
network.setState(Network.State.Allocated);
network.setRestartRequired(false);
}
_networksDao.update(network.getId(), network);
_networksDao.clearCheckForGc(networkId);
result = true;
} else {
network.setState(Network.State.Implemented);
_networksDao.update(network.getId(), network);
try {
stateTransitTo(network, Event.OperationFailed);
} catch (NoTransitionException e) {
network.setState(Network.State.Implemented);
_networksDao.update(network.getId(), network);
}
result = false;
}
txn.commit();