diff --git a/client/tomcatconf/applicationContext.xml.in b/client/tomcatconf/applicationContext.xml.in
index 5d2fb2cc1cf..aca4dbea43d 100644
--- a/client/tomcatconf/applicationContext.xml.in
+++ b/client/tomcatconf/applicationContext.xml.in
@@ -788,14 +788,19 @@
-
-
+
+
+
+
+
+
+
diff --git a/server/src/com/cloud/api/ApiAsyncJobDispatcher.java b/server/src/com/cloud/api/ApiAsyncJobDispatcher.java
index 98ed5bf4f88..98a7747e080 100644
--- a/server/src/com/cloud/api/ApiAsyncJobDispatcher.java
+++ b/server/src/com/cloud/api/ApiAsyncJobDispatcher.java
@@ -91,31 +91,25 @@ public class ApiAsyncJobDispatcher extends AdapterBase implements AsyncJobDispat
UserContext.unregisterContext();
}
} catch(Throwable e) {
- if (e instanceof AsyncCommandQueued) {
- if (s_logger.isDebugEnabled()) {
- s_logger.debug("job " + job.getCmd() + " for job-" + job.getId() + " was queued, processing the queue.");
- }
+ String errorMsg = null;
+ int errorCode = ApiErrorCode.INTERNAL_ERROR.getHttpCode();
+ if (!(e instanceof ServerApiException)) {
+ s_logger.error("Unexpected exception while executing " + job.getCmd(), e);
+ errorMsg = e.getMessage();
} else {
- String errorMsg = null;
- int errorCode = ApiErrorCode.INTERNAL_ERROR.getHttpCode();
- if (!(e instanceof ServerApiException)) {
- s_logger.error("Unexpected exception while executing " + job.getCmd(), e);
- errorMsg = e.getMessage();
- } else {
- ServerApiException sApiEx = (ServerApiException)e;
- errorMsg = sApiEx.getDescription();
- errorCode = sApiEx.getErrorCode().getHttpCode();
- }
-
- ExceptionResponse response = new ExceptionResponse();
- response.setErrorCode(errorCode);
- response.setErrorText(errorMsg);
- response.setResponseName((cmdObj == null) ? "unknowncommandresponse" : cmdObj.getCommandName());
-
- // FIXME: setting resultCode to ApiErrorCode.INTERNAL_ERROR is not right, usually executors have their exception handling
- // and we need to preserve that as much as possible here
- _asyncJobMgr.completeAsyncJob(job.getId(), AsyncJobResult.STATUS_FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), response);
+ ServerApiException sApiEx = (ServerApiException)e;
+ errorMsg = sApiEx.getDescription();
+ errorCode = sApiEx.getErrorCode().getHttpCode();
}
+
+ ExceptionResponse response = new ExceptionResponse();
+ response.setErrorCode(errorCode);
+ response.setErrorText(errorMsg);
+ response.setResponseName((cmdObj == null) ? "unknowncommandresponse" : cmdObj.getCommandName());
+
+ // FIXME: setting resultCode to ApiErrorCode.INTERNAL_ERROR is not right, usually executors have their exception handling
+ // and we need to preserve that as much as possible here
+ _asyncJobMgr.completeAsyncJob(job.getId(), AsyncJobResult.STATUS_FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), response);
}
}
}
diff --git a/server/src/com/cloud/api/ApiDispatcher.java b/server/src/com/cloud/api/ApiDispatcher.java
index 9298360b470..a8594144d93 100755
--- a/server/src/com/cloud/api/ApiDispatcher.java
+++ b/server/src/com/cloud/api/ApiDispatcher.java
@@ -55,6 +55,7 @@ import org.apache.cloudstack.api.command.user.event.ListEventsCmd;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Component;
+import com.cloud.async.AsyncJobExecutionContext;
import com.cloud.async.AsyncJobManager;
import com.cloud.dao.EntityManager;
import com.cloud.exception.InvalidParameterValueException;
@@ -126,35 +127,38 @@ public class ApiDispatcher {
}
public void dispatch(BaseCmd cmd, Map params) throws Exception {
- processParameters(cmd, params);
- UserContext ctx = UserContext.current();
- ctx.setAccountId(cmd.getEntityOwnerId());
-
- if (cmd instanceof BaseAsyncCmd) {
+ processParameters(cmd, params);
+ UserContext ctx = UserContext.current();
+ ctx.setAccountId(cmd.getEntityOwnerId());
+
+ if (cmd instanceof BaseAsyncCmd) {
- BaseAsyncCmd asyncCmd = (BaseAsyncCmd) cmd;
- String startEventId = params.get("ctxStartEventId");
- ctx.setStartEventId(Long.valueOf(startEventId));
+ BaseAsyncCmd asyncCmd = (BaseAsyncCmd) cmd;
+ String startEventId = params.get("ctxStartEventId");
+ ctx.setStartEventId(Long.valueOf(startEventId));
- // Synchronise job on the object if needed
- if (asyncCmd.getJob() != null && asyncCmd.getSyncObjId() != null && asyncCmd.getSyncObjType() != null) {
- Long queueSizeLimit = null;
- if (asyncCmd.getSyncObjType() != null && asyncCmd.getSyncObjType().equalsIgnoreCase(BaseAsyncCmd.snapshotHostSyncObject)) {
- queueSizeLimit = _createSnapshotQueueSizeLimit;
- } else {
- queueSizeLimit = 1L;
- }
+ // Synchronise job on the object if needed
+ if (asyncCmd.getJob() != null && asyncCmd.getSyncObjId() != null && asyncCmd.getSyncObjType() != null) {
+ Long queueSizeLimit = null;
+ if (asyncCmd.getSyncObjType() != null && asyncCmd.getSyncObjType().equalsIgnoreCase(BaseAsyncCmd.snapshotHostSyncObject)) {
+ queueSizeLimit = _createSnapshotQueueSizeLimit;
+ } else {
+ queueSizeLimit = 1L;
+ }
- if (queueSizeLimit != null) {
- _asyncMgr.syncAsyncJobExecution(asyncCmd.getJob(), asyncCmd.getSyncObjType(), asyncCmd.getSyncObjId().longValue(), queueSizeLimit);
- } else {
- s_logger.trace("The queue size is unlimited, skipping the synchronizing");
- }
+ if (queueSizeLimit != null) {
+ if(AsyncJobExecutionContext.getCurrentExecutionContext() == null) {
+ // if we are not within async-execution context, enqueue the command
+ _asyncMgr.syncAsyncJobExecution(asyncCmd.getJob(), asyncCmd.getSyncObjType(), asyncCmd.getSyncObjId().longValue(), queueSizeLimit);
+ return;
+ }
+ } else {
+ s_logger.trace("The queue size is unlimited, skipping the synchronizing");
}
}
+ }
- cmd.execute();
-
+ cmd.execute();
}
@SuppressWarnings({ "unchecked", "rawtypes" })
diff --git a/server/src/com/cloud/api/ApiServer.java b/server/src/com/cloud/api/ApiServer.java
index d4d88b36d58..42c5a739b56 100755
--- a/server/src/com/cloud/api/ApiServer.java
+++ b/server/src/com/cloud/api/ApiServer.java
@@ -361,10 +361,6 @@ public class ApiServer implements HttpRequestHandler, ApiServerService {
}
throw new ServerApiException(ApiErrorCode.RESOURCE_UNAVAILABLE_ERROR, errorMsg, ex);
}
- catch (AsyncCommandQueued ex){
- s_logger.error("unhandled exception executing api command: " + ((command == null) ? "null" : command[0]), ex);
- throw new ServerApiException(ApiErrorCode.INTERNAL_ERROR, "Internal server error, unable to execute request.");
- }
catch (ServerApiException ex){
s_logger.info(ex.getDescription());
throw ex;
diff --git a/server/src/com/cloud/api/AsyncCommandQueued.java b/server/src/com/cloud/api/AsyncCommandQueued.java
deleted file mode 100644
index ecd38c86be5..00000000000
--- a/server/src/com/cloud/api/AsyncCommandQueued.java
+++ /dev/null
@@ -1,36 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloud.api;
-
-import com.cloud.async.SyncQueueVO;
-import com.cloud.utils.SerialVersionUID;
-import com.cloud.utils.exception.CloudRuntimeException;
-
-public class AsyncCommandQueued extends CloudRuntimeException {
- private static final long serialVersionUID = SerialVersionUID.AsyncCommandQueued;
-
- private SyncQueueVO _queue = null;
-
- public AsyncCommandQueued(SyncQueueVO queue, String msg) {
- super(msg);
- _queue = queue;
- }
-
- public SyncQueueVO getQueue() {
- return _queue;
- }
-}
diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java
index dfd6a016b11..586f40e668b 100644
--- a/server/src/com/cloud/async/AsyncJobManagerImpl.java
+++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java
@@ -19,7 +19,6 @@ package com.cloud.async;
import java.io.File;
import java.io.FileInputStream;
-import java.lang.reflect.Type;
import java.util.Date;
import java.util.List;
import java.util.Map;
@@ -31,7 +30,6 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import javax.ejb.Local;
import javax.inject.Inject;
import javax.naming.ConfigurationException;
@@ -40,11 +38,8 @@ import org.apache.cloudstack.api.command.user.job.QueryAsyncJobResultCmd;
import org.apache.cloudstack.api.response.ExceptionResponse;
import org.apache.log4j.Logger;
import org.apache.log4j.NDC;
-import org.springframework.stereotype.Component;
-import com.cloud.api.ApiAsyncJobDispatcher;
import com.cloud.api.ApiSerializerHelper;
-import com.cloud.api.AsyncCommandQueued;
import com.cloud.async.dao.AsyncJobDao;
import com.cloud.cluster.ClusterManager;
import com.cloud.cluster.ClusterManagerListener;
@@ -87,7 +82,16 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
@Inject private ConfigurationDao _configDao;
@Inject private List _jobDispatchers;
- private long _jobExpireSeconds = 86400; // 1 day
+ // property
+ private String defaultDispatcher;
+ public String getDefaultDispatcher() {
+ return defaultDispatcher;
+ }
+ public void setDefaultDispatcher(String defaultDispatcher) {
+ this.defaultDispatcher = defaultDispatcher;
+ }
+
+ private long _jobExpireSeconds = 86400; // 1 day
private long _jobCancelThresholdSeconds = 3600; // 1 hour (for cancelling the jobs blocking other jobs)
private final ScheduledExecutorService _heartbeatScheduler =
@@ -239,15 +243,6 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
@Override
public void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId, long queueSizeLimit) {
- // This method is re-entrant. If an API developer wants to synchronized on an object, e.g. the router,
- // when executing business logic, they will call this method (actually a method in BaseAsyncCmd that calls this).
- // This method will get called every time their business logic executes. The first time it exectues for a job
- // there will be no sync source, but on subsequent execution there will be a sync souce. If this is the first
- // time the job executes we queue the job, otherwise we just return so that the business logic can execute.
- if (job.getSyncSource() != null) {
- return;
- }
-
if(s_logger.isDebugEnabled()) {
s_logger.debug("Sync job-" + job.getId() + " execution on object " + syncObjType + "." + syncObjId);
}
@@ -270,11 +265,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
}
}
- if (queue == null) {
+ if (queue == null)
throw new CloudRuntimeException("Unable to insert queue item into database, DB is full?");
- } else {
- throw new AsyncCommandQueued(queue, "job-" + job.getId() + " queued");
- }
}
@Override
@@ -372,7 +364,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
private AsyncJobDispatcher getDispatcher(String dispatcherName) {
if(dispatcherName == null || dispatcherName.isEmpty())
- dispatcherName = ApiAsyncJobDispatcher.class.getSimpleName();
+ dispatcherName = this.defaultDispatcher;
if(_jobDispatchers != null) {
for(AsyncJobDispatcher dispatcher : _jobDispatchers) {
diff --git a/setup/db/db/schema-410to420.sql b/setup/db/db/schema-410to420.sql
index ab9df05103b..fecdc0fd696 100644
--- a/setup/db/db/schema-410to420.sql
+++ b/setup/db/db/schema-410to420.sql
@@ -405,3 +405,8 @@ INSERT INTO `cloud`.`vm_template` (id, unique_name, name, public, created, type,
VALUES (10, 'routing-10', 'SystemVM Template (LXC)', 0, now(), 'SYSTEM', 0, 64, 1, 'http://download.cloud.com/templates/acton/acton-systemvm-02062012.qcow2.bz2', '2755de1f9ef2ce4d6f2bee2efbb4da92', 0, 'SystemVM Template (LXC)', 'QCOW2', 15, 0, 1, 'LXC');
-- END: support for LXC
+
+ALTER TABLE `cloud`.`async_job` DROP COLUMN `session_key`;
+ALTER TABLE `cloud`.`async_job` DROP COLUMN `job_cmd_originator`;
+ALTER TABLE `cloud`.`async_job` ADD COLUMN `job_dispatcher` VARCHAR(64);
+