From 2dd40c2823bb908ef55be7adbfaa8df7638c30c4 Mon Sep 17 00:00:00 2001 From: Murali Reddy Date: Fri, 25 Jan 2013 14:18:49 +0530 Subject: [PATCH] -Added recoonect logic using shutdown listner in RabbitMQEventBus -changed EventBus interface method signtures to return/take UUID for subscriber management --- api/src/com/cloud/event/EventCategory.java | 7 +- api/src/com/cloud/network/Network.java | 2 + client/tomcatconf/components.xml.in | 1 + .../cloudstack/framework/events/Event.java | 68 ++- .../cloudstack/framework/events/EventBus.java | 19 +- .../framework/events/EventBusException.java | 26 + .../framework/events/EventSubscriber.java | 6 +- .../framework/events/EventTopic.java | 22 +- .../framework/events/Subscribe.java | 8 +- .../mom/rabbitmq/RabbitMQEventBus.java | 523 ++++++++++++++++-- .../src/com/cloud/alert/AlertManagerImpl.java | 2 +- .../com/cloud/event/ActionEventCallback.java | 24 +- .../src/com/cloud/event/AlertGenerator.java | 43 +- .../com/cloud/event/UsageEventGenerator.java | 12 +- .../com/cloud/network/NetworkManagerImpl.java | 142 ++--- 15 files changed, 671 insertions(+), 234 deletions(-) create mode 100644 framework/events/src/org/apache/cloudstack/framework/events/EventBusException.java diff --git a/api/src/com/cloud/event/EventCategory.java b/api/src/com/cloud/event/EventCategory.java index a0042dd3cfa..cee6529b550 100644 --- a/api/src/com/cloud/event/EventCategory.java +++ b/api/src/com/cloud/event/EventCategory.java @@ -48,7 +48,8 @@ public class EventCategory { return null; } - public static final EventCategory ACTION_EVENT = new EventCategory("Action Events"); - public static final EventCategory ALERT_EVENT = new EventCategory("Alert Event"); - public static final EventCategory USAGE_EVENT = new EventCategory("Usage Event"); + public static final EventCategory ACTION_EVENT = new EventCategory("ActionEvent"); + public static final EventCategory ALERT_EVENT = new EventCategory("AlertEvent"); + public static final EventCategory USAGE_EVENT = new EventCategory("UsageEvent"); + public static final EventCategory RESOURCE_STATE_CHANGE_EVENT = new EventCategory("ResourceStateEvent"); } diff --git a/api/src/com/cloud/network/Network.java b/api/src/com/cloud/network/Network.java index f70d898a801..b3c8381f15a 100644 --- a/api/src/com/cloud/network/Network.java +++ b/api/src/com/cloud/network/Network.java @@ -202,6 +202,7 @@ public interface Network extends ControlledEntity, StateObject, I } public enum State { + Allocated("Indicates the network configuration is in allocated but not setup"), Setup("Indicates the network configuration is setup"), Implementing("Indicates the network configuration is being implemented"), @@ -210,6 +211,7 @@ public interface Network extends ControlledEntity, StateObject, I Destroy("Indicates that the network is destroyed"); protected static final StateMachine2 s_fsm = new StateMachine2(); + static { s_fsm.addTransition(State.Allocated, Event.ImplementNetwork, State.Implementing); s_fsm.addTransition(State.Implementing, Event.OperationSucceeded, State.Implemented); diff --git a/client/tomcatconf/components.xml.in b/client/tomcatconf/components.xml.in index 8af6d9b1126..5cd1985e232 100755 --- a/client/tomcatconf/components.xml.in +++ b/client/tomcatconf/components.xml.in @@ -234,6 +234,7 @@ under the License. 5672 guest guest + cloudstack-evetns diff --git a/framework/events/src/org/apache/cloudstack/framework/events/Event.java b/framework/events/src/org/apache/cloudstack/framework/events/Event.java index f35bbef6eda..eb6f48de3b8 100644 --- a/framework/events/src/org/apache/cloudstack/framework/events/Event.java +++ b/framework/events/src/org/apache/cloudstack/framework/events/Event.java @@ -23,34 +23,44 @@ import com.google.gson.Gson; public class Event { - String category; - String type; - String routingKey; - String description; - String publisher; - String date; + String eventCategory; + String eventType; + String eventSource; String resourceType; + String resourceUUID; + String description; - public Event(String category, String type, String routingKey) { - this.category = category; - this.type = type; - this.routingKey = routingKey; + public Event(String eventSource, String eventCategory, String eventType, String resourceType, + String resourceUUID) { + this.eventCategory = eventCategory; + this.eventType = eventType; + this.eventSource = eventSource; + this.resourceType = resourceType; + this.resourceUUID = resourceUUID; } - public String getCategory() { - return category; + public String getEventCategory() { + return eventCategory; } - public String getType() { - return type; + public void setEventCategory(String category) { + eventCategory = category; } - public String getRoutingKey() { - return routingKey; + public String getEventType() { + return eventType; } - public void setRoutingKey(String routingKey) { - this.routingKey = routingKey; + public void setEventType(String type) { + eventType = type; + } + + public String getEventSource() { + return eventSource; + } + + void setEventSource(String source) { + eventSource = source; } public String getDescription() { @@ -62,19 +72,23 @@ public class Event { this.description = gson.toJson(message).toString(); } - public String getEventPublisher() { - return publisher; + public void setDescription(String description) { + this.description = description; } - void setEventPublisher(String source) { - this.publisher = source; + public String getResourceType() { + return resourceType; } - public String getDate() { - return date; + public void setResourceType(String resourceType) { + this.resourceType = resourceType; } - void setDate(String date) { - this.date = date; + public void setResourceUUID(String uuid) { + this.resourceUUID = uuid; } -} + + public String getResourceUUID () { + return resourceUUID; + } +} \ No newline at end of file diff --git a/framework/events/src/org/apache/cloudstack/framework/events/EventBus.java b/framework/events/src/org/apache/cloudstack/framework/events/EventBus.java index 3782b32aeba..c16ee6f96f4 100644 --- a/framework/events/src/org/apache/cloudstack/framework/events/EventBus.java +++ b/framework/events/src/org/apache/cloudstack/framework/events/EventBus.java @@ -21,6 +21,8 @@ package org.apache.cloudstack.framework.events; import com.cloud.utils.component.Adapter; +import java.util.UUID; + /** * Interface to publish and subscribe to CloudStack events * @@ -28,29 +30,26 @@ import com.cloud.utils.component.Adapter; public interface EventBus extends Adapter{ /** - * publish an event + * publish an event on to the event bus * - * @param event event that needs to be published - * @return true if the event has been successfully published on event bus + * @param event event that needs to be published on the event bus */ - boolean publish(Event event); + void publish(Event event) throws EventBusException; /** - * subscribe to events of a category and a type + * subscribe to events that matches specified event topics * * @param topic defines category and type of the events being subscribed to * @param subscriber subscriber that intends to receive event notification - * @return true if the subscriber has been successfully registered. + * @return UUID returns the subscription ID */ - boolean subscribe(EventTopic topic, EventSubscriber subscriber); + UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws EventBusException; /** * unsubscribe to events of a category and a type * - * @param topic defines category and type of the events to unsubscribe * @param subscriber subscriber that intends to unsubscribe from the event notification - * @return true if the subscriber has been successfully unsubscribed. */ - boolean unsubscribe(EventTopic topic, EventSubscriber subscriber); + void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException; } diff --git a/framework/events/src/org/apache/cloudstack/framework/events/EventBusException.java b/framework/events/src/org/apache/cloudstack/framework/events/EventBusException.java new file mode 100644 index 00000000000..5654ba04804 --- /dev/null +++ b/framework/events/src/org/apache/cloudstack/framework/events/EventBusException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cloudstack.framework.events; + +public class EventBusException extends Exception{ + public EventBusException (String msg) { + super(msg); + } +} diff --git a/framework/events/src/org/apache/cloudstack/framework/events/EventSubscriber.java b/framework/events/src/org/apache/cloudstack/framework/events/EventSubscriber.java index d69035aede4..b1c30c21587 100644 --- a/framework/events/src/org/apache/cloudstack/framework/events/EventSubscriber.java +++ b/framework/events/src/org/apache/cloudstack/framework/events/EventSubscriber.java @@ -24,9 +24,7 @@ public interface EventSubscriber { /** * Callback method. EventBus calls this method on occurrence of subscribed event * - * @param category category of the event being subscribed (e.g. action, usage, alert etc) - * @param type type of the event (e.g. vm stop, volume delete etc) - * @param description description of the event + * @param event details of the event */ - void recieve(Event event); + void onEvent(Event event); } diff --git a/framework/events/src/org/apache/cloudstack/framework/events/EventTopic.java b/framework/events/src/org/apache/cloudstack/framework/events/EventTopic.java index eabcdf60772..19b727d4519 100644 --- a/framework/events/src/org/apache/cloudstack/framework/events/EventTopic.java +++ b/framework/events/src/org/apache/cloudstack/framework/events/EventTopic.java @@ -23,12 +23,16 @@ public class EventTopic { String eventCategory; String eventType; - String bindingKey; + String resourceType; + String resourceUUID; + String eventSource; - public EventTopic(String eventCategory, String eventType, String bindingKey) { + public EventTopic(String eventCategory, String eventType, String resourceType, String resourceUUID, String eventSource) { this.eventCategory = eventCategory; this.eventType = eventType; - this.bindingKey = bindingKey; + this.resourceType = resourceType; + this.resourceUUID = resourceUUID; + this.eventSource = eventSource; } public String getEventCategory() { @@ -39,7 +43,15 @@ public class EventTopic { return eventType; } - public String getBindingKey() { - return bindingKey; + public String getResourceType() { + return resourceType; + } + + public String getEventSource() { + return eventSource; + } + + public String getResourceUUID() { + return resourceUUID; } } diff --git a/framework/events/src/org/apache/cloudstack/framework/events/Subscribe.java b/framework/events/src/org/apache/cloudstack/framework/events/Subscribe.java index 00aa5e571d7..74997f62a54 100644 --- a/framework/events/src/org/apache/cloudstack/framework/events/Subscribe.java +++ b/framework/events/src/org/apache/cloudstack/framework/events/Subscribe.java @@ -19,15 +19,15 @@ package org.apache.cloudstack.framework.events; -import static java.lang.annotation.ElementType.METHOD; -import static java.lang.annotation.RetentionPolicy.RUNTIME; - import java.lang.annotation.Retention; import java.lang.annotation.Target; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + @Target({METHOD }) @Retention(RUNTIME) -public @interface Subscribe { +public @interface Subscribe { String eventCategory(); diff --git a/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java b/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java index 3c51cd7af4c..9049fe87fb4 100644 --- a/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java +++ b/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java @@ -1,126 +1,533 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.cloudstack.mom.rabbitmq; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.MessageProperties; -import org.apache.cloudstack.framework.events.Event; -import org.apache.cloudstack.framework.events.EventBus; -import org.apache.cloudstack.framework.events.EventSubscriber; -import org.apache.cloudstack.framework.events.EventTopic; +import com.rabbitmq.client.*; +import org.apache.cloudstack.framework.events.*; import org.apache.log4j.Logger; +import com.cloud.utils.Ternary; + import javax.ejb.Local; import javax.naming.ConfigurationException; +import java.io.IOException; +import java.net.ConnectException; import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; @Local(value=EventBus.class) public class RabbitMQEventBus implements EventBus { + // details of AMQP server + private static String _amqpHost; + private static Integer _port; + private static String _username; + private static String _password; - public static final Logger s_logger = Logger.getLogger(RabbitMQEventBus.class); - public Connection _connection = null; - public Channel _channel = null; - private String _rabbitMqHost; - private Integer _port; - private String _username; - private String _password; + // AMQP exchange name where all CloudStack events will be published + private static String _amqpExchangeName; + + // hashmap to book keep the registered subscribers + private static ConcurrentHashMap> _subscribers; + + // connection to AMQP server, + private static Connection _connection=null; + + // AMQP server should consider messages acknowledged once delivered if _autoAck is true + private static boolean _autoAck = true; + + private ExecutorService executorService; + private String _name; + private static DisconnectHandler disconnectHandler; + private static Integer _retryInterval; + private static final Logger s_logger = Logger.getLogger(RabbitMQEventBus.class); @Override public boolean configure(String name, Map params) throws ConfigurationException { - _rabbitMqHost = (String) params.get("server"); - _port = Integer.parseInt((String) params.get("port")); + + _amqpHost = (String) params.get("server"); + if (_amqpHost == null || _amqpHost.isEmpty()) { + throw new ConfigurationException("Unable to get the AMQP server details"); + } + _username = (String) params.get("username"); + if (_username == null || _username.isEmpty()) { + throw new ConfigurationException("Unable to get the username details"); + } + _password = (String) params.get("password"); + if (_password == null || _password.isEmpty()) { + throw new ConfigurationException("Unable to get the password details"); + } + + _amqpExchangeName = (String) params.get("exchangename"); + if (_amqpExchangeName == null || _amqpExchangeName.isEmpty()) { + throw new ConfigurationException("Unable to get the _exchange details on the AMQP server"); + } + + try { + String portStr = (String) params.get("port"); + if (portStr == null || portStr.isEmpty()) { + throw new ConfigurationException("Unable to get the port details of AMQP server"); + } + _port = Integer.parseInt(portStr); + + String retryIntervalStr = (String) params.get("retryinterval"); + if (retryIntervalStr == null || retryIntervalStr.isEmpty()) { + // default to 10s to try out reconnect + retryIntervalStr = "10000"; + } + _retryInterval = Integer.parseInt(retryIntervalStr); + } catch (NumberFormatException e) { + throw new ConfigurationException("Invalid port number/retry interval"); + } + + _subscribers = new ConcurrentHashMap>(); + + executorService = Executors.newCachedThreadPool(); + disconnectHandler = new DisconnectHandler(); + _name = name; return true; } + /** Call to subscribe to interested set of events + * + * @param topic defines category and type of the events being subscribed to + * @param subscriber subscriber that intends to receive event notification + * @return UUID that represents the subscription with event bus + * @throws EventBusException + */ @Override - public String getName() { - return null; + public UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws EventBusException { + + if (subscriber == null || topic == null) { + throw new EventBusException("Invalid EventSubscriber/EventTopic object passed."); + } + + // create a UUID, that will be used for managing subscriptions and also used as queue name + // for on the queue used for the subscriber on the AMQP broker + UUID queueId = UUID.randomUUID(); + String queueName = queueId.toString(); + + try { + String bindingKey = createBindingKey(topic); + + // store the subscriber details before creating channel + _subscribers.put(queueName, new Ternary(bindingKey, null, subscriber)); + + // create a channel dedicated for this subscription + Connection connection = getConnection(); + Channel channel = createChannel(connection); + + // create a queue and bind it to the exchange with binding key formed from event topic + createExchange(channel, _amqpExchangeName); + channel.queueDeclare(queueName, false, false, false, null); + channel.queueBind(queueName, _amqpExchangeName, bindingKey); + + // register a callback handler to receive the events that a subscriber subscribed to + channel.basicConsume(queueName, _autoAck, queueName, + new DefaultConsumer(channel) { + @Override + public void handleDelivery(String queueName, + Envelope envelope, + AMQP.BasicProperties properties, + byte[] body) + throws IOException { + Ternary queueDetails = _subscribers.get(queueName); + if (queueDetails != null) { + EventSubscriber subscriber = queueDetails.third(); + String routingKey = envelope.getRoutingKey(); + Event event = new Event(null, getEventCategoryFromRoutingKey(routingKey), + getEventTypeFromRoutingKey(routingKey), null, null); + event.setDescription(body.toString()); + + // deliver the event to call back object provided by subscriber + subscriber.onEvent(event); + } + } + } + ); + + // update the channel details for the subscription + Ternary queueDetails = _subscribers.get(queueName); + queueDetails.second(channel); + _subscribers.put(queueName, queueDetails); + + } catch (AlreadyClosedException closedException) { + s_logger.warn("Connection to AMQP service is lost. Subscription:" + queueName + + " will be active after reconnection"); + } catch (ConnectException connectException) { + s_logger.warn("Connection to AMQP service is lost. Subscription:" + queueName + + " will be active after reconnection"); + } catch (Exception e) { + throw new EventBusException("Failed to subscribe to event due to " + e.getMessage()); + } + + return queueId; } @Override - public boolean start() { - return true; + public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException { + try { + String classname = subscriber.getClass().getName(); + String queueName = UUID.nameUUIDFromBytes(classname.getBytes()).toString(); + Ternary queueDetails = _subscribers.get(queueName); + Channel channel = queueDetails.second(); + channel.basicCancel(queueName); + _subscribers.remove(queueName, queueDetails); + } catch (Exception e) { + throw new EventBusException("Failed to unsubscribe from event bus due to " + e.getMessage()); + } } + // publish event on to the exchange created on AMQP server @Override - public boolean stop() { - return true; - } + public void publish(Event event) throws EventBusException { - @Override - public boolean publish(Event event) { - String exchangeName = getExchangeName(event.getCategory()); - String routingKey = getRoutingKey(event.getType()); + String routingKey = createRoutingKey(event); String eventDescription = event.getDescription(); try { - createConnection(); - createExchange(exchangeName); - publishEventToExchange(exchangeName, routingKey, eventDescription); + Connection connection = getConnection(); + Channel channel = createChannel(connection); + createExchange(channel, _amqpExchangeName); + publishEventToExchange(channel, _amqpExchangeName, routingKey, eventDescription); + channel.close(); + } catch (AlreadyClosedException e) { + closeConnection(); + throw new EventBusException("Failed to publish event to message broker as connection to AMQP broker in lost"); } catch (Exception e) { - s_logger.error("Failed to publish event to message broker due to " + e.getMessage()); - return false; + throw new EventBusException("Failed to publish event to message broker due to " + e.getMessage()); } - return true; } - @Override - public boolean subscribe(EventTopic topic, EventSubscriber subscriber) { - return true; + /** creates a routing key from the event details. + * created routing key will be used while publishing the message to exchange on AMQP server + */ + private String createRoutingKey(Event event) { + + StringBuilder routingKey = new StringBuilder(); + + String eventSource = replaceNullWithWildcard(event.getEventSource()); + eventSource = eventSource.replace(".", "-"); + + String eventCategory = replaceNullWithWildcard(event.getEventCategory()); + eventCategory = eventCategory.replace(".", "-"); + + String eventType = replaceNullWithWildcard(event.getEventType()); + eventType = eventType.replace(".", "-"); + + String resourceType = replaceNullWithWildcard(event.getResourceType()); + resourceType = resourceType.replace(".", "-"); + + String resourceUuid = replaceNullWithWildcard(event.getResourceUUID()); + resourceUuid = resourceUuid.replace(".", "-"); + + // routing key will be of format: eventSource.eventCategory.eventType.resourceType.resourceUuid + routingKey.append(eventSource); + routingKey.append("."); + routingKey.append(eventCategory); + routingKey.append("."); + routingKey.append(eventType); + routingKey.append("."); + routingKey.append(resourceType); + routingKey.append("."); + routingKey.append(resourceUuid); + + return routingKey.toString(); } - @Override - public boolean unsubscribe(EventTopic topic, EventSubscriber subscriber) { - return true; + /** creates a binding key from the event topic that subscriber specified + * binding key will be used to bind the queue created for subscriber to exchange on AMQP server + */ + private String createBindingKey(EventTopic topic) { + + StringBuilder bindingKey = new StringBuilder(); + + String eventSource = replaceNullWithWildcard(topic.getEventSource()); + eventSource = eventSource.replace(".", "-"); + + String eventCategory = replaceNullWithWildcard(topic.getEventCategory()); + eventCategory = eventCategory.replace(".", "-"); + + String eventType = replaceNullWithWildcard(topic.getEventType()); + eventType = eventType.replace(".", "-"); + + String resourceType = replaceNullWithWildcard(topic.getResourceType()); + resourceType = resourceType.replace(".", "-"); + + String resourceUuid = replaceNullWithWildcard(topic.getResourceUUID()); + resourceUuid = resourceUuid.replace(".", "-"); + + // binding key will be of format: eventSource.eventCategory.eventType.resourceType.resourceUuid + bindingKey.append(eventSource); + bindingKey.append("."); + bindingKey.append(eventCategory); + bindingKey.append("."); + bindingKey.append(eventType); + bindingKey.append("."); + bindingKey.append(resourceType); + bindingKey.append("."); + bindingKey.append(resourceUuid); + + return bindingKey.toString(); } - private String getExchangeName(String eventCategory) { - return "CloudStack " + eventCategory; + private synchronized Connection getConnection() throws Exception { + if (_connection == null) { + try { + return createConnection(); + } catch (Exception e) { + s_logger.error("Failed to create a connection to AMQP server due to " + e.getMessage()); + throw e; + } + } else { + return _connection; + } } - private String getRoutingKey(String eventType) { - return eventType; - } - - private void createConnection() throws Exception { + private synchronized Connection createConnection() throws Exception { try { - // obtain a connection to RabbitMQ server ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(_username); factory.setPassword(_password); factory.setVirtualHost("/"); - factory.setHost(_rabbitMqHost); + factory.setHost(_amqpHost); factory.setPort(_port); - _connection = factory.newConnection(); - _channel = _connection.createChannel(); + Connection connection = factory.newConnection(); + connection.addShutdownListener(disconnectHandler); + _connection = connection; + return _connection; } catch (Exception e) { - s_logger.error("Failed to create a connection to RabbitMQ server due to " + e.getMessage()); throw e; } } - private void createExchange(String exchangeName) throws Exception { + private synchronized void closeConnection() { try { - _channel.exchangeDeclare(exchangeName, "topic", true); + if (_connection != null) { + _connection.close(); + } + } catch (Exception e) { + s_logger.warn("Failed to close connection to AMQP server due to " + e.getMessage()); + } + _connection = null; + } + + private synchronized void abortConnection () { + if (_connection == null) + return; + + try { + _connection.abort(); + } catch (Exception e) { + s_logger.warn("Failed to abort connection due to " + e.getMessage()); + } + _connection = null; + } + + private String replaceNullWithWildcard(String key) { + if (key == null || key.isEmpty()) { + return "*"; + } else { + return key; + } + } + + private Channel createChannel(Connection connection) throws Exception { + try { + return connection.createChannel(); + } catch (java.io.IOException exception) { + s_logger.warn("Failed to create a channel due to " + exception.getMessage()); + throw exception; + } + } + + private void createExchange(Channel channel, String exchangeName) throws Exception { + try { + channel.exchangeDeclare(exchangeName, "topic", true); } catch (java.io.IOException exception) { s_logger.error("Failed to create exchange" + exchangeName + " on RabbitMQ server"); throw exception; } } - private void publishEventToExchange(String exchangeName, String routingKey, String eventDescription) throws Exception { + private void publishEventToExchange(Channel channel, String exchangeName, + String routingKey, String eventDescription) throws Exception { try { - _channel.txSelect(); byte[] messageBodyBytes = eventDescription.getBytes(); - _channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes); - _channel.txCommit(); + channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes); } catch (Exception e) { s_logger.error("Failed to publish event " + routingKey + " on exchange " + exchangeName + " of message broker due to " + e.getMessage()); throw e; } } + + private String getEventCategoryFromRoutingKey(String routingKey) { + String[] keyParts = routingKey.split("\\."); + return keyParts[1]; + } + + private String getEventTypeFromRoutingKey(String routingKey) { + String[] keyParts = routingKey.split("\\."); + return keyParts[2]; + } + + private String getEventSourceFromRoutingKey(String routingKey) { + String[] keyParts = routingKey.split("\\."); + return keyParts[0]; + } + + @Override + public String getName() { + return _name; + } + + @Override + public boolean start() { + ReconnectionTask reconnect = new ReconnectionTask(); // initiate connection to AMQP server + executorService.submit(reconnect); + return true; + } + + @Override + public boolean stop() { + + if (_connection.isOpen()) { + for (String subscriberId : _subscribers.keySet()) { + Ternary subscriberDetails = _subscribers.get(subscriberId); + Channel channel = subscriberDetails.second(); + String queueName = subscriberId; + try { + channel.queueDelete(queueName); + channel.abort(); + } catch (IOException ioe) { + s_logger.warn("Failed to delete queue: " + queueName + " on AMQP server due to " + ioe.getMessage() ); + } + } + } + + closeConnection(); + return true; + } + + // logic to deal with loss of connection to AMQP server + private class DisconnectHandler implements ShutdownListener { + + @Override + public void shutdownCompleted(ShutdownSignalException shutdownSignalException) { + if (!shutdownSignalException.isInitiatedByApplication()) { + + for (String subscriberId : _subscribers.keySet()) { + Ternary subscriberDetails = _subscribers.get(subscriberId); + subscriberDetails.second(null); + _subscribers.put(subscriberId, subscriberDetails); + } + + abortConnection(); // disconnected to AMQP server, so abort the connection and channels + s_logger.warn("Connection has been shutdown by AMQP server. Attempting to reconnect."); + + // initiate re-connect process + ReconnectionTask reconnect = new ReconnectionTask(); + executorService.submit(reconnect); + } + } + } + + // retry logic to connect back to AMQP server after loss of connection + private class ReconnectionTask implements Runnable { + + boolean connected = false; + Connection connection = null; + + public void run() { + + while (!connected) { + try { + Thread.sleep(_retryInterval); + } catch (InterruptedException ie) { + // ignore timer interrupts + } + + try { + try { + connection = createConnection(); + connected = true; + } catch (IOException ie) { + continue; // can't establish connection to AMQP server yet, so continue + } + + // prepare consumer on AMQP server for each of subscriber + for (String subscriberId : _subscribers.keySet()) { + Ternary subscriberDetails = _subscribers.get(subscriberId); + String bindingKey = subscriberDetails.first(); + EventSubscriber subscriber = subscriberDetails.third(); + + /** create a queue with subscriber ID as queue name and bind it to the exchange + * with binding key formed from event topic + */ + Channel channel = createChannel(connection); + createExchange(channel, _amqpExchangeName); + channel.queueDeclare(subscriberId, false, false, false, null); + channel.queueBind(subscriberId, _amqpExchangeName, bindingKey); + + // register a callback handler to receive the events that a subscriber subscribed to + channel.basicConsume(subscriberId, _autoAck, subscriberId, + new DefaultConsumer(channel) { + @Override + public void handleDelivery(String queueName, + Envelope envelope, + AMQP.BasicProperties properties, + byte[] body) + throws IOException { + + Ternary subscriberDetails + = _subscribers.get(queueName); // queue name == subscriber ID + + if (subscriberDetails != null) { + EventSubscriber subscriber = subscriberDetails.third(); + String routingKey = envelope.getRoutingKey(); + Event event = new Event(null, getEventCategoryFromRoutingKey(routingKey), + getEventTypeFromRoutingKey(routingKey), null, null); + event.setDescription(body.toString()); + + // deliver the event to call back object provided by subscriber + subscriber.onEvent(event); + } + } + } + ); + + // update the channel details for the subscription + subscriberDetails.second(channel); + _subscribers.put(subscriberId, subscriberDetails); + } + } catch (Exception e) { + s_logger.warn("Failed to recreate queues and binding for the subscribers due to " + e.getMessage()); + } + } + return; + } + } } \ No newline at end of file diff --git a/server/src/com/cloud/alert/AlertManagerImpl.java b/server/src/com/cloud/alert/AlertManagerImpl.java index 8eda011b5e3..1a93f97a704 100755 --- a/server/src/com/cloud/alert/AlertManagerImpl.java +++ b/server/src/com/cloud/alert/AlertManagerImpl.java @@ -263,7 +263,7 @@ public class AlertManagerImpl implements AlertManager { public void sendAlert(short alertType, long dataCenterId, Long podId, String subject, String body) { // publish alert - AlertGenerator.publishAlert(getAlertType(alertType), dataCenterId, podId, subject, body); + AlertGenerator.publishAlertOnEventBus(getAlertType(alertType), dataCenterId, podId, subject, body); // TODO: queue up these messages and send them as one set of issues once a certain number of issues is reached? If that's the case, // shouldn't we have a type/severity as part of the API so that severe errors get sent right away? diff --git a/server/src/com/cloud/event/ActionEventCallback.java b/server/src/com/cloud/event/ActionEventCallback.java index 93c2fdc23b4..eeec2b4feae 100644 --- a/server/src/com/cloud/event/ActionEventCallback.java +++ b/server/src/com/cloud/event/ActionEventCallback.java @@ -25,6 +25,8 @@ import net.sf.cglib.proxy.MethodInterceptor; import net.sf.cglib.proxy.MethodProxy; import org.apache.cloudstack.framework.events.Event; import org.apache.cloudstack.framework.events.EventBus; +import org.apache.cloudstack.framework.events.EventBusException; +import org.apache.log4j.Logger; import java.lang.reflect.AnnotatedElement; import java.lang.reflect.Method; @@ -36,6 +38,7 @@ public class ActionEventCallback implements MethodInterceptor, AnnotationInterce protected static EventBus _eventBus = null; protected static boolean _eventBusLoaded = false; + private static final Logger s_logger = Logger.getLogger(ActionEventCallback.class); @Override public Object intercept(Object object, Method method, Object[] args, MethodProxy methodProxy) throws Throwable { @@ -144,21 +147,34 @@ public class ActionEventCallback implements MethodInterceptor, AnnotationInterce return this; } - void publishOnEventBus(long userId, long accountId, String type, com.cloud.event.Event.State state, String description) { + void publishOnEventBus(long userId, long accountId, String eventType, com.cloud.event.Event.State state, String description) { if (getEventBus() != null) { Map eventDescription = new HashMap(); eventDescription.put("user", String.valueOf(userId)); eventDescription.put("account", String.valueOf(accountId)); eventDescription.put("state", state.toString()); eventDescription.put("description", description); - Event event = new Event(EventCategory.ACTION_EVENT.getName(), type, type); + + int index = eventType.lastIndexOf("\\."); + + String resourceType = null; + if (index != -1 ) { + resourceType = eventType.substring(0, index); + } + + Event event = new Event(null, EventCategory.ACTION_EVENT.getName(), eventType, + resourceType, null); event.setDescription(eventDescription); - _eventBus.publish(event); + + try { + _eventBus.publish(event); + } catch (EventBusException e) { + s_logger.warn("Failed to publish action event on the the event bus."); + } } } private EventBus getEventBus() { - //TODO: check if there is way of getting single adapter if (_eventBus == null) { if (!_eventBusLoaded) { ComponentLocator locator = ComponentLocator.getLocator("management-server"); diff --git a/server/src/com/cloud/event/AlertGenerator.java b/server/src/com/cloud/event/AlertGenerator.java index 16ef73e5235..5421aec12f6 100644 --- a/server/src/com/cloud/event/AlertGenerator.java +++ b/server/src/com/cloud/event/AlertGenerator.java @@ -1,9 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package com.cloud.event; import com.cloud.utils.component.Adapters; import com.cloud.utils.component.ComponentLocator; -import org.apache.cloudstack.framework.events.EventBus; -import org.apache.cloudstack.framework.events.Event; +import org.apache.cloudstack.framework.events.*; +import org.apache.log4j.Logger; import java.util.Enumeration; import java.util.HashMap; @@ -11,13 +28,11 @@ import java.util.Map; public class AlertGenerator { + private static final Logger s_logger = Logger.getLogger(AlertGenerator.class); protected static EventBus _eventBus = null; protected static boolean _eventBusLoaded = false; - public static void publishAlert(String alertType, long dataCenterId, Long podId, String subject, String body) { - } - - void publishOnEventBus(String alertType, long dataCenterId, Long podId, String subject, String body) { + public static void publishAlertOnEventBus(String alertType, long dataCenterId, Long podId, String subject, String body) { if (getEventBus() != null) { Map eventDescription = new HashMap(); eventDescription.put("alertType", alertType); @@ -25,14 +40,22 @@ public class AlertGenerator { eventDescription.put("podId", Long.toString(podId)); eventDescription.put("subject", subject); eventDescription.put("body", body); - Event event = new Event(EventCategory.ALERT_EVENT.getName(), alertType, alertType); + org.apache.cloudstack.framework.events.Event event = + new org.apache.cloudstack.framework.events.Event(null, + EventCategory.ALERT_EVENT.getName(), + alertType, + null, + null); event.setDescription(eventDescription); - _eventBus.publish(event); + try { + _eventBus.publish(event); + } catch (EventBusException e) { + s_logger.warn("Failed to publish alert on the the event bus."); + } } } - private EventBus getEventBus() { - //TODO: check if there is way of getting single adapter + private static EventBus getEventBus() { if (_eventBus == null) { if (!_eventBusLoaded) { ComponentLocator locator = ComponentLocator.getLocator("management-server"); diff --git a/server/src/com/cloud/event/UsageEventGenerator.java b/server/src/com/cloud/event/UsageEventGenerator.java index edf6a422c22..80353e58ae2 100644 --- a/server/src/com/cloud/event/UsageEventGenerator.java +++ b/server/src/com/cloud/event/UsageEventGenerator.java @@ -4,6 +4,8 @@ import com.cloud.utils.component.Adapters; import com.cloud.utils.component.ComponentLocator; import org.apache.cloudstack.framework.events.EventBus; import org.apache.cloudstack.framework.events.Event; +import org.apache.cloudstack.framework.events.EventBusException; +import org.apache.log4j.Logger; import java.util.Enumeration; import java.util.HashMap; @@ -11,6 +13,7 @@ import java.util.Map; public class UsageEventGenerator { + private static final Logger s_logger = Logger.getLogger(UsageEventGenerator.class); protected static EventBus _eventBus = null; protected static boolean _eventBusLoaded = false; @@ -54,14 +57,17 @@ public class UsageEventGenerator { } eventDescription.put("resourceName", resourceName); eventDescription.put("resourceType", resourceType); - Event event = new Event(EventCategory.USAGE_EVENT.getName(), usageType, usageType); + Event event = new Event(null, EventCategory.USAGE_EVENT.getName(), usageType, null, null); event.setDescription(eventDescription); - _eventBus.publish(event); + try { + _eventBus.publish(event); + } catch (EventBusException e) { + s_logger.warn("Failed to publish usage event on the the event bus."); + } } } private static EventBus getEventBus() { - //TODO: check if there is way of getting single adapter if (_eventBus == null) { if (!_eventBusLoaded) { ComponentLocator locator = ComponentLocator.getLocator("management-server"); diff --git a/server/src/com/cloud/network/NetworkManagerImpl.java b/server/src/com/cloud/network/NetworkManagerImpl.java index 1775f88ee91..04f448826ee 100755 --- a/server/src/com/cloud/network/NetworkManagerImpl.java +++ b/server/src/com/cloud/network/NetworkManagerImpl.java @@ -16,39 +16,9 @@ // under the License. package com.cloud.network; -import java.net.URI; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import javax.ejb.Local; -import javax.naming.ConfigurationException; - -import org.apache.cloudstack.acl.ControlledEntity.ACLType; -import org.apache.cloudstack.acl.SecurityChecker.AccessType; -import org.apache.log4j.Logger; - import com.cloud.agent.AgentManager; import com.cloud.agent.Listener; -import com.cloud.agent.api.AgentControlAnswer; -import com.cloud.agent.api.AgentControlCommand; -import com.cloud.agent.api.Answer; -import com.cloud.agent.api.CheckNetworkAnswer; -import com.cloud.agent.api.CheckNetworkCommand; -import com.cloud.agent.api.Command; -import com.cloud.agent.api.StartupCommand; -import com.cloud.agent.api.StartupRoutingCommand; +import com.cloud.agent.api.*; import com.cloud.agent.api.to.NicTO; import com.cloud.alert.AlertManager; import com.cloud.api.ApiDBUtils; @@ -56,15 +26,9 @@ import com.cloud.configuration.Config; import com.cloud.configuration.ConfigurationManager; import com.cloud.configuration.Resource.ResourceType; import com.cloud.configuration.dao.ConfigurationDao; -import com.cloud.dc.AccountVlanMapVO; -import com.cloud.dc.DataCenter; +import com.cloud.dc.*; import com.cloud.dc.DataCenter.NetworkType; -import com.cloud.dc.DataCenterVO; -import com.cloud.dc.Pod; -import com.cloud.dc.PodVlanMapVO; -import com.cloud.dc.Vlan; import com.cloud.dc.Vlan.VlanType; -import com.cloud.dc.VlanVO; import com.cloud.dc.dao.AccountVlanMapDao; import com.cloud.dc.dao.DataCenterDao; import com.cloud.dc.dao.PodVlanMapDao; @@ -77,64 +41,28 @@ import com.cloud.domain.dao.DomainDao; import com.cloud.event.EventTypes; import com.cloud.event.UsageEventGenerator; import com.cloud.event.dao.UsageEventDao; -import com.cloud.exception.AccountLimitException; -import com.cloud.exception.ConcurrentOperationException; -import com.cloud.exception.ConnectionException; -import com.cloud.exception.InsufficientAddressCapacityException; -import com.cloud.exception.InsufficientCapacityException; -import com.cloud.exception.InsufficientVirtualNetworkCapcityException; -import com.cloud.exception.InvalidParameterValueException; -import com.cloud.exception.PermissionDeniedException; -import com.cloud.exception.ResourceAllocationException; -import com.cloud.exception.ResourceUnavailableException; -import com.cloud.exception.UnsupportedServiceException; +import com.cloud.exception.*; import com.cloud.host.Host; import com.cloud.host.HostVO; import com.cloud.host.Status; import com.cloud.host.dao.HostDao; import com.cloud.hypervisor.Hypervisor.HypervisorType; import com.cloud.network.IpAddress.State; -import com.cloud.network.Network.Capability; -import com.cloud.network.Network.GuestType; -import com.cloud.network.Network.Provider; -import com.cloud.network.Network.Service; +import com.cloud.network.Network.*; import com.cloud.network.Networks.AddressFormat; import com.cloud.network.Networks.BroadcastDomainType; import com.cloud.network.Networks.IsolationType; import com.cloud.network.Networks.TrafficType; import com.cloud.network.addr.PublicIp; -import com.cloud.network.Network.Event; import com.cloud.network.dao.*; import com.cloud.network.element.*; -import com.cloud.network.dao.FirewallRulesDao; -import com.cloud.network.dao.IPAddressDao; -import com.cloud.network.dao.LoadBalancerDao; -import com.cloud.network.dao.NetworkDao; -import com.cloud.network.dao.NetworkServiceMapDao; -import com.cloud.network.dao.PhysicalNetworkDao; -import com.cloud.network.dao.PhysicalNetworkServiceProviderDao; -import com.cloud.network.dao.PhysicalNetworkTrafficTypeDao; -import com.cloud.network.dao.PhysicalNetworkTrafficTypeVO; -import com.cloud.network.element.DhcpServiceProvider; -import com.cloud.network.element.IpDeployer; -import com.cloud.network.element.LoadBalancingServiceProvider; -import com.cloud.network.element.NetworkElement; -import com.cloud.network.element.StaticNatServiceProvider; -import com.cloud.network.element.UserDataServiceProvider; import com.cloud.network.guru.NetworkGuru; import com.cloud.network.lb.LoadBalancingRule; import com.cloud.network.lb.LoadBalancingRule.LbDestination; import com.cloud.network.lb.LoadBalancingRule.LbStickinessPolicy; import com.cloud.network.lb.LoadBalancingRulesManager; -import com.cloud.network.rules.FirewallManager; -import com.cloud.network.rules.FirewallRule; +import com.cloud.network.rules.*; import com.cloud.network.rules.FirewallRule.Purpose; -import com.cloud.network.rules.FirewallRuleVO; -import com.cloud.network.rules.PortForwardingRuleVO; -import com.cloud.network.rules.RulesManager; -import com.cloud.network.rules.StaticNat; -import com.cloud.network.rules.StaticNatRule; -import com.cloud.network.rules.StaticNatRuleImpl; import com.cloud.network.rules.dao.PortForwardingRulesDao; import com.cloud.network.vpc.NetworkACLManager; import com.cloud.network.vpc.VpcManager; @@ -147,11 +75,7 @@ import com.cloud.offerings.NetworkOfferingVO; import com.cloud.offerings.dao.NetworkOfferingDao; import com.cloud.offerings.dao.NetworkOfferingServiceMapDao; import com.cloud.org.Grouping; -import com.cloud.user.Account; -import com.cloud.user.AccountManager; -import com.cloud.user.ResourceLimitService; -import com.cloud.user.User; -import com.cloud.user.UserContext; +import com.cloud.user.*; import com.cloud.user.dao.AccountDao; import com.cloud.utils.NumbersUtil; import com.cloud.utils.Pair; @@ -159,32 +83,30 @@ import com.cloud.utils.component.Adapters; import com.cloud.utils.component.Inject; import com.cloud.utils.component.Manager; import com.cloud.utils.concurrency.NamedThreadFactory; -import com.cloud.utils.db.DB; -import com.cloud.utils.db.Filter; +import com.cloud.utils.db.*; import com.cloud.utils.db.JoinBuilder.JoinType; -import com.cloud.utils.db.SearchBuilder; -import com.cloud.utils.db.SearchCriteria; import com.cloud.utils.db.SearchCriteria.Op; -import com.cloud.utils.db.Transaction; import com.cloud.utils.exception.CloudRuntimeException; import com.cloud.utils.fsm.NoTransitionException; import com.cloud.utils.fsm.StateMachine2; import com.cloud.utils.net.Ip; import com.cloud.utils.net.NetUtils; -import com.cloud.vm.Nic; -import com.cloud.vm.NicProfile; -import com.cloud.vm.NicVO; -import com.cloud.vm.ReservationContext; -import com.cloud.vm.ReservationContextImpl; -import com.cloud.vm.UserVmVO; -import com.cloud.vm.VMInstanceVO; -import com.cloud.vm.VirtualMachine; +import com.cloud.vm.*; import com.cloud.vm.VirtualMachine.Type; -import com.cloud.vm.VirtualMachineProfile; -import com.cloud.vm.VirtualMachineProfileImpl; import com.cloud.vm.dao.NicDao; import com.cloud.vm.dao.UserVmDao; import com.cloud.vm.dao.VMInstanceDao; +import org.apache.cloudstack.acl.ControlledEntity.ACLType; +import org.apache.cloudstack.acl.SecurityChecker.AccessType; +import org.apache.log4j.Logger; + +import javax.ejb.Local; +import javax.naming.ConfigurationException; +import java.net.URI; +import java.util.*; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; /** * NetworkManagerImpl implements NetworkManager. @@ -2059,9 +1981,12 @@ public class NetworkManagerImpl implements NetworkManager, Manager, Listener { s_logger.debug("Network is not implemented: " + network); return false; } - - network.setState(Network.State.Shutdown); - _networksDao.update(network.getId(), network); + try { + stateTransitTo(network, Event.DestroyNetwork); + } catch (NoTransitionException e) { + network.setState(Network.State.Shutdown); + _networksDao.update(network.getId(), network); + } boolean success = shutdownNetworkElementsAndResources(context, cleanupElements, network); @@ -2076,15 +2001,22 @@ public class NetworkManagerImpl implements NetworkManager, Manager, Listener { guru.shutdown(profile, _networkOfferingDao.findById(network.getNetworkOfferingId())); applyProfileToNetwork(network, profile); - - network.setState(Network.State.Allocated); - network.setRestartRequired(false); + try { + stateTransitTo(network, Event.OperationSucceeded); + } catch (NoTransitionException e) { + network.setState(Network.State.Allocated); + network.setRestartRequired(false); + } _networksDao.update(network.getId(), network); _networksDao.clearCheckForGc(networkId); result = true; } else { - network.setState(Network.State.Implemented); - _networksDao.update(network.getId(), network); + try { + stateTransitTo(network, Event.OperationFailed); + } catch (NoTransitionException e) { + network.setState(Network.State.Implemented); + _networksDao.update(network.getId(), network); + } result = false; } txn.commit();