Add AsyncMethod support

This commit is contained in:
Kelven Yang 2012-12-06 15:14:18 -08:00
parent 1b91641397
commit 6fd6b38b43
17 changed files with 422 additions and 80 deletions

View File

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
public class AsyncCallbackDispatcher {
private static Map<Class<?>, Method> s_handlerCache = new HashMap<Class<?>, Method>();
public static boolean dispatch(Object target, AsyncCompletionCallback callback) {
assert(callback != null);
assert(target != null);
Method handler = resolveHandler(target.getClass(), callback.getOperationName());
if(handler == null)
return false;
try {
handler.invoke(target, callback);
} catch (IllegalArgumentException e) {
throw new RuntimeException("IllegalArgumentException when invoking RPC callback for command: " + callback.getOperationName());
} catch (IllegalAccessException e) {
throw new RuntimeException("IllegalAccessException when invoking RPC callback for command: " + callback.getOperationName());
} catch (InvocationTargetException e) {
throw new RuntimeException("InvocationTargetException when invoking RPC callback for command: " + callback.getOperationName());
}
return true;
}
public static Method resolveHandler(Class<?> handlerClz, String operationName) {
synchronized(s_handlerCache) {
Method handler = s_handlerCache.get(handlerClz);
if(handler != null)
return handler;
for(Method method : handlerClz.getMethods()) {
AsyncCallbackHandler annotation = method.getAnnotation(AsyncCallbackHandler.class);
if(annotation != null) {
if(annotation.operationName().equals(operationName)) {
s_handlerCache.put(handlerClz, method);
return method;
}
}
}
}
return null;
}
}

View File

@ -18,6 +18,6 @@
*/
package org.apache.cloudstack.framework.messaging;
public interface ComponentContainer {
ComponentEndpoint wire(ComponentEndpoint endpoint, String predefinedAddress);
public interface AsyncCallbackDriver {
public void performCompletionCallback(AsyncCompletionCallback callback);
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;
public @interface AsyncCallbackHandler {
String operationName();
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;
import java.util.HashMap;
import java.util.Map;
public class AsyncCompletionCallback {
private Map<String, Object> _contextMap = new HashMap<String, Object>();
private String _operationName;
private Object _targetObject;
public AsyncCompletionCallback(Object target) {
_targetObject = target;
}
public AsyncCompletionCallback setContextParam(String key, Object param) {
// ???
return this;
}
public AsyncCompletionCallback attachDriver(AsyncCallbackDriver driver) {
// ???
return this;
}
public AsyncCompletionCallback setOperationName(String name) {
_operationName = name;
return this;
}
public String getOperationName() {
return _operationName;
}
public <T> T getContextParam(String key) {
// ???
return null;
}
public void complete(Object resultObject) {
///
}
public <T> T getResult() {
// ???
return null;
}
}

View File

@ -1,65 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;
import org.apache.log4j.Logger;
public class ComponentEndpoint implements RpcServiceEndpoint, Subscriber {
private static final Logger s_logger = Logger.getLogger(ComponentEndpoint.class);
private TransportEndpoint transportEndpoint;
private RpcProvider rpcProvider;
public ComponentEndpoint() {
}
public TransportEndpoint getTransportEndpoint() {
return transportEndpoint;
}
public void setTransportEndpoint(TransportEndpoint transportEndpoint) {
this.transportEndpoint = transportEndpoint;
}
public RpcProvider getRpcProvider() {
return rpcProvider;
}
public void setRpcProvider(RpcProvider rpcProvider) {
this.rpcProvider = rpcProvider;
}
public void initialize() {
rpcProvider.registerRpcServiceEndpoint(this);
}
@Override
public boolean onCallReceive(RpcServerCall call) {
return RpcServiceDispatcher.dispatch(this, call);
}
@Override
public void onPublishEvent(String subject, String senderAddress, Object args) {
try {
EventDispatcher.dispatch(this, subject, senderAddress, args);
} catch(RuntimeException e) {
s_logger.error("Unhandled exception", e);
}
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;
public class EventBusEndpoint {
private EventBus _eventBus;
private String _sender;
private PublishScope _scope;
public EventBusEndpoint(EventBus eventBus, String sender, PublishScope scope) {
_eventBus = eventBus;
_sender = sender;
_scope = scope;
}
public EventBusEndpoint setEventBus(EventBus eventBus) {
_eventBus = eventBus;
return this;
}
public EventBusEndpoint setScope(PublishScope scope) {
_scope = scope;
return this;
}
public PublishScope getScope() {
return _scope;
}
public EventBusEndpoint setSender(String sender) {
_sender = sender;
return this;
}
public String getSender() {
return _sender;
}
public void Publish(String subject, Object args) {
assert(_eventBus != null);
_eventBus.publish(_sender, subject, _scope, args);
}
}

View File

@ -23,9 +23,39 @@ import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
public class EventDispatcher {
public class EventDispatcher implements Subscriber {
private static Map<Class<?>, Method> s_handlerCache = new HashMap<Class<?>, Method>();
private static Map<Object, EventDispatcher> s_targetMap = new HashMap<Object, EventDispatcher>();
private Object _targetObject;
public EventDispatcher(Object targetObject) {
_targetObject = targetObject;
}
@Override
public void onPublishEvent(String senderAddress, String subject, Object args) {
dispatch(_targetObject, subject, senderAddress, args);
}
public static EventDispatcher getDispatcher(Object targetObject) {
EventDispatcher dispatcher;
synchronized(s_targetMap) {
dispatcher = s_targetMap.get(targetObject);
if(dispatcher == null) {
dispatcher = new EventDispatcher(targetObject);
s_targetMap.put(targetObject, dispatcher);
}
}
return dispatcher;
}
public static void removeDispatcher(Object targetObject) {
synchronized(s_targetMap) {
s_targetMap.remove(targetObject);
}
}
public static boolean dispatch(Object target, String subject, String senderAddress, Object args) {
assert(subject != null);
assert(target != null);

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;
public class InplaceAsyncCallbackDriver implements AsyncCallbackDriver {
@Override
public void performCompletionCallback(AsyncCompletionCallback callback) {
// TODO Auto-generated method stub
}
}

View File

@ -86,9 +86,10 @@ public class RpcClientCallImpl implements RpcClientCall {
return this;
}
@SuppressWarnings("unchecked")
@Override
public Object getContextParam(String key) {
return _contextParams.get(key);
public <T> T getContextParam(String key) {
return (T)_contextParams.get(key);
}
@Override

View File

@ -24,14 +24,17 @@ public interface RpcProvider extends TransportMultiplexier {
void setMessageSerializer(MessageSerializer messageSerializer);
MessageSerializer getMessageSerializer();
void registerRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint);
void unregisteRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint);
RpcClientCall newCall(TransportEndpoint sourceEndpoint, RpcAddressable targetAddress);
RpcClientCall newCall(TransportEndpoint sourceEndpoint, String targetAddress);
void registerRpcServiceEndpoint(String serviceAddress, RpcServiceEndpoint rpcEndpoint);
void unregisteRpcServiceEndpoint(RpcAddressable serviceAddress, RpcServiceEndpoint rpcEndpoint);
RpcClientCall newCall(String targetAddress);
RpcClientCall newCall(RpcAddressable targetAddress);
//
// low-level public API
//
RpcClientCall newCall(TransportEndpoint sourceEndpoint, RpcAddressable targetAddress);
RpcClientCall newCall(TransportEndpoint sourceEndpoint, String targetAddress);
void registerCall(RpcClientCall call);
void cancelCall(RpcClientCall call);

View File

@ -69,14 +69,14 @@ public class RpcProviderImpl implements RpcProvider {
}
@Override
public void registerRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint) {
public void registerRpcServiceEndpoint(String serviceAddress, RpcServiceEndpoint rpcEndpoint) {
synchronized(_serviceEndpoints) {
_serviceEndpoints.add(rpcEndpoint);
}
}
@Override
public void unregisteRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint) {
public void unregisteRpcServiceEndpoint(RpcAddressable serviceAddress, RpcServiceEndpoint rpcEndpoint) {
synchronized(_serviceEndpoints) {
_serviceEndpoints.remove(rpcEndpoint);
}

View File

@ -42,7 +42,7 @@ public class RpcServerCallImpl implements RpcServerCall {
}
@Override
public Object getCommandArgument() {
public <T> T getCommandArgument() {
if(_requestPdu.getSerializedCommandArg() == null)
return null;

View File

@ -23,9 +23,34 @@ import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
public class RpcServiceDispatcher {
public class RpcServiceDispatcher implements RpcServiceEndpoint {
private static Map<Class<?>, Method> s_handlerCache = new HashMap<Class<?>, Method>();
private static Map<Object, RpcServiceDispatcher> s_targetMap = new HashMap<Object, RpcServiceDispatcher>();
private Object _targetObject;
public RpcServiceDispatcher(Object targetObject) {
_targetObject = targetObject;
}
public static RpcServiceDispatcher getDispatcher(Object targetObject) {
RpcServiceDispatcher dispatcher;
synchronized(s_targetMap) {
dispatcher = s_targetMap.get(targetObject);
if(dispatcher == null) {
dispatcher = new RpcServiceDispatcher(targetObject);
s_targetMap.put(targetObject, dispatcher);
}
}
return dispatcher;
}
public static void removeDispatcher(Object targetObject) {
synchronized(s_targetMap) {
s_targetMap.remove(targetObject);
}
}
public static boolean dispatch(Object target, RpcServerCall serviceCall) {
assert(serviceCall != null);
@ -67,4 +92,9 @@ public class RpcServiceDispatcher {
return null;
}
@Override
public boolean onCallReceive(RpcServerCall call) {
return dispatch(_targetObject, call);
}
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;
public class AsyncSampleCallee {
AsyncSampleCallee _driver;
public TestVolume createVolume(Object realParam, AsyncCompletionCallback callback) {
_driver.createVolume(realParam,
new AsyncCompletionCallback(this)
.setOperationName("volume.driver.create")
.setContextParam("dsCompletion", callback)
);
return null;
}
@AsyncCallbackHandler(operationName="volume.driver.create")
public void onDriverCreateVolumeCallback(AsyncCompletionCallback driverCompletion) {
AsyncCompletionCallback dsCompletionCallback = driverCompletion.getContextParam("dsCompletion");
String str = driverCompletion.getResult();
dsCompletionCallback.complete(str);
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;
public class AsyncSampleCaller {
AsyncSampleCallee _ds;
public void MethodThatWillCallAsyncMethod() {
TestVolume vol = new TestVolume();
_ds.createVolume(vol,
new AsyncCompletionCallback(this).setContextParam("vol", vol));
}
@AsyncCallbackHandler(operationName="volume.create")
public void onCreateVolumeCallback(AsyncCompletionCallback callback) {
TestVolume contextVol = callback.getContextParam("vol");
TestVolume result = callback.getResult();
// do something
}
}

View File

@ -18,11 +18,24 @@
*/
package org.apache.cloudstack.framework.messaging;
public class SampleComponent extends ComponentEndpoint {
public class SampleComponent {
RpcProvider _rpcProvider;
EventBus _eventBus;
public SampleComponent() {
}
public void init() {
_rpcProvider.registerRpcServiceEndpoint("AgentManager",
RpcServiceDispatcher.getDispatcher(this));
// subscribe to all network events (for example)
_eventBus.subscribe("network",
EventDispatcher.getDispatcher(this));
}
@RpcServiceHandler(command="StartCommand")
void onStartCommand(RpcServerCall call) {
call.completeCall("Call response");

View File

@ -0,0 +1,5 @@
package org.apache.cloudstack.framework.messaging;
public class TestVolume {
}