diff --git a/server/src/com/cloud/api/ApiDispatcher.java b/server/src/com/cloud/api/ApiDispatcher.java index 477b200add0..5aa0545e886 100644 --- a/server/src/com/cloud/api/ApiDispatcher.java +++ b/server/src/com/cloud/api/ApiDispatcher.java @@ -38,7 +38,14 @@ public class ApiDispatcher { private StorageManager _storageMgr; private UserVmManager _userVmMgr; - public ApiDispatcher() { + // singleton class + private static ApiDispatcher s_instance = new ApiDispatcher(); + + public static ApiDispatcher getInstance() { + return s_instance; + } + + private ApiDispatcher() { ComponentLocator locator = ComponentLocator.getLocator(ManagementServer.Name); _mgmtServer = (ManagementServer)ComponentLocator.getComponent(ManagementServer.Name); _configMgr = locator.getManager(ConfigurationManager.class); @@ -49,42 +56,55 @@ public class ApiDispatcher { _userVmMgr = locator.getManager(UserVmManager.class); } - public void dispatch(BaseCmd cmd, Map params) { - Map unpackedParams = cmd.unpackParams(params); - Field[] fields = cmd.getClass().getDeclaredFields(); - for (Field field : fields) { - Parameter parameterAnnotation = field.getAnnotation(Parameter.class); - if (parameterAnnotation == null) { - continue; - } - Object paramObj = unpackedParams.get(parameterAnnotation.name()); - if (paramObj == null) { - if (parameterAnnotation.required()) { - throw new ServerApiException(BaseCmd.PARAM_ERROR, "Unable to execute API command " + cmd.getName() + " due to missing parameter " + parameterAnnotation.name()); - } - continue; - } + public Long dispatchCreateCmd(BaseAsyncCreateCmd cmd, Map params) { + setupParameters(cmd, params); - // marshall the parameter into the correct type and set the field value - try { - setFieldValue(field, cmd, paramObj, parameterAnnotation); - } catch (IllegalArgumentException argEx) { - if (s_logger.isDebugEnabled()) { - s_logger.debug("Unable to execute API command " + cmd.getName() + " due to invalid value " + paramObj + " for parameter " + parameterAnnotation.name()); - } - throw new ServerApiException(BaseCmd.PARAM_ERROR, "Unable to execute API command " + cmd.getName() + " due to invalid value " + paramObj + " for parameter " + parameterAnnotation.name()); - } catch (ParseException parseEx) { - if (s_logger.isDebugEnabled()) { - s_logger.debug("Invalid date parameter " + paramObj + " passed to command " + cmd.getName()); - } - throw new ServerApiException(BaseCmd.PARAM_ERROR, "Unable to parse date " + paramObj + " for command " + cmd.getName() + ", please pass dates in the format yyyy-MM-dd"); - } catch (CloudRuntimeException cloudEx) { - // FIXME: Better error message? This only happens if the API command is not executable, which typically means there was - // and IllegalAccessException setting one of the parameters. - throw new ServerApiException(BaseCmd.INTERNAL_ERROR, "Internal error executing API command " + cmd.getName()); - } + Implementation impl = cmd.getClass().getAnnotation(Implementation.class); + String methodName = impl.createMethod(); + Object mgr = _mgmtServer; + switch (impl.manager()) { + case ConfigManager: + mgr = _configMgr; + break; + case NetworkGroupManager: + mgr = _networkGroupMgr; + break; + case NetworkManager: + mgr = _networkMgr; + break; + case SnapshotManager: + mgr = _snapshotMgr; + break; + case StorageManager: + mgr = _storageMgr; + break; + case UserVmManager: + mgr = _userVmMgr; + break; } + try { + Method method = mgr.getClass().getMethod(methodName, cmd.getClass()); + method.invoke(mgr, cmd); + return cmd.getId(); + } catch (NoSuchMethodException nsme) { + s_logger.warn("Exception executing method " + methodName + " for command " + cmd.getClass().getSimpleName(), nsme); + throw new CloudRuntimeException("Unable to execute method " + methodName + " for command " + cmd.getClass().getSimpleName() + ", unable to find implementation."); + } catch (InvocationTargetException ite) { + s_logger.warn("Exception executing method " + methodName + " for command " + cmd.getClass().getSimpleName(), ite); + throw new CloudRuntimeException("Unable to execute method " + methodName + " for command " + cmd.getClass().getSimpleName() + ", internal error in the implementation."); + } catch (IllegalAccessException iae) { + s_logger.warn("Exception executing method " + methodName + " for command " + cmd.getClass().getSimpleName(), iae); + throw new CloudRuntimeException("Unable to execute method " + methodName + " for command " + cmd.getClass().getSimpleName() + ", internal error in the implementation."); + } catch (IllegalArgumentException iArgEx) { + s_logger.warn("Exception executing method " + methodName + " for command " + cmd.getClass().getSimpleName(), iArgEx); + throw new CloudRuntimeException("Unable to execute method " + methodName + " for command " + cmd.getClass().getSimpleName() + ", internal error in the implementation."); + } + } + + public void dispatch(BaseCmd cmd, Map params) { + setupParameters(cmd, params); + Implementation impl = cmd.getClass().getAnnotation(Implementation.class); String methodName = impl.method(); Object mgr = _mgmtServer; @@ -127,6 +147,43 @@ public class ApiDispatcher { } } + private void setupParameters(BaseCmd cmd, Map params) { + Map unpackedParams = cmd.unpackParams(params); + Field[] fields = cmd.getClass().getDeclaredFields(); + for (Field field : fields) { + Parameter parameterAnnotation = field.getAnnotation(Parameter.class); + if (parameterAnnotation == null) { + continue; + } + Object paramObj = unpackedParams.get(parameterAnnotation.name()); + if (paramObj == null) { + if (parameterAnnotation.required()) { + throw new ServerApiException(BaseCmd.PARAM_ERROR, "Unable to execute API command " + cmd.getName() + " due to missing parameter " + parameterAnnotation.name()); + } + continue; + } + + // marshall the parameter into the correct type and set the field value + try { + setFieldValue(field, cmd, paramObj, parameterAnnotation); + } catch (IllegalArgumentException argEx) { + if (s_logger.isDebugEnabled()) { + s_logger.debug("Unable to execute API command " + cmd.getName() + " due to invalid value " + paramObj + " for parameter " + parameterAnnotation.name()); + } + throw new ServerApiException(BaseCmd.PARAM_ERROR, "Unable to execute API command " + cmd.getName() + " due to invalid value " + paramObj + " for parameter " + parameterAnnotation.name()); + } catch (ParseException parseEx) { + if (s_logger.isDebugEnabled()) { + s_logger.debug("Invalid date parameter " + paramObj + " passed to command " + cmd.getName()); + } + throw new ServerApiException(BaseCmd.PARAM_ERROR, "Unable to parse date " + paramObj + " for command " + cmd.getName() + ", please pass dates in the format yyyy-MM-dd"); + } catch (CloudRuntimeException cloudEx) { + // FIXME: Better error message? This only happens if the API command is not executable, which typically means there was + // and IllegalAccessException setting one of the parameters. + throw new ServerApiException(BaseCmd.INTERNAL_ERROR, "Internal error executing API command " + cmd.getName()); + } + } + } + @SuppressWarnings({"unchecked", "rawtypes"}) private void setFieldValue(Field field, BaseCmd cmdObj, Object paramObj, Parameter annotation) throws IllegalArgumentException, ParseException { try { diff --git a/server/src/com/cloud/api/ApiServer.java b/server/src/com/cloud/api/ApiServer.java index 0a2c9f56446..2b2a62732dc 100644 --- a/server/src/com/cloud/api/ApiServer.java +++ b/server/src/com/cloud/api/ApiServer.java @@ -186,7 +186,7 @@ public class ApiServer implements HttpRequestHandler { _ms = (ManagementServer)ComponentLocator.getComponent(ManagementServer.Name); ComponentLocator locator = ComponentLocator.getLocator(ManagementServer.Name); _asyncMgr = locator.getManager(AsyncJobManager.class); - _dispatcher = new ApiDispatcher(); + _dispatcher = ApiDispatcher.getInstance(); int apiPort = 8096; // default port ConfigurationDao configDao = locator.getDao(ConfigurationDao.class); @@ -347,6 +347,10 @@ public class ApiServer implements HttpRequestHandler { private String queueCommand(BaseCmd cmdObj, Map params) { if (cmdObj instanceof BaseAsyncCmd) { + Long objectId = null; + if (cmdObj instanceof BaseAsyncCreateCmd) { + objectId = _dispatcher.dispatchCreateCmd((BaseAsyncCreateCmd)cmdObj, params); + } BaseAsyncCmd asyncCmd = (BaseAsyncCmd)cmdObj; Gson gson = GsonHelper.getBuilder().create(); @@ -356,6 +360,9 @@ public class ApiServer implements HttpRequestHandler { job.setCmd(cmdObj.getClass().getName()); job.setCmdInfo(gson.toJson(params)); long jobId = _asyncMgr.submitAsyncJob(job); + if (objectId != null) { + return ((BaseAsyncCreateCmd)asyncCmd).getResponse(jobId, objectId); + } return asyncCmd.getResponse(jobId); } else { _dispatcher.dispatch(cmdObj, params); diff --git a/server/src/com/cloud/api/BaseAsyncCreateCmd.java b/server/src/com/cloud/api/BaseAsyncCreateCmd.java index 7c22f011dd8..4936e9d4b39 100644 --- a/server/src/com/cloud/api/BaseAsyncCreateCmd.java +++ b/server/src/com/cloud/api/BaseAsyncCreateCmd.java @@ -1,7 +1,10 @@ package com.cloud.api; +import com.cloud.api.response.CreateCmdResponse; +import com.cloud.serializer.SerializerHelper; + public abstract class BaseAsyncCreateCmd extends BaseAsyncCmd { - @Parameter(name="portforwardingserviceid") + @Parameter(name="id") private Long id; public Long getId() { @@ -11,4 +14,11 @@ public abstract class BaseAsyncCreateCmd extends BaseAsyncCmd { public void setId(Long id) { this.id = id; } + + public String getResponse(long jobId, long objectId) { + CreateCmdResponse response = new CreateCmdResponse(); + response.setJobId(jobId); + response.setId(objectId); + return SerializerHelper.toSerializedString(response); + } } diff --git a/server/src/com/cloud/api/BaseCmd.java b/server/src/com/cloud/api/BaseCmd.java index a45353834a6..45f3afbb3d9 100644 --- a/server/src/com/cloud/api/BaseCmd.java +++ b/server/src/com/cloud/api/BaseCmd.java @@ -223,7 +223,7 @@ public abstract class BaseCmd { } // FIXME: move this to a utils method so that maps can be unpacked and integer/long values can be appropriately cast - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "rawtypes"}) public Map unpackParams(Map params) { Map lowercaseParams = new HashMap(); for (String key : params.keySet()) { diff --git a/server/src/com/cloud/api/response/CreateCmdResponse.java b/server/src/com/cloud/api/response/CreateCmdResponse.java new file mode 100644 index 00000000000..cd7c4619ef9 --- /dev/null +++ b/server/src/com/cloud/api/response/CreateCmdResponse.java @@ -0,0 +1,28 @@ +package com.cloud.api.response; + +import com.cloud.api.ResponseObject; +import com.cloud.serializer.Param; + +public class CreateCmdResponse implements ResponseObject { + @Param(name="jobid") + private Long jobId; + + @Param(name="id") + private Long id; + + public Long getJobId() { + return jobId; + } + + public void setJobId(Long jobId) { + this.jobId = jobId; + } + + public Long getId() { + return id; + } + + public void setId(Long id) { + this.id = id; + } +} diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java index a0abb2628e3..41fe92a8340 100644 --- a/server/src/com/cloud/async/AsyncJobManagerImpl.java +++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java @@ -18,6 +18,7 @@ package com.cloud.async; +import java.lang.reflect.Type; import java.util.Date; import java.util.List; import java.util.Map; @@ -33,10 +34,13 @@ import javax.naming.ConfigurationException; import org.apache.log4j.Logger; import org.apache.log4j.NDC; +import com.cloud.api.ApiDispatcher; +import com.cloud.api.BaseCmd; import com.cloud.async.dao.AsyncJobDao; import com.cloud.cluster.ClusterManager; import com.cloud.configuration.dao.ConfigurationDao; import com.cloud.maid.StackMaid; +import com.cloud.serializer.GsonHelper; import com.cloud.serializer.SerializerHelper; import com.cloud.utils.DateUtil; import com.cloud.utils.NumbersUtil; @@ -47,6 +51,8 @@ import com.cloud.utils.db.GlobalLock; import com.cloud.utils.db.Transaction; import com.cloud.utils.exception.CloudRuntimeException; import com.cloud.utils.net.MacAddress; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; @Local(value={AsyncJobManager.class}) public class AsyncJobManagerImpl implements AsyncJobManager { @@ -63,7 +69,8 @@ public class AsyncJobManagerImpl implements AsyncJobManager { private SyncQueueManager _queueMgr; private ClusterManager _clusterMgr; private AsyncJobDao _jobDao; - private long _jobExpireSeconds = 86400; // 1 day + private long _jobExpireSeconds = 86400; // 1 day + private ApiDispatcher _dispatcher; private final ScheduledExecutorService _heartbeatScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("AsyncJobMgr-Heartbeat")); @@ -94,26 +101,28 @@ public class AsyncJobManagerImpl implements AsyncJobManager { if(s_logger.isDebugEnabled()) s_logger.debug("submit async job-" + job.getId() + ", details: " + job.toString()); - AsyncJobExecutor executor = getJobExecutor(job); - if(executor == null) { - s_logger.error("Unable to find executor to execute command " + job.getCmd() + " for job-" + job.getId()); - } else { +// AsyncJobExecutor executor = getJobExecutor(job); +// if(executor == null) { +// s_logger.error("Unable to find executor to execute command " + job.getCmd() + " for job-" + job.getId()); +// } else { Transaction txt = Transaction.currentTxn(); try { txt.start(); job.setInitMsid(getMsid()); _jobDao.persist(job); txt.commit(); - // no sync source originally - executor.setSyncSource(null); - executor.setJob(job); - scheduleExecution(executor, scheduleJobExecutionInContext); + // no sync source originally + // FIXME: sync source for commands? how does new API framework handle that? +// executor.setSyncSource(null); +// executor.setJob(job); +// scheduleExecution(executor, scheduleJobExecutionInContext); + scheduleExecution(job, scheduleJobExecutionInContext); return job.getId(); } catch(Exception e) { s_logger.error("Unexpected exception: ", e); txt.rollback(); } - } +// } return 0L; } @@ -143,7 +152,8 @@ public class AsyncJobManagerImpl implements AsyncJobManager { // reset attached object job.setInstanceType(null); job.setInstanceId(null); - + + // FIXME: do we need to re-serialize here? if(resultObject != null) job.setResult(SerializerHelper.toSerializedString(resultObject)); job.setLastUpdated(DateUtil.currentGMTTime()); @@ -287,7 +297,8 @@ public class AsyncJobManagerImpl implements AsyncJobManager { return jobResult; } - + + /* old code...remove for new API framework private AsyncJobExecutor getJobExecutor(AsyncJobVO job) { String executorClazzName = "com.cloud.async.executor." + job.getCmd() + "Executor"; @@ -302,12 +313,14 @@ public class AsyncJobManagerImpl implements AsyncJobManager { s_logger.error("Unable to load async-job executor class: " + executorClazzName, e); } return null; - } - + } + + // old code...remove for new API framework private void scheduleExecution(final AsyncJobExecutor executor) { scheduleExecution(executor, false); } - + + // old code...remove for new API framework private void scheduleExecution(final AsyncJobExecutor executor, boolean executeInContext) { Runnable runnable = getExecutorRunnable(executor); if(executeInContext) @@ -315,7 +328,8 @@ public class AsyncJobManagerImpl implements AsyncJobManager { else _executor.submit(runnable); } - + + // old code...remove for new API framework private Runnable getExecutorRunnable(final AsyncJobExecutor executor) { return new Runnable() { public void run() { @@ -369,31 +383,122 @@ public class AsyncJobManagerImpl implements AsyncJobManager { } }; } - - private void executeQueueItem(SyncQueueItemVO item, boolean fromPreviousSession) { - AsyncJobVO job = _jobDao.findById(item.getContentId()); - if(job != null) { - AsyncJobExecutor executor = getJobExecutor(job); - if(executor == null) { - s_logger.error("Unable to find job exectutor for job-" + job.getId()); - _queueMgr.purgeItem(item.getId()); - } else { - if(s_logger.isDebugEnabled()) - s_logger.debug("Schedule queued job-" + job.getId()); - - executor.setFromPreviousSession(fromPreviousSession); - executor.setSyncSource(item); - executor.setJob(job); - scheduleExecution(executor); - } - } else { - if(s_logger.isDebugEnabled()) - s_logger.debug("Unable to find related job for queue item: " + item.toString()); - - _queueMgr.purgeItem(item.getId()); - } - } + */ + private void scheduleExecution(final AsyncJobVO job) { + scheduleExecution(job, false); + } + + private void scheduleExecution(final AsyncJobVO job, boolean executeInContext) { + Runnable runnable = getExecutorRunnable(job); + if (executeInContext) { + runnable.run(); + } else { + _executor.submit(runnable); + } + } + + private Runnable getExecutorRunnable(final AsyncJobVO job) { + return new Runnable() { + public void run() { + long jobId = 0; + + Transaction txn = Transaction.open(Transaction.CLOUD_DB); + try { + jobId = job.getId(); + NDC.push("job-" + jobId); + + if(s_logger.isDebugEnabled()) { + s_logger.debug("Executing " + job.getCmd() + " for job-" + jobId); + } + + Class cmdClass = Class.forName(job.getCmd()); + BaseCmd cmdObj = (BaseCmd)cmdClass.newInstance(); + Type mapType = new TypeToken>() {}.getType(); + Gson gson = GsonHelper.getBuilder().create(); + Map params = gson.fromJson(job.getCmdInfo(), mapType); + + // FIXME: whenever we deserialize, the UserContext needs to be updated + //UserContext.registerContext(userId, accountObject, accountName, accountId, domainId, sessionId, apiServer); + + // FIXME: things might need to be queued as part of synchronization here, so they just have to be re-dispatched from the queue + // mechanism... + _dispatcher.dispatch(cmdObj, params); + + // serialize this to the async job table + completeAsyncJob(jobId, AsyncJobResult.STATUS_SUCCEEDED, 0, cmdObj.getResponse()); + + if(s_logger.isDebugEnabled()) + s_logger.debug("Done executing " + job.getCmd() + " for job-" + jobId); + + } catch(Throwable e) { + s_logger.error("Unexpected exception while executing " + job.getCmd(), e); + + /* FIXME: need to clean up any queue that happened as part of the dispatching + try { + if(executor.getSyncSource() != null) { + _queueMgr.purgeItem(executor.getSyncSource().getId()); + + checkQueue(executor.getSyncSource().getQueueId()); + } + } catch(Throwable ex) { + s_logger.fatal("Exception on exception, log it for record", ex); + } + */ + } finally { + StackMaid.current().exitCleanup(); + txn.close(); + NDC.pop(); + } + + // leave no trace out after execution for security reason +// BaseAsyncJobExecutor.setCurrentExecutor(null); + } + }; + } + + /* Old method...remove as part of API refactoring... + private void executeQueueItem(SyncQueueItemVO item, boolean fromPreviousSession) { + AsyncJobVO job = _jobDao.findById(item.getContentId()); + if(job != null) { + AsyncJobExecutor executor = getJobExecutor(job); + if(executor == null) { + s_logger.error("Unable to find job exectutor for job-" + job.getId()); + _queueMgr.purgeItem(item.getId()); + } else { + if(s_logger.isDebugEnabled()) + s_logger.debug("Schedule queued job-" + job.getId()); + + executor.setFromPreviousSession(fromPreviousSession); + executor.setSyncSource(item); + executor.setJob(job); + scheduleExecution(executor); + } + } else { + if(s_logger.isDebugEnabled()) + s_logger.debug("Unable to find related job for queue item: " + item.toString()); + + _queueMgr.purgeItem(item.getId()); + } + } + */ + + private void executeQueueItem(SyncQueueItemVO item, boolean fromPreviousSession) { + AsyncJobVO job = _jobDao.findById(item.getContentId()); + if (job != null) { + if(s_logger.isDebugEnabled()) { + s_logger.debug("Schedule queued job-" + job.getId()); + } + + scheduleExecution(job); + } else { + if(s_logger.isDebugEnabled()) + s_logger.debug("Unable to find related job for queue item: " + item.toString()); + + _queueMgr.purgeItem(item.getId()); + } + } + @Override public void releaseSyncSource(AsyncJobExecutor executor) { if(executor.getSyncSource() != null) { @@ -551,7 +656,10 @@ public class AsyncJobManagerImpl implements AsyncJobManager { + SyncQueueManager.class.getName()); } - _clusterMgr = locator.getManager(ClusterManager.class); + _clusterMgr = locator.getManager(ClusterManager.class); + + _dispatcher = ApiDispatcher.getInstance(); + return true; }