registry of message busses

This commit is contained in:
Daan Hoogland 2023-09-21 10:22:46 +02:00
parent 5c7e4b7edc
commit e596b10296
18 changed files with 304 additions and 145 deletions

View File

@ -39,6 +39,11 @@
<property name="typeClass" value="com.cloud.ha.FenceBuilder" />
</bean>
<bean class="org.apache.cloudstack.spring.lifecycle.registry.RegistryLifecycle">
<property name="registry" ref="eventBussesRegistry" />
<property name="typeClass" value="org.apache.cloudstack.framework.events.EventBus" />
</bean>
<bean class="org.apache.cloudstack.spring.lifecycle.registry.RegistryLifecycle">
<property name="registry" ref="hypervisorGurusRegistry" />
<property name="typeClass" value="com.cloud.hypervisor.HypervisorGuru" />

View File

@ -287,11 +287,16 @@
<property name="excludeKey" value="api.commands.exclude" />
</bean>
<bean id="eventBussesRegistry"
class="org.apache.cloudstack.spring.lifecycle.registry.ExtensionRegistry">
<property name="excludeKey" value="event.busses.exclude" />
</bean>
<bean id="hypervisorGurusRegistry"
class="org.apache.cloudstack.spring.lifecycle.registry.ExtensionRegistry">
class="org.apache.cloudstack.spring.lifecycle.registry.ExtensionRegistry">
<property name="excludeKey" value="hypervisor.gurus.exclude" />
</bean>
<bean id="vpcProvidersRegistry"
class="org.apache.cloudstack.spring.lifecycle.registry.ExtensionRegistry">
<property name="excludeKey" value="vpc.providers.exclude" />

View File

@ -24,16 +24,13 @@ import java.util.Map;
import javax.inject.Inject;
import com.cloud.utils.component.ComponentContext;
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
import org.apache.cloudstack.framework.events.EventBus;
import org.apache.cloudstack.framework.events.EventBusException;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.apache.cloudstack.framework.events.EventDistributor;
import com.cloud.event.EventCategory;
import com.cloud.network.Network.Event;
import com.cloud.network.Network.State;
import com.cloud.utils.component.ComponentContext;
import com.cloud.utils.fsm.StateListener;
import com.cloud.utils.fsm.StateMachine2;
@ -42,14 +39,16 @@ public class NetworkStateListener implements StateListener<State, Event, Network
@Inject
private ConfigurationDao _configDao;
private static EventBus s_eventBus = null;
private static final Logger s_logger = Logger.getLogger(NetworkStateListener.class);
private EventDistributor eventDistributor;
public NetworkStateListener(ConfigurationDao configDao) {
_configDao = configDao;
}
public void setEventDistributor(EventDistributor eventDistributor) {
this.eventDistributor = eventDistributor;
}
@Override
public boolean preStateTransitionEvent(State oldState, Event event, State newState, Network vo, boolean status, Object opaque) {
pubishOnEventBus(event.name(), "preStateTransitionEvent", vo, oldState, newState);
@ -66,36 +65,30 @@ public class NetworkStateListener implements StateListener<State, Event, Network
}
private void pubishOnEventBus(String event, String status, Network vo, State oldState, State newState) {
String configKey = "publish.resource.state.events";
String value = _configDao.getValue(configKey);
boolean configValue = Boolean.parseBoolean(value);
if(!configValue)
return;
if (eventDistributor == null) {
setEventDistributor(ComponentContext.getComponent(EventDistributor.class));
}
String configKey = "publish.resource.state.events";
String value = _configDao.getValue(configKey);
boolean configValue = Boolean.parseBoolean(value);
if(!configValue)
return;
try {
s_eventBus = ComponentContext.getComponent(EventBus.class);
} catch (NoSuchBeanDefinitionException nbe) {
return; // no provider is configured to provide events bus, so just return
}
String resourceName = getEntityFromClassName(Network.class.getName());
org.apache.cloudstack.framework.events.Event eventMsg =
new org.apache.cloudstack.framework.events.Event("management-server", EventCategory.RESOURCE_STATE_CHANGE_EVENT.getName(), event, resourceName, vo.getUuid());
Map<String, String> eventDescription = new HashMap<>();
eventDescription.put("resource", resourceName);
eventDescription.put("id", vo.getUuid());
eventDescription.put("old-state", oldState.name());
eventDescription.put("new-state", newState.name());
String resourceName = getEntityFromClassName(Network.class.getName());
org.apache.cloudstack.framework.events.Event eventMsg =
new org.apache.cloudstack.framework.events.Event("management-server", EventCategory.RESOURCE_STATE_CHANGE_EVENT.getName(), event, resourceName, vo.getUuid());
Map<String, String> eventDescription = new HashMap<String, String>();
eventDescription.put("resource", resourceName);
eventDescription.put("id", vo.getUuid());
eventDescription.put("old-state", oldState.name());
eventDescription.put("new-state", newState.name());
String eventDate = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss Z").format(new Date());
eventDescription.put("eventDateTime", eventDate);
String eventDate = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss Z").format(new Date());
eventDescription.put("eventDateTime", eventDate);
eventMsg.setDescription(eventDescription);
eventMsg.setDescription(eventDescription);
try {
s_eventBus.publish(eventMsg);
} catch (EventBusException e) {
s_logger.warn("Failed to publish state change event on the event bus.");
}
eventDistributor.publish(eventMsg);
}
private String getEntityFromClassName(String entityClassName) {

View File

@ -31,11 +31,11 @@ public class Event {
String description;
public Event(String eventSource, String eventCategory, String eventType, String resourceType, String resourceUUID) {
this.eventCategory = eventCategory;
this.eventType = eventType;
this.eventSource = eventSource;
this.resourceType = resourceType;
this.resourceUUID = resourceUUID;
setEventCategory(eventCategory);
setEventType(eventType);
setEventSource(eventSource);
setResourceType(resourceType);
setResourceUUID(resourceUUID);
}
public String getEventCategory() {
@ -68,7 +68,7 @@ public class Event {
public void setDescription(Object message) {
Gson gson = new Gson();
this.description = gson.toJson(message).toString();
this.description = gson.toJson(message);
}
public void setDescription(String description) {

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.events;
import com.cloud.utils.component.Manager;
import java.util.List;
public interface EventDistributor extends Manager {
/**
* publish an event on to the event busses
*
* @param event event that needs to be published on the event bus
*/
List<EventBusException> publish(Event event);
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.events;
import com.cloud.utils.component.ManagerBase;
import org.apache.log4j.Logger;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
public class EventDistributorImpl extends ManagerBase implements EventDistributor {
private static final Logger LOGGER = Logger.getLogger(EventDistributorImpl.class);
public void setEventBusses(List<EventBus> eventBusses) {
this.eventBusses = eventBusses;
}
List<EventBus> eventBusses;
@PostConstruct
public void init() {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace(String.format("testing %d event busses", eventBusses.size()));
}
publish(new Event("server", "NONE","starting", "server", "NONE"));
}
@Override
public List<EventBusException> publish(Event event) {
LOGGER.info(String.format("publishing %s to %d event busses", (event == null ? "<none>" : event.getDescription()), eventBusses.size()));
List<EventBusException> exceptions = new ArrayList<>();
if (event == null) {
return exceptions;
}
for (EventBus bus : eventBusses) {
try {
bus.publish(event);
} catch (EventBusException e) {
LOGGER.warn(String.format("no publish for bus %s of event %s", bus.getClass().getName(), event.getDescription()));
exceptions.add(e);
}
}
return exceptions;
}
}

View File

@ -0,0 +1,34 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd"
>
<bean id="eventDistributor"
class="org.apache.cloudstack.framework.events.EventDistributorImpl" >
<property name="eventBusses"
value="#{eventBussesRegistry.registered}" />
</bean>
</beans>

View File

@ -62,6 +62,10 @@ public class InMemoryEventBus extends ManagerBase implements EventBus {
if (subscriber == null || topic == null) {
throw new EventBusException("Invalid EventSubscriber/EventTopic object passed.");
}
if (s_logger.isDebugEnabled()) {
s_logger.debug(String.format("subscribing \'%s\' to events of type \'%s\' from \'%s\'",subscriber.toString(), topic.getEventType(), topic.getEventSource()));
}
UUID subscriberId = UUID.randomUUID();
subscribers.put(subscriberId, new Pair<EventTopic, EventSubscriber>(topic, subscriber));
@ -70,6 +74,9 @@ public class InMemoryEventBus extends ManagerBase implements EventBus {
@Override
public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException {
if (s_logger.isDebugEnabled()) {
s_logger.debug(String.format("unsubscribing \'%s\'",subscriberId));
}
if (subscriberId == null) {
throw new EventBusException("Cannot unregister a null subscriberId.");
}
@ -87,7 +94,11 @@ public class InMemoryEventBus extends ManagerBase implements EventBus {
@Override
public void publish(Event event) throws EventBusException {
if (s_logger.isTraceEnabled()) {
s_logger.trace(String.format("publish \'%s\'", event.getDescription()));
}
if (subscribers == null || subscribers.isEmpty()) {
s_logger.trace("no subscribers, no publish");
return; // no subscriber to publish to, so just return
}

View File

@ -89,19 +89,29 @@ public class KafkaEventBus extends ManagerBase implements EventBus {
@Override
public UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws EventBusException {
if (s_logger.isDebugEnabled()) {
s_logger.debug(String.format("subscribing \'%s\' to events of type \'%s\' from \'%s\'",subscriber.toString(), topic.getEventType(), topic.getEventSource()));
}
/* NOOP */
return UUID.randomUUID();
}
@Override
public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException {
if (s_logger.isDebugEnabled()) {
s_logger.debug(String.format("unsubscribing \'%s\'",subscriberId));
}
/* NOOP */
}
@Override
public void publish(Event event) throws EventBusException {
ProducerRecord<String, String> record = new ProducerRecord<String,String>(_topic, event.getResourceUUID(), event.getDescription());
_producer.send(record);
if (s_logger.isTraceEnabled()) {
s_logger.trace(String.format("publish \'%s\'", event.getDescription()));
}
ProducerRecord<String, String> newRecord = new ProducerRecord<>(_topic, event.getResourceUUID(), event.getDescription());
_producer.send(newRecord);
}
@Override

View File

@ -187,11 +187,14 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus {
*/
@Override
public UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws EventBusException {
if (subscriber == null || topic == null) {
throw new EventBusException("Invalid EventSubscriber/EventTopic object passed.");
}
if (s_logger.isDebugEnabled()) {
s_logger.debug(String.format("subscribing \'%s\' to events of type \'%s\' from \'%s\'",subscriber.toString(), topic.getEventType(), topic.getEventSource()));
}
// create a UUID, that will be used for managing subscriptions and also used as queue name
// for on the queue used for the subscriber on the AMQP broker
UUID queueId = UUID.randomUUID();
@ -252,6 +255,9 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus {
@Override
public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException {
if (s_logger.isDebugEnabled()) {
s_logger.debug(String.format("unsubscribing \'%s\'",subscriberId));
}
try {
String classname = subscriber.getClass().getName();
String queueName = UUID.nameUUIDFromBytes(classname.getBytes()).toString();
@ -267,6 +273,9 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus {
// publish event on to the exchange created on AMQP server
@Override
public void publish(Event event) throws EventBusException {
if (s_logger.isTraceEnabled()) {
s_logger.trace(String.format("publish \'%s\'", event.getDescription()));
}
String routingKey = createRoutingKey(event);
String eventDescription = event.getDescription();

View File

@ -23,6 +23,7 @@ import java.util.Map;
import java.util.ArrayList;
import java.util.List;
import org.apache.cloudstack.framework.events.EventDistributor;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.stereotype.Component;
@ -42,15 +43,21 @@ import com.cloud.server.ManagementService;
import com.cloud.utils.component.ComponentContext;
import com.cloud.utils.component.ComponentMethodInterceptor;
@Component
public class EventUtils {
private static final Logger s_logger = Logger.getLogger(EventUtils.class);
private static EventDistributor eventDistributor;
protected static EventBus s_eventBus = null;
public EventUtils() {
}
public static void setEventDistributor(EventDistributor eventDistributorImpl) {
eventDistributor = eventDistributorImpl;
}
private static void publishOnMessageBus(String eventCategory, String eventType, String details, Event.State state) {
if (state != com.cloud.event.Event.State.Completed) {
@ -58,6 +65,7 @@ public class EventUtils {
}
try {
setEventDistributor(ComponentContext.getComponent(EventDistributor.class));
s_eventBus = ComponentContext.getComponent(EventBus.class);
} catch (NoSuchBeanDefinitionException nbe) {
return; // no provider is configured to provide events bus, so just return
@ -66,18 +74,16 @@ public class EventUtils {
org.apache.cloudstack.framework.events.Event event =
new org.apache.cloudstack.framework.events.Event(ManagementService.Name, eventCategory, eventType, EventTypes.getEntityForEvent(eventType), null);
Map<String, String> eventDescription = new HashMap<String, String>();
Map<String, String> eventDescription = new HashMap<>();
eventDescription.put("event", eventType);
eventDescription.put("status", state.toString());
eventDescription.put("details", details);
event.setDescription(eventDescription);
try {
s_eventBus.publish(event);
} catch (EventBusException evx) {
String errMsg = "Failed to publish contrail event.";
s_logger.warn(errMsg, evx);
List<EventBusException> exceptions = eventDistributor.publish(event);
for (EventBusException ex : exceptions) {
String errMsg = "Failed to publish event.";
s_logger.warn(errMsg, ex);
}
}
public static class EventInterceptor implements ComponentMethodInterceptor, MethodInterceptor {
@ -118,7 +124,7 @@ public class EventUtils {
}
protected List<ActionEvent> getActionEvents(Method m) {
List<ActionEvent> result = new ArrayList<ActionEvent>();
List<ActionEvent> result = new ArrayList<>();
ActionEvents events = m.getAnnotation(ActionEvents.class);

View File

@ -95,8 +95,8 @@ import org.apache.cloudstack.config.ApiServiceConfiguration;
import org.apache.cloudstack.context.CallContext;
import org.apache.cloudstack.framework.config.ConfigKey;
import org.apache.cloudstack.framework.config.Configurable;
import org.apache.cloudstack.framework.events.EventBus;
import org.apache.cloudstack.framework.events.EventBusException;
import org.apache.cloudstack.framework.events.EventDistributor;
import org.apache.cloudstack.framework.jobs.AsyncJob;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
@ -134,7 +134,6 @@ import org.apache.http.protocol.ResponseDate;
import org.apache.http.protocol.ResponseServer;
import org.apache.log4j.Logger;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.stereotype.Component;
import com.cloud.api.dispatch.DispatchChainFactory;
@ -197,26 +196,26 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer
*/
private static final String CONTROL_CHARACTERS = "[\000-\011\013-\014\016-\037\177]";
@Inject
private AccountManager accountMgr;
@Inject
private APIAuthenticationManager authManager;
@Inject
private ApiDispatcher dispatcher;
@Inject
private DispatchChainFactory dispatchChainFactory;
private AsyncJobManager asyncMgr;
@Inject
private AccountManager accountMgr;
private DispatchChainFactory dispatchChainFactory;
@Inject
private DomainManager domainMgr;
@Inject
private DomainDao domainDao;
@Inject
private UUIDManager uuidMgr;
@Inject
private AsyncJobManager asyncMgr;
@Inject
private EntityManager entityMgr;
@Inject
private APIAuthenticationManager authManager;
@Inject
private ProjectDao projectDao;
@Inject
private UUIDManager uuidMgr;
private List<PluggableService> pluggableServices;
@ -225,6 +224,7 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer
@Inject
private ApiAsyncJobDispatcher asyncDispatcher;
private EventDistributor eventDistributor = null;
private static int s_workerCount = 0;
private static Map<String, List<Class<?>>> s_apiNameCmdClassMap = new HashMap<String, List<Class<?>>>();
@ -291,6 +291,10 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer
return true;
}
public void setEventDistributor(EventDistributor eventDistributor) {
this.eventDistributor = eventDistributor;
}
@MessageHandler(topic = AsyncJob.Topics.JOB_EVENT_PUBLISH)
public void handleAsyncJobPublishEvent(String subject, String senderAddress, Object args) {
assert (args != null);
@ -302,12 +306,8 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer
if (s_logger.isTraceEnabled())
s_logger.trace("Handle asyjob publish event " + jobEvent);
EventBus eventBus = null;
try {
eventBus = ComponentContext.getComponent(EventBus.class);
} catch (NoSuchBeanDefinitionException nbe) {
return; // no provider is configured to provide events bus, so just return
if (eventDistributor == null) {
setEventDistributor(ComponentContext.getComponent(EventDistributor.class));
}
if (!job.getDispatcher().equalsIgnoreCase("ApiAsyncJobDispatcher")) {
@ -320,7 +320,7 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer
// Get the event type from the cmdInfo json string
String info = job.getCmdInfo();
String cmdEventType = "unknown";
Map<String, Object> cmdInfoObj = new HashMap<String, Object>();
Map<String, Object> cmdInfoObj = new HashMap<>();
if (info != null) {
Type type = new TypeToken<Map<String, String>>(){}.getType();
Map<String, String> cmdInfo = ApiGsonHelper.getBuilder().create().fromJson(info, type);
@ -348,7 +348,7 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer
org.apache.cloudstack.framework.events.Event event = new org.apache.cloudstack.framework.events.Event("management-server", EventCategory.ASYNC_JOB_CHANGE_EVENT.getName(),
jobEvent, instanceType, instanceUuid);
Map<String, Object> eventDescription = new HashMap<String, Object>();
Map<String, Object> eventDescription = new HashMap<>();
eventDescription.put("command", job.getCmd());
eventDescription.put("user", userJobOwner.getUuid());
eventDescription.put("account", jobOwner.getUuid());
@ -369,12 +369,10 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer
eventDescription.put("domainname", domain.getName());
}
event.setDescription(eventDescription);
try {
eventBus.publish(event);
} catch (EventBusException evx) {
String errMsg = "Failed to publish async job event on the event bus.";
s_logger.warn(errMsg, evx);
List<EventBusException> exceptions = eventDistributor.publish(event);
for (EventBusException ex : exceptions) {
String errMsg = "Failed to publish event.";
s_logger.warn(errMsg, ex);
}
}

View File

@ -40,7 +40,7 @@ public class HypervisorGuruManagerImpl extends ManagerBase implements Hypervisor
HostDao _hostDao;
List<HypervisorGuru> _hvGuruList;
Map<HypervisorType, HypervisorGuru> _hvGurus = new ConcurrentHashMap<HypervisorType, HypervisorGuru>();
Map<HypervisorType, HypervisorGuru> _hvGurus = new ConcurrentHashMap<>();
@PostConstruct
public void init() {

View File

@ -25,13 +25,11 @@ import java.util.Map;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
import org.apache.cloudstack.framework.events.EventBus;
import org.apache.cloudstack.framework.events.EventBusException;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.stereotype.Component;
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
import org.apache.cloudstack.framework.events.EventDistributor;
import com.cloud.configuration.Config;
import com.cloud.event.EventCategory;
import com.cloud.server.ManagementService;
@ -46,13 +44,12 @@ import com.cloud.utils.fsm.StateMachine2;
@Component
public class SnapshotStateListener implements StateListener<State, Event, SnapshotVO> {
protected static EventBus s_eventBus = null;
protected static ConfigurationDao s_configDao;
@Inject
private ConfigurationDao configDao;
private static final Logger s_logger = Logger.getLogger(SnapshotStateListener.class);
private EventDistributor eventDistributor = null;
public SnapshotStateListener() {
@ -63,6 +60,10 @@ public class SnapshotStateListener implements StateListener<State, Event, Snapsh
s_configDao = configDao;
}
public void setEventDistributor(EventDistributor eventDistributor) {
this.eventDistributor = eventDistributor;
}
@Override
public boolean preStateTransitionEvent(State oldState, Event event, State newState, SnapshotVO vo, boolean status, Object opaque) {
pubishOnEventBus(event.name(), "preStateTransitionEvent", vo, oldState, newState);
@ -83,17 +84,15 @@ public class SnapshotStateListener implements StateListener<State, Event, Snapsh
if(!configValue) {
return;
}
try {
s_eventBus = ComponentContext.getComponent(EventBus.class);
} catch (NoSuchBeanDefinitionException nbe) {
return; // no provider is configured to provide events bus, so just return
if (eventDistributor == null) {
setEventDistributor(ComponentContext.getComponent(EventDistributor.class));
}
String resourceName = getEntityFromClassName(Snapshot.class.getName());
org.apache.cloudstack.framework.events.Event eventMsg =
new org.apache.cloudstack.framework.events.Event(ManagementService.Name, EventCategory.RESOURCE_STATE_CHANGE_EVENT.getName(), event, resourceName,
vo.getUuid());
Map<String, String> eventDescription = new HashMap<String, String>();
Map<String, String> eventDescription = new HashMap<>();
eventDescription.put("resource", resourceName);
eventDescription.put("id", vo.getUuid());
eventDescription.put("old-state", oldState.name());
@ -103,11 +102,7 @@ public class SnapshotStateListener implements StateListener<State, Event, Snapsh
eventDescription.put("eventDateTime", eventDate);
eventMsg.setDescription(eventDescription);
try {
s_eventBus.publish(eventMsg);
} catch (EventBusException e) {
s_logger.warn("Failed to publish state change event on the event bus.");
}
eventDistributor.publish(eventMsg);
}
private String getEntityFromClassName(String entityClassName) {

View File

@ -22,41 +22,40 @@ import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import com.cloud.event.EventTypes;
import com.cloud.event.UsageEventUtils;
import com.cloud.utils.fsm.StateMachine2;
import com.cloud.vm.VMInstanceVO;
import com.cloud.vm.VirtualMachine;
import com.cloud.vm.dao.VMInstanceDao;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
import org.apache.cloudstack.framework.events.EventBus;
import org.apache.cloudstack.framework.events.EventBusException;
import com.cloud.configuration.Config;
import com.cloud.event.EventCategory;
import com.cloud.event.EventTypes;
import com.cloud.event.UsageEventUtils;
import com.cloud.server.ManagementService;
import com.cloud.storage.Volume;
import com.cloud.storage.Volume.Event;
import com.cloud.storage.Volume.State;
import com.cloud.utils.component.ComponentContext;
import com.cloud.utils.fsm.StateListener;
import com.cloud.utils.fsm.StateMachine2;
import com.cloud.vm.VMInstanceVO;
import com.cloud.vm.VirtualMachine;
import com.cloud.vm.dao.VMInstanceDao;
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
import org.apache.cloudstack.framework.events.EventDistributor;
public class VolumeStateListener implements StateListener<State, Event, Volume> {
protected static EventBus s_eventBus = null;
protected ConfigurationDao _configDao;
protected VMInstanceDao _vmInstanceDao;
private static final Logger s_logger = Logger.getLogger(VolumeStateListener.class);
private EventDistributor eventDistributor;
public VolumeStateListener(ConfigurationDao configDao, VMInstanceDao vmInstanceDao) {
this._configDao = configDao;
this._vmInstanceDao = vmInstanceDao;
}
public void setEventDistributor(EventDistributor eventDistributor) {
this.eventDistributor = eventDistributor;
}
@Override
public boolean preStateTransitionEvent(State oldState, Event event, State newState, Volume vo, boolean status, Object opaque) {
pubishOnEventBus(event.name(), "preStateTransitionEvent", vo, oldState, newState);
@ -92,23 +91,21 @@ public class VolumeStateListener implements StateListener<State, Event, Volume>
return true;
}
private void pubishOnEventBus(String event, String status, Volume vo, State oldState, State newState) {
private void pubishOnEventBus(String event, String status, Volume vo, State oldState, State newState) {
String configKey = Config.PublishResourceStateEvent.key();
String value = _configDao.getValue(configKey);
boolean configValue = Boolean.parseBoolean(value);
if(!configValue)
return;
try {
s_eventBus = ComponentContext.getComponent(EventBus.class);
} catch (NoSuchBeanDefinitionException nbe) {
return; // no provider is configured to provide events bus, so just return
if (eventDistributor == null) {
setEventDistributor(ComponentContext.getComponent(EventDistributor.class));
}
String resourceName = getEntityFromClassName(Volume.class.getName());
org.apache.cloudstack.framework.events.Event eventMsg =
new org.apache.cloudstack.framework.events.Event(ManagementService.Name, EventCategory.RESOURCE_STATE_CHANGE_EVENT.getName(), event, resourceName,
vo.getUuid());
vo.getUuid());
Map<String, String> eventDescription = new HashMap<String, String>();
eventDescription.put("resource", resourceName);
eventDescription.put("id", vo.getUuid());
@ -119,11 +116,7 @@ public class VolumeStateListener implements StateListener<State, Event, Volume>
eventDescription.put("eventDateTime", eventDate);
eventMsg.setDescription(eventDescription);
try {
s_eventBus.publish(eventMsg);
} catch (EventBusException e) {
s_logger.warn("Failed to state change event on the event bus.");
}
eventDistributor.publish(eventMsg);
}
private String getEntityFromClassName(String entityClassName) {

View File

@ -24,15 +24,6 @@ import java.util.Map;
import javax.inject.Inject;
import com.cloud.server.ManagementService;
import com.cloud.utils.fsm.StateMachine2;
import com.cloud.vm.dao.UserVmDao;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
import org.apache.cloudstack.framework.events.EventBus;
import com.cloud.configuration.Config;
import com.cloud.event.EventCategory;
import com.cloud.event.EventTypes;
@ -41,11 +32,17 @@ import com.cloud.event.dao.UsageEventDao;
import com.cloud.network.dao.NetworkDao;
import com.cloud.network.dao.NetworkVO;
import com.cloud.service.dao.ServiceOfferingDao;
import com.cloud.server.ManagementService;
import com.cloud.utils.component.ComponentContext;
import com.cloud.utils.fsm.StateListener;
import com.cloud.utils.fsm.StateMachine2;
import com.cloud.vm.VirtualMachine.Event;
import com.cloud.vm.VirtualMachine.State;
import com.cloud.vm.dao.NicDao;
import com.cloud.vm.dao.UserVmDao;
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
import org.apache.cloudstack.framework.events.EventDistributor;
public class UserVmStateListener implements StateListener<State, VirtualMachine.Event, VirtualMachine> {
@ -56,9 +53,7 @@ public class UserVmStateListener implements StateListener<State, VirtualMachine.
@Inject protected UserVmDao _userVmDao;
@Inject protected UserVmManager _userVmMgr;
@Inject protected ConfigurationDao _configDao;
private static final Logger s_logger = Logger.getLogger(UserVmStateListener.class);
protected static EventBus s_eventBus = null;
private EventDistributor eventDistributor;
public UserVmStateListener(UsageEventDao usageEventDao, NetworkDao networkDao, NicDao nicDao, ServiceOfferingDao offeringDao, UserVmDao userVmDao, UserVmManager userVmMgr,
ConfigurationDao configDao) {
@ -71,6 +66,10 @@ public class UserVmStateListener implements StateListener<State, VirtualMachine.
this._configDao = configDao;
}
public void setEventDistributor(EventDistributor eventDistributor) {
this.eventDistributor = eventDistributor;
}
@Override
public boolean preStateTransitionEvent(State oldState, Event event, State newState, VirtualMachine vo, boolean status, Object opaque) {
pubishOnEventBus(event.name(), "preStateTransitionEvent", vo, oldState, newState);
@ -128,17 +127,15 @@ public class UserVmStateListener implements StateListener<State, VirtualMachine.
boolean configValue = Boolean.parseBoolean(value);
if(!configValue)
return;
try {
s_eventBus = ComponentContext.getComponent(EventBus.class);
} catch (NoSuchBeanDefinitionException nbe) {
return; // no provider is configured to provide events bus, so just return
if (eventDistributor == null) {
setEventDistributor(ComponentContext.getComponent(EventDistributor.class));
}
String resourceName = getEntityFromClassName(VirtualMachine.class.getName());
org.apache.cloudstack.framework.events.Event eventMsg =
new org.apache.cloudstack.framework.events.Event(ManagementService.Name, EventCategory.RESOURCE_STATE_CHANGE_EVENT.getName(), event, resourceName,
vo.getUuid());
Map<String, String> eventDescription = new HashMap<String, String>();
Map<String, String> eventDescription = new HashMap<>();
eventDescription.put("resource", resourceName);
eventDescription.put("id", vo.getUuid());
eventDescription.put("old-state", oldState.name());
@ -149,12 +146,7 @@ public class UserVmStateListener implements StateListener<State, VirtualMachine.
eventDescription.put("eventDateTime", eventDate);
eventMsg.setDescription(eventDescription);
try {
s_eventBus.publish(eventMsg);
} catch (org.apache.cloudstack.framework.events.EventBusException e) {
s_logger.warn("Failed to publish state change event on the event bus.");
}
eventDistributor.publish(eventMsg);
}
private String getEntityFromClassName(String entityClassName) {

View File

@ -144,6 +144,10 @@
<property name="staticNatElements" value="#{staticNatServiceProvidersRegistry.registered}" />
</bean>
<bean id="eventDistributor" class="org.apache.cloudstack.framework.events.EventDistributorImpl" >
<property name="eventBusses" value="#{eventBussesRegistry.registered}" />
</bean>
<bean id="hypervisorGuruManagerImpl" class="com.cloud.hypervisor.HypervisorGuruManagerImpl" >
<property name="hvGuruList" value="#{hypervisorGurusRegistry.registered}" />
</bean>

View File

@ -100,7 +100,7 @@ public class ComponentContext implements ApplicationContextAware {
s_logger.info("Running SystemIntegrityChecker " + entry.getKey());
try {
entry.getValue().check();
} catch (Throwable e) {
} catch (RuntimeException e) {
s_logger.error("System integrity check failed. Refuse to startup", e);
System.exit(1);
}
@ -178,6 +178,13 @@ public class ComponentContext implements ApplicationContextAware {
return (T)s_appContext.getBean(name);
}
/**
* only ever used to get the event bus
*
* @param beanType the component type to return
* @return one of the component registered for the requested type
* @param <T>
*/
public static <T> T getComponent(Class<T> beanType) {
assert (s_appContext != null);
Map<String, T> matchedTypes = getComponentsOfType(beanType);