From aa108fcaeca796a2649c9d88b14fe49def956dfa Mon Sep 17 00:00:00 2001 From: Kelven Yang Date: Tue, 9 Apr 2013 18:05:14 -0700 Subject: [PATCH] Get rid of annoying communicating through exception between API/Job manager, decouple job manager from API manager for VMsync usage --- client/tomcatconf/applicationContext.xml.in | 9 +++- .../com/cloud/api/ApiAsyncJobDispatcher.java | 40 +++++++-------- server/src/com/cloud/api/ApiDispatcher.java | 50 ++++++++++--------- server/src/com/cloud/api/ApiServer.java | 4 -- .../src/com/cloud/api/AsyncCommandQueued.java | 36 ------------- .../com/cloud/async/AsyncJobManagerImpl.java | 32 +++++------- setup/db/db/schema-410to420.sql | 5 ++ 7 files changed, 68 insertions(+), 108 deletions(-) delete mode 100644 server/src/com/cloud/api/AsyncCommandQueued.java diff --git a/client/tomcatconf/applicationContext.xml.in b/client/tomcatconf/applicationContext.xml.in index 5d2fb2cc1cf..aca4dbea43d 100644 --- a/client/tomcatconf/applicationContext.xml.in +++ b/client/tomcatconf/applicationContext.xml.in @@ -788,14 +788,19 @@ - - + + + + + + + diff --git a/server/src/com/cloud/api/ApiAsyncJobDispatcher.java b/server/src/com/cloud/api/ApiAsyncJobDispatcher.java index 98ed5bf4f88..98a7747e080 100644 --- a/server/src/com/cloud/api/ApiAsyncJobDispatcher.java +++ b/server/src/com/cloud/api/ApiAsyncJobDispatcher.java @@ -91,31 +91,25 @@ public class ApiAsyncJobDispatcher extends AdapterBase implements AsyncJobDispat UserContext.unregisterContext(); } } catch(Throwable e) { - if (e instanceof AsyncCommandQueued) { - if (s_logger.isDebugEnabled()) { - s_logger.debug("job " + job.getCmd() + " for job-" + job.getId() + " was queued, processing the queue."); - } + String errorMsg = null; + int errorCode = ApiErrorCode.INTERNAL_ERROR.getHttpCode(); + if (!(e instanceof ServerApiException)) { + s_logger.error("Unexpected exception while executing " + job.getCmd(), e); + errorMsg = e.getMessage(); } else { - String errorMsg = null; - int errorCode = ApiErrorCode.INTERNAL_ERROR.getHttpCode(); - if (!(e instanceof ServerApiException)) { - s_logger.error("Unexpected exception while executing " + job.getCmd(), e); - errorMsg = e.getMessage(); - } else { - ServerApiException sApiEx = (ServerApiException)e; - errorMsg = sApiEx.getDescription(); - errorCode = sApiEx.getErrorCode().getHttpCode(); - } - - ExceptionResponse response = new ExceptionResponse(); - response.setErrorCode(errorCode); - response.setErrorText(errorMsg); - response.setResponseName((cmdObj == null) ? "unknowncommandresponse" : cmdObj.getCommandName()); - - // FIXME: setting resultCode to ApiErrorCode.INTERNAL_ERROR is not right, usually executors have their exception handling - // and we need to preserve that as much as possible here - _asyncJobMgr.completeAsyncJob(job.getId(), AsyncJobResult.STATUS_FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), response); + ServerApiException sApiEx = (ServerApiException)e; + errorMsg = sApiEx.getDescription(); + errorCode = sApiEx.getErrorCode().getHttpCode(); } + + ExceptionResponse response = new ExceptionResponse(); + response.setErrorCode(errorCode); + response.setErrorText(errorMsg); + response.setResponseName((cmdObj == null) ? "unknowncommandresponse" : cmdObj.getCommandName()); + + // FIXME: setting resultCode to ApiErrorCode.INTERNAL_ERROR is not right, usually executors have their exception handling + // and we need to preserve that as much as possible here + _asyncJobMgr.completeAsyncJob(job.getId(), AsyncJobResult.STATUS_FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), response); } } } diff --git a/server/src/com/cloud/api/ApiDispatcher.java b/server/src/com/cloud/api/ApiDispatcher.java index 9298360b470..a8594144d93 100755 --- a/server/src/com/cloud/api/ApiDispatcher.java +++ b/server/src/com/cloud/api/ApiDispatcher.java @@ -55,6 +55,7 @@ import org.apache.cloudstack.api.command.user.event.ListEventsCmd; import org.apache.log4j.Logger; import org.springframework.stereotype.Component; +import com.cloud.async.AsyncJobExecutionContext; import com.cloud.async.AsyncJobManager; import com.cloud.dao.EntityManager; import com.cloud.exception.InvalidParameterValueException; @@ -126,35 +127,38 @@ public class ApiDispatcher { } public void dispatch(BaseCmd cmd, Map params) throws Exception { - processParameters(cmd, params); - UserContext ctx = UserContext.current(); - ctx.setAccountId(cmd.getEntityOwnerId()); - - if (cmd instanceof BaseAsyncCmd) { + processParameters(cmd, params); + UserContext ctx = UserContext.current(); + ctx.setAccountId(cmd.getEntityOwnerId()); + + if (cmd instanceof BaseAsyncCmd) { - BaseAsyncCmd asyncCmd = (BaseAsyncCmd) cmd; - String startEventId = params.get("ctxStartEventId"); - ctx.setStartEventId(Long.valueOf(startEventId)); + BaseAsyncCmd asyncCmd = (BaseAsyncCmd) cmd; + String startEventId = params.get("ctxStartEventId"); + ctx.setStartEventId(Long.valueOf(startEventId)); - // Synchronise job on the object if needed - if (asyncCmd.getJob() != null && asyncCmd.getSyncObjId() != null && asyncCmd.getSyncObjType() != null) { - Long queueSizeLimit = null; - if (asyncCmd.getSyncObjType() != null && asyncCmd.getSyncObjType().equalsIgnoreCase(BaseAsyncCmd.snapshotHostSyncObject)) { - queueSizeLimit = _createSnapshotQueueSizeLimit; - } else { - queueSizeLimit = 1L; - } + // Synchronise job on the object if needed + if (asyncCmd.getJob() != null && asyncCmd.getSyncObjId() != null && asyncCmd.getSyncObjType() != null) { + Long queueSizeLimit = null; + if (asyncCmd.getSyncObjType() != null && asyncCmd.getSyncObjType().equalsIgnoreCase(BaseAsyncCmd.snapshotHostSyncObject)) { + queueSizeLimit = _createSnapshotQueueSizeLimit; + } else { + queueSizeLimit = 1L; + } - if (queueSizeLimit != null) { - _asyncMgr.syncAsyncJobExecution(asyncCmd.getJob(), asyncCmd.getSyncObjType(), asyncCmd.getSyncObjId().longValue(), queueSizeLimit); - } else { - s_logger.trace("The queue size is unlimited, skipping the synchronizing"); - } + if (queueSizeLimit != null) { + if(AsyncJobExecutionContext.getCurrentExecutionContext() == null) { + // if we are not within async-execution context, enqueue the command + _asyncMgr.syncAsyncJobExecution(asyncCmd.getJob(), asyncCmd.getSyncObjType(), asyncCmd.getSyncObjId().longValue(), queueSizeLimit); + return; + } + } else { + s_logger.trace("The queue size is unlimited, skipping the synchronizing"); } } + } - cmd.execute(); - + cmd.execute(); } @SuppressWarnings({ "unchecked", "rawtypes" }) diff --git a/server/src/com/cloud/api/ApiServer.java b/server/src/com/cloud/api/ApiServer.java index d4d88b36d58..42c5a739b56 100755 --- a/server/src/com/cloud/api/ApiServer.java +++ b/server/src/com/cloud/api/ApiServer.java @@ -361,10 +361,6 @@ public class ApiServer implements HttpRequestHandler, ApiServerService { } throw new ServerApiException(ApiErrorCode.RESOURCE_UNAVAILABLE_ERROR, errorMsg, ex); } - catch (AsyncCommandQueued ex){ - s_logger.error("unhandled exception executing api command: " + ((command == null) ? "null" : command[0]), ex); - throw new ServerApiException(ApiErrorCode.INTERNAL_ERROR, "Internal server error, unable to execute request."); - } catch (ServerApiException ex){ s_logger.info(ex.getDescription()); throw ex; diff --git a/server/src/com/cloud/api/AsyncCommandQueued.java b/server/src/com/cloud/api/AsyncCommandQueued.java deleted file mode 100644 index ecd38c86be5..00000000000 --- a/server/src/com/cloud/api/AsyncCommandQueued.java +++ /dev/null @@ -1,36 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -package com.cloud.api; - -import com.cloud.async.SyncQueueVO; -import com.cloud.utils.SerialVersionUID; -import com.cloud.utils.exception.CloudRuntimeException; - -public class AsyncCommandQueued extends CloudRuntimeException { - private static final long serialVersionUID = SerialVersionUID.AsyncCommandQueued; - - private SyncQueueVO _queue = null; - - public AsyncCommandQueued(SyncQueueVO queue, String msg) { - super(msg); - _queue = queue; - } - - public SyncQueueVO getQueue() { - return _queue; - } -} diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java index dfd6a016b11..586f40e668b 100644 --- a/server/src/com/cloud/async/AsyncJobManagerImpl.java +++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java @@ -19,7 +19,6 @@ package com.cloud.async; import java.io.File; import java.io.FileInputStream; -import java.lang.reflect.Type; import java.util.Date; import java.util.List; import java.util.Map; @@ -31,7 +30,6 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import javax.ejb.Local; import javax.inject.Inject; import javax.naming.ConfigurationException; @@ -40,11 +38,8 @@ import org.apache.cloudstack.api.command.user.job.QueryAsyncJobResultCmd; import org.apache.cloudstack.api.response.ExceptionResponse; import org.apache.log4j.Logger; import org.apache.log4j.NDC; -import org.springframework.stereotype.Component; -import com.cloud.api.ApiAsyncJobDispatcher; import com.cloud.api.ApiSerializerHelper; -import com.cloud.api.AsyncCommandQueued; import com.cloud.async.dao.AsyncJobDao; import com.cloud.cluster.ClusterManager; import com.cloud.cluster.ClusterManagerListener; @@ -87,7 +82,16 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, @Inject private ConfigurationDao _configDao; @Inject private List _jobDispatchers; - private long _jobExpireSeconds = 86400; // 1 day + // property + private String defaultDispatcher; + public String getDefaultDispatcher() { + return defaultDispatcher; + } + public void setDefaultDispatcher(String defaultDispatcher) { + this.defaultDispatcher = defaultDispatcher; + } + + private long _jobExpireSeconds = 86400; // 1 day private long _jobCancelThresholdSeconds = 3600; // 1 hour (for cancelling the jobs blocking other jobs) private final ScheduledExecutorService _heartbeatScheduler = @@ -239,15 +243,6 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, @Override public void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId, long queueSizeLimit) { - // This method is re-entrant. If an API developer wants to synchronized on an object, e.g. the router, - // when executing business logic, they will call this method (actually a method in BaseAsyncCmd that calls this). - // This method will get called every time their business logic executes. The first time it exectues for a job - // there will be no sync source, but on subsequent execution there will be a sync souce. If this is the first - // time the job executes we queue the job, otherwise we just return so that the business logic can execute. - if (job.getSyncSource() != null) { - return; - } - if(s_logger.isDebugEnabled()) { s_logger.debug("Sync job-" + job.getId() + " execution on object " + syncObjType + "." + syncObjId); } @@ -270,11 +265,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, } } - if (queue == null) { + if (queue == null) throw new CloudRuntimeException("Unable to insert queue item into database, DB is full?"); - } else { - throw new AsyncCommandQueued(queue, "job-" + job.getId() + " queued"); - } } @Override @@ -372,7 +364,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, private AsyncJobDispatcher getDispatcher(String dispatcherName) { if(dispatcherName == null || dispatcherName.isEmpty()) - dispatcherName = ApiAsyncJobDispatcher.class.getSimpleName(); + dispatcherName = this.defaultDispatcher; if(_jobDispatchers != null) { for(AsyncJobDispatcher dispatcher : _jobDispatchers) { diff --git a/setup/db/db/schema-410to420.sql b/setup/db/db/schema-410to420.sql index ab9df05103b..fecdc0fd696 100644 --- a/setup/db/db/schema-410to420.sql +++ b/setup/db/db/schema-410to420.sql @@ -405,3 +405,8 @@ INSERT INTO `cloud`.`vm_template` (id, unique_name, name, public, created, type, VALUES (10, 'routing-10', 'SystemVM Template (LXC)', 0, now(), 'SYSTEM', 0, 64, 1, 'http://download.cloud.com/templates/acton/acton-systemvm-02062012.qcow2.bz2', '2755de1f9ef2ce4d6f2bee2efbb4da92', 0, 'SystemVM Template (LXC)', 'QCOW2', 15, 0, 1, 'LXC'); -- END: support for LXC + +ALTER TABLE `cloud`.`async_job` DROP COLUMN `session_key`; +ALTER TABLE `cloud`.`async_job` DROP COLUMN `job_cmd_originator`; +ALTER TABLE `cloud`.`async_job` ADD COLUMN `job_dispatcher` VARCHAR(64); +