From f4caf145c32046560adcf35e1ad605689cf079fa Mon Sep 17 00:00:00 2001 From: Kris McQueen Date: Mon, 13 Sep 2010 18:28:19 -0700 Subject: [PATCH] Refactoring dispatching API commands from the scheduled async job. Instead of calling an executor, the dispatcher invokes the method on the manager directly. After the command is executed the response is serialized to the async job table so it can be queried later. Also serialize a response for async create commands that includes the id of the object being created. --- server/src/com/cloud/api/ApiDispatcher.java | 125 ++++++++---- server/src/com/cloud/api/ApiServer.java | 9 +- .../src/com/cloud/api/BaseAsyncCreateCmd.java | 12 +- server/src/com/cloud/api/BaseCmd.java | 2 +- .../cloud/api/response/CreateCmdResponse.java | 28 +++ .../com/cloud/async/AsyncJobManagerImpl.java | 190 ++++++++++++++---- 6 files changed, 288 insertions(+), 78 deletions(-) create mode 100644 server/src/com/cloud/api/response/CreateCmdResponse.java diff --git a/server/src/com/cloud/api/ApiDispatcher.java b/server/src/com/cloud/api/ApiDispatcher.java index 477b200add0..5aa0545e886 100644 --- a/server/src/com/cloud/api/ApiDispatcher.java +++ b/server/src/com/cloud/api/ApiDispatcher.java @@ -38,7 +38,14 @@ public class ApiDispatcher { private StorageManager _storageMgr; private UserVmManager _userVmMgr; - public ApiDispatcher() { + // singleton class + private static ApiDispatcher s_instance = new ApiDispatcher(); + + public static ApiDispatcher getInstance() { + return s_instance; + } + + private ApiDispatcher() { ComponentLocator locator = ComponentLocator.getLocator(ManagementServer.Name); _mgmtServer = (ManagementServer)ComponentLocator.getComponent(ManagementServer.Name); _configMgr = locator.getManager(ConfigurationManager.class); @@ -49,42 +56,55 @@ public class ApiDispatcher { _userVmMgr = locator.getManager(UserVmManager.class); } - public void dispatch(BaseCmd cmd, Map params) { - Map unpackedParams = cmd.unpackParams(params); - Field[] fields = cmd.getClass().getDeclaredFields(); - for (Field field : fields) { - Parameter parameterAnnotation = field.getAnnotation(Parameter.class); - if (parameterAnnotation == null) { - continue; - } - Object paramObj = unpackedParams.get(parameterAnnotation.name()); - if (paramObj == null) { - if (parameterAnnotation.required()) { - throw new ServerApiException(BaseCmd.PARAM_ERROR, "Unable to execute API command " + cmd.getName() + " due to missing parameter " + parameterAnnotation.name()); - } - continue; - } + public Long dispatchCreateCmd(BaseAsyncCreateCmd cmd, Map params) { + setupParameters(cmd, params); - // marshall the parameter into the correct type and set the field value - try { - setFieldValue(field, cmd, paramObj, parameterAnnotation); - } catch (IllegalArgumentException argEx) { - if (s_logger.isDebugEnabled()) { - s_logger.debug("Unable to execute API command " + cmd.getName() + " due to invalid value " + paramObj + " for parameter " + parameterAnnotation.name()); - } - throw new ServerApiException(BaseCmd.PARAM_ERROR, "Unable to execute API command " + cmd.getName() + " due to invalid value " + paramObj + " for parameter " + parameterAnnotation.name()); - } catch (ParseException parseEx) { - if (s_logger.isDebugEnabled()) { - s_logger.debug("Invalid date parameter " + paramObj + " passed to command " + cmd.getName()); - } - throw new ServerApiException(BaseCmd.PARAM_ERROR, "Unable to parse date " + paramObj + " for command " + cmd.getName() + ", please pass dates in the format yyyy-MM-dd"); - } catch (CloudRuntimeException cloudEx) { - // FIXME: Better error message? This only happens if the API command is not executable, which typically means there was - // and IllegalAccessException setting one of the parameters. - throw new ServerApiException(BaseCmd.INTERNAL_ERROR, "Internal error executing API command " + cmd.getName()); - } + Implementation impl = cmd.getClass().getAnnotation(Implementation.class); + String methodName = impl.createMethod(); + Object mgr = _mgmtServer; + switch (impl.manager()) { + case ConfigManager: + mgr = _configMgr; + break; + case NetworkGroupManager: + mgr = _networkGroupMgr; + break; + case NetworkManager: + mgr = _networkMgr; + break; + case SnapshotManager: + mgr = _snapshotMgr; + break; + case StorageManager: + mgr = _storageMgr; + break; + case UserVmManager: + mgr = _userVmMgr; + break; } + try { + Method method = mgr.getClass().getMethod(methodName, cmd.getClass()); + method.invoke(mgr, cmd); + return cmd.getId(); + } catch (NoSuchMethodException nsme) { + s_logger.warn("Exception executing method " + methodName + " for command " + cmd.getClass().getSimpleName(), nsme); + throw new CloudRuntimeException("Unable to execute method " + methodName + " for command " + cmd.getClass().getSimpleName() + ", unable to find implementation."); + } catch (InvocationTargetException ite) { + s_logger.warn("Exception executing method " + methodName + " for command " + cmd.getClass().getSimpleName(), ite); + throw new CloudRuntimeException("Unable to execute method " + methodName + " for command " + cmd.getClass().getSimpleName() + ", internal error in the implementation."); + } catch (IllegalAccessException iae) { + s_logger.warn("Exception executing method " + methodName + " for command " + cmd.getClass().getSimpleName(), iae); + throw new CloudRuntimeException("Unable to execute method " + methodName + " for command " + cmd.getClass().getSimpleName() + ", internal error in the implementation."); + } catch (IllegalArgumentException iArgEx) { + s_logger.warn("Exception executing method " + methodName + " for command " + cmd.getClass().getSimpleName(), iArgEx); + throw new CloudRuntimeException("Unable to execute method " + methodName + " for command " + cmd.getClass().getSimpleName() + ", internal error in the implementation."); + } + } + + public void dispatch(BaseCmd cmd, Map params) { + setupParameters(cmd, params); + Implementation impl = cmd.getClass().getAnnotation(Implementation.class); String methodName = impl.method(); Object mgr = _mgmtServer; @@ -127,6 +147,43 @@ public class ApiDispatcher { } } + private void setupParameters(BaseCmd cmd, Map params) { + Map unpackedParams = cmd.unpackParams(params); + Field[] fields = cmd.getClass().getDeclaredFields(); + for (Field field : fields) { + Parameter parameterAnnotation = field.getAnnotation(Parameter.class); + if (parameterAnnotation == null) { + continue; + } + Object paramObj = unpackedParams.get(parameterAnnotation.name()); + if (paramObj == null) { + if (parameterAnnotation.required()) { + throw new ServerApiException(BaseCmd.PARAM_ERROR, "Unable to execute API command " + cmd.getName() + " due to missing parameter " + parameterAnnotation.name()); + } + continue; + } + + // marshall the parameter into the correct type and set the field value + try { + setFieldValue(field, cmd, paramObj, parameterAnnotation); + } catch (IllegalArgumentException argEx) { + if (s_logger.isDebugEnabled()) { + s_logger.debug("Unable to execute API command " + cmd.getName() + " due to invalid value " + paramObj + " for parameter " + parameterAnnotation.name()); + } + throw new ServerApiException(BaseCmd.PARAM_ERROR, "Unable to execute API command " + cmd.getName() + " due to invalid value " + paramObj + " for parameter " + parameterAnnotation.name()); + } catch (ParseException parseEx) { + if (s_logger.isDebugEnabled()) { + s_logger.debug("Invalid date parameter " + paramObj + " passed to command " + cmd.getName()); + } + throw new ServerApiException(BaseCmd.PARAM_ERROR, "Unable to parse date " + paramObj + " for command " + cmd.getName() + ", please pass dates in the format yyyy-MM-dd"); + } catch (CloudRuntimeException cloudEx) { + // FIXME: Better error message? This only happens if the API command is not executable, which typically means there was + // and IllegalAccessException setting one of the parameters. + throw new ServerApiException(BaseCmd.INTERNAL_ERROR, "Internal error executing API command " + cmd.getName()); + } + } + } + @SuppressWarnings({"unchecked", "rawtypes"}) private void setFieldValue(Field field, BaseCmd cmdObj, Object paramObj, Parameter annotation) throws IllegalArgumentException, ParseException { try { diff --git a/server/src/com/cloud/api/ApiServer.java b/server/src/com/cloud/api/ApiServer.java index 0a2c9f56446..2b2a62732dc 100644 --- a/server/src/com/cloud/api/ApiServer.java +++ b/server/src/com/cloud/api/ApiServer.java @@ -186,7 +186,7 @@ public class ApiServer implements HttpRequestHandler { _ms = (ManagementServer)ComponentLocator.getComponent(ManagementServer.Name); ComponentLocator locator = ComponentLocator.getLocator(ManagementServer.Name); _asyncMgr = locator.getManager(AsyncJobManager.class); - _dispatcher = new ApiDispatcher(); + _dispatcher = ApiDispatcher.getInstance(); int apiPort = 8096; // default port ConfigurationDao configDao = locator.getDao(ConfigurationDao.class); @@ -347,6 +347,10 @@ public class ApiServer implements HttpRequestHandler { private String queueCommand(BaseCmd cmdObj, Map params) { if (cmdObj instanceof BaseAsyncCmd) { + Long objectId = null; + if (cmdObj instanceof BaseAsyncCreateCmd) { + objectId = _dispatcher.dispatchCreateCmd((BaseAsyncCreateCmd)cmdObj, params); + } BaseAsyncCmd asyncCmd = (BaseAsyncCmd)cmdObj; Gson gson = GsonHelper.getBuilder().create(); @@ -356,6 +360,9 @@ public class ApiServer implements HttpRequestHandler { job.setCmd(cmdObj.getClass().getName()); job.setCmdInfo(gson.toJson(params)); long jobId = _asyncMgr.submitAsyncJob(job); + if (objectId != null) { + return ((BaseAsyncCreateCmd)asyncCmd).getResponse(jobId, objectId); + } return asyncCmd.getResponse(jobId); } else { _dispatcher.dispatch(cmdObj, params); diff --git a/server/src/com/cloud/api/BaseAsyncCreateCmd.java b/server/src/com/cloud/api/BaseAsyncCreateCmd.java index 7c22f011dd8..4936e9d4b39 100644 --- a/server/src/com/cloud/api/BaseAsyncCreateCmd.java +++ b/server/src/com/cloud/api/BaseAsyncCreateCmd.java @@ -1,7 +1,10 @@ package com.cloud.api; +import com.cloud.api.response.CreateCmdResponse; +import com.cloud.serializer.SerializerHelper; + public abstract class BaseAsyncCreateCmd extends BaseAsyncCmd { - @Parameter(name="portforwardingserviceid") + @Parameter(name="id") private Long id; public Long getId() { @@ -11,4 +14,11 @@ public abstract class BaseAsyncCreateCmd extends BaseAsyncCmd { public void setId(Long id) { this.id = id; } + + public String getResponse(long jobId, long objectId) { + CreateCmdResponse response = new CreateCmdResponse(); + response.setJobId(jobId); + response.setId(objectId); + return SerializerHelper.toSerializedString(response); + } } diff --git a/server/src/com/cloud/api/BaseCmd.java b/server/src/com/cloud/api/BaseCmd.java index a45353834a6..45f3afbb3d9 100644 --- a/server/src/com/cloud/api/BaseCmd.java +++ b/server/src/com/cloud/api/BaseCmd.java @@ -223,7 +223,7 @@ public abstract class BaseCmd { } // FIXME: move this to a utils method so that maps can be unpacked and integer/long values can be appropriately cast - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "rawtypes"}) public Map unpackParams(Map params) { Map lowercaseParams = new HashMap(); for (String key : params.keySet()) { diff --git a/server/src/com/cloud/api/response/CreateCmdResponse.java b/server/src/com/cloud/api/response/CreateCmdResponse.java new file mode 100644 index 00000000000..cd7c4619ef9 --- /dev/null +++ b/server/src/com/cloud/api/response/CreateCmdResponse.java @@ -0,0 +1,28 @@ +package com.cloud.api.response; + +import com.cloud.api.ResponseObject; +import com.cloud.serializer.Param; + +public class CreateCmdResponse implements ResponseObject { + @Param(name="jobid") + private Long jobId; + + @Param(name="id") + private Long id; + + public Long getJobId() { + return jobId; + } + + public void setJobId(Long jobId) { + this.jobId = jobId; + } + + public Long getId() { + return id; + } + + public void setId(Long id) { + this.id = id; + } +} diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java index a0abb2628e3..41fe92a8340 100644 --- a/server/src/com/cloud/async/AsyncJobManagerImpl.java +++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java @@ -18,6 +18,7 @@ package com.cloud.async; +import java.lang.reflect.Type; import java.util.Date; import java.util.List; import java.util.Map; @@ -33,10 +34,13 @@ import javax.naming.ConfigurationException; import org.apache.log4j.Logger; import org.apache.log4j.NDC; +import com.cloud.api.ApiDispatcher; +import com.cloud.api.BaseCmd; import com.cloud.async.dao.AsyncJobDao; import com.cloud.cluster.ClusterManager; import com.cloud.configuration.dao.ConfigurationDao; import com.cloud.maid.StackMaid; +import com.cloud.serializer.GsonHelper; import com.cloud.serializer.SerializerHelper; import com.cloud.utils.DateUtil; import com.cloud.utils.NumbersUtil; @@ -47,6 +51,8 @@ import com.cloud.utils.db.GlobalLock; import com.cloud.utils.db.Transaction; import com.cloud.utils.exception.CloudRuntimeException; import com.cloud.utils.net.MacAddress; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; @Local(value={AsyncJobManager.class}) public class AsyncJobManagerImpl implements AsyncJobManager { @@ -63,7 +69,8 @@ public class AsyncJobManagerImpl implements AsyncJobManager { private SyncQueueManager _queueMgr; private ClusterManager _clusterMgr; private AsyncJobDao _jobDao; - private long _jobExpireSeconds = 86400; // 1 day + private long _jobExpireSeconds = 86400; // 1 day + private ApiDispatcher _dispatcher; private final ScheduledExecutorService _heartbeatScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("AsyncJobMgr-Heartbeat")); @@ -94,26 +101,28 @@ public class AsyncJobManagerImpl implements AsyncJobManager { if(s_logger.isDebugEnabled()) s_logger.debug("submit async job-" + job.getId() + ", details: " + job.toString()); - AsyncJobExecutor executor = getJobExecutor(job); - if(executor == null) { - s_logger.error("Unable to find executor to execute command " + job.getCmd() + " for job-" + job.getId()); - } else { +// AsyncJobExecutor executor = getJobExecutor(job); +// if(executor == null) { +// s_logger.error("Unable to find executor to execute command " + job.getCmd() + " for job-" + job.getId()); +// } else { Transaction txt = Transaction.currentTxn(); try { txt.start(); job.setInitMsid(getMsid()); _jobDao.persist(job); txt.commit(); - // no sync source originally - executor.setSyncSource(null); - executor.setJob(job); - scheduleExecution(executor, scheduleJobExecutionInContext); + // no sync source originally + // FIXME: sync source for commands? how does new API framework handle that? +// executor.setSyncSource(null); +// executor.setJob(job); +// scheduleExecution(executor, scheduleJobExecutionInContext); + scheduleExecution(job, scheduleJobExecutionInContext); return job.getId(); } catch(Exception e) { s_logger.error("Unexpected exception: ", e); txt.rollback(); } - } +// } return 0L; } @@ -143,7 +152,8 @@ public class AsyncJobManagerImpl implements AsyncJobManager { // reset attached object job.setInstanceType(null); job.setInstanceId(null); - + + // FIXME: do we need to re-serialize here? if(resultObject != null) job.setResult(SerializerHelper.toSerializedString(resultObject)); job.setLastUpdated(DateUtil.currentGMTTime()); @@ -287,7 +297,8 @@ public class AsyncJobManagerImpl implements AsyncJobManager { return jobResult; } - + + /* old code...remove for new API framework private AsyncJobExecutor getJobExecutor(AsyncJobVO job) { String executorClazzName = "com.cloud.async.executor." + job.getCmd() + "Executor"; @@ -302,12 +313,14 @@ public class AsyncJobManagerImpl implements AsyncJobManager { s_logger.error("Unable to load async-job executor class: " + executorClazzName, e); } return null; - } - + } + + // old code...remove for new API framework private void scheduleExecution(final AsyncJobExecutor executor) { scheduleExecution(executor, false); } - + + // old code...remove for new API framework private void scheduleExecution(final AsyncJobExecutor executor, boolean executeInContext) { Runnable runnable = getExecutorRunnable(executor); if(executeInContext) @@ -315,7 +328,8 @@ public class AsyncJobManagerImpl implements AsyncJobManager { else _executor.submit(runnable); } - + + // old code...remove for new API framework private Runnable getExecutorRunnable(final AsyncJobExecutor executor) { return new Runnable() { public void run() { @@ -369,31 +383,122 @@ public class AsyncJobManagerImpl implements AsyncJobManager { } }; } - - private void executeQueueItem(SyncQueueItemVO item, boolean fromPreviousSession) { - AsyncJobVO job = _jobDao.findById(item.getContentId()); - if(job != null) { - AsyncJobExecutor executor = getJobExecutor(job); - if(executor == null) { - s_logger.error("Unable to find job exectutor for job-" + job.getId()); - _queueMgr.purgeItem(item.getId()); - } else { - if(s_logger.isDebugEnabled()) - s_logger.debug("Schedule queued job-" + job.getId()); - - executor.setFromPreviousSession(fromPreviousSession); - executor.setSyncSource(item); - executor.setJob(job); - scheduleExecution(executor); - } - } else { - if(s_logger.isDebugEnabled()) - s_logger.debug("Unable to find related job for queue item: " + item.toString()); - - _queueMgr.purgeItem(item.getId()); - } - } + */ + private void scheduleExecution(final AsyncJobVO job) { + scheduleExecution(job, false); + } + + private void scheduleExecution(final AsyncJobVO job, boolean executeInContext) { + Runnable runnable = getExecutorRunnable(job); + if (executeInContext) { + runnable.run(); + } else { + _executor.submit(runnable); + } + } + + private Runnable getExecutorRunnable(final AsyncJobVO job) { + return new Runnable() { + public void run() { + long jobId = 0; + + Transaction txn = Transaction.open(Transaction.CLOUD_DB); + try { + jobId = job.getId(); + NDC.push("job-" + jobId); + + if(s_logger.isDebugEnabled()) { + s_logger.debug("Executing " + job.getCmd() + " for job-" + jobId); + } + + Class cmdClass = Class.forName(job.getCmd()); + BaseCmd cmdObj = (BaseCmd)cmdClass.newInstance(); + Type mapType = new TypeToken>() {}.getType(); + Gson gson = GsonHelper.getBuilder().create(); + Map params = gson.fromJson(job.getCmdInfo(), mapType); + + // FIXME: whenever we deserialize, the UserContext needs to be updated + //UserContext.registerContext(userId, accountObject, accountName, accountId, domainId, sessionId, apiServer); + + // FIXME: things might need to be queued as part of synchronization here, so they just have to be re-dispatched from the queue + // mechanism... + _dispatcher.dispatch(cmdObj, params); + + // serialize this to the async job table + completeAsyncJob(jobId, AsyncJobResult.STATUS_SUCCEEDED, 0, cmdObj.getResponse()); + + if(s_logger.isDebugEnabled()) + s_logger.debug("Done executing " + job.getCmd() + " for job-" + jobId); + + } catch(Throwable e) { + s_logger.error("Unexpected exception while executing " + job.getCmd(), e); + + /* FIXME: need to clean up any queue that happened as part of the dispatching + try { + if(executor.getSyncSource() != null) { + _queueMgr.purgeItem(executor.getSyncSource().getId()); + + checkQueue(executor.getSyncSource().getQueueId()); + } + } catch(Throwable ex) { + s_logger.fatal("Exception on exception, log it for record", ex); + } + */ + } finally { + StackMaid.current().exitCleanup(); + txn.close(); + NDC.pop(); + } + + // leave no trace out after execution for security reason +// BaseAsyncJobExecutor.setCurrentExecutor(null); + } + }; + } + + /* Old method...remove as part of API refactoring... + private void executeQueueItem(SyncQueueItemVO item, boolean fromPreviousSession) { + AsyncJobVO job = _jobDao.findById(item.getContentId()); + if(job != null) { + AsyncJobExecutor executor = getJobExecutor(job); + if(executor == null) { + s_logger.error("Unable to find job exectutor for job-" + job.getId()); + _queueMgr.purgeItem(item.getId()); + } else { + if(s_logger.isDebugEnabled()) + s_logger.debug("Schedule queued job-" + job.getId()); + + executor.setFromPreviousSession(fromPreviousSession); + executor.setSyncSource(item); + executor.setJob(job); + scheduleExecution(executor); + } + } else { + if(s_logger.isDebugEnabled()) + s_logger.debug("Unable to find related job for queue item: " + item.toString()); + + _queueMgr.purgeItem(item.getId()); + } + } + */ + + private void executeQueueItem(SyncQueueItemVO item, boolean fromPreviousSession) { + AsyncJobVO job = _jobDao.findById(item.getContentId()); + if (job != null) { + if(s_logger.isDebugEnabled()) { + s_logger.debug("Schedule queued job-" + job.getId()); + } + + scheduleExecution(job); + } else { + if(s_logger.isDebugEnabled()) + s_logger.debug("Unable to find related job for queue item: " + item.toString()); + + _queueMgr.purgeItem(item.getId()); + } + } + @Override public void releaseSyncSource(AsyncJobExecutor executor) { if(executor.getSyncSource() != null) { @@ -551,7 +656,10 @@ public class AsyncJobManagerImpl implements AsyncJobManager { + SyncQueueManager.class.getName()); } - _clusterMgr = locator.getManager(ClusterManager.class); + _clusterMgr = locator.getManager(ClusterManager.class); + + _dispatcher = ApiDispatcher.getInstance(); + return true; }