Refactoring dispatching API commands from the scheduled async job. Instead of calling an executor, the dispatcher invokes the method on the manager directly. After the command is executed the response is serialized to the async job table so it can be queried later. Also serialize a response for async create commands that includes the id of the object being created.

This commit is contained in:
Kris McQueen 2010-09-13 18:28:19 -07:00
parent a8112f65de
commit f4caf145c3
6 changed files with 288 additions and 78 deletions

View File

@ -38,7 +38,14 @@ public class ApiDispatcher {
private StorageManager _storageMgr;
private UserVmManager _userVmMgr;
public ApiDispatcher() {
// singleton class
private static ApiDispatcher s_instance = new ApiDispatcher();
public static ApiDispatcher getInstance() {
return s_instance;
}
private ApiDispatcher() {
ComponentLocator locator = ComponentLocator.getLocator(ManagementServer.Name);
_mgmtServer = (ManagementServer)ComponentLocator.getComponent(ManagementServer.Name);
_configMgr = locator.getManager(ConfigurationManager.class);
@ -49,42 +56,55 @@ public class ApiDispatcher {
_userVmMgr = locator.getManager(UserVmManager.class);
}
public void dispatch(BaseCmd cmd, Map<String, String> params) {
Map<String, Object> unpackedParams = cmd.unpackParams(params);
Field[] fields = cmd.getClass().getDeclaredFields();
for (Field field : fields) {
Parameter parameterAnnotation = field.getAnnotation(Parameter.class);
if (parameterAnnotation == null) {
continue;
}
Object paramObj = unpackedParams.get(parameterAnnotation.name());
if (paramObj == null) {
if (parameterAnnotation.required()) {
throw new ServerApiException(BaseCmd.PARAM_ERROR, "Unable to execute API command " + cmd.getName() + " due to missing parameter " + parameterAnnotation.name());
}
continue;
}
public Long dispatchCreateCmd(BaseAsyncCreateCmd cmd, Map<String, String> params) {
setupParameters(cmd, params);
// marshall the parameter into the correct type and set the field value
try {
setFieldValue(field, cmd, paramObj, parameterAnnotation);
} catch (IllegalArgumentException argEx) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Unable to execute API command " + cmd.getName() + " due to invalid value " + paramObj + " for parameter " + parameterAnnotation.name());
}
throw new ServerApiException(BaseCmd.PARAM_ERROR, "Unable to execute API command " + cmd.getName() + " due to invalid value " + paramObj + " for parameter " + parameterAnnotation.name());
} catch (ParseException parseEx) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Invalid date parameter " + paramObj + " passed to command " + cmd.getName());
}
throw new ServerApiException(BaseCmd.PARAM_ERROR, "Unable to parse date " + paramObj + " for command " + cmd.getName() + ", please pass dates in the format yyyy-MM-dd");
} catch (CloudRuntimeException cloudEx) {
// FIXME: Better error message? This only happens if the API command is not executable, which typically means there was
// and IllegalAccessException setting one of the parameters.
throw new ServerApiException(BaseCmd.INTERNAL_ERROR, "Internal error executing API command " + cmd.getName());
}
Implementation impl = cmd.getClass().getAnnotation(Implementation.class);
String methodName = impl.createMethod();
Object mgr = _mgmtServer;
switch (impl.manager()) {
case ConfigManager:
mgr = _configMgr;
break;
case NetworkGroupManager:
mgr = _networkGroupMgr;
break;
case NetworkManager:
mgr = _networkMgr;
break;
case SnapshotManager:
mgr = _snapshotMgr;
break;
case StorageManager:
mgr = _storageMgr;
break;
case UserVmManager:
mgr = _userVmMgr;
break;
}
try {
Method method = mgr.getClass().getMethod(methodName, cmd.getClass());
method.invoke(mgr, cmd);
return cmd.getId();
} catch (NoSuchMethodException nsme) {
s_logger.warn("Exception executing method " + methodName + " for command " + cmd.getClass().getSimpleName(), nsme);
throw new CloudRuntimeException("Unable to execute method " + methodName + " for command " + cmd.getClass().getSimpleName() + ", unable to find implementation.");
} catch (InvocationTargetException ite) {
s_logger.warn("Exception executing method " + methodName + " for command " + cmd.getClass().getSimpleName(), ite);
throw new CloudRuntimeException("Unable to execute method " + methodName + " for command " + cmd.getClass().getSimpleName() + ", internal error in the implementation.");
} catch (IllegalAccessException iae) {
s_logger.warn("Exception executing method " + methodName + " for command " + cmd.getClass().getSimpleName(), iae);
throw new CloudRuntimeException("Unable to execute method " + methodName + " for command " + cmd.getClass().getSimpleName() + ", internal error in the implementation.");
} catch (IllegalArgumentException iArgEx) {
s_logger.warn("Exception executing method " + methodName + " for command " + cmd.getClass().getSimpleName(), iArgEx);
throw new CloudRuntimeException("Unable to execute method " + methodName + " for command " + cmd.getClass().getSimpleName() + ", internal error in the implementation.");
}
}
public void dispatch(BaseCmd cmd, Map<String, String> params) {
setupParameters(cmd, params);
Implementation impl = cmd.getClass().getAnnotation(Implementation.class);
String methodName = impl.method();
Object mgr = _mgmtServer;
@ -127,6 +147,43 @@ public class ApiDispatcher {
}
}
private void setupParameters(BaseCmd cmd, Map<String, String> params) {
Map<String, Object> unpackedParams = cmd.unpackParams(params);
Field[] fields = cmd.getClass().getDeclaredFields();
for (Field field : fields) {
Parameter parameterAnnotation = field.getAnnotation(Parameter.class);
if (parameterAnnotation == null) {
continue;
}
Object paramObj = unpackedParams.get(parameterAnnotation.name());
if (paramObj == null) {
if (parameterAnnotation.required()) {
throw new ServerApiException(BaseCmd.PARAM_ERROR, "Unable to execute API command " + cmd.getName() + " due to missing parameter " + parameterAnnotation.name());
}
continue;
}
// marshall the parameter into the correct type and set the field value
try {
setFieldValue(field, cmd, paramObj, parameterAnnotation);
} catch (IllegalArgumentException argEx) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Unable to execute API command " + cmd.getName() + " due to invalid value " + paramObj + " for parameter " + parameterAnnotation.name());
}
throw new ServerApiException(BaseCmd.PARAM_ERROR, "Unable to execute API command " + cmd.getName() + " due to invalid value " + paramObj + " for parameter " + parameterAnnotation.name());
} catch (ParseException parseEx) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Invalid date parameter " + paramObj + " passed to command " + cmd.getName());
}
throw new ServerApiException(BaseCmd.PARAM_ERROR, "Unable to parse date " + paramObj + " for command " + cmd.getName() + ", please pass dates in the format yyyy-MM-dd");
} catch (CloudRuntimeException cloudEx) {
// FIXME: Better error message? This only happens if the API command is not executable, which typically means there was
// and IllegalAccessException setting one of the parameters.
throw new ServerApiException(BaseCmd.INTERNAL_ERROR, "Internal error executing API command " + cmd.getName());
}
}
}
@SuppressWarnings({"unchecked", "rawtypes"})
private void setFieldValue(Field field, BaseCmd cmdObj, Object paramObj, Parameter annotation) throws IllegalArgumentException, ParseException {
try {

View File

@ -186,7 +186,7 @@ public class ApiServer implements HttpRequestHandler {
_ms = (ManagementServer)ComponentLocator.getComponent(ManagementServer.Name);
ComponentLocator locator = ComponentLocator.getLocator(ManagementServer.Name);
_asyncMgr = locator.getManager(AsyncJobManager.class);
_dispatcher = new ApiDispatcher();
_dispatcher = ApiDispatcher.getInstance();
int apiPort = 8096; // default port
ConfigurationDao configDao = locator.getDao(ConfigurationDao.class);
@ -347,6 +347,10 @@ public class ApiServer implements HttpRequestHandler {
private String queueCommand(BaseCmd cmdObj, Map<String, String> params) {
if (cmdObj instanceof BaseAsyncCmd) {
Long objectId = null;
if (cmdObj instanceof BaseAsyncCreateCmd) {
objectId = _dispatcher.dispatchCreateCmd((BaseAsyncCreateCmd)cmdObj, params);
}
BaseAsyncCmd asyncCmd = (BaseAsyncCmd)cmdObj;
Gson gson = GsonHelper.getBuilder().create();
@ -356,6 +360,9 @@ public class ApiServer implements HttpRequestHandler {
job.setCmd(cmdObj.getClass().getName());
job.setCmdInfo(gson.toJson(params));
long jobId = _asyncMgr.submitAsyncJob(job);
if (objectId != null) {
return ((BaseAsyncCreateCmd)asyncCmd).getResponse(jobId, objectId);
}
return asyncCmd.getResponse(jobId);
} else {
_dispatcher.dispatch(cmdObj, params);

View File

@ -1,7 +1,10 @@
package com.cloud.api;
import com.cloud.api.response.CreateCmdResponse;
import com.cloud.serializer.SerializerHelper;
public abstract class BaseAsyncCreateCmd extends BaseAsyncCmd {
@Parameter(name="portforwardingserviceid")
@Parameter(name="id")
private Long id;
public Long getId() {
@ -11,4 +14,11 @@ public abstract class BaseAsyncCreateCmd extends BaseAsyncCmd {
public void setId(Long id) {
this.id = id;
}
public String getResponse(long jobId, long objectId) {
CreateCmdResponse response = new CreateCmdResponse();
response.setJobId(jobId);
response.setId(objectId);
return SerializerHelper.toSerializedString(response);
}
}

View File

@ -223,7 +223,7 @@ public abstract class BaseCmd {
}
// FIXME: move this to a utils method so that maps can be unpacked and integer/long values can be appropriately cast
@SuppressWarnings("unchecked")
@SuppressWarnings({"unchecked", "rawtypes"})
public Map<String, Object> unpackParams(Map<String, String> params) {
Map<String, Object> lowercaseParams = new HashMap<String, Object>();
for (String key : params.keySet()) {

View File

@ -0,0 +1,28 @@
package com.cloud.api.response;
import com.cloud.api.ResponseObject;
import com.cloud.serializer.Param;
public class CreateCmdResponse implements ResponseObject {
@Param(name="jobid")
private Long jobId;
@Param(name="id")
private Long id;
public Long getJobId() {
return jobId;
}
public void setJobId(Long jobId) {
this.jobId = jobId;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
}

View File

@ -18,6 +18,7 @@
package com.cloud.async;
import java.lang.reflect.Type;
import java.util.Date;
import java.util.List;
import java.util.Map;
@ -33,10 +34,13 @@ import javax.naming.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.log4j.NDC;
import com.cloud.api.ApiDispatcher;
import com.cloud.api.BaseCmd;
import com.cloud.async.dao.AsyncJobDao;
import com.cloud.cluster.ClusterManager;
import com.cloud.configuration.dao.ConfigurationDao;
import com.cloud.maid.StackMaid;
import com.cloud.serializer.GsonHelper;
import com.cloud.serializer.SerializerHelper;
import com.cloud.utils.DateUtil;
import com.cloud.utils.NumbersUtil;
@ -47,6 +51,8 @@ import com.cloud.utils.db.GlobalLock;
import com.cloud.utils.db.Transaction;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.utils.net.MacAddress;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
@Local(value={AsyncJobManager.class})
public class AsyncJobManagerImpl implements AsyncJobManager {
@ -63,7 +69,8 @@ public class AsyncJobManagerImpl implements AsyncJobManager {
private SyncQueueManager _queueMgr;
private ClusterManager _clusterMgr;
private AsyncJobDao _jobDao;
private long _jobExpireSeconds = 86400; // 1 day
private long _jobExpireSeconds = 86400; // 1 day
private ApiDispatcher _dispatcher;
private final ScheduledExecutorService _heartbeatScheduler =
Executors.newScheduledThreadPool(1, new NamedThreadFactory("AsyncJobMgr-Heartbeat"));
@ -94,26 +101,28 @@ public class AsyncJobManagerImpl implements AsyncJobManager {
if(s_logger.isDebugEnabled())
s_logger.debug("submit async job-" + job.getId() + ", details: " + job.toString());
AsyncJobExecutor executor = getJobExecutor(job);
if(executor == null) {
s_logger.error("Unable to find executor to execute command " + job.getCmd() + " for job-" + job.getId());
} else {
// AsyncJobExecutor executor = getJobExecutor(job);
// if(executor == null) {
// s_logger.error("Unable to find executor to execute command " + job.getCmd() + " for job-" + job.getId());
// } else {
Transaction txt = Transaction.currentTxn();
try {
txt.start();
job.setInitMsid(getMsid());
_jobDao.persist(job);
txt.commit();
// no sync source originally
executor.setSyncSource(null);
executor.setJob(job);
scheduleExecution(executor, scheduleJobExecutionInContext);
// no sync source originally
// FIXME: sync source for commands? how does new API framework handle that?
// executor.setSyncSource(null);
// executor.setJob(job);
// scheduleExecution(executor, scheduleJobExecutionInContext);
scheduleExecution(job, scheduleJobExecutionInContext);
return job.getId();
} catch(Exception e) {
s_logger.error("Unexpected exception: ", e);
txt.rollback();
}
}
// }
return 0L;
}
@ -143,7 +152,8 @@ public class AsyncJobManagerImpl implements AsyncJobManager {
// reset attached object
job.setInstanceType(null);
job.setInstanceId(null);
// FIXME: do we need to re-serialize here?
if(resultObject != null)
job.setResult(SerializerHelper.toSerializedString(resultObject));
job.setLastUpdated(DateUtil.currentGMTTime());
@ -287,7 +297,8 @@ public class AsyncJobManagerImpl implements AsyncJobManager {
return jobResult;
}
/* old code...remove for new API framework
private AsyncJobExecutor getJobExecutor(AsyncJobVO job) {
String executorClazzName = "com.cloud.async.executor." + job.getCmd() + "Executor";
@ -302,12 +313,14 @@ public class AsyncJobManagerImpl implements AsyncJobManager {
s_logger.error("Unable to load async-job executor class: " + executorClazzName, e);
}
return null;
}
}
// old code...remove for new API framework
private void scheduleExecution(final AsyncJobExecutor executor) {
scheduleExecution(executor, false);
}
// old code...remove for new API framework
private void scheduleExecution(final AsyncJobExecutor executor, boolean executeInContext) {
Runnable runnable = getExecutorRunnable(executor);
if(executeInContext)
@ -315,7 +328,8 @@ public class AsyncJobManagerImpl implements AsyncJobManager {
else
_executor.submit(runnable);
}
// old code...remove for new API framework
private Runnable getExecutorRunnable(final AsyncJobExecutor executor) {
return new Runnable() {
public void run() {
@ -369,31 +383,122 @@ public class AsyncJobManagerImpl implements AsyncJobManager {
}
};
}
private void executeQueueItem(SyncQueueItemVO item, boolean fromPreviousSession) {
AsyncJobVO job = _jobDao.findById(item.getContentId());
if(job != null) {
AsyncJobExecutor executor = getJobExecutor(job);
if(executor == null) {
s_logger.error("Unable to find job exectutor for job-" + job.getId());
_queueMgr.purgeItem(item.getId());
} else {
if(s_logger.isDebugEnabled())
s_logger.debug("Schedule queued job-" + job.getId());
executor.setFromPreviousSession(fromPreviousSession);
executor.setSyncSource(item);
executor.setJob(job);
scheduleExecution(executor);
}
} else {
if(s_logger.isDebugEnabled())
s_logger.debug("Unable to find related job for queue item: " + item.toString());
_queueMgr.purgeItem(item.getId());
}
}
*/
private void scheduleExecution(final AsyncJobVO job) {
scheduleExecution(job, false);
}
private void scheduleExecution(final AsyncJobVO job, boolean executeInContext) {
Runnable runnable = getExecutorRunnable(job);
if (executeInContext) {
runnable.run();
} else {
_executor.submit(runnable);
}
}
private Runnable getExecutorRunnable(final AsyncJobVO job) {
return new Runnable() {
public void run() {
long jobId = 0;
Transaction txn = Transaction.open(Transaction.CLOUD_DB);
try {
jobId = job.getId();
NDC.push("job-" + jobId);
if(s_logger.isDebugEnabled()) {
s_logger.debug("Executing " + job.getCmd() + " for job-" + jobId);
}
Class<?> cmdClass = Class.forName(job.getCmd());
BaseCmd cmdObj = (BaseCmd)cmdClass.newInstance();
Type mapType = new TypeToken<Map<String, String>>() {}.getType();
Gson gson = GsonHelper.getBuilder().create();
Map<String, String> params = gson.fromJson(job.getCmdInfo(), mapType);
// FIXME: whenever we deserialize, the UserContext needs to be updated
//UserContext.registerContext(userId, accountObject, accountName, accountId, domainId, sessionId, apiServer);
// FIXME: things might need to be queued as part of synchronization here, so they just have to be re-dispatched from the queue
// mechanism...
_dispatcher.dispatch(cmdObj, params);
// serialize this to the async job table
completeAsyncJob(jobId, AsyncJobResult.STATUS_SUCCEEDED, 0, cmdObj.getResponse());
if(s_logger.isDebugEnabled())
s_logger.debug("Done executing " + job.getCmd() + " for job-" + jobId);
} catch(Throwable e) {
s_logger.error("Unexpected exception while executing " + job.getCmd(), e);
/* FIXME: need to clean up any queue that happened as part of the dispatching
try {
if(executor.getSyncSource() != null) {
_queueMgr.purgeItem(executor.getSyncSource().getId());
checkQueue(executor.getSyncSource().getQueueId());
}
} catch(Throwable ex) {
s_logger.fatal("Exception on exception, log it for record", ex);
}
*/
} finally {
StackMaid.current().exitCleanup();
txn.close();
NDC.pop();
}
// leave no trace out after execution for security reason
// BaseAsyncJobExecutor.setCurrentExecutor(null);
}
};
}
/* Old method...remove as part of API refactoring...
private void executeQueueItem(SyncQueueItemVO item, boolean fromPreviousSession) {
AsyncJobVO job = _jobDao.findById(item.getContentId());
if(job != null) {
AsyncJobExecutor executor = getJobExecutor(job);
if(executor == null) {
s_logger.error("Unable to find job exectutor for job-" + job.getId());
_queueMgr.purgeItem(item.getId());
} else {
if(s_logger.isDebugEnabled())
s_logger.debug("Schedule queued job-" + job.getId());
executor.setFromPreviousSession(fromPreviousSession);
executor.setSyncSource(item);
executor.setJob(job);
scheduleExecution(executor);
}
} else {
if(s_logger.isDebugEnabled())
s_logger.debug("Unable to find related job for queue item: " + item.toString());
_queueMgr.purgeItem(item.getId());
}
}
*/
private void executeQueueItem(SyncQueueItemVO item, boolean fromPreviousSession) {
AsyncJobVO job = _jobDao.findById(item.getContentId());
if (job != null) {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Schedule queued job-" + job.getId());
}
scheduleExecution(job);
} else {
if(s_logger.isDebugEnabled())
s_logger.debug("Unable to find related job for queue item: " + item.toString());
_queueMgr.purgeItem(item.getId());
}
}
@Override
public void releaseSyncSource(AsyncJobExecutor executor) {
if(executor.getSyncSource() != null) {
@ -551,7 +656,10 @@ public class AsyncJobManagerImpl implements AsyncJobManager {
+ SyncQueueManager.class.getName());
}
_clusterMgr = locator.getManager(ClusterManager.class);
_clusterMgr = locator.getManager(ClusterManager.class);
_dispatcher = ApiDispatcher.getInstance();
return true;
}