diff --git a/core/src/main/resources/META-INF/cloudstack/compute/spring-core-lifecycle-compute-context-inheritable.xml b/core/src/main/resources/META-INF/cloudstack/compute/spring-core-lifecycle-compute-context-inheritable.xml
index fb0e8780ecc..ef6adab9dd9 100644
--- a/core/src/main/resources/META-INF/cloudstack/compute/spring-core-lifecycle-compute-context-inheritable.xml
+++ b/core/src/main/resources/META-INF/cloudstack/compute/spring-core-lifecycle-compute-context-inheritable.xml
@@ -39,6 +39,11 @@
+
+
+
+
+
diff --git a/core/src/main/resources/META-INF/cloudstack/core/spring-core-registry-core-context.xml b/core/src/main/resources/META-INF/cloudstack/core/spring-core-registry-core-context.xml
index a36d1243155..bfe722fad55 100644
--- a/core/src/main/resources/META-INF/cloudstack/core/spring-core-registry-core-context.xml
+++ b/core/src/main/resources/META-INF/cloudstack/core/spring-core-registry-core-context.xml
@@ -287,11 +287,16 @@
+
+
+
+
+ class="org.apache.cloudstack.spring.lifecycle.registry.ExtensionRegistry">
-
+
diff --git a/engine/components-api/src/main/java/com/cloud/network/NetworkStateListener.java b/engine/components-api/src/main/java/com/cloud/network/NetworkStateListener.java
index 1e1251d8cdc..1686935c3ac 100644
--- a/engine/components-api/src/main/java/com/cloud/network/NetworkStateListener.java
+++ b/engine/components-api/src/main/java/com/cloud/network/NetworkStateListener.java
@@ -24,16 +24,13 @@ import java.util.Map;
import javax.inject.Inject;
+import com.cloud.utils.component.ComponentContext;
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
-import org.apache.cloudstack.framework.events.EventBus;
-import org.apache.cloudstack.framework.events.EventBusException;
-import org.apache.log4j.Logger;
-import org.springframework.beans.factory.NoSuchBeanDefinitionException;
+import org.apache.cloudstack.framework.events.EventDistributor;
import com.cloud.event.EventCategory;
import com.cloud.network.Network.Event;
import com.cloud.network.Network.State;
-import com.cloud.utils.component.ComponentContext;
import com.cloud.utils.fsm.StateListener;
import com.cloud.utils.fsm.StateMachine2;
@@ -42,14 +39,16 @@ public class NetworkStateListener implements StateListener eventDescription = new HashMap<>();
+ eventDescription.put("resource", resourceName);
+ eventDescription.put("id", vo.getUuid());
+ eventDescription.put("old-state", oldState.name());
+ eventDescription.put("new-state", newState.name());
- String resourceName = getEntityFromClassName(Network.class.getName());
- org.apache.cloudstack.framework.events.Event eventMsg =
- new org.apache.cloudstack.framework.events.Event("management-server", EventCategory.RESOURCE_STATE_CHANGE_EVENT.getName(), event, resourceName, vo.getUuid());
- Map eventDescription = new HashMap();
- eventDescription.put("resource", resourceName);
- eventDescription.put("id", vo.getUuid());
- eventDescription.put("old-state", oldState.name());
- eventDescription.put("new-state", newState.name());
+ String eventDate = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss Z").format(new Date());
+ eventDescription.put("eventDateTime", eventDate);
- String eventDate = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss Z").format(new Date());
- eventDescription.put("eventDateTime", eventDate);
+ eventMsg.setDescription(eventDescription);
- eventMsg.setDescription(eventDescription);
- try {
- s_eventBus.publish(eventMsg);
- } catch (EventBusException e) {
- s_logger.warn("Failed to publish state change event on the event bus.");
- }
+ eventDistributor.publish(eventMsg);
}
private String getEntityFromClassName(String entityClassName) {
diff --git a/framework/events/src/main/java/org/apache/cloudstack/framework/events/Event.java b/framework/events/src/main/java/org/apache/cloudstack/framework/events/Event.java
index 4a3eaf9e68c..bd1ea2aa5f9 100644
--- a/framework/events/src/main/java/org/apache/cloudstack/framework/events/Event.java
+++ b/framework/events/src/main/java/org/apache/cloudstack/framework/events/Event.java
@@ -31,11 +31,11 @@ public class Event {
String description;
public Event(String eventSource, String eventCategory, String eventType, String resourceType, String resourceUUID) {
- this.eventCategory = eventCategory;
- this.eventType = eventType;
- this.eventSource = eventSource;
- this.resourceType = resourceType;
- this.resourceUUID = resourceUUID;
+ setEventCategory(eventCategory);
+ setEventType(eventType);
+ setEventSource(eventSource);
+ setResourceType(resourceType);
+ setResourceUUID(resourceUUID);
}
public String getEventCategory() {
@@ -68,7 +68,7 @@ public class Event {
public void setDescription(Object message) {
Gson gson = new Gson();
- this.description = gson.toJson(message).toString();
+ this.description = gson.toJson(message);
}
public void setDescription(String description) {
diff --git a/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributor.java b/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributor.java
new file mode 100644
index 00000000000..4f477531710
--- /dev/null
+++ b/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.framework.events;
+
+import com.cloud.utils.component.Manager;
+
+import java.util.List;
+
+public interface EventDistributor extends Manager {
+ /**
+ * publish an event on to the event busses
+ *
+ * @param event event that needs to be published on the event bus
+ */
+ List publish(Event event);
+}
diff --git a/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java b/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java
new file mode 100644
index 00000000000..e92d36b4541
--- /dev/null
+++ b/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.framework.events;
+
+import com.cloud.utils.component.ManagerBase;
+import org.apache.log4j.Logger;
+
+import javax.annotation.PostConstruct;
+import java.util.ArrayList;
+import java.util.List;
+
+public class EventDistributorImpl extends ManagerBase implements EventDistributor {
+ private static final Logger LOGGER = Logger.getLogger(EventDistributorImpl.class);
+
+ public void setEventBusses(List eventBusses) {
+ this.eventBusses = eventBusses;
+ }
+
+ List eventBusses;
+
+ @PostConstruct
+ public void init() {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace(String.format("testing %d event busses", eventBusses.size()));
+ }
+ publish(new Event("server", "NONE","starting", "server", "NONE"));
+ }
+
+ @Override
+ public List publish(Event event) {
+ LOGGER.info(String.format("publishing %s to %d event busses", (event == null ? "" : event.getDescription()), eventBusses.size()));
+ List exceptions = new ArrayList<>();
+ if (event == null) {
+ return exceptions;
+ }
+ for (EventBus bus : eventBusses) {
+ try {
+ bus.publish(event);
+ } catch (EventBusException e) {
+ LOGGER.warn(String.format("no publish for bus %s of event %s", bus.getClass().getName(), event.getDescription()));
+ exceptions.add(e);
+ }
+ }
+ return exceptions;
+ }
+
+}
diff --git a/framework/events/src/main/resources/META-INF.cloudstack.core/spring-framework-event-core-context.xml b/framework/events/src/main/resources/META-INF.cloudstack.core/spring-framework-event-core-context.xml
new file mode 100644
index 00000000000..45eb666cb1c
--- /dev/null
+++ b/framework/events/src/main/resources/META-INF.cloudstack.core/spring-framework-event-core-context.xml
@@ -0,0 +1,34 @@
+
+
+
+
+
+
diff --git a/plugins/event-bus/inmemory/src/main/java/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java b/plugins/event-bus/inmemory/src/main/java/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java
index b7d74df980f..5538a50988d 100644
--- a/plugins/event-bus/inmemory/src/main/java/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java
+++ b/plugins/event-bus/inmemory/src/main/java/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java
@@ -62,6 +62,10 @@ public class InMemoryEventBus extends ManagerBase implements EventBus {
if (subscriber == null || topic == null) {
throw new EventBusException("Invalid EventSubscriber/EventTopic object passed.");
}
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug(String.format("subscribing \'%s\' to events of type \'%s\' from \'%s\'",subscriber.toString(), topic.getEventType(), topic.getEventSource()));
+ }
+
UUID subscriberId = UUID.randomUUID();
subscribers.put(subscriberId, new Pair(topic, subscriber));
@@ -70,6 +74,9 @@ public class InMemoryEventBus extends ManagerBase implements EventBus {
@Override
public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug(String.format("unsubscribing \'%s\'",subscriberId));
+ }
if (subscriberId == null) {
throw new EventBusException("Cannot unregister a null subscriberId.");
}
@@ -87,7 +94,11 @@ public class InMemoryEventBus extends ManagerBase implements EventBus {
@Override
public void publish(Event event) throws EventBusException {
+ if (s_logger.isTraceEnabled()) {
+ s_logger.trace(String.format("publish \'%s\'", event.getDescription()));
+ }
if (subscribers == null || subscribers.isEmpty()) {
+ s_logger.trace("no subscribers, no publish");
return; // no subscriber to publish to, so just return
}
diff --git a/plugins/event-bus/kafka/src/main/java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java b/plugins/event-bus/kafka/src/main/java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java
index 17a58a5d232..7d48a391025 100644
--- a/plugins/event-bus/kafka/src/main/java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java
+++ b/plugins/event-bus/kafka/src/main/java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java
@@ -89,19 +89,29 @@ public class KafkaEventBus extends ManagerBase implements EventBus {
@Override
public UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws EventBusException {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug(String.format("subscribing \'%s\' to events of type \'%s\' from \'%s\'",subscriber.toString(), topic.getEventType(), topic.getEventSource()));
+ }
+
/* NOOP */
return UUID.randomUUID();
}
@Override
public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug(String.format("unsubscribing \'%s\'",subscriberId));
+ }
/* NOOP */
}
@Override
public void publish(Event event) throws EventBusException {
- ProducerRecord record = new ProducerRecord(_topic, event.getResourceUUID(), event.getDescription());
- _producer.send(record);
+ if (s_logger.isTraceEnabled()) {
+ s_logger.trace(String.format("publish \'%s\'", event.getDescription()));
+ }
+ ProducerRecord newRecord = new ProducerRecord<>(_topic, event.getResourceUUID(), event.getDescription());
+ _producer.send(newRecord);
}
@Override
diff --git a/plugins/event-bus/rabbitmq/src/main/java/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java b/plugins/event-bus/rabbitmq/src/main/java/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java
index f54c769908d..5e5589aca5c 100644
--- a/plugins/event-bus/rabbitmq/src/main/java/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java
+++ b/plugins/event-bus/rabbitmq/src/main/java/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java
@@ -187,11 +187,14 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus {
*/
@Override
public UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws EventBusException {
-
if (subscriber == null || topic == null) {
throw new EventBusException("Invalid EventSubscriber/EventTopic object passed.");
}
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug(String.format("subscribing \'%s\' to events of type \'%s\' from \'%s\'",subscriber.toString(), topic.getEventType(), topic.getEventSource()));
+ }
+
// create a UUID, that will be used for managing subscriptions and also used as queue name
// for on the queue used for the subscriber on the AMQP broker
UUID queueId = UUID.randomUUID();
@@ -252,6 +255,9 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus {
@Override
public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug(String.format("unsubscribing \'%s\'",subscriberId));
+ }
try {
String classname = subscriber.getClass().getName();
String queueName = UUID.nameUUIDFromBytes(classname.getBytes()).toString();
@@ -267,6 +273,9 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus {
// publish event on to the exchange created on AMQP server
@Override
public void publish(Event event) throws EventBusException {
+ if (s_logger.isTraceEnabled()) {
+ s_logger.trace(String.format("publish \'%s\'", event.getDescription()));
+ }
String routingKey = createRoutingKey(event);
String eventDescription = event.getDescription();
diff --git a/plugins/network-elements/juniper-contrail/src/main/java/org/apache/cloudstack/network/contrail/management/EventUtils.java b/plugins/network-elements/juniper-contrail/src/main/java/org/apache/cloudstack/network/contrail/management/EventUtils.java
index 78ec01344ca..d6a28659631 100644
--- a/plugins/network-elements/juniper-contrail/src/main/java/org/apache/cloudstack/network/contrail/management/EventUtils.java
+++ b/plugins/network-elements/juniper-contrail/src/main/java/org/apache/cloudstack/network/contrail/management/EventUtils.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.ArrayList;
import java.util.List;
+import org.apache.cloudstack.framework.events.EventDistributor;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.stereotype.Component;
@@ -42,15 +43,21 @@ import com.cloud.server.ManagementService;
import com.cloud.utils.component.ComponentContext;
import com.cloud.utils.component.ComponentMethodInterceptor;
+
@Component
public class EventUtils {
private static final Logger s_logger = Logger.getLogger(EventUtils.class);
+ private static EventDistributor eventDistributor;
protected static EventBus s_eventBus = null;
public EventUtils() {
}
+ public static void setEventDistributor(EventDistributor eventDistributorImpl) {
+ eventDistributor = eventDistributorImpl;
+ }
+
private static void publishOnMessageBus(String eventCategory, String eventType, String details, Event.State state) {
if (state != com.cloud.event.Event.State.Completed) {
@@ -58,6 +65,7 @@ public class EventUtils {
}
try {
+ setEventDistributor(ComponentContext.getComponent(EventDistributor.class));
s_eventBus = ComponentContext.getComponent(EventBus.class);
} catch (NoSuchBeanDefinitionException nbe) {
return; // no provider is configured to provide events bus, so just return
@@ -66,18 +74,16 @@ public class EventUtils {
org.apache.cloudstack.framework.events.Event event =
new org.apache.cloudstack.framework.events.Event(ManagementService.Name, eventCategory, eventType, EventTypes.getEntityForEvent(eventType), null);
- Map eventDescription = new HashMap();
+ Map eventDescription = new HashMap<>();
eventDescription.put("event", eventType);
eventDescription.put("status", state.toString());
eventDescription.put("details", details);
event.setDescription(eventDescription);
- try {
- s_eventBus.publish(event);
- } catch (EventBusException evx) {
- String errMsg = "Failed to publish contrail event.";
- s_logger.warn(errMsg, evx);
+ List exceptions = eventDistributor.publish(event);
+ for (EventBusException ex : exceptions) {
+ String errMsg = "Failed to publish event.";
+ s_logger.warn(errMsg, ex);
}
-
}
public static class EventInterceptor implements ComponentMethodInterceptor, MethodInterceptor {
@@ -118,7 +124,7 @@ public class EventUtils {
}
protected List getActionEvents(Method m) {
- List result = new ArrayList();
+ List result = new ArrayList<>();
ActionEvents events = m.getAnnotation(ActionEvents.class);
diff --git a/server/src/main/java/com/cloud/api/ApiServer.java b/server/src/main/java/com/cloud/api/ApiServer.java
index b602ed2edbc..0310fa3881b 100644
--- a/server/src/main/java/com/cloud/api/ApiServer.java
+++ b/server/src/main/java/com/cloud/api/ApiServer.java
@@ -95,8 +95,8 @@ import org.apache.cloudstack.config.ApiServiceConfiguration;
import org.apache.cloudstack.context.CallContext;
import org.apache.cloudstack.framework.config.ConfigKey;
import org.apache.cloudstack.framework.config.Configurable;
-import org.apache.cloudstack.framework.events.EventBus;
import org.apache.cloudstack.framework.events.EventBusException;
+import org.apache.cloudstack.framework.events.EventDistributor;
import org.apache.cloudstack.framework.jobs.AsyncJob;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
@@ -134,7 +134,6 @@ import org.apache.http.protocol.ResponseDate;
import org.apache.http.protocol.ResponseServer;
import org.apache.log4j.Logger;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
-import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.stereotype.Component;
import com.cloud.api.dispatch.DispatchChainFactory;
@@ -197,26 +196,26 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer
*/
private static final String CONTROL_CHARACTERS = "[\000-\011\013-\014\016-\037\177]";
+ @Inject
+ private AccountManager accountMgr;
+ @Inject
+ private APIAuthenticationManager authManager;
@Inject
private ApiDispatcher dispatcher;
@Inject
- private DispatchChainFactory dispatchChainFactory;
+ private AsyncJobManager asyncMgr;
@Inject
- private AccountManager accountMgr;
+ private DispatchChainFactory dispatchChainFactory;
@Inject
private DomainManager domainMgr;
@Inject
private DomainDao domainDao;
@Inject
- private UUIDManager uuidMgr;
- @Inject
- private AsyncJobManager asyncMgr;
- @Inject
private EntityManager entityMgr;
@Inject
- private APIAuthenticationManager authManager;
- @Inject
private ProjectDao projectDao;
+ @Inject
+ private UUIDManager uuidMgr;
private List pluggableServices;
@@ -225,6 +224,7 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer
@Inject
private ApiAsyncJobDispatcher asyncDispatcher;
+ private EventDistributor eventDistributor = null;
private static int s_workerCount = 0;
private static Map>> s_apiNameCmdClassMap = new HashMap>>();
@@ -291,6 +291,10 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer
return true;
}
+ public void setEventDistributor(EventDistributor eventDistributor) {
+ this.eventDistributor = eventDistributor;
+ }
+
@MessageHandler(topic = AsyncJob.Topics.JOB_EVENT_PUBLISH)
public void handleAsyncJobPublishEvent(String subject, String senderAddress, Object args) {
assert (args != null);
@@ -302,12 +306,8 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer
if (s_logger.isTraceEnabled())
s_logger.trace("Handle asyjob publish event " + jobEvent);
-
- EventBus eventBus = null;
- try {
- eventBus = ComponentContext.getComponent(EventBus.class);
- } catch (NoSuchBeanDefinitionException nbe) {
- return; // no provider is configured to provide events bus, so just return
+ if (eventDistributor == null) {
+ setEventDistributor(ComponentContext.getComponent(EventDistributor.class));
}
if (!job.getDispatcher().equalsIgnoreCase("ApiAsyncJobDispatcher")) {
@@ -320,7 +320,7 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer
// Get the event type from the cmdInfo json string
String info = job.getCmdInfo();
String cmdEventType = "unknown";
- Map cmdInfoObj = new HashMap();
+ Map cmdInfoObj = new HashMap<>();
if (info != null) {
Type type = new TypeToken
+
+
+
+
diff --git a/utils/src/main/java/com/cloud/utils/component/ComponentContext.java b/utils/src/main/java/com/cloud/utils/component/ComponentContext.java
index 8486dbf4bd4..a03d21db656 100644
--- a/utils/src/main/java/com/cloud/utils/component/ComponentContext.java
+++ b/utils/src/main/java/com/cloud/utils/component/ComponentContext.java
@@ -100,7 +100,7 @@ public class ComponentContext implements ApplicationContextAware {
s_logger.info("Running SystemIntegrityChecker " + entry.getKey());
try {
entry.getValue().check();
- } catch (Throwable e) {
+ } catch (RuntimeException e) {
s_logger.error("System integrity check failed. Refuse to startup", e);
System.exit(1);
}
@@ -178,6 +178,13 @@ public class ComponentContext implements ApplicationContextAware {
return (T)s_appContext.getBean(name);
}
+ /**
+ * only ever used to get the event bus
+ *
+ * @param beanType the component type to return
+ * @return one of the component registered for the requested type
+ * @param
+ */
public static T getComponent(Class beanType) {
assert (s_appContext != null);
Map matchedTypes = getComponentsOfType(beanType);