Completely Isolated job manager from user

This commit is contained in:
Alex Huang 2013-06-15 05:15:53 -07:00
parent ea6ca5ff5c
commit aad2bc78c1
21 changed files with 205 additions and 242 deletions

View File

@ -61,6 +61,10 @@ public class ConfigKey<T> {
return _scope;
}
public boolean isDynamic() {
return _isDynamic;
}
@Override
public String toString() {
return _name;
@ -73,8 +77,10 @@ public class ConfigKey<T> {
private final String _description;
private final String _range;
private final String _scope; // Parameter can be at different levels (Zone/cluster/pool/account), by default every parameter is at global
private final boolean _isDynamic;
public ConfigKey(Class<T> type, String name, String category, Class<?> componentClass, String defaultValue, String description, String range, String scope) {
public ConfigKey(Class<T> type, String name, String category, Class<?> componentClass, String defaultValue, String description, boolean isDynamic, String range,
String scope) {
_category = category;
_componentClass = componentClass;
_type = type;
@ -83,9 +89,10 @@ public class ConfigKey<T> {
_description = description;
_range = range;
_scope = scope;
_isDynamic = isDynamic;
}
public ConfigKey(Class<T> type, String name, String category, Class<?> componentClass, String defaultValue, String description, String range) {
this(type, name, category, componentClass, defaultValue, description, range, null);
public ConfigKey(Class<T> type, String name, String category, Class<?> componentClass, String defaultValue, String description, boolean isDynamic, String range) {
this(type, name, category, componentClass, defaultValue, description, isDynamic, range, null);
}
}

View File

@ -0,0 +1,21 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.cloudstack.config;
public interface Configurable {
ConfigKey<?>[] getConfigKeys();
}

View File

@ -20,7 +20,6 @@ package org.apache.cloudstack.config;
* ConfigRepo is a repository of configurations.
*
*/
public interface ConfigRepo {
public interface ConfigDepot {
<T> ConfigValue<T> get(ConfigKey<T> key);
}

View File

@ -22,24 +22,24 @@ import org.apache.cloudstack.engine.service.api.OrchestrationService;
public interface Configs {
public static final ConfigKey<Integer> StartRetry = new ConfigKey<Integer>(
Integer.class, "start.retry", "Advanced", OrchestrationService.class, "10", "Number of times to retry create and start commands", null);
Integer.class, "start.retry", "Advanced", OrchestrationService.class, "10", "Number of times to retry create and start commands", true, null);
public static final ConfigKey<Long> VmOpWaitInterval = new ConfigKey<Long>(
Long.class, "vm.op.wait.interval", "Advanced", OrchestrationService.class, "120", "Time (in seconds) to wait before checking if a previous operation has succeeded",
null);
true, null);
public static final ConfigKey<Integer> VmOpLockStateRetry = new ConfigKey<Integer>(
Integer.class, "vm.op.lock.state.retry", "Advanced", OrchestrationService.class, "5", "Times to retry locking the state of a VM for operations", "-1 means try forever");
Integer.class, "vm.op.lock.state.retry", "Advanced", OrchestrationService.class, "5", "Times to retry locking the state of a VM for operations",
true, "-1 means try forever");
public static final ConfigKey<Long> VmOpCleanupInterval = new ConfigKey<Long>(
Long.class, "vm.op.cleanup.interval", "Advanced", OrchestrationService.class, "86400", "Interval to run the thread that cleans up the vm operations (in seconds)",
"Seconds");
false, "Seconds");
public static final ConfigKey<Long> VmOpCleanupWait = new ConfigKey<Long>(
Long.class, "vm.op.cleanup.wait", "Advanced", OrchestrationService.class, "3600", "Time (in seconds) to wait before cleanuping up any vm work items", "Seconds");
Long.class, "vm.op.cleanup.wait", "Advanced", OrchestrationService.class, "3600", "Time (in seconds) to wait before cleanuping up any vm work items", false, "Seconds");
public static final ConfigKey<Integer> VmOpCancelInterval = new ConfigKey<Integer>(
Integer.class, "vm.op.cancel.interval", "Advanced", OrchestrationService.class, "3600", "Time (in seconds) to wait before cancelling a operation", "Seconds");
Integer.class, "vm.op.cancel.interval", "Advanced", OrchestrationService.class, "3600", "Time (in seconds) to wait before cancelling a operation", false, "Seconds");
public static final ConfigKey<Integer> Wait = new ConfigKey<Integer>(
Integer.class, "wait", "Advanced", OrchestrationService.class, "1800", "Time in seconds to wait for control commands to return", null);
Integer.class, "wait", "Advanced", OrchestrationService.class, "1800", "Time in seconds to wait for control commands to return", false, null);
public static final ConfigKey<Boolean> VmDestroyForcestop = new ConfigKey<Boolean>(
Boolean.class, "vm.destroy.forcestop", "Advanced", OrchestrationService.class, "false", "On destroy, force-stop takes this value ", null);
Boolean.class, "vm.destroy.forcestop", "Advanced", OrchestrationService.class, "false", "On destroy, force-stop takes this value ", true, null);
public static final ConfigKey<Long> PingInterval = new ConfigKey<Long>(
Long.class, "ping.interval", "Advanced", OrchestrationService.class, "60", "Ping interval in seconds", null);

View File

@ -39,19 +39,19 @@ import javax.naming.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.cloudstack.affinity.dao.AffinityGroupVMMapDao;
import org.apache.cloudstack.config.ConfigRepo;
import org.apache.cloudstack.config.ConfigDepot;
import org.apache.cloudstack.config.ConfigValue;
import org.apache.cloudstack.context.CallContext;
import org.apache.cloudstack.engine.config.Configs;
import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
import org.apache.cloudstack.engine.subsystem.api.storage.StoragePoolAllocator;
import org.apache.cloudstack.framework.jobs.AsyncJob;
import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
import org.apache.cloudstack.framework.messagebus.MessageBus;
import org.apache.cloudstack.framework.messagebus.MessageDispatcher;
import org.apache.cloudstack.framework.messagebus.MessageHandler;
import org.apache.cloudstack.jobs.JobInfo;
import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
import org.apache.cloudstack.vm.jobs.VmWorkJobDao;
import org.apache.cloudstack.vm.jobs.VmWorkJobVO;
@ -175,7 +175,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
@Inject
protected EntityManager _entityMgr;
@Inject
ConfigRepo _configRepo;
ConfigDepot _configRepo;
@Inject
DataStoreManager _dataStoreMgr;
@Inject
@ -3536,7 +3536,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
// 1) no pending VmWork job
// 2) on hostId host and host is UP
//
// When host is UP, soon or later we will get a report from the host about the VM,
// When host is UP, soon or later we will get a report from the host about the VM,
// however, if VM is missing from the host report (it may happen in out of band changes
// or from designed behave of XS/KVM), the VM may not get a chance to run the state-sync logic
//
@ -3580,9 +3580,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
// VMs that in transitional state without recent power state report
@DB
private List<Long> listStalledVMInTransitionStateOnUpHost(long hostId, Date cutTime) {
String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status = 'UP' " +
String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status = 'UP' " +
"AND h.id = ? AND i.power_state_update_time < ? AND i.host_id = h.id " +
"AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " +
"AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " +
"AND i.id NOT IN (SELECT vm_instance_id FROM vm_work_job)";
List<Long> l = new ArrayList<Long>();
@ -3631,9 +3631,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
@DB
private List<Long> listStalledVMInTransitionStateOnDisconnectedHosts(Date cutTime) {
String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status != 'UP' " +
String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status != 'UP' " +
"AND i.power_state_update_time < ? AND i.host_id = h.id " +
"AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " +
"AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " +
"AND i.id NOT IN (SELECT vm_instance_id FROM vm_work_job)";
List<Long> l = new ArrayList<Long>();

View File

@ -27,9 +27,9 @@ import com.google.gson.GsonBuilder;
import org.apache.cloudstack.context.CallContext;
import org.apache.cloudstack.framework.jobs.AsyncJob;
import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
import org.apache.cloudstack.framework.jobs.AsyncJobDispatcher;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
import org.apache.cloudstack.jobs.JobInfo;
import com.cloud.api.StringMapTypeAdapter;
import com.cloud.dao.EntityManager;

View File

@ -21,7 +21,7 @@ import java.util.List;
import javax.annotation.PostConstruct;
import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
import org.apache.cloudstack.jobs.JobInfo;
import org.apache.cloudstack.vm.jobs.VmWorkJobVO.Step;
import com.cloud.utils.DateUtil;
@ -31,7 +31,6 @@ import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.SearchCriteria.Op;
import com.cloud.vm.VirtualMachine;
import com.cloud.vm.VirtualMachine.Type;
public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implements VmWorkJobDao {
@ -63,7 +62,8 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen
ExpungeWorkJobSearch.done();
}
public VmWorkJobVO findPendingWorkJob(VirtualMachine.Type type, long instanceId) {
@Override
public VmWorkJobVO findPendingWorkJob(VirtualMachine.Type type, long instanceId) {
SearchCriteria<VmWorkJobVO> sc = PendingWorkJobSearch.create();
sc.setParameters("vmType", type);
@ -78,7 +78,8 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen
return null;
}
public List<VmWorkJobVO> listPendingWorkJobs(VirtualMachine.Type type, long instanceId) {
@Override
public List<VmWorkJobVO> listPendingWorkJobs(VirtualMachine.Type type, long instanceId) {
SearchCriteria<VmWorkJobVO> sc = PendingWorkJobSearch.create();
sc.setParameters("vmType", type);
@ -89,7 +90,8 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen
return this.listBy(sc, filter);
}
public List<VmWorkJobVO> listPendingWorkJobs(VirtualMachine.Type type, long instanceId, String jobCmd) {
@Override
public List<VmWorkJobVO> listPendingWorkJobs(VirtualMachine.Type type, long instanceId, String jobCmd) {
SearchCriteria<VmWorkJobVO> sc = PendingWorkJobByCommandSearch.create();
sc.setParameters("vmType", type);
@ -101,17 +103,19 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen
return this.listBy(sc, filter);
}
public void updateStep(long workJobId, Step step) {
@Override
public void updateStep(long workJobId, Step step) {
VmWorkJobVO jobVo = findById(workJobId);
jobVo.setStep(step);
jobVo.setLastUpdated(DateUtil.currentGMTTime());
update(workJobId, jobVo);
}
public void expungeCompletedWorkJobs(Date cutDate) {
@Override
public void expungeCompletedWorkJobs(Date cutDate) {
SearchCriteria<VmWorkJobVO> sc = ExpungeWorkJobSearch.create();
sc.setParameters("lastUpdated",cutDate);
sc.setParameters("status", JobInfo.Status.IN_PROGRESS);
sc.setParameters("status", JobInfo.Status.IN_PROGRESS);
expunge(sc);
}

View File

@ -64,12 +64,12 @@ public class AsyncJobExecutionContext {
_job = job;
}
public void completeAsyncJob(JobInfo.Status jobStatus, int resultCode, Object resultObject) {
public void completeAsyncJob(JobInfo.Status jobStatus, int resultCode, String resultObject) {
assert(_job != null);
_jobMgr.completeAsyncJob(_job.getId(), jobStatus, resultCode, resultObject);
}
public void updateAsyncJobStatus(int processStatus, Object resultObject) {
public void updateAsyncJobStatus(int processStatus, String resultObject) {
assert(_job != null);
_jobMgr.updateAsyncJobStatus(_job.getId(), processStatus, resultObject);
}

View File

@ -18,7 +18,6 @@ package org.apache.cloudstack.framework.jobs;
import java.util.List;
import org.apache.cloudstack.api.command.user.job.QueryAsyncJobResultCmd;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
import org.apache.cloudstack.jobs.JobInfo;
@ -27,15 +26,18 @@ import com.cloud.utils.component.Manager;
public interface AsyncJobManager extends Manager {
AsyncJobVO getAsyncJob(long jobId);
public static final String JOB_POOL_THREAD_PREFIX = "Job-Executor";
AsyncJobVO getAsyncJob(long jobId);
List<? extends AsyncJob> findInstancePendingAsyncJobs(String instanceType, Long accountId);
long submitAsyncJob(AsyncJob job);
long submitAsyncJob(AsyncJob job, String syncObjType, long syncObjId);
void completeAsyncJob(long jobId, JobInfo.Status jobStatus, int resultCode, Object resultObject);
void updateAsyncJobStatus(long jobId, int processStatus, Object resultObject);
void completeAsyncJob(long jobId, JobInfo.Status jobStatus, int resultCode, String result);
void updateAsyncJobStatus(long jobId, int processStatus, String resultObject);
void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId);
void logJobJournal(long jobId, AsyncJob.JournalType journalType, String
journalText, String journalObjJson);
@ -118,11 +120,7 @@ public interface AsyncJobManager extends Manager {
@Deprecated
boolean waitAndCheck(String[] wakupTopicsOnMessageBus, long checkIntervalInMilliSeconds,
long timeoutInMiliseconds, Predicate predicate);
/**
* Queries for the status or final result of an async job.
* @param cmd the command that specifies the job id
* @return an async-call result object
*/
AsyncJob queryAsyncJobResult(QueryAsyncJobResultCmd cmd);
AsyncJob queryJob(long jobId, boolean updatePollTime);
}

View File

@ -27,7 +27,7 @@ import javax.naming.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.cloudstack.framework.jobs.AsyncJob;
import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
import org.apache.cloudstack.framework.messagebus.MessageBus;
import org.apache.cloudstack.framework.messagebus.MessageDispatcher;
import org.apache.cloudstack.framework.messagebus.MessageHandler;
@ -114,7 +114,7 @@ public class AsyncJobMonitor extends ManagerBase {
assert(_activeTasks.get(runNumber) == null);
long threadId = Thread.currentThread().getId();
boolean fromPoolThread = Thread.currentThread().getName().contains(AsyncJobConstants.JOB_POOL_THREAD_PREFIX);
boolean fromPoolThread = Thread.currentThread().getName().contains(AsyncJobManager.JOB_POOL_THREAD_PREFIX);
ActiveTaskRecord record = new ActiveTaskRecord(jobId, threadId, fromPoolThread);
_activeTasks.put(runNumber, record);
if(fromPoolThread)

View File

@ -93,7 +93,7 @@ public class ApiAsyncJobDispatcher extends AdapterBase implements AsyncJobDispat
_dispatcher.dispatch(cmdObj, params, true);
// serialize this to the async job table
_asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, cmdObj.getResponseObject());
_asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, ApiSerializerHelper.toSerializedString(cmdObj.getResponseObject()));
} finally {
CallContext.unregister();
}
@ -116,7 +116,7 @@ public class ApiAsyncJobDispatcher extends AdapterBase implements AsyncJobDispat
// FIXME: setting resultCode to ApiErrorCode.INTERNAL_ERROR is not right, usually executors have their exception handling
// and we need to preserve that as much as possible here
_asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), response);
_asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), ApiSerializerHelper.toSerializedString(response));
}
}
}

View File

@ -34,7 +34,6 @@ import java.util.TimeZone;
import javax.inject.Inject;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Component;
import org.apache.cloudstack.acl.ControlledEntity;
import org.apache.cloudstack.acl.ControlledEntity.ACLType;
@ -139,6 +138,7 @@ import org.apache.cloudstack.api.response.VpnUsersResponse;
import org.apache.cloudstack.api.response.ZoneResponse;
import org.apache.cloudstack.context.CallContext;
import org.apache.cloudstack.framework.jobs.AsyncJob;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
import org.apache.cloudstack.network.lb.ApplicationLoadBalancerRule;
import org.apache.cloudstack.region.PortableIp;
import org.apache.cloudstack.region.PortableIpRange;
@ -187,6 +187,8 @@ import com.cloud.dc.Vlan.VlanType;
import com.cloud.dc.VlanVO;
import com.cloud.domain.Domain;
import com.cloud.event.Event;
import com.cloud.exception.InvalidParameterValueException;
import com.cloud.exception.PermissionDeniedException;
import com.cloud.host.Host;
import com.cloud.host.HostVO;
import com.cloud.hypervisor.HypervisorCapabilities;
@ -276,6 +278,7 @@ import com.cloud.storage.snapshot.SnapshotPolicy;
import com.cloud.storage.snapshot.SnapshotSchedule;
import com.cloud.template.VirtualMachineTemplate;
import com.cloud.user.Account;
import com.cloud.user.AccountManager;
import com.cloud.user.User;
import com.cloud.user.UserAccount;
import com.cloud.uservm.UserVm;
@ -295,7 +298,6 @@ import com.cloud.vm.VirtualMachine.Type;
import com.cloud.vm.dao.NicSecondaryIpVO;
import com.cloud.vm.snapshot.VMSnapshot;
@Component
public class ApiResponseHelper implements ResponseGenerator {
public final Logger s_logger = Logger.getLogger(ApiResponseHelper.class);
@ -303,6 +305,10 @@ public class ApiResponseHelper implements ResponseGenerator {
@Inject private final EntityManager _entityMgr = null;
@Inject private final UsageService _usageSvc = null;
@Inject NetworkModel _ntwkModel;
@Inject
AccountManager _accountMgr;
@Inject
AsyncJobManager _jobMgr;
@Override
public UserResponse createUserResponse(User user) {
@ -1967,8 +1973,27 @@ public class ApiResponseHelper implements ResponseGenerator {
@Override
public AsyncJobResponse queryJobResult(QueryAsyncJobResultCmd cmd) {
AsyncJob result = ApiDBUtils._asyncMgr.queryAsyncJobResult(cmd);
return createAsyncJobResponse(result);
Account caller = CallContext.current().getCallingAccount();
AsyncJob job = _entityMgr.findById(AsyncJob.class, cmd.getId());
if (job == null) {
throw new InvalidParameterValueException("Unable to find a job by id " + cmd.getId());
}
User userJobOwner = _accountMgr.getUserIncludingRemoved(job.getUserId());
Account jobOwner = _accountMgr.getAccount(userJobOwner.getAccountId());
//check permissions
if (caller.getType() == Account.ACCOUNT_TYPE_NORMAL) {
//regular user can see only jobs he owns
if (caller.getId() != jobOwner.getId()) {
throw new PermissionDeniedException("Account " + caller + " is not authorized to see job id=" + job.getId());
}
} else if (caller.getType() == Account.ACCOUNT_TYPE_DOMAIN_ADMIN) {
_accountMgr.checkAccess(caller, null, true, jobOwner);
}
return createAsyncJobResponse(_jobMgr.queryJob(cmd.getId(), true));
}
@Override

View File

@ -36,10 +36,10 @@ import javax.naming.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.cloudstack.api.ApiErrorCode;
import org.apache.cloudstack.api.command.user.job.QueryAsyncJobResultCmd;
import org.apache.cloudstack.api.response.ExceptionResponse;
import org.apache.cloudstack.config.ConfigRepo;
import org.apache.cloudstack.context.CallContext;
import org.apache.cloudstack.config.ConfigDepot;
import org.apache.cloudstack.config.ConfigKey;
import org.apache.cloudstack.config.ConfigValue;
import org.apache.cloudstack.config.Configurable;
import org.apache.cloudstack.framework.jobs.AsyncJob;
import org.apache.cloudstack.framework.jobs.AsyncJobDispatcher;
import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext;
@ -63,20 +63,10 @@ import org.apache.cloudstack.framework.messagebus.PublishScope;
import org.apache.cloudstack.jobs.JobInfo;
import org.apache.cloudstack.jobs.JobInfo.Status;
import com.cloud.api.ApiSerializerHelper;
import com.cloud.cluster.ClusterManager;
import com.cloud.cluster.ClusterManagerListener;
import com.cloud.cluster.ManagementServerHost;
import com.cloud.cluster.ManagementServerNode;
import com.cloud.configuration.Config;
import com.cloud.configuration.dao.ConfigurationDao;
import com.cloud.exception.InvalidParameterValueException;
import com.cloud.exception.PermissionDeniedException;
import com.cloud.user.Account;
import com.cloud.user.AccountManager;
import com.cloud.user.User;
import com.cloud.utils.DateUtil;
import com.cloud.utils.NumbersUtil;
import com.cloud.utils.Predicate;
import com.cloud.utils.PropertiesUtil;
import com.cloud.utils.component.ManagerBase;
@ -90,10 +80,15 @@ import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.utils.exception.ExceptionUtil;
import com.cloud.utils.mgmt.JmxUtil;
public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, ClusterManagerListener {
public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, ClusterManagerListener, Configurable {
// Advanced
private static final ConfigKey<Long> JobExpireMinutes = new ConfigKey<Long>(Long.class, "job.expire.minutes", "Advanced", AsyncJobManager.class, "1440",
"Time (in minutes) for async-jobs to be kept in system", true, null);
private static final ConfigKey<Long> JobCancelThresholdMinutes = new ConfigKey<Long>(Long.class, "job.cancel.threshold.minutes", "Advanced", AsyncJobManager.class,
"60", "Time (in minutes) for async-jobs to be forcely cancelled if it has been in process for long", true, null);
private static final Logger s_logger = Logger.getLogger(AsyncJobManagerImpl.class);
private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3; // 3 seconds
private static final String JOB_POOL_THREAD_PREFIX = "Job-Executor";
// Although we may have detailed masks for each individual wakeup event, i.e.
// periodical timer, matched topic from message bus, it seems that we don't
// need to distinguish them to such level. Therefore, only one wakeup signal
@ -105,26 +100,27 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
private static final int GC_INTERVAL = 10000; // 10 seconds
@Inject private SyncQueueManager _queueMgr;
@Inject private ClusterManager _clusterMgr;
@Inject private AccountManager _accountMgr;
@Inject private AsyncJobDao _jobDao;
@Inject private AsyncJobJournalDao _journalDao;
@Inject private ConfigurationDao _configDao;
@Inject private AsyncJobJoinMapDao _joinMapDao;
@Inject private List<AsyncJobDispatcher> _jobDispatchers;
@Inject private MessageBus _messageBus;
@Inject private AsyncJobMonitor _jobMonitor;
@Inject
private ConfigRepo _configRepo;
private ConfigDepot _configDepot;
private long _jobExpireSeconds = 86400; // 1 day
private long _jobCancelThresholdSeconds = 3600; // 1 hour (for cancelling the jobs blocking other jobs)
private ConfigValue<Long> _jobExpireSeconds; // 1 day
private ConfigValue<Long> _jobCancelThresholdSeconds; // 1 hour (for cancelling the jobs blocking other jobs)
private volatile long _executionRunNumber = 1;
private final ScheduledExecutorService _heartbeatScheduler =
Executors.newScheduledThreadPool(1, new NamedThreadFactory("AsyncJobMgr-Heartbeat"));
private ExecutorService _executor;
private final ScheduledExecutorService _heartbeatScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("AsyncJobMgr-Heartbeat"));
private ExecutorService _executor;
@Override
public ConfigKey<?>[] getConfigKeys() {
return new ConfigKey<?>[] {JobExpireMinutes, JobCancelThresholdMinutes};
}
@Override
public AsyncJobVO getAsyncJob(long jobId) {
@ -196,7 +192,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
}
@Override @DB
public void completeAsyncJob(long jobId, Status jobStatus, int resultCode, Object resultObject) {
public void completeAsyncJob(long jobId, Status jobStatus, int resultCode, String resultObject) {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Complete async job-" + jobId + ", jobStatus: " + jobStatus +
", resultCode: " + resultCode + ", result: " + resultObject);
@ -234,7 +230,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
job.setInstanceId(null);
if (resultObject != null) {
job.setResult(ApiSerializerHelper.toSerializedString(resultObject));
job.setResult(resultObject);
}
job.setLastUpdated(DateUtil.currentGMTTime());
@ -260,7 +256,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
}
@Override @DB
public void updateAsyncJobStatus(long jobId, int processStatus, Object resultObject) {
public void updateAsyncJobStatus(long jobId, int processStatus, String resultObject) {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Update async-job progress, job-" + jobId + ", processStatus: " + processStatus +
", result: " + resultObject);
@ -281,7 +277,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
job.setProcessStatus(processStatus);
if(resultObject != null) {
job.setResult(ApiSerializerHelper.toSerializedString(resultObject));
job.setResult(resultObject);
}
job.setLastUpdated(DateUtil.currentGMTTime());
_jobDao.update(jobId, job);
@ -388,84 +384,16 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
}
@Override
public AsyncJob queryAsyncJobResult(QueryAsyncJobResultCmd cmd) {
Account caller = CallContext.current().getCallingAccount();
AsyncJobVO job = _jobDao.findById(cmd.getId());
if (job == null) {
throw new InvalidParameterValueException("Unable to find a job by id " + cmd.getId());
public AsyncJob queryJob(long jobId, boolean updatePollTime) {
AsyncJobVO job = _jobDao.findById(jobId);
if (updatePollTime) {
job.setLastPolled(DateUtil.currentGMTTime());
_jobDao.update(jobId, job);
}
User userJobOwner = _accountMgr.getUserIncludingRemoved(job.getUserId());
Account jobOwner = _accountMgr.getAccount(userJobOwner.getAccountId());
//check permissions
if (caller.getType() == Account.ACCOUNT_TYPE_NORMAL) {
//regular user can see only jobs he owns
if (caller.getId() != jobOwner.getId()) {
throw new PermissionDeniedException("Account " + caller + " is not authorized to see job id=" + job.getId());
}
} else if (caller.getType() == Account.ACCOUNT_TYPE_DOMAIN_ADMIN) {
_accountMgr.checkAccess(caller, null, true, jobOwner);
}
//poll the job
queryAsyncJobResult(cmd.getId());
return _jobDao.findById(cmd.getId());
return job;
}
@DB
public AsyncJobResult queryAsyncJobResult(long jobId) {
if(s_logger.isTraceEnabled()) {
s_logger.trace("Query async-job status, job-" + jobId);
}
Transaction txt = Transaction.currentTxn();
AsyncJobResult jobResult = new AsyncJobResult(jobId);
try {
txt.start();
AsyncJobVO job = _jobDao.findById(jobId);
if(job != null) {
jobResult.setJobStatus(job.getStatus());
jobResult.setProcessStatus(job.getProcessStatus());
jobResult.setResult(job.getResult());
jobResult.setResultCode(job.getResultCode());
jobResult.setUuid(job.getUuid());
if(job.getStatus() == JobInfo.Status.SUCCEEDED ||
job.getStatus() == JobInfo.Status.FAILED) {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Async job-" + jobId + " completed");
}
} else {
job.setLastPolled(DateUtil.currentGMTTime());
_jobDao.update(jobId, job);
}
} else {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Async job-" + jobId + " does not exist, invalid job id?");
}
jobResult.setJobStatus(JobInfo.Status.FAILED);
jobResult.setResult("job-" + jobId + " does not exist");
}
txt.commit();
} catch(Exception e) {
s_logger.error("Unexpected exception while querying async job-" + jobId + " status: ", e);
jobResult.setJobStatus(JobInfo.Status.FAILED);
jobResult.setResult("Exception: " + e.toString());
txt.rollback();
}
if(s_logger.isTraceEnabled()) {
s_logger.trace("Job status: " + jobResult.toString());
}
return jobResult;
}
private void scheduleExecution(final AsyncJobVO job) {
scheduleExecution(job, false);
@ -762,7 +690,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
try {
s_logger.trace("Begin cleanup expired async-jobs");
Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - _jobExpireSeconds*1000);
Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - _jobExpireSeconds.value() * 1000);
// limit to 100 jobs per turn, this gives cleanup throughput as 600 jobs per minute
// hopefully this will be fast enough to balance potential growth of job table
@ -781,12 +709,12 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
}
// forcefully cancel blocking queue items if they've been staying there for too long
List<SyncQueueItemVO> blockItems = _queueMgr.getBlockedQueueItems(_jobCancelThresholdSeconds*1000, false);
List<SyncQueueItemVO> blockItems = _queueMgr.getBlockedQueueItems(_jobCancelThresholdSeconds.value()
* 1000, false);
if(blockItems != null && blockItems.size() > 0) {
for(SyncQueueItemVO item : blockItems) {
if(item.getContentType().equalsIgnoreCase(SyncQueueItem.AsyncJobContentType)) {
completeAsyncJob(item.getContentId(), JobInfo.Status.FAILED, 0,
getResetResultResponse("Job is cancelled as it has been blocking others for too long"));
completeAsyncJob(item.getContentId(), JobInfo.Status.FAILED, 0, "Job is cancelled as it has been blocking others for too long");
}
// purge the item and resume queue processing
@ -799,8 +727,6 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
s_logger.error("Unexpected exception when trying to execute queue item, ", e);
}
}
};
}
@ -819,32 +745,27 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
}
private void cleanupPendingJobs(List<SyncQueueItemVO> l) {
if(l != null && l.size() > 0) {
for(SyncQueueItemVO item: l) {
if(s_logger.isInfoEnabled()) {
s_logger.info("Discard left-over queue item: " + item.toString());
}
String contentType = item.getContentType();
if(contentType != null && contentType.equalsIgnoreCase(SyncQueueItem.AsyncJobContentType)) {
Long jobId = item.getContentId();
if(jobId != null) {
s_logger.warn("Mark job as failed as its correspoding queue-item has been discarded. job id: " + jobId);
completeAsyncJob(jobId, JobInfo.Status.FAILED, 0, getResetResultResponse("Execution was cancelled because of server shutdown"));
}
}
_queueMgr.purgeItem(item.getId());
for (SyncQueueItemVO item : l) {
if (s_logger.isInfoEnabled()) {
s_logger.info("Discard left-over queue item: " + item.toString());
}
String contentType = item.getContentType();
if (contentType != null && contentType.equalsIgnoreCase(SyncQueueItem.AsyncJobContentType)) {
Long jobId = item.getContentId();
if (jobId != null) {
s_logger.warn("Mark job as failed as its correspoding queue-item has been discarded. job id: " + jobId);
completeAsyncJob(jobId, JobInfo.Status.FAILED, 0, "Execution was cancelled because of server shutdown");
}
}
_queueMgr.purgeItem(item.getId());
}
}
@Override
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
int expireMinutes = NumbersUtil.parseInt(_configDao.getValue(Config.JobExpireMinutes.key()), 24 * 60);
_jobExpireSeconds = (long)expireMinutes*60;
_jobCancelThresholdSeconds = NumbersUtil.parseInt(_configDao.getValue(Config.JobCancelThresholdMinutes.key()), 60);
_jobCancelThresholdSeconds *= 60;
_jobExpireSeconds = _configDepot.get(JobExpireMinutes).setMultiplier(60);
_jobCancelThresholdSeconds = _configDepot.get(JobCancelThresholdMinutes).setMultiplier(60);
try {
final File dbPropsFile = PropertiesUtil.findConfigFile("db.properties");
@ -856,7 +777,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
int poolSize = (cloudMaxActive * 2) / 3;
s_logger.info("Start AsyncJobManager thread pool in size " + poolSize);
_executor = Executors.newFixedThreadPool(poolSize, new NamedThreadFactory(JOB_POOL_THREAD_PREFIX));
_executor = Executors.newFixedThreadPool(poolSize, new NamedThreadFactory(AsyncJobManager.JOB_POOL_THREAD_PREFIX));
} catch (final Exception e) {
throw new ConfigurationException("Unable to load db.properties to configure AsyncJobManagerImpl");
}
@ -879,11 +800,10 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
txn.start();
List<SyncQueueItemVO> items = _queueMgr.getActiveQueueItems(msHost.getId(), true);
cleanupPendingJobs(items);
_jobDao.resetJobProcess(msHost.getId(), ApiErrorCode.INTERNAL_ERROR.getHttpCode(), getSerializedErrorMessage("job cancelled because of management server restart"));
_jobDao.resetJobProcess(msHost.getId(), ApiErrorCode.INTERNAL_ERROR.getHttpCode(), "job cancelled because of management server restart");
txn.commit();
} catch(Throwable e) {
s_logger.warn("Unexpected exception ", e);
txn.rollback();
} finally {
txn.close();
}
@ -901,30 +821,17 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
List<SyncQueueItemVO> l = _queueMgr.getActiveQueueItems(getMsid(), false);
cleanupPendingJobs(l);
_jobDao.resetJobProcess(getMsid(), ApiErrorCode.INTERNAL_ERROR.getHttpCode(), getSerializedErrorMessage("job cancelled because of management server restart"));
_jobDao.resetJobProcess(getMsid(), ApiErrorCode.INTERNAL_ERROR.getHttpCode(), "job cancelled because of management server restart");
} catch(Throwable e) {
s_logger.error("Unexpected exception " + e.getMessage(), e);
}
_heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), HEARTBEAT_INTERVAL,
HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
_heartbeatScheduler.scheduleAtFixedRate(getGCTask(), GC_INTERVAL,
GC_INTERVAL, TimeUnit.MILLISECONDS);
_heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
_heartbeatScheduler.scheduleAtFixedRate(getGCTask(), GC_INTERVAL, GC_INTERVAL, TimeUnit.MILLISECONDS);
return true;
}
private static ExceptionResponse getResetResultResponse(String errorMessage) {
ExceptionResponse resultObject = new ExceptionResponse();
resultObject.setErrorCode(ApiErrorCode.INTERNAL_ERROR.getHttpCode());
resultObject.setErrorText(errorMessage);
return resultObject;
}
private static String getSerializedErrorMessage(String errorMessage) {
return ApiSerializerHelper.toSerializedString(getResetResultResponse(errorMessage));
}
@Override
public boolean stop() {
_heartbeatScheduler.shutdown();

View File

@ -68,9 +68,10 @@ import org.apache.cloudstack.api.command.admin.zone.CreateZoneCmd;
import org.apache.cloudstack.api.command.admin.zone.DeleteZoneCmd;
import org.apache.cloudstack.api.command.admin.zone.UpdateZoneCmd;
import org.apache.cloudstack.api.command.user.network.ListNetworkOfferingsCmd;
import org.apache.cloudstack.config.ConfigDepot;
import org.apache.cloudstack.config.ConfigKey;
import org.apache.cloudstack.config.ConfigRepo;
import org.apache.cloudstack.config.ConfigValue;
import org.apache.cloudstack.config.Configurable;
import org.apache.cloudstack.context.CallContext;
import org.apache.cloudstack.region.PortableIp;
import org.apache.cloudstack.region.PortableIpDao;
@ -215,7 +216,7 @@ import com.cloud.vm.dao.NicSecondaryIpDao;
import edu.emory.mathcs.backport.java.util.Arrays;
@Local(value = { ConfigurationManager.class, ConfigurationService.class })
public class ConfigurationManagerImpl extends ManagerBase implements ConfigurationManager, ConfigurationService, ConfigRepo {
public class ConfigurationManagerImpl extends ManagerBase implements ConfigurationManager, ConfigurationService, ConfigDepot {
public static final Logger s_logger = Logger.getLogger(ConfigurationManagerImpl.class.getName());
@Inject
@ -319,6 +320,9 @@ public class ConfigurationManagerImpl extends ManagerBase implements Configurati
@Inject
DedicatedResourceDao _dedicatedDao;
@Inject
List<Configurable> _configurables;
// FIXME - why don't we have interface for DataCenterLinkLocalIpAddressDao?
@Inject protected DataCenterLinkLocalIpAddressDao _LinkLocalIpAllocDao;

View File

@ -424,9 +424,9 @@ import org.apache.cloudstack.context.CallContext;
import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
import org.apache.cloudstack.engine.subsystem.api.storage.StoragePoolAllocator;
import org.apache.cloudstack.framework.jobs.AsyncJob;
import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
import org.apache.cloudstack.jobs.JobInfo;
import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
@ -443,6 +443,7 @@ import com.cloud.alert.AlertManager;
import com.cloud.alert.AlertVO;
import com.cloud.alert.dao.AlertDao;
import com.cloud.api.ApiDBUtils;
import com.cloud.api.ApiSerializerHelper;
import com.cloud.capacity.Capacity;
import com.cloud.capacity.CapacityVO;
import com.cloud.capacity.dao.CapacityDao;
@ -3469,7 +3470,7 @@ public class ManagementServerImpl extends ManagerBase implements ManagementServe
if (asyncExecutionContext != null) {
job = asyncExecutionContext.getJob();
_asyncMgr.updateAsyncJobAttachment(job.getId(), Upload.Type.VOLUME.toString(), volumeId);
_asyncMgr.updateAsyncJobStatus(job.getId(), JobInfo.Status.IN_PROGRESS, resultObj);
_asyncMgr.updateAsyncJobStatus(job.getId(), JobInfo.Status.IN_PROGRESS.ordinal(), ApiSerializerHelper.toSerializedString(resultObj));
}
String value = _configs.get(Config.CopyVolumeWait.toString());
int copyvolumewait = NumbersUtil.parseInt(value, Integer.parseInt(Config.CopyVolumeWait.getDefaultValue()));
@ -3490,7 +3491,7 @@ public class ManagementServerImpl extends ManagerBase implements ManagementServe
resultObj.setResultString(errorString);
resultObj.setUploadStatus(UploadVO.Status.COPY_ERROR.toString());
if (asyncExecutionContext != null) {
_asyncMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, resultObj);
_asyncMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, ApiSerializerHelper.toSerializedString(resultObj));
}
// Update the DB that volume couldn't be copied

View File

@ -1797,8 +1797,7 @@ public class VolumeManagerImpl extends ManagerBase implements VolumeManager {
}
_asyncMgr.updateAsyncJobAttachment(job.getId(), "volume", volumeId);
_asyncMgr.updateAsyncJobStatus(job.getId(),
BaseCmd.PROGRESS_INSTANCE_CREATED, volumeId);
_asyncMgr.updateAsyncJobStatus(job.getId(), BaseCmd.PROGRESS_INSTANCE_CREATED, Long.toString(volumeId));
}
VolumeVO newVol = _volumeDao.findById(volumeOnPrimaryStorage.getId());
@ -1896,8 +1895,7 @@ public class VolumeManagerImpl extends ManagerBase implements VolumeManager {
}
_asyncMgr.updateAsyncJobAttachment(job.getId(), "volume", volumeId);
_asyncMgr.updateAsyncJobStatus(job.getId(),
BaseCmd.PROGRESS_INSTANCE_CREATED, volumeId);
_asyncMgr.updateAsyncJobStatus(job.getId(), BaseCmd.PROGRESS_INSTANCE_CREATED, volumeId.toString());
}
String errorMsg = "Failed to detach volume: " + volume.getName()

View File

@ -34,7 +34,6 @@ import org.springframework.stereotype.Component;
import org.apache.cloudstack.api.ApiConstants;
import org.apache.cloudstack.api.command.user.snapshot.CreateSnapshotCmd;
import org.apache.cloudstack.context.CallContext;
import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
import org.apache.cloudstack.framework.jobs.dao.AsyncJobDao;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
@ -143,14 +142,14 @@ public class SnapshotSchedulerImpl extends ManagerBase implements SnapshotSchedu
Long asyncJobId = snapshotSchedule.getAsyncJobId();
AsyncJobVO asyncJob = _asyncJobDao.findById(asyncJobId);
switch (asyncJob.getStatus()) {
case JobInfo.Status.SUCCEEDED:
case SUCCEEDED:
// The snapshot has been successfully backed up.
// The snapshot state has also been cleaned up.
// We can schedule the next job for this snapshot.
// Remove the existing entry in the snapshot_schedule table.
scheduleNextSnapshotJob(snapshotSchedule);
break;
case JobInfo.Status.FAILED:
case FAILED:
// Check the snapshot status.
Long snapshotId = snapshotSchedule.getSnapshotId();
if (snapshotId == null) {
@ -188,7 +187,7 @@ public class SnapshotSchedulerImpl extends ManagerBase implements SnapshotSchedu
}
break;
case JobInfo.Status.IN_PROGRESS:
case IN_PROGRESS:
// There is no way of knowing from here whether
// 1) Another management server is processing this snapshot job
// 2) The management server has crashed and this snapshot is lying

View File

@ -24,11 +24,16 @@ import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.cloudstack.api.command.user.iso.ExtractIsoCmd;
import org.apache.cloudstack.api.command.user.volume.ExtractVolumeCmd;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.cloudstack.api.command.user.iso.ExtractIsoCmd;
import org.apache.cloudstack.api.command.user.template.ExtractTemplateCmd;
import org.apache.cloudstack.api.command.user.volume.ExtractVolumeCmd;
import org.apache.cloudstack.api.response.ExtractResponse;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
import org.apache.cloudstack.jobs.JobInfo;
import com.cloud.agent.Listener;
import com.cloud.agent.api.AgentControlAnswer;
import com.cloud.agent.api.AgentControlCommand;
@ -40,13 +45,8 @@ import com.cloud.agent.api.storage.UploadAnswer;
import com.cloud.agent.api.storage.UploadCommand;
import com.cloud.agent.api.storage.UploadProgressCommand;
import com.cloud.agent.api.storage.UploadProgressCommand.RequestType;
import org.apache.cloudstack.api.command.user.template.ExtractTemplateCmd;
import org.apache.cloudstack.api.response.ExtractResponse;
import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
import com.cloud.api.ApiDBUtils;
import com.cloud.async.AsyncJobResult;
import com.cloud.api.ApiSerializerHelper;
import com.cloud.exception.AgentUnavailableException;
import com.cloud.host.HostVO;
import com.cloud.storage.Storage;
@ -65,7 +65,7 @@ public class UploadListener implements Listener {
private final RequestType reqType;
public StatusTask( UploadListener ul, RequestType req) {
this.reqType = req;
reqType = req;
this.ul = ul;
}
@ -165,19 +165,19 @@ public class UploadListener implements Listener {
public UploadListener(HostVO host, Timer _timer, UploadDao uploadDao,
UploadVO uploadObj, UploadMonitorImpl uploadMonitor, UploadCommand cmd,
Long accountId, String typeName, Type type, long eventId, long asyncJobId, AsyncJobManager asyncMgr) {
this.sserver = host;
sserver = host;
this.uploadDao = uploadDao;
this.uploadMonitor = uploadMonitor;
this.cmd = cmd;
this.uploadId = uploadObj.getId();
uploadId = uploadObj.getId();
this.accountId = accountId;
this.typeName = typeName;
this.type = type;
initStateMachine();
this.currState = getState(Status.NOT_UPLOADED.toString());
this.timer = _timer;
this.timeoutTask = new TimeoutTask(this);
this.timer.schedule(timeoutTask, 3*STATUS_POLL_INTERVAL);
currState = getState(Status.NOT_UPLOADED.toString());
timer = _timer;
timeoutTask = new TimeoutTask(this);
timer.schedule(timeoutTask, 3*STATUS_POLL_INTERVAL);
this.eventId = eventId;
this.asyncJobId = asyncJobId;
this.asyncMgr = asyncMgr;
@ -188,7 +188,7 @@ public class UploadListener implements Listener {
else{
extractId = ApiDBUtils.findTemplateById(uploadObj.getTypeId()).getUuid();
}
this.resultObj = new ExtractResponse(extractId, typeName, ApiDBUtils.findAccountById(accountId).getUuid(), Status.NOT_UPLOADED.toString(),
resultObj = new ExtractResponse(extractId, typeName, ApiDBUtils.findAccountById(accountId).getUuid(), Status.NOT_UPLOADED.toString(),
ApiDBUtils.findUploadById(uploadId).getUuid());
resultObj.setResponseName(responseNameMap.get(type.toString()));
updateDatabase(Status.NOT_UPLOADED, cmd.getUrl(),"");
@ -213,11 +213,11 @@ public class UploadListener implements Listener {
}
public void setCommand(UploadCommand _cmd) {
this.cmd = _cmd;
cmd = _cmd;
}
public void setJobId(String _jobId) {
this.jobId = _jobId;
jobId = _jobId;
}
public String getJobId() {
@ -365,7 +365,7 @@ public class UploadListener implements Listener {
resultObj.setResultString(uploadErrorString);
resultObj.setState(state.toString());
asyncMgr.updateAsyncJobAttachment(asyncJobId, type.toString(), 1L);
asyncMgr.updateAsyncJobStatus(asyncJobId, JobInfo.Status.IN_PROGRESS, resultObj);
asyncMgr.updateAsyncJobStatus(asyncJobId, JobInfo.Status.IN_PROGRESS.ordinal(), ApiSerializerHelper.toSerializedString(resultObj));
UploadVO vo = uploadDao.createForUpdate();
vo.setUploadState(state);
@ -378,7 +378,7 @@ public class UploadListener implements Listener {
resultObj.setResultString(uploadErrorString);
resultObj.setState(state.toString());
asyncMgr.updateAsyncJobAttachment(asyncJobId, type.toString(), 1L);
asyncMgr.updateAsyncJobStatus(asyncJobId, JobInfo.Status.IN_PROGRESS, resultObj);
asyncMgr.updateAsyncJobStatus(asyncJobId, JobInfo.Status.IN_PROGRESS.ordinal(), ApiSerializerHelper.toSerializedString(resultObj));
UploadVO vo = uploadDao.createForUpdate();
@ -407,12 +407,12 @@ public class UploadListener implements Listener {
if (answer.getUploadStatus() == Status.UPLOAD_IN_PROGRESS){
asyncMgr.updateAsyncJobAttachment(asyncJobId, type.toString(), 1L);
asyncMgr.updateAsyncJobStatus(asyncJobId, JobInfo.Status.IN_PROGRESS, resultObj);
asyncMgr.updateAsyncJobStatus(asyncJobId, JobInfo.Status.IN_PROGRESS.ordinal(), ApiSerializerHelper.toSerializedString(resultObj));
}else if(answer.getUploadStatus() == Status.UPLOADED){
resultObj.setResultString("Success");
asyncMgr.completeAsyncJob(asyncJobId, JobInfo.Status.SUCCEEDED, 1, resultObj);
asyncMgr.completeAsyncJob(asyncJobId, JobInfo.Status.SUCCEEDED, 1, ApiSerializerHelper.toSerializedString(resultObj));
}else{
asyncMgr.completeAsyncJob(asyncJobId, JobInfo.Status.FAILED, 2, resultObj);
asyncMgr.completeAsyncJob(asyncJobId, JobInfo.Status.FAILED, 2, ApiSerializerHelper.toSerializedString(resultObj));
}
UploadVO updateBuilder = uploadDao.createForUpdate();
updateBuilder.setUploadPercent(answer.getUploadPct());
@ -454,6 +454,6 @@ public class UploadListener implements Listener {
}
public void setCurrState(Status uploadState) {
this.currState = getState(currState.toString());
currState = getState(currState.toString());
}
}

View File

@ -35,7 +35,6 @@ import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.apache.cloudstack.framework.jobs.AsyncJob;
import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
import org.apache.cloudstack.framework.jobs.dao.AsyncJobJoinMapDao;
import org.apache.cloudstack.framework.jobs.dao.AsyncJobJournalDao;
@ -44,6 +43,7 @@ import org.apache.cloudstack.framework.jobs.dao.SyncQueueItemDao;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobJournalVO;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobMonitor;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
import org.apache.cloudstack.framework.jobs.impl.SyncQueueItemVO;
import org.apache.cloudstack.framework.jobs.impl.SyncQueueVO;
import org.apache.cloudstack.framework.messagebus.MessageBus;
@ -202,7 +202,7 @@ public class TestAsyncJobManager extends TestCase {
@Test
public void testPseudoJob() {
AsyncJob job = asyncMgr.getPseudoJob(Account.ACCOUNT_ID_SYSTEM, User.UID_SYSTEM);
Assert.assertTrue(job.getInstanceType().equals(AsyncJobConstants.PSEUDO_JOB_INSTANCE_TYPE));
Assert.assertTrue(job.getInstanceType().equals(AsyncJobVO.PSEUDO_JOB_INSTANCE_TYPE));
Assert.assertTrue(job.getInstanceId().longValue() == Thread.currentThread().getId());
}

View File

@ -25,9 +25,9 @@ import javax.naming.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext;
import org.apache.cloudstack.framework.messagebus.MessageBus;
import org.apache.cloudstack.jobs.JobInfo;
import com.cloud.agent.api.to.NicTO;
import com.cloud.agent.api.to.VirtualMachineTO;

View File

@ -3,9 +3,9 @@ package com.cloud.vm;
import org.apache.log4j.Logger;
import org.apache.cloudstack.framework.jobs.AsyncJob;
import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
import org.apache.cloudstack.framework.jobs.AsyncJobDispatcher;
import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext;
import org.apache.cloudstack.jobs.JobInfo;
import com.cloud.utils.component.AdapterBase;