Get rid of annoying communicating through exception between API/Job manager, decouple job manager from API manager for VMsync usage

This commit is contained in:
Kelven Yang 2013-04-09 18:05:14 -07:00
parent 34cae6349c
commit aa108fcaec
7 changed files with 68 additions and 108 deletions

View File

@ -788,14 +788,19 @@
<!-- Async management -->
<bean id="ApiAsyncJobDispatcher" class="com.cloud.api.ApiAsyncJobDispatcher" />
<bean id="asyncJobDaoImpl" class="com.cloud.async.dao.AsyncJobDaoImpl" />
<bean id="asyncJobJoinDaoImpl" class="com.cloud.api.query.dao.AsyncJobJoinDaoImpl" />
<bean id="asyncJobManagerImpl" class="com.cloud.async.AsyncJobManagerImpl" />
<bean id="asyncJobManagerImpl" class="com.cloud.async.AsyncJobManagerImpl">
<property name="defaultDispatcher" value="ApiAsyncJobDispatcher" />
</bean>
<bean id="syncQueueDaoImpl" class="com.cloud.async.dao.SyncQueueDaoImpl" />
<bean id="syncQueueItemDaoImpl" class="com.cloud.async.dao.SyncQueueItemDaoImpl" />
<bean id="syncQueueManagerImpl" class="com.cloud.async.SyncQueueManagerImpl" />
<bean id="ApiAsyncJobDispatcher" class="com.cloud.api.ApiAsyncJobDispatcher">
<property name="name" value="ApiAsyncJobDispatcher" />
</bean>
<!--
Baremetal components
-->

View File

@ -91,31 +91,25 @@ public class ApiAsyncJobDispatcher extends AdapterBase implements AsyncJobDispat
UserContext.unregisterContext();
}
} catch(Throwable e) {
if (e instanceof AsyncCommandQueued) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("job " + job.getCmd() + " for job-" + job.getId() + " was queued, processing the queue.");
}
String errorMsg = null;
int errorCode = ApiErrorCode.INTERNAL_ERROR.getHttpCode();
if (!(e instanceof ServerApiException)) {
s_logger.error("Unexpected exception while executing " + job.getCmd(), e);
errorMsg = e.getMessage();
} else {
String errorMsg = null;
int errorCode = ApiErrorCode.INTERNAL_ERROR.getHttpCode();
if (!(e instanceof ServerApiException)) {
s_logger.error("Unexpected exception while executing " + job.getCmd(), e);
errorMsg = e.getMessage();
} else {
ServerApiException sApiEx = (ServerApiException)e;
errorMsg = sApiEx.getDescription();
errorCode = sApiEx.getErrorCode().getHttpCode();
}
ExceptionResponse response = new ExceptionResponse();
response.setErrorCode(errorCode);
response.setErrorText(errorMsg);
response.setResponseName((cmdObj == null) ? "unknowncommandresponse" : cmdObj.getCommandName());
// FIXME: setting resultCode to ApiErrorCode.INTERNAL_ERROR is not right, usually executors have their exception handling
// and we need to preserve that as much as possible here
_asyncJobMgr.completeAsyncJob(job.getId(), AsyncJobResult.STATUS_FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), response);
ServerApiException sApiEx = (ServerApiException)e;
errorMsg = sApiEx.getDescription();
errorCode = sApiEx.getErrorCode().getHttpCode();
}
ExceptionResponse response = new ExceptionResponse();
response.setErrorCode(errorCode);
response.setErrorText(errorMsg);
response.setResponseName((cmdObj == null) ? "unknowncommandresponse" : cmdObj.getCommandName());
// FIXME: setting resultCode to ApiErrorCode.INTERNAL_ERROR is not right, usually executors have their exception handling
// and we need to preserve that as much as possible here
_asyncJobMgr.completeAsyncJob(job.getId(), AsyncJobResult.STATUS_FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), response);
}
}
}

View File

@ -55,6 +55,7 @@ import org.apache.cloudstack.api.command.user.event.ListEventsCmd;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Component;
import com.cloud.async.AsyncJobExecutionContext;
import com.cloud.async.AsyncJobManager;
import com.cloud.dao.EntityManager;
import com.cloud.exception.InvalidParameterValueException;
@ -126,35 +127,38 @@ public class ApiDispatcher {
}
public void dispatch(BaseCmd cmd, Map<String, String> params) throws Exception {
processParameters(cmd, params);
UserContext ctx = UserContext.current();
ctx.setAccountId(cmd.getEntityOwnerId());
if (cmd instanceof BaseAsyncCmd) {
processParameters(cmd, params);
UserContext ctx = UserContext.current();
ctx.setAccountId(cmd.getEntityOwnerId());
if (cmd instanceof BaseAsyncCmd) {
BaseAsyncCmd asyncCmd = (BaseAsyncCmd) cmd;
String startEventId = params.get("ctxStartEventId");
ctx.setStartEventId(Long.valueOf(startEventId));
BaseAsyncCmd asyncCmd = (BaseAsyncCmd) cmd;
String startEventId = params.get("ctxStartEventId");
ctx.setStartEventId(Long.valueOf(startEventId));
// Synchronise job on the object if needed
if (asyncCmd.getJob() != null && asyncCmd.getSyncObjId() != null && asyncCmd.getSyncObjType() != null) {
Long queueSizeLimit = null;
if (asyncCmd.getSyncObjType() != null && asyncCmd.getSyncObjType().equalsIgnoreCase(BaseAsyncCmd.snapshotHostSyncObject)) {
queueSizeLimit = _createSnapshotQueueSizeLimit;
} else {
queueSizeLimit = 1L;
}
// Synchronise job on the object if needed
if (asyncCmd.getJob() != null && asyncCmd.getSyncObjId() != null && asyncCmd.getSyncObjType() != null) {
Long queueSizeLimit = null;
if (asyncCmd.getSyncObjType() != null && asyncCmd.getSyncObjType().equalsIgnoreCase(BaseAsyncCmd.snapshotHostSyncObject)) {
queueSizeLimit = _createSnapshotQueueSizeLimit;
} else {
queueSizeLimit = 1L;
}
if (queueSizeLimit != null) {
_asyncMgr.syncAsyncJobExecution(asyncCmd.getJob(), asyncCmd.getSyncObjType(), asyncCmd.getSyncObjId().longValue(), queueSizeLimit);
} else {
s_logger.trace("The queue size is unlimited, skipping the synchronizing");
}
if (queueSizeLimit != null) {
if(AsyncJobExecutionContext.getCurrentExecutionContext() == null) {
// if we are not within async-execution context, enqueue the command
_asyncMgr.syncAsyncJobExecution(asyncCmd.getJob(), asyncCmd.getSyncObjType(), asyncCmd.getSyncObjId().longValue(), queueSizeLimit);
return;
}
} else {
s_logger.trace("The queue size is unlimited, skipping the synchronizing");
}
}
}
cmd.execute();
cmd.execute();
}
@SuppressWarnings({ "unchecked", "rawtypes" })

View File

@ -361,10 +361,6 @@ public class ApiServer implements HttpRequestHandler, ApiServerService {
}
throw new ServerApiException(ApiErrorCode.RESOURCE_UNAVAILABLE_ERROR, errorMsg, ex);
}
catch (AsyncCommandQueued ex){
s_logger.error("unhandled exception executing api command: " + ((command == null) ? "null" : command[0]), ex);
throw new ServerApiException(ApiErrorCode.INTERNAL_ERROR, "Internal server error, unable to execute request.");
}
catch (ServerApiException ex){
s_logger.info(ex.getDescription());
throw ex;

View File

@ -1,36 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.api;
import com.cloud.async.SyncQueueVO;
import com.cloud.utils.SerialVersionUID;
import com.cloud.utils.exception.CloudRuntimeException;
public class AsyncCommandQueued extends CloudRuntimeException {
private static final long serialVersionUID = SerialVersionUID.AsyncCommandQueued;
private SyncQueueVO _queue = null;
public AsyncCommandQueued(SyncQueueVO queue, String msg) {
super(msg);
_queue = queue;
}
public SyncQueueVO getQueue() {
return _queue;
}
}

View File

@ -19,7 +19,6 @@ package com.cloud.async;
import java.io.File;
import java.io.FileInputStream;
import java.lang.reflect.Type;
import java.util.Date;
import java.util.List;
import java.util.Map;
@ -31,7 +30,6 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.ejb.Local;
import javax.inject.Inject;
import javax.naming.ConfigurationException;
@ -40,11 +38,8 @@ import org.apache.cloudstack.api.command.user.job.QueryAsyncJobResultCmd;
import org.apache.cloudstack.api.response.ExceptionResponse;
import org.apache.log4j.Logger;
import org.apache.log4j.NDC;
import org.springframework.stereotype.Component;
import com.cloud.api.ApiAsyncJobDispatcher;
import com.cloud.api.ApiSerializerHelper;
import com.cloud.api.AsyncCommandQueued;
import com.cloud.async.dao.AsyncJobDao;
import com.cloud.cluster.ClusterManager;
import com.cloud.cluster.ClusterManagerListener;
@ -87,7 +82,16 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
@Inject private ConfigurationDao _configDao;
@Inject private List<AsyncJobDispatcher> _jobDispatchers;
private long _jobExpireSeconds = 86400; // 1 day
// property
private String defaultDispatcher;
public String getDefaultDispatcher() {
return defaultDispatcher;
}
public void setDefaultDispatcher(String defaultDispatcher) {
this.defaultDispatcher = defaultDispatcher;
}
private long _jobExpireSeconds = 86400; // 1 day
private long _jobCancelThresholdSeconds = 3600; // 1 hour (for cancelling the jobs blocking other jobs)
private final ScheduledExecutorService _heartbeatScheduler =
@ -239,15 +243,6 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
@Override
public void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId, long queueSizeLimit) {
// This method is re-entrant. If an API developer wants to synchronized on an object, e.g. the router,
// when executing business logic, they will call this method (actually a method in BaseAsyncCmd that calls this).
// This method will get called every time their business logic executes. The first time it exectues for a job
// there will be no sync source, but on subsequent execution there will be a sync souce. If this is the first
// time the job executes we queue the job, otherwise we just return so that the business logic can execute.
if (job.getSyncSource() != null) {
return;
}
if(s_logger.isDebugEnabled()) {
s_logger.debug("Sync job-" + job.getId() + " execution on object " + syncObjType + "." + syncObjId);
}
@ -270,11 +265,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
}
}
if (queue == null) {
if (queue == null)
throw new CloudRuntimeException("Unable to insert queue item into database, DB is full?");
} else {
throw new AsyncCommandQueued(queue, "job-" + job.getId() + " queued");
}
}
@Override
@ -372,7 +364,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
private AsyncJobDispatcher getDispatcher(String dispatcherName) {
if(dispatcherName == null || dispatcherName.isEmpty())
dispatcherName = ApiAsyncJobDispatcher.class.getSimpleName();
dispatcherName = this.defaultDispatcher;
if(_jobDispatchers != null) {
for(AsyncJobDispatcher dispatcher : _jobDispatchers) {

View File

@ -405,3 +405,8 @@ INSERT INTO `cloud`.`vm_template` (id, unique_name, name, public, created, type,
VALUES (10, 'routing-10', 'SystemVM Template (LXC)', 0, now(), 'SYSTEM', 0, 64, 1, 'http://download.cloud.com/templates/acton/acton-systemvm-02062012.qcow2.bz2', '2755de1f9ef2ce4d6f2bee2efbb4da92', 0, 'SystemVM Template (LXC)', 'QCOW2', 15, 0, 1, 'LXC');
-- END: support for LXC
ALTER TABLE `cloud`.`async_job` DROP COLUMN `session_key`;
ALTER TABLE `cloud`.`async_job` DROP COLUMN `job_cmd_originator`;
ALTER TABLE `cloud`.`async_job` ADD COLUMN `job_dispatcher` VARCHAR(64);