Refactor message bus facitlity to avoid confusing with event bus for external notification, planning to use it in VMSync

This commit is contained in:
Kelven Yang 2013-04-11 15:57:19 -07:00
parent e7e862db2f
commit 85e73d18f5
14 changed files with 61 additions and 61 deletions

View File

@ -82,7 +82,7 @@
<property name="messageSerializer" ref="messageSerializer" />
</bean>
<bean id="eventBus" class = "org.apache.cloudstack.framework.eventbus.EventBusBase" />
<bean id="messageBus" class = "org.apache.cloudstack.framework.messagebus.MessageBusBase" />
<!--
DAO with customized configuration

View File

@ -153,7 +153,6 @@ public class VMInstanceVO implements VirtualMachine, FiniteStateObject<State, Vi
@Column(name="uuid")
protected String uuid = UUID.randomUUID().toString();
;
@Column(name="disk_offering_id")
protected Long diskOfferingId;

View File

@ -18,10 +18,10 @@
*/
package org.apache.cloudstack.framework.client;
import org.apache.cloudstack.framework.eventbus.EventBusBase;
import org.apache.cloudstack.framework.messagebus.MessageBusBase;
import org.apache.cloudstack.framework.transport.TransportMultiplexier;
public class ClientEventBus extends EventBusBase implements TransportMultiplexier {
public class ClientEventBus extends MessageBusBase implements TransportMultiplexier {
@Override
public void onTransportMessage(String senderEndpointAddress,

View File

@ -17,16 +17,16 @@
* under the License.
*/
package org.apache.cloudstack.framework.eventbus;
package org.apache.cloudstack.framework.messagebus;
import org.apache.cloudstack.framework.serializer.MessageSerializer;
public interface EventBus {
public interface MessageBus {
void setMessageSerializer(MessageSerializer messageSerializer);
MessageSerializer getMessageSerializer();
void subscribe(String subject, Subscriber subscriber);
void unsubscribe(String subject, Subscriber subscriber);
void subscribe(String subject, MessageSubscriber subscriber);
void unsubscribe(String subject, MessageSubscriber subscriber);
void publish(String senderAddress, String subject, PublishScope scope, Object args);
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.apache.cloudstack.framework.eventbus;
package org.apache.cloudstack.framework.messagebus;
import java.util.ArrayList;
import java.util.Arrays;
@ -28,7 +28,7 @@ import java.util.Map;
import org.apache.cloudstack.framework.serializer.MessageSerializer;
public class EventBusBase implements EventBus {
public class MessageBusBase implements MessageBus {
private Gate _gate;
private List<ActionRecord> _pendingActions;
@ -36,7 +36,7 @@ public class EventBusBase implements EventBus {
private SubscriptionNode _subscriberRoot;
private MessageSerializer _messageSerializer;
public EventBusBase() {
public MessageBusBase() {
_gate = new Gate();
_pendingActions = new ArrayList<ActionRecord>();
@ -54,7 +54,7 @@ public class EventBusBase implements EventBus {
}
@Override
public void subscribe(String subject, Subscriber subscriber) {
public void subscribe(String subject, MessageSubscriber subscriber) {
assert(subject != null);
assert(subscriber != null);
if(_gate.enter()) {
@ -70,7 +70,7 @@ public class EventBusBase implements EventBus {
}
@Override
public void unsubscribe(String subject, Subscriber subscriber) {
public void unsubscribe(String subject, MessageSubscriber subscriber) {
if(_gate.enter()) {
SubscriptionNode current = locate(subject, null, false);
if(current != null)
@ -186,9 +186,9 @@ public class EventBusBase implements EventBus {
private static class ActionRecord {
private ActionType _type;
private String _subject;
private Subscriber _subscriber;
private MessageSubscriber _subscriber;
public ActionRecord(ActionType type, String subject, Subscriber subscriber) {
public ActionRecord(ActionType type, String subject, MessageSubscriber subscriber) {
_type = type;
_subject = subject;
_subscriber = subscriber;
@ -202,7 +202,7 @@ public class EventBusBase implements EventBus {
return _subject;
}
public Subscriber getSubscriber() {
public MessageSubscriber getSubscriber() {
return _subscriber;
}
}
@ -264,13 +264,13 @@ public class EventBusBase implements EventBus {
private static class SubscriptionNode {
@SuppressWarnings("unused")
private String _nodeKey;
private List<Subscriber> _subscribers;
private List<MessageSubscriber> _subscribers;
private Map<String, SubscriptionNode> _children;
public SubscriptionNode(String nodeKey, Subscriber subscriber) {
public SubscriptionNode(String nodeKey, MessageSubscriber subscriber) {
assert(nodeKey != null);
_nodeKey = nodeKey;
_subscribers = new ArrayList<Subscriber>();
_subscribers = new ArrayList<MessageSubscriber>();
if(subscriber != null)
_subscribers.add(subscriber);
@ -279,15 +279,15 @@ public class EventBusBase implements EventBus {
}
@SuppressWarnings("unused")
public List<Subscriber> getSubscriber() {
public List<MessageSubscriber> getSubscriber() {
return _subscribers;
}
public void addSubscriber(Subscriber subscriber) {
public void addSubscriber(MessageSubscriber subscriber) {
_subscribers.add(subscriber);
}
public void removeSubscriber(Subscriber subscriber) {
public void removeSubscriber(MessageSubscriber subscriber) {
_subscribers.remove(subscriber);
}
@ -300,8 +300,8 @@ public class EventBusBase implements EventBus {
}
public void notifySubscribers(String senderAddress, String subject, Object args) {
for(Subscriber subscriber : _subscribers) {
subscriber.onPublishEvent(senderAddress, subject, args);
for(MessageSubscriber subscriber : _subscribers) {
subscriber.onPublishMessage(senderAddress, subject, args);
}
}
}

View File

@ -17,26 +17,26 @@
* under the License.
*/
package org.apache.cloudstack.framework.eventbus;
package org.apache.cloudstack.framework.messagebus;
public class EventBusEndpoint {
private EventBus _eventBus;
public class MessageBusEndpoint {
private MessageBus _eventBus;
private String _sender;
private PublishScope _scope;
public EventBusEndpoint(EventBus eventBus, String sender, PublishScope scope) {
public MessageBusEndpoint(MessageBus eventBus, String sender, PublishScope scope) {
_eventBus = eventBus;
_sender = sender;
_scope = scope;
}
public EventBusEndpoint setEventBus(EventBus eventBus) {
public MessageBusEndpoint setEventBus(MessageBus eventBus) {
_eventBus = eventBus;
return this;
}
public EventBusEndpoint setScope(PublishScope scope) {
public MessageBusEndpoint setScope(PublishScope scope) {
_scope = scope;
return this;
}
@ -45,7 +45,7 @@ public class EventBusEndpoint {
return _scope;
}
public EventBusEndpoint setSender(String sender) {
public MessageBusEndpoint setSender(String sender) {
_sender = sender;
return this;
}

View File

@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.eventbus;
package org.apache.cloudstack.framework.messagebus;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@ -24,27 +24,27 @@ import java.util.HashMap;
import java.util.Map;
public class EventDispatcher implements Subscriber {
public class MessageDispatcher implements MessageSubscriber {
private static Map<Class<?>, Method> s_handlerCache = new HashMap<Class<?>, Method>();
private static Map<Object, EventDispatcher> s_targetMap = new HashMap<Object, EventDispatcher>();
private static Map<Object, MessageDispatcher> s_targetMap = new HashMap<Object, MessageDispatcher>();
private Object _targetObject;
public EventDispatcher(Object targetObject) {
public MessageDispatcher(Object targetObject) {
_targetObject = targetObject;
}
@Override
public void onPublishEvent(String senderAddress, String subject, Object args) {
public void onPublishMessage(String senderAddress, String subject, Object args) {
dispatch(_targetObject, subject, senderAddress, args);
}
public static EventDispatcher getDispatcher(Object targetObject) {
EventDispatcher dispatcher;
public static MessageDispatcher getDispatcher(Object targetObject) {
MessageDispatcher dispatcher;
synchronized(s_targetMap) {
dispatcher = s_targetMap.get(targetObject);
if(dispatcher == null) {
dispatcher = new EventDispatcher(targetObject);
dispatcher = new MessageDispatcher(targetObject);
s_targetMap.put(targetObject, dispatcher);
}
}
@ -85,7 +85,7 @@ public class EventDispatcher implements Subscriber {
return handler;
for(Method method : handlerClz.getMethods()) {
EventHandler annotation = method.getAnnotation(EventHandler.class);
MessageHandler annotation = method.getAnnotation(MessageHandler.class);
if(annotation != null) {
if(match(annotation.topic(), subject)) {
s_handlerCache.put(handlerClz, method);

View File

@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.eventbus;
package org.apache.cloudstack.framework.messagebus;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
@ -25,6 +25,6 @@ import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface EventHandler {
public @interface MessageHandler {
public String topic();
}

View File

@ -17,8 +17,8 @@
* under the License.
*/
package org.apache.cloudstack.framework.eventbus;
package org.apache.cloudstack.framework.messagebus;
public interface Subscriber {
void onPublishEvent(String senderAddress, String subject, Object args);
public interface MessageSubscriber {
void onPublishMessage(String senderAddress, String subject, Object args);
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.apache.cloudstack.framework.eventbus;
package org.apache.cloudstack.framework.messagebus;
public enum PublishScope {
LOCAL, GLOBAL

View File

@ -18,10 +18,10 @@
*/
package org.apache.cloudstack.framework.server;
import org.apache.cloudstack.framework.eventbus.EventBusBase;
import org.apache.cloudstack.framework.messagebus.MessageBusBase;
import org.apache.cloudstack.framework.transport.TransportMultiplexier;
public class ServerEventBus extends EventBusBase implements TransportMultiplexier {
public class ServerEventBus extends MessageBusBase implements TransportMultiplexier {
@Override
public void onTransportMessage(String senderEndpointAddress,

View File

@ -24,9 +24,9 @@ import java.util.TimerTask;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
import org.apache.cloudstack.framework.eventbus.EventBus;
import org.apache.cloudstack.framework.eventbus.EventDispatcher;
import org.apache.cloudstack.framework.eventbus.EventHandler;
import org.apache.cloudstack.framework.messagebus.MessageBus;
import org.apache.cloudstack.framework.messagebus.MessageDispatcher;
import org.apache.cloudstack.framework.messagebus.MessageHandler;
import org.apache.cloudstack.framework.rpc.RpcCallbackListener;
import org.apache.cloudstack.framework.rpc.RpcException;
import org.apache.cloudstack.framework.rpc.RpcProvider;
@ -41,7 +41,7 @@ public class SampleManagerComponent {
private static final Logger s_logger = Logger.getLogger(SampleManagerComponent.class);
@Inject
private EventBus _eventBus;
private MessageBus _eventBus;
@Inject
private RpcProvider _rpcProvider;
@ -58,7 +58,7 @@ public class SampleManagerComponent {
// subscribe to all network events (for example)
_eventBus.subscribe("network",
EventDispatcher.getDispatcher(this));
MessageDispatcher.getDispatcher(this));
_timer.schedule(new TimerTask() {
public void run() {
@ -72,7 +72,7 @@ public class SampleManagerComponent {
call.completeCall("NetworkPrepare completed");
}
@EventHandler(topic="network.prepare")
@MessageHandler(topic="network.prepare")
void onPrepareNetwork(String sender, String topic, Object args) {
}

View File

@ -21,9 +21,9 @@ package org.apache.cloudstack.framework.sampleserver;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
import org.apache.cloudstack.framework.eventbus.EventBus;
import org.apache.cloudstack.framework.eventbus.EventDispatcher;
import org.apache.cloudstack.framework.eventbus.EventHandler;
import org.apache.cloudstack.framework.messagebus.MessageBus;
import org.apache.cloudstack.framework.messagebus.MessageDispatcher;
import org.apache.cloudstack.framework.messagebus.MessageHandler;
import org.apache.cloudstack.framework.rpc.RpcProvider;
import org.apache.cloudstack.framework.rpc.RpcServerCall;
import org.apache.cloudstack.framework.rpc.RpcServiceDispatcher;
@ -36,7 +36,7 @@ public class SampleManagerComponent2 {
private static final Logger s_logger = Logger.getLogger(SampleManagerComponent2.class);
@Inject
private EventBus _eventBus;
private MessageBus _eventBus;
@Inject
private RpcProvider _rpcProvider;
@ -51,7 +51,7 @@ public class SampleManagerComponent2 {
// subscribe to all network events (for example)
_eventBus.subscribe("storage",
EventDispatcher.getDispatcher(this));
MessageDispatcher.getDispatcher(this));
}
@RpcServiceHandler(command="StoragePrepare")
@ -66,7 +66,7 @@ public class SampleManagerComponent2 {
call.completeCall(answer);
}
@EventHandler(topic="storage.prepare")
@MessageHandler(topic="storage.prepare")
void onPrepareNetwork(String sender, String topic, Object args) {
}

View File

@ -494,7 +494,8 @@ public class VmwareManagerImpl extends ManagerBase implements VmwareManager, Vmw
s_logger.info("Inject SSH key pairs before copying systemvm.iso into secondary storage");
_configServer.updateKeyPairs();
s_logger.info("Copy System VM patch ISO file to secondary storage. source ISO: " + srcIso.getAbsolutePath() +
", destination: " + destIso.getAbsolutePath());
try {
FileUtil.copyfile(srcIso, destIso);
} catch(IOException e) {