Removed AsyncJobConstant

This commit is contained in:
Alex Huang 2013-06-14 19:21:25 -07:00
parent 2b96665bf4
commit ea6ca5ff5c
28 changed files with 214 additions and 221 deletions

View File

@ -16,6 +16,7 @@
// under the License.
package org.apache.cloudstack.api;
public interface ResponseObject {
/**
* Get the name of the API response

View File

@ -22,6 +22,12 @@ import org.apache.cloudstack.api.Identity;
import org.apache.cloudstack.api.InternalIdentity;
public interface JobInfo extends Identity, InternalIdentity {
public enum Status {
IN_PROGRESS,
SUCCEEDED,
FAILED,
CANCELLED;
}
String getType();
@ -39,7 +45,7 @@ public interface JobInfo extends Identity, InternalIdentity {
String getCmdInfo();
int getStatus();
Status getStatus();
int getProcessStatus();

View File

@ -725,7 +725,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
return true;
VmWorkJobVO workJob = _workJobDao.findById(jobId);
if(workJob.getStatus() != AsyncJobConstants.STATUS_IN_PROGRESS)
if(workJob.getStatus() != JobInfo.Status.IN_PROGRESS)
return true;
return false;
@ -1152,7 +1152,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
return true;
VmWorkJobVO workJob = _workJobDao.findById(jobId);
if(workJob.getStatus() != AsyncJobConstants.STATUS_IN_PROGRESS)
if(workJob.getStatus() != JobInfo.Status.IN_PROGRESS)
return true;
return false;

View File

@ -99,10 +99,10 @@ public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatch
VmWorkStop stop = (VmWorkStop)work;
_vmMgr.orchestrateStop(vm.getUuid(), stop.isForceStop());
}
_asyncJobMgr.completeAsyncJob(job.getId(), AsyncJobConstants.STATUS_SUCCEEDED, 0, null);
_asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, null);
} catch(Throwable e) {
s_logger.error("Unable to complete " + job, e);
_asyncJobMgr.completeAsyncJob(job.getId(), AsyncJobConstants.STATUS_FAILED, 0, e.getMessage());
_asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, e.getMessage());
} finally {
CallContext.unregister();
}

View File

@ -111,7 +111,7 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen
public void expungeCompletedWorkJobs(Date cutDate) {
SearchCriteria<VmWorkJobVO> sc = ExpungeWorkJobSearch.create();
sc.setParameters("lastUpdated",cutDate);
sc.setParameters("status", AsyncJobConstants.STATUS_IN_PROGRESS);
sc.setParameters("status", JobInfo.Status.IN_PROGRESS);
expunge(sc);
}

View File

@ -57,7 +57,7 @@ public interface AsyncJob extends JobInfo {
String getCmdInfo();
@Override
int getStatus();
Status getStatus();
@Override
int getProcessStatus();

View File

@ -1,34 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.cloudstack.framework.jobs;
public interface AsyncJobConstants {
public static final int STATUS_IN_PROGRESS = 0;
public static final int STATUS_SUCCEEDED = 1;
public static final int STATUS_FAILED = 2;
public static final String JOB_DISPATCHER_PSEUDO = "pseudoJobDispatcher";
public static final String PSEUDO_JOB_INSTANCE_TYPE = "Thread";
public static final String JOB_POOL_THREAD_PREFIX = "Job-Executor";
// Although we may have detailed masks for each individual wakeup event, i.e.
// periodical timer, matched topic from message bus, it seems that we don't
// need to distinguish them to such level. Therefore, only one wakeup signal
// is defined
public static final int SIGNAL_MASK_WAKEUP = 1;
}

View File

@ -22,11 +22,11 @@ import org.apache.cloudstack.framework.jobs.dao.AsyncJobJoinMapDao;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO;
import org.apache.cloudstack.framework.jobs.impl.JobSerializerHelper;
import org.apache.cloudstack.framework.jobs.impl.SyncQueueItem;
import org.apache.cloudstack.jobs.JobInfo;
import com.cloud.exception.ConcurrentOperationException;
import com.cloud.exception.InsufficientCapacityException;
import com.cloud.exception.ResourceUnavailableException;
import com.cloud.utils.component.ComponentContext;
public class AsyncJobExecutionContext {
private AsyncJob _job;
@ -57,10 +57,6 @@ public class AsyncJobExecutionContext {
}
public AsyncJob getJob() {
if(_job == null) {
_job = _jobMgr.getPseudoJob();
}
return _job;
}
@ -68,7 +64,7 @@ public class AsyncJobExecutionContext {
_job = job;
}
public void completeAsyncJob(int jobStatus, int resultCode, Object resultObject) {
public void completeAsyncJob(JobInfo.Status jobStatus, int resultCode, Object resultObject) {
assert(_job != null);
_jobMgr.completeAsyncJob(_job.getId(), jobStatus, resultCode, resultObject);
}
@ -114,7 +110,7 @@ public class AsyncJobExecutionContext {
assert(_job != null);
AsyncJobJoinMapVO record = _joinMapDao.getJoinRecord(_job.getId(), joinedJobId);
if(record.getJoinStatus() == AsyncJobConstants.STATUS_FAILED && record.getJoinResult() != null) {
if(record.getJoinStatus() == JobInfo.Status.FAILED && record.getJoinResult() != null) {
Object exception = JobSerializerHelper.fromObjectSerializedString(record.getJoinResult());
if(exception != null && exception instanceof Exception) {
if(exception instanceof InsufficientCapacityException)
@ -131,12 +127,12 @@ public class AsyncJobExecutionContext {
_jobMgr.disjoinJob(_job.getId(), joinedJobId);
}
public void completeJoin(int joinStatus, String joinResult) {
public void completeJoin(JobInfo.Status joinStatus, String joinResult) {
assert(_job != null);
_jobMgr.completeJoin(_job.getId(), joinStatus, joinResult);
}
public void completeJobAndJoin(int joinStatus, String joinResult) {
public void completeJobAndJoin(JobInfo.Status joinStatus, String joinResult) {
assert(_job != null);
_jobMgr.completeJoin(_job.getId(), joinStatus, joinResult);
_jobMgr.completeAsyncJob(_job.getId(), joinStatus, 0, null);
@ -144,21 +140,14 @@ public class AsyncJobExecutionContext {
public static AsyncJobExecutionContext getCurrentExecutionContext() {
AsyncJobExecutionContext context = s_currentExectionContext.get();
if(context == null) {
context = new AsyncJobExecutionContext();
context = ComponentContext.inject(context);
context.getJob();
setCurrentExecutionContext(context);
}
return context;
}
public static AsyncJobExecutionContext registerPseudoExecutionContext() {
public static AsyncJobExecutionContext registerPseudoExecutionContext(long accountId, long userId) {
AsyncJobExecutionContext context = s_currentExectionContext.get();
if (context == null) {
context = new AsyncJobExecutionContext();
context.getJob();
context.setJob(_jobMgr.getPseudoJob(accountId, userId));
setCurrentExecutionContext(context);
}

View File

@ -20,6 +20,7 @@ import java.util.List;
import org.apache.cloudstack.api.command.user.job.QueryAsyncJobResultCmd;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
import org.apache.cloudstack.jobs.JobInfo;
import com.cloud.utils.Predicate;
import com.cloud.utils.component.Manager;
@ -31,12 +32,9 @@ public interface AsyncJobManager extends Manager {
List<? extends AsyncJob> findInstancePendingAsyncJobs(String instanceType, Long accountId);
long submitAsyncJob(AsyncJob job);
long submitAsyncJob(AsyncJob job, boolean scheduleJobExecutionInContext);
long submitAsyncJob(AsyncJob job, String syncObjType, long syncObjId);
// AsyncJobResult queryAsyncJobResult(long jobId);
void completeAsyncJob(long jobId, int jobStatus, int resultCode, Object resultObject);
void completeAsyncJob(long jobId, JobInfo.Status jobStatus, int resultCode, Object resultObject);
void updateAsyncJobStatus(long jobId, int processStatus, Object resultObject);
void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId);
void logJobJournal(long jobId, AsyncJob.JournalType journalType, String
@ -50,7 +48,7 @@ public interface AsyncJobManager extends Manager {
*
* @return pseudo job for the thread
*/
AsyncJob getPseudoJob();
AsyncJob getPseudoJob(long accountId, long userId);
/**
* Used by upper level job to wait for completion of a down-level job (usually VmWork jobs)
@ -101,7 +99,7 @@ public interface AsyncJobManager extends Manager {
* for legacy code to work. To help pass exception object easier, we use
* object-stream based serialization instead of GSON
*/
void completeJoin(long joinJobId, int joinStatus, String joinResult);
void completeJoin(long joinJobId, JobInfo.Status joinStatus, String joinResult);
void releaseSyncSource();
void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId, long queueSizeLimit);

View File

@ -23,8 +23,8 @@ import java.util.List;
import org.apache.log4j.Logger;
import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
import org.apache.cloudstack.jobs.JobInfo;
import com.cloud.utils.db.DB;
import com.cloud.utils.db.Filter;
@ -100,7 +100,7 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements
SearchCriteria<AsyncJobVO> sc = pendingAsyncJobSearch.create();
sc.setParameters("instanceType", instanceType);
sc.setParameters("instanceId", instanceId);
sc.setParameters("status", AsyncJobConstants.STATUS_IN_PROGRESS);
sc.setParameters("status", JobInfo.Status.IN_PROGRESS);
List<AsyncJobVO> l = listIncludingRemovedBy(sc);
if(l != null && l.size() > 0) {
@ -121,7 +121,7 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements
if (accountId != null) {
sc.setParameters("accountId", accountId);
}
sc.setParameters("status", AsyncJobConstants.STATUS_IN_PROGRESS);
sc.setParameters("status", JobInfo.Status.IN_PROGRESS);
return listBy(sc);
}
@ -129,8 +129,8 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements
@Override
public AsyncJobVO findPseudoJob(long threadId, long msid) {
SearchCriteria<AsyncJobVO> sc = pseudoJobSearch.create();
sc.setParameters("jobDispatcher", AsyncJobConstants.JOB_DISPATCHER_PSEUDO);
sc.setParameters("instanceType", AsyncJobConstants.PSEUDO_JOB_INSTANCE_TYPE);
sc.setParameters("jobDispatcher", AsyncJobVO.JOB_DISPATCHER_PSEUDO);
sc.setParameters("instanceType", AsyncJobVO.PSEUDO_JOB_INSTANCE_TYPE);
sc.setParameters("instanceId", threadId);
List<AsyncJobVO> result = listBy(sc);
@ -178,7 +178,7 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements
@Override
@DB
public void resetJobProcess(long msid, int jobResultCode, String jobResultMessage) {
String sql = "UPDATE async_job SET job_status=" + AsyncJobConstants.STATUS_FAILED + ", job_result_code=" + jobResultCode
String sql = "UPDATE async_job SET job_status=" + JobInfo.Status.FAILED + ", job_result_code=" + jobResultCode
+ ", job_result='" + jobResultMessage + "' where job_status=0 AND (job_complete_msid=? OR (job_complete_msid IS NULL AND job_init_msid=?))";
Transaction txn = Transaction.currentTxn();

View File

@ -19,12 +19,13 @@ package org.apache.cloudstack.framework.jobs.dao;
import java.util.List;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO;
import org.apache.cloudstack.jobs.JobInfo;
import com.cloud.utils.db.GenericDao;
public interface AsyncJobJoinMapDao extends GenericDao<AsyncJobJoinMapVO, Long> {
Long joinJob(long jobId, long joinJobId, long joinMsid,
Long joinJob(long jobId, long joinJobId, long joinMsid,
long wakeupIntervalMs, long expirationMs,
Long syncSourceId, String wakeupHandler, String wakeupDispatcher);
void disjoinJob(long jobId, long joinedJobId);
@ -33,7 +34,7 @@ public interface AsyncJobJoinMapDao extends GenericDao<AsyncJobJoinMapVO, Long>
AsyncJobJoinMapVO getJoinRecord(long jobId, long joinJobId);
List<AsyncJobJoinMapVO> listJoinRecords(long jobId);
void completeJoin(long joinJobId, int joinStatus, String joinResult, long completeMsid);
void completeJoin(long joinJobId, JobInfo.Status joinStatus, String joinResult, long completeMsid);
List<Long> wakeupScan();
List<Long> wakeupByJoinedJobCompletion(long joinedJobId);

View File

@ -26,23 +26,23 @@ import java.util.TimeZone;
import org.apache.log4j.Logger;
import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO;
import org.apache.cloudstack.jobs.JobInfo;
import com.cloud.utils.DateUtil;
import com.cloud.utils.db.GenericDaoBase;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.SearchCriteria.Op;
import com.cloud.utils.db.Transaction;
import com.cloud.utils.db.UpdateBuilder;
import com.cloud.utils.db.SearchCriteria.Op;
public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Long> implements AsyncJobJoinMapDao {
public static final Logger s_logger = Logger.getLogger(AsyncJobJoinMapDaoImpl.class);
private final SearchBuilder<AsyncJobJoinMapVO> RecordSearch;
private final SearchBuilder<AsyncJobJoinMapVO> RecordSearchByOwner;
private final SearchBuilder<AsyncJobJoinMapVO> CompleteJoinSearch;
private final SearchBuilder<AsyncJobJoinMapVO> RecordSearch;
private final SearchBuilder<AsyncJobJoinMapVO> RecordSearchByOwner;
private final SearchBuilder<AsyncJobJoinMapVO> CompleteJoinSearch;
private final SearchBuilder<AsyncJobJoinMapVO> WakeupSearch;
public AsyncJobJoinMapDaoImpl() {
@ -66,7 +66,8 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
WakeupSearch.done();
}
public Long joinJob(long jobId, long joinJobId, long joinMsid,
@Override
public Long joinJob(long jobId, long joinJobId, long joinMsid,
long wakeupIntervalMs, long expirationMs,
Long syncSourceId, String wakeupHandler, String wakeupDispatcher) {
@ -74,7 +75,7 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
record.setJobId(jobId);
record.setJoinJobId(joinJobId);
record.setJoinMsid(joinMsid);
record.setJoinStatus(AsyncJobConstants.STATUS_IN_PROGRESS);
record.setJoinStatus(JobInfo.Status.IN_PROGRESS);
record.setSyncSourceId(syncSourceId);
record.setWakeupInterval(wakeupIntervalMs / 1000); // convert millisecond to second
record.setWakeupHandler(wakeupHandler);
@ -84,11 +85,12 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
record.setExpiration(new Date(DateUtil.currentGMTTime().getTime() + expirationMs));
}
this.persist(record);
persist(record);
return record.getId();
}
public void disjoinJob(long jobId, long joinedJobId) {
@Override
public void disjoinJob(long jobId, long joinedJobId) {
SearchCriteria<AsyncJobJoinMapVO> sc = RecordSearch.create();
sc.setParameters("jobId", jobId);
sc.setParameters("joinJobId", joinedJobId);
@ -96,14 +98,16 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
this.expunge(sc);
}
public void disjoinAllJobs(long jobId) {
@Override
public void disjoinAllJobs(long jobId) {
SearchCriteria<AsyncJobJoinMapVO> sc = RecordSearchByOwner.create();
sc.setParameters("jobId", jobId);
this.expunge(sc);
}
public AsyncJobJoinMapVO getJoinRecord(long jobId, long joinJobId) {
@Override
public AsyncJobJoinMapVO getJoinRecord(long jobId, long joinJobId) {
SearchCriteria<AsyncJobJoinMapVO> sc = RecordSearch.create();
sc.setParameters("jobId", jobId);
sc.setParameters("joinJobId", joinJobId);
@ -117,14 +121,16 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
return null;
}
public List<AsyncJobJoinMapVO> listJoinRecords(long jobId) {
@Override
public List<AsyncJobJoinMapVO> listJoinRecords(long jobId) {
SearchCriteria<AsyncJobJoinMapVO> sc = RecordSearchByOwner.create();
sc.setParameters("jobId", jobId);
return this.listBy(sc);
}
public void completeJoin(long joinJobId, int joinStatus, String joinResult, long completeMsid) {
@Override
public void completeJoin(long joinJobId, JobInfo.Status joinStatus, String joinResult, long completeMsid) {
AsyncJobJoinMapVO record = createForUpdate();
record.setJoinStatus(joinStatus);
record.setJoinResult(joinResult);
@ -138,7 +144,8 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
update(ub, sc, null);
}
public List<Long> wakeupScan() {
@Override
public List<Long> wakeupScan() {
List<Long> standaloneList = new ArrayList<Long>();
Date cutDate = DateUtil.currentGMTTime();
@ -149,9 +156,9 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
txn.start();
//
// performance sensitive processing, do it in plain SQL
// performance sensitive processing, do it in plain SQL
//
String sql = "UPDATE async_job SET job_pending_signals=1 WHERE id IN " +
String sql = "UPDATE async_job SET job_pending_signals=1 WHERE id IN " +
"(SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ?)";
pstmt = txn.prepareStatement(sql);
pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
@ -159,7 +166,7 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
pstmt.executeUpdate();
pstmt.close();
sql = "UPDATE sync_queue_item SET queue_proc_msid=NULL, queue_proc_number=NULL WHERE content_id IN " +
sql = "UPDATE sync_queue_item SET queue_proc_msid=NULL, queue_proc_number=NULL WHERE content_id IN " +
"(SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ?)";
pstmt = txn.prepareStatement(sql);
pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
@ -194,7 +201,8 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
return standaloneList;
}
public List<Long> wakeupByJoinedJobCompletion(long joinedJobId) {
@Override
public List<Long> wakeupByJoinedJobCompletion(long joinedJobId) {
List<Long> standaloneList = new ArrayList<Long>();
Transaction txn = Transaction.currentTxn();
@ -203,16 +211,16 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
txn.start();
//
// performance sensitive processing, do it in plain SQL
// performance sensitive processing, do it in plain SQL
//
String sql = "UPDATE async_job SET job_pending_signals=1 WHERE id IN " +
String sql = "UPDATE async_job SET job_pending_signals=1 WHERE id IN " +
"(SELECT job_id FROM async_job_join_map WHERE join_job_id = ?)";
pstmt = txn.prepareStatement(sql);
pstmt.setLong(1, joinedJobId);
pstmt.executeUpdate();
pstmt.close();
sql = "UPDATE sync_queue_item SET queue_proc_msid=NULL, queue_proc_number=NULL WHERE content_id IN " +
sql = "UPDATE sync_queue_item SET queue_proc_msid=NULL, queue_proc_number=NULL WHERE content_id IN " +
"(SELECT job_id FROM async_job_join_map WHERE join_job_id = ?)";
pstmt = txn.prepareStatement(sql);
pstmt.setLong(1, joinedJobId);

View File

@ -20,6 +20,8 @@ import java.util.Date;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.EnumType;
import javax.persistence.Enumerated;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
@ -27,6 +29,8 @@ import javax.persistence.Table;
import javax.persistence.Temporal;
import javax.persistence.TemporalType;
import org.apache.cloudstack.jobs.JobInfo;
import com.cloud.utils.DateUtil;
import com.cloud.utils.db.GenericDao;
@ -45,7 +49,8 @@ public class AsyncJobJoinMapVO {
private long joinJobId;
@Column(name="join_status")
private int joinStatus;
@Enumerated(EnumType.ORDINAL)
private JobInfo.Status joinStatus;
@Column(name="join_result", length=1024)
private String joinResult;
@ -112,11 +117,11 @@ public class AsyncJobJoinMapVO {
this.joinJobId = joinJobId;
}
public int getJoinStatus() {
public JobInfo.Status getJoinStatus() {
return joinStatus;
}
public void setJoinStatus(int joinStatus) {
public void setJoinStatus(JobInfo.Status joinStatus) {
this.joinStatus = joinStatus;
}

View File

@ -22,13 +22,12 @@ import java.util.TimeZone;
import javax.management.StandardMBean;
import org.apache.cloudstack.framework.jobs.AsyncJob;
import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
import org.apache.cloudstack.framework.jobs.AsyncJobMBean;
import com.cloud.utils.DateUtil;
public class AsyncJobMBeanImpl extends StandardMBean implements AsyncJobMBean {
private AsyncJob _job;
private final AsyncJob _job;
public AsyncJobMBeanImpl(AsyncJob job) {
super(AsyncJobMBean.class, false);
@ -36,84 +35,100 @@ public class AsyncJobMBeanImpl extends StandardMBean implements AsyncJobMBean {
_job = job;
}
public long getAccountId() {
@Override
public long getAccountId() {
return _job.getAccountId();
}
public long getUserId() {
@Override
public long getUserId() {
return _job.getUserId();
}
public String getCmd() {
@Override
public String getCmd() {
return _job.getCmd();
}
public String getCmdInfo() {
@Override
public String getCmdInfo() {
return _job.getCmdInfo();
}
public String getStatus() {
int jobStatus = _job.getStatus();
switch(jobStatus) {
case AsyncJobConstants.STATUS_SUCCEEDED :
@Override
public String getStatus() {
switch (_job.getStatus()) {
case SUCCEEDED:
return "Completed";
case AsyncJobConstants.STATUS_IN_PROGRESS:
case IN_PROGRESS:
return "In preogress";
case AsyncJobConstants.STATUS_FAILED:
case FAILED:
return "failed";
case CANCELLED:
return "cancelled";
}
return "Unknow";
}
public int getProcessStatus() {
@Override
public int getProcessStatus() {
return _job.getProcessStatus();
}
public int getResultCode() {
@Override
public int getResultCode() {
return _job.getResultCode();
}
public String getResult() {
@Override
public String getResult() {
return _job.getResult();
}
public String getInstanceType() {
@Override
public String getInstanceType() {
if(_job.getInstanceType() != null)
return _job.getInstanceType().toString();
return "N/A";
}
public String getInstanceId() {
@Override
public String getInstanceId() {
if(_job.getInstanceId() != null)
return String.valueOf(_job.getInstanceId());
return "N/A";
}
public String getInitMsid() {
@Override
public String getInitMsid() {
if(_job.getInitMsid() != null) {
return String.valueOf(_job.getInitMsid());
}
return "N/A";
}
public String getCreateTime() {
@Override
public String getCreateTime() {
Date time = _job.getCreated();
if(time != null)
return DateUtil.getDateDisplayString(TimeZone.getDefault(), time);
return "N/A";
}
public String getLastUpdateTime() {
@Override
public String getLastUpdateTime() {
Date time = _job.getLastUpdated();
if(time != null)
return DateUtil.getDateDisplayString(TimeZone.getDefault(), time);
return "N/A";
}
public String getLastPollTime() {
@Override
public String getLastPollTime() {
Date time = _job.getLastPolled();
if(time != null)
@ -121,7 +136,8 @@ public class AsyncJobMBeanImpl extends StandardMBean implements AsyncJobMBean {
return "N/A";
}
public String getSyncQueueId() {
@Override
public String getSyncQueueId() {
SyncQueueItem item = _job.getSyncSource();
if(item != null && item.getQueueId() != null) {
return String.valueOf(item.getQueueId());
@ -129,7 +145,8 @@ public class AsyncJobMBeanImpl extends StandardMBean implements AsyncJobMBean {
return "N/A";
}
public String getSyncQueueContentType() {
@Override
public String getSyncQueueContentType() {
SyncQueueItem item = _job.getSyncSource();
if(item != null) {
return item.getContentType();
@ -137,7 +154,8 @@ public class AsyncJobMBeanImpl extends StandardMBean implements AsyncJobMBean {
return "N/A";
}
public String getSyncQueueContentId() {
@Override
public String getSyncQueueContentId() {
SyncQueueItem item = _job.getSyncSource();
if(item != null && item.getContentId() != null) {
return String.valueOf(item.getContentId());

View File

@ -23,6 +23,8 @@ import javax.persistence.Column;
import javax.persistence.DiscriminatorColumn;
import javax.persistence.DiscriminatorType;
import javax.persistence.Entity;
import javax.persistence.EnumType;
import javax.persistence.Enumerated;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
@ -44,11 +46,14 @@ import com.cloud.utils.db.GenericDao;
@Inheritance(strategy=InheritanceType.JOINED)
@DiscriminatorColumn(name="job_type", discriminatorType=DiscriminatorType.STRING, length=32)
public class AsyncJobVO implements AsyncJob, JobInfo {
public static final String JOB_DISPATCHER_PSEUDO = "pseudoJobDispatcher";
public static final String PSEUDO_JOB_INSTANCE_TYPE = "Thread";
@Id
@GeneratedValue(strategy=GenerationType.IDENTITY)
@Column(name="id")
private Long id = null;
private long id;
@Column(name="job_type", length=32)
protected String type;
@ -78,7 +83,8 @@ public class AsyncJobVO implements AsyncJob, JobInfo {
private String cmdInfo;
@Column(name="job_status")
private int status;
@Enumerated(value = EnumType.ORDINAL)
private Status status;
@Column(name="job_process_status")
private int processStatus;
@ -145,7 +151,7 @@ public class AsyncJobVO implements AsyncJob, JobInfo {
return id;
}
public void setId(Long id) {
public void setId(long id) {
this.id = id;
}
@ -236,11 +242,11 @@ public class AsyncJobVO implements AsyncJob, JobInfo {
}
@Override
public int getStatus() {
public Status getStatus() {
return status;
}
public void setStatus(int status) {
public void setStatus(Status status) {
this.status = status;
}

View File

@ -33,9 +33,9 @@ import org.apache.cloudstack.api.ServerApiException;
import org.apache.cloudstack.api.response.ExceptionResponse;
import org.apache.cloudstack.context.CallContext;
import org.apache.cloudstack.framework.jobs.AsyncJob;
import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
import org.apache.cloudstack.framework.jobs.AsyncJobDispatcher;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
import org.apache.cloudstack.jobs.JobInfo;
import com.cloud.user.Account;
import com.cloud.user.dao.AccountDao;
@ -93,7 +93,7 @@ public class ApiAsyncJobDispatcher extends AdapterBase implements AsyncJobDispat
_dispatcher.dispatch(cmdObj, params, true);
// serialize this to the async job table
_asyncJobMgr.completeAsyncJob(job.getId(), AsyncJobConstants.STATUS_SUCCEEDED, 0, cmdObj.getResponseObject());
_asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, cmdObj.getResponseObject());
} finally {
CallContext.unregister();
}
@ -116,7 +116,7 @@ public class ApiAsyncJobDispatcher extends AdapterBase implements AsyncJobDispat
// FIXME: setting resultCode to ApiErrorCode.INTERNAL_ERROR is not right, usually executors have their exception handling
// and we need to preserve that as much as possible here
_asyncJobMgr.completeAsyncJob(job.getId(), AsyncJobConstants.STATUS_FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), response);
_asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), response);
}
}
}

View File

@ -605,7 +605,7 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer
if (response.getObjectId() != null && objectJobMap.containsKey(response.getObjectId())) {
AsyncJob job = objectJobMap.get(response.getObjectId());
response.setJobId(job.getUuid());
response.setJobStatus(job.getStatus());
response.setJobStatus(job.getStatus().ordinal());
}
}
}

View File

@ -38,9 +38,9 @@ import org.apache.log4j.Logger;
import org.apache.cloudstack.api.ApiErrorCode;
import org.apache.cloudstack.api.command.user.job.QueryAsyncJobResultCmd;
import org.apache.cloudstack.api.response.ExceptionResponse;
import org.apache.cloudstack.config.ConfigRepo;
import org.apache.cloudstack.context.CallContext;
import org.apache.cloudstack.framework.jobs.AsyncJob;
import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
import org.apache.cloudstack.framework.jobs.AsyncJobDispatcher;
import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
@ -60,11 +60,14 @@ import org.apache.cloudstack.framework.jobs.impl.SyncQueueVO;
import org.apache.cloudstack.framework.messagebus.MessageBus;
import org.apache.cloudstack.framework.messagebus.MessageDetector;
import org.apache.cloudstack.framework.messagebus.PublishScope;
import org.apache.cloudstack.jobs.JobInfo;
import org.apache.cloudstack.jobs.JobInfo.Status;
import com.cloud.api.ApiSerializerHelper;
import com.cloud.cluster.ClusterManager;
import com.cloud.cluster.ClusterManagerListener;
import com.cloud.cluster.ManagementServerHost;
import com.cloud.cluster.ManagementServerNode;
import com.cloud.configuration.Config;
import com.cloud.configuration.dao.ConfigurationDao;
import com.cloud.exception.InvalidParameterValueException;
@ -76,7 +79,6 @@ import com.cloud.utils.DateUtil;
import com.cloud.utils.NumbersUtil;
import com.cloud.utils.Predicate;
import com.cloud.utils.PropertiesUtil;
import com.cloud.utils.component.ComponentContext;
import com.cloud.utils.component.ManagerBase;
import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.db.DB;
@ -87,11 +89,16 @@ import com.cloud.utils.db.Transaction;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.utils.exception.ExceptionUtil;
import com.cloud.utils.mgmt.JmxUtil;
import com.cloud.utils.net.MacAddress;
public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, ClusterManagerListener {
public static final Logger s_logger = Logger.getLogger(AsyncJobManagerImpl.class);
private static final Logger s_logger = Logger.getLogger(AsyncJobManagerImpl.class);
private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3; // 3 seconds
private static final String JOB_POOL_THREAD_PREFIX = "Job-Executor";
// Although we may have detailed masks for each individual wakeup event, i.e.
// periodical timer, matched topic from message bus, it seems that we don't
// need to distinguish them to such level. Therefore, only one wakeup signal
// is defined
public static final int SIGNAL_MASK_WAKEUP = 1;
private static final int MAX_ONETIME_SCHEDULE_SIZE = 50;
private static final int HEARTBEAT_INTERVAL = 2000;
@ -107,6 +114,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
@Inject private List<AsyncJobDispatcher> _jobDispatchers;
@Inject private MessageBus _messageBus;
@Inject private AsyncJobMonitor _jobMonitor;
@Inject
private ConfigRepo _configRepo;
private long _jobExpireSeconds = 86400; // 1 day
private long _jobCancelThresholdSeconds = 3600; // 1 hour (for cancelling the jobs blocking other jobs)
@ -128,15 +137,15 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
}
@Override @DB
public AsyncJob getPseudoJob() {
public AsyncJob getPseudoJob(long accountId, long userId) {
AsyncJobVO job = _jobDao.findPseudoJob(Thread.currentThread().getId(), getMsid());
if(job == null) {
job = new AsyncJobVO();
job.setAccountId(_accountMgr.getSystemAccount().getId());
job.setUserId(_accountMgr.getSystemUser().getId());
job.setAccountId(accountId);
job.setUserId(userId);
job.setInitMsid(getMsid());
job.setDispatcher(AsyncJobConstants.JOB_DISPATCHER_PSEUDO);
job.setInstanceType(AsyncJobConstants.PSEUDO_JOB_INSTANCE_TYPE);
job.setDispatcher(AsyncJobVO.JOB_DISPATCHER_PSEUDO);
job.setInstanceType(AsyncJobVO.PSEUDO_JOB_INSTANCE_TYPE);
job.setInstanceId(Thread.currentThread().getId());
_jobDao.persist(job);
}
@ -149,31 +158,19 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
}
@SuppressWarnings("unchecked")
@Override @DB
@DB
public long submitAsyncJob(AsyncJob job, boolean scheduleJobExecutionInContext) {
Transaction txt = Transaction.currentTxn();
try {
@SuppressWarnings("rawtypes")
GenericDao dao = GenericDaoBase.getDao(job.getClass());
txt.start();
job.setInitMsid(getMsid());
dao.persist(job);
txt.commit();
@SuppressWarnings("rawtypes")
GenericDao dao = GenericDaoBase.getDao(job.getClass());
job.setInitMsid(getMsid());
job.setSyncSource(null); // no sync source originally
dao.persist(job);
// no sync source originally
job.setSyncSource(null);
scheduleExecution(job, scheduleJobExecutionInContext);
if(s_logger.isDebugEnabled()) {
s_logger.debug("submit async job-" + job.getId() + ", details: " + job.toString());
}
return job.getId();
} catch(Exception e) {
txt.rollback();
String errMsg = "Unable to schedule async job for command " + job.getCmd() + ", unexpected exception.";
s_logger.warn(errMsg, e);
throw new CloudRuntimeException(errMsg);
scheduleExecution(job, scheduleJobExecutionInContext);
if (s_logger.isDebugEnabled()) {
s_logger.debug("submit async job-" + job.getId() + ", details: " + job.toString());
}
return job.getId();
}
@SuppressWarnings("unchecked")
@ -199,7 +196,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
}
@Override @DB
public void completeAsyncJob(long jobId, int jobStatus, int resultCode, Object resultObject) {
public void completeAsyncJob(long jobId, Status jobStatus, int resultCode, Object resultObject) {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Complete async job-" + jobId + ", jobStatus: " + jobStatus +
", resultCode: " + resultCode + ", result: " + resultObject);
@ -219,7 +216,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
return;
}
if(job.getStatus() != AsyncJobConstants.STATUS_IN_PROGRESS) {
if(job.getStatus() != JobInfo.Status.IN_PROGRESS) {
if(s_logger.isDebugEnabled()) {
s_logger.debug("job-" + jobId + " is already completed.");
}
@ -251,7 +248,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
for(Long id : wakeupList) {
// TODO, we assume that all jobs in this category is API job only
AsyncJobVO jobToWakeup = _jobDao.findById(id);
if(jobToWakeup != null && (jobToWakeup.getPendingSignals() & AsyncJobConstants.SIGNAL_MASK_WAKEUP) != 0)
if (jobToWakeup != null && (jobToWakeup.getPendingSignals() & SIGNAL_MASK_WAKEUP) != 0)
scheduleExecution(jobToWakeup, false);
}
@ -358,7 +355,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
}
@Override @DB
public void completeJoin(long joinJobId, int joinStatus, String joinResult) {
public void completeJoin(long joinJobId, JobInfo.Status joinStatus, String joinResult) {
_joinMapDao.completeJoin(joinJobId, joinStatus, joinResult, getMsid());
}
@ -436,8 +433,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
jobResult.setResultCode(job.getResultCode());
jobResult.setUuid(job.getUuid());
if(job.getStatus() == AsyncJobConstants.STATUS_SUCCEEDED ||
job.getStatus() == AsyncJobConstants.STATUS_FAILED) {
if(job.getStatus() == JobInfo.Status.SUCCEEDED ||
job.getStatus() == JobInfo.Status.FAILED) {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Async job-" + jobId + " completed");
@ -451,14 +448,14 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
s_logger.debug("Async job-" + jobId + " does not exist, invalid job id?");
}
jobResult.setJobStatus(AsyncJobConstants.STATUS_FAILED);
jobResult.setJobStatus(JobInfo.Status.FAILED);
jobResult.setResult("job-" + jobId + " does not exist");
}
txt.commit();
} catch(Exception e) {
s_logger.error("Unexpected exception while querying async job-" + jobId + " status: ", e);
jobResult.setJobStatus(AsyncJobConstants.STATUS_FAILED);
jobResult.setJobStatus(JobInfo.Status.FAILED);
jobResult.setResult("Exception: " + e.toString());
txt.rollback();
}
@ -475,7 +472,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
}
private void scheduleExecution(final AsyncJob job, boolean executeInContext) {
Runnable runnable = getExecutorRunnable(this, job);
Runnable runnable = getExecutorRunnable(job);
if (executeInContext) {
runnable.run();
} else {
@ -516,7 +513,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
}
}
private Runnable getExecutorRunnable(final AsyncJobManager mgr, final AsyncJob job) {
private Runnable getExecutorRunnable(final AsyncJob job) {
return new Runnable() {
@Override
public void run() {
@ -539,20 +536,19 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
}
_jobMonitor.registerActiveTask(runNumber, job.getId());
AsyncJobExecutionContext.setCurrentExecutionContext((AsyncJobExecutionContext)ComponentContext.inject(new AsyncJobExecutionContext(job))
);
AsyncJobExecutionContext.setCurrentExecutionContext(new AsyncJobExecutionContext(job));
// execute the job
if(s_logger.isDebugEnabled()) {
s_logger.debug("Executing " + job);
}
if((getAndResetPendingSignals(job) & AsyncJobConstants.SIGNAL_MASK_WAKEUP) != 0) {
if ((getAndResetPendingSignals(job) & SIGNAL_MASK_WAKEUP) != 0) {
AsyncJobDispatcher jobDispatcher = getWakeupDispatcher(job);
if(jobDispatcher != null) {
jobDispatcher.runJob(job);
} else {
s_logger.error("Unable to find a wakeup dispatcher from the joined job. job-" + job.getId());
s_logger.error("Unable to find a wakeup dispatcher from the joined job: " + job);
}
} else {
AsyncJobDispatcher jobDispatcher = getDispatcher(job.getDispatcher());
@ -560,7 +556,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
jobDispatcher.runJob(job);
} else {
s_logger.error("Unable to find job dispatcher, job will be cancelled");
completeAsyncJob(job.getId(), AsyncJobConstants.STATUS_FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), null);
completeAsyncJob(job.getId(), JobInfo.Status.FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), null);
}
}
@ -570,7 +566,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
} catch (Throwable e) {
s_logger.error("Unexpected exception", e);
completeAsyncJob(job.getId(), AsyncJobConstants.STATUS_FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), null);
completeAsyncJob(job.getId(), JobInfo.Status.FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), null);
} finally {
// guard final clause as well
try {
@ -598,7 +594,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
//
// clean execution environment
//
AsyncJobExecutionContext.setCurrentExecutionContext(null);
AsyncJobExecutionContext.unregister();
_jobMonitor.unregisterActiveTask(runNumber);
} catch(Throwable e) {
@ -727,7 +723,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
for(Long jobId : standaloneWakeupJobs) {
// TODO, we assume that all jobs in this category is API job only
AsyncJobVO job = _jobDao.findById(jobId);
if(job != null && (job.getPendingSignals() & AsyncJobConstants.SIGNAL_MASK_WAKEUP) != 0)
if (job != null && (job.getPendingSignals() & SIGNAL_MASK_WAKEUP) != 0)
scheduleExecution(job, false);
}
} catch(Throwable e) {
@ -789,7 +785,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
if(blockItems != null && blockItems.size() > 0) {
for(SyncQueueItemVO item : blockItems) {
if(item.getContentType().equalsIgnoreCase(SyncQueueItem.AsyncJobContentType)) {
completeAsyncJob(item.getContentId(), AsyncJobConstants.STATUS_FAILED, 0,
completeAsyncJob(item.getContentId(), JobInfo.Status.FAILED, 0,
getResetResultResponse("Job is cancelled as it has been blocking others for too long"));
}
@ -819,11 +815,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
}
private long getMsid() {
if(_clusterMgr != null) {
return _clusterMgr.getManagementNodeId();
}
return MacAddress.getMacAddress().toLong();
return ManagementServerNode.getManagementServerId();
}
private void cleanupPendingJobs(List<SyncQueueItemVO> l) {
@ -838,7 +830,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
Long jobId = item.getContentId();
if(jobId != null) {
s_logger.warn("Mark job as failed as its correspoding queue-item has been discarded. job id: " + jobId);
completeAsyncJob(jobId, AsyncJobConstants.STATUS_FAILED, 0, getResetResultResponse("Execution was cancelled because of server shutdown"));
completeAsyncJob(jobId, JobInfo.Status.FAILED, 0, getResetResultResponse("Execution was cancelled because of server shutdown"));
}
}
_queueMgr.purgeItem(item.getId());
@ -864,7 +856,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
int poolSize = (cloudMaxActive * 2) / 3;
s_logger.info("Start AsyncJobManager thread pool in size " + poolSize);
_executor = Executors.newFixedThreadPool(poolSize, new NamedThreadFactory(AsyncJobConstants.JOB_POOL_THREAD_PREFIX));
_executor = Executors.newFixedThreadPool(poolSize, new NamedThreadFactory(JOB_POOL_THREAD_PREFIX));
} catch (final Exception e) {
throw new ConfigurationException("Unable to load db.properties to configure AsyncJobManagerImpl");
}

View File

@ -16,14 +16,14 @@
// under the License.
package com.cloud.async;
import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
import org.apache.cloudstack.jobs.JobInfo;
import com.cloud.api.ApiSerializerHelper;
public class AsyncJobResult {
private long jobId;
private int jobStatus;
private JobInfo.Status jobStatus;
private int processStatus;
private int resultCode;
private String result;
@ -31,7 +31,7 @@ public class AsyncJobResult {
public AsyncJobResult(long jobId) {
this.jobId = jobId;
jobStatus = AsyncJobConstants.STATUS_IN_PROGRESS;
jobStatus = JobInfo.Status.IN_PROGRESS;
processStatus = 0;
resultCode = 0;
result = "";
@ -46,18 +46,18 @@ public class AsyncJobResult {
}
public String getUuid() {
return this.uuid;
return uuid;
}
public void setUuid(String uuid) {
this.uuid = uuid;
}
public int getJobStatus() {
public JobInfo.Status getJobStatus() {
return jobStatus;
}
public void setJobStatus(int jobStatus) {
public void setJobStatus(JobInfo.Status jobStatus) {
this.jobStatus = jobStatus;
}
@ -97,7 +97,7 @@ public class AsyncJobResult {
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("AsyncJobResult {jobId:").append(getJobId());
sb.append(", jobStatus: ").append(getJobStatus());
sb.append(", jobStatus: ").append(getJobStatus().ordinal());
sb.append(", processStatus: ").append(getProcessStatus());
sb.append(", resultCode: ").append(getResultCode());
sb.append(", result: ").append(result);

View File

@ -1728,11 +1728,11 @@ public class ConsoleProxyManagerImpl extends ManagerBase implements ConsoleProxy
// _itMgr.processVmStartWork(vm, ((VmWorkStart)work).getParams(),
// user, account, ((VmWorkStart)work).getPlan());
//
// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_SUCCEEDED, null);
// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.SUCCEEDED, null);
// } catch(Exception e) {
// s_logger.error("Exception in process VM-start work", e);
// String result = SerializerHelper.toObjectSerializedString(e);
// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_FAILED, result);
// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.FAILED, result);
// }
// }
//
@ -1748,11 +1748,11 @@ public class ConsoleProxyManagerImpl extends ManagerBase implements ConsoleProxy
// try {
// _itMgr.processVmStopWork(vm, ((VmWorkStop)work).isForceStop(), user, account);
//
// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_SUCCEEDED, null);
// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.SUCCEEDED, null);
// } catch(Exception e) {
// s_logger.error("Exception in process VM-stop work", e);
// String result = SerializerHelper.toObjectSerializedString(e);
// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_FAILED, result);
// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.FAILED, result);
// }
// }
}

View File

@ -3469,7 +3469,7 @@ public class ManagementServerImpl extends ManagerBase implements ManagementServe
if (asyncExecutionContext != null) {
job = asyncExecutionContext.getJob();
_asyncMgr.updateAsyncJobAttachment(job.getId(), Upload.Type.VOLUME.toString(), volumeId);
_asyncMgr.updateAsyncJobStatus(job.getId(), AsyncJobConstants.STATUS_IN_PROGRESS, resultObj);
_asyncMgr.updateAsyncJobStatus(job.getId(), JobInfo.Status.IN_PROGRESS, resultObj);
}
String value = _configs.get(Config.CopyVolumeWait.toString());
int copyvolumewait = NumbersUtil.parseInt(value, Integer.parseInt(Config.CopyVolumeWait.getDefaultValue()));
@ -3490,7 +3490,7 @@ public class ManagementServerImpl extends ManagerBase implements ManagementServe
resultObj.setResultString(errorString);
resultObj.setUploadStatus(UploadVO.Status.COPY_ERROR.toString());
if (asyncExecutionContext != null) {
_asyncMgr.completeAsyncJob(job.getId(), AsyncJobConstants.STATUS_FAILED, 0, resultObj);
_asyncMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, resultObj);
}
// Update the DB that volume couldn't be copied

View File

@ -1462,11 +1462,11 @@ public class SecondaryStorageManagerImpl extends ManagerBase implements Secondar
// _itMgr.processVmStartWork(vm, ((VmWorkStart)work).getParams(),
// user, account, ((VmWorkStart)work).getPlan());
//
// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_SUCCEEDED, null);
// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.SUCCEEDED, null);
// } catch(Exception e) {
// s_logger.error("Exception in process VM-start work", e);
// String result = SerializerHelper.toObjectSerializedString(e);
// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_FAILED, result);
// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.FAILED, result);
// }
// }
//
@ -1482,11 +1482,11 @@ public class SecondaryStorageManagerImpl extends ManagerBase implements Secondar
// try {
// _itMgr.processVmStopWork(vm, ((VmWorkStop)work).isForceStop(), user, account);
//
// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_SUCCEEDED, null);
// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.SUCCEEDED, null);
// } catch(Exception e) {
// s_logger.error("Exception in process VM-stop work", e);
// String result = SerializerHelper.toObjectSerializedString(e);
// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_FAILED, result);
// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.FAILED, result);
// }
// }
}

View File

@ -143,14 +143,14 @@ public class SnapshotSchedulerImpl extends ManagerBase implements SnapshotSchedu
Long asyncJobId = snapshotSchedule.getAsyncJobId();
AsyncJobVO asyncJob = _asyncJobDao.findById(asyncJobId);
switch (asyncJob.getStatus()) {
case AsyncJobConstants.STATUS_SUCCEEDED:
case JobInfo.Status.SUCCEEDED:
// The snapshot has been successfully backed up.
// The snapshot state has also been cleaned up.
// We can schedule the next job for this snapshot.
// Remove the existing entry in the snapshot_schedule table.
scheduleNextSnapshotJob(snapshotSchedule);
break;
case AsyncJobConstants.STATUS_FAILED:
case JobInfo.Status.FAILED:
// Check the snapshot status.
Long snapshotId = snapshotSchedule.getSnapshotId();
if (snapshotId == null) {
@ -188,7 +188,7 @@ public class SnapshotSchedulerImpl extends ManagerBase implements SnapshotSchedu
}
break;
case AsyncJobConstants.STATUS_IN_PROGRESS:
case JobInfo.Status.IN_PROGRESS:
// There is no way of knowing from here whether
// 1) Another management server is processing this snapshot job
// 2) The management server has crashed and this snapshot is lying

View File

@ -365,7 +365,7 @@ public class UploadListener implements Listener {
resultObj.setResultString(uploadErrorString);
resultObj.setState(state.toString());
asyncMgr.updateAsyncJobAttachment(asyncJobId, type.toString(), 1L);
asyncMgr.updateAsyncJobStatus(asyncJobId, AsyncJobConstants.STATUS_IN_PROGRESS, resultObj);
asyncMgr.updateAsyncJobStatus(asyncJobId, JobInfo.Status.IN_PROGRESS, resultObj);
UploadVO vo = uploadDao.createForUpdate();
vo.setUploadState(state);
@ -378,7 +378,7 @@ public class UploadListener implements Listener {
resultObj.setResultString(uploadErrorString);
resultObj.setState(state.toString());
asyncMgr.updateAsyncJobAttachment(asyncJobId, type.toString(), 1L);
asyncMgr.updateAsyncJobStatus(asyncJobId, AsyncJobConstants.STATUS_IN_PROGRESS, resultObj);
asyncMgr.updateAsyncJobStatus(asyncJobId, JobInfo.Status.IN_PROGRESS, resultObj);
UploadVO vo = uploadDao.createForUpdate();
@ -407,12 +407,12 @@ public class UploadListener implements Listener {
if (answer.getUploadStatus() == Status.UPLOAD_IN_PROGRESS){
asyncMgr.updateAsyncJobAttachment(asyncJobId, type.toString(), 1L);
asyncMgr.updateAsyncJobStatus(asyncJobId, AsyncJobConstants.STATUS_IN_PROGRESS, resultObj);
asyncMgr.updateAsyncJobStatus(asyncJobId, JobInfo.Status.IN_PROGRESS, resultObj);
}else if(answer.getUploadStatus() == Status.UPLOADED){
resultObj.setResultString("Success");
asyncMgr.completeAsyncJob(asyncJobId, AsyncJobConstants.STATUS_SUCCEEDED, 1, resultObj);
asyncMgr.completeAsyncJob(asyncJobId, JobInfo.Status.SUCCEEDED, 1, resultObj);
}else{
asyncMgr.completeAsyncJob(asyncJobId, AsyncJobConstants.STATUS_FAILED, 2, resultObj);
asyncMgr.completeAsyncJob(asyncJobId, JobInfo.Status.FAILED, 2, resultObj);
}
UploadVO updateBuilder = uploadDao.createForUpdate();
updateBuilder.setUploadPercent(answer.getUploadPct());

View File

@ -71,8 +71,8 @@ public class SystemVmLoadScanner<T> {
@Override
public void run() {
try {
CallContext.registerSystemCallContextOnceOnly();
AsyncJobExecutionContext.registerPseudoExecutionContext();
CallContext cc = CallContext.registerSystemCallContextOnceOnly();
AsyncJobExecutionContext.registerPseudoExecutionContext(cc.getCallingAccountId(), cc.getCallingUserId());
} catch (Exception e) {
s_logger.fatal("Unable to start the capacity scan task", e);
System.exit(1);

View File

@ -48,10 +48,13 @@ import org.apache.cloudstack.framework.jobs.impl.SyncQueueItemVO;
import org.apache.cloudstack.framework.jobs.impl.SyncQueueVO;
import org.apache.cloudstack.framework.messagebus.MessageBus;
import org.apache.cloudstack.framework.messagebus.PublishScope;
import org.apache.cloudstack.jobs.JobInfo;
import com.cloud.cluster.ClusterManager;
import com.cloud.user.Account;
import com.cloud.user.AccountManager;
import com.cloud.user.AccountVO;
import com.cloud.user.User;
import com.cloud.user.UserVO;
import com.cloud.utils.Predicate;
import com.cloud.utils.component.ComponentContext;
@ -142,21 +145,21 @@ public class TestAsyncJobManager extends TestCase {
AsyncJobJoinMapVO record = joinMapDao.getJoinRecord(2, 1);
Assert.assertTrue(record != null);
Assert.assertTrue(record.getJoinMsid() == 100);
Assert.assertTrue(record.getJoinStatus() == AsyncJobConstants.STATUS_IN_PROGRESS);
Assert.assertTrue(record.getJoinStatus() == JobInfo.Status.IN_PROGRESS);
joinMapDao.completeJoin(1, AsyncJobConstants.STATUS_SUCCEEDED, "Done", 101);
joinMapDao.completeJoin(1, JobInfo.Status.SUCCEEDED, "Done", 101);
record = joinMapDao.getJoinRecord(2, 1);
Assert.assertTrue(record != null);
Assert.assertTrue(record.getJoinMsid() == 100);
Assert.assertTrue(record.getJoinStatus() == AsyncJobConstants.STATUS_SUCCEEDED);
Assert.assertTrue(record.getJoinStatus() == JobInfo.Status.SUCCEEDED);
Assert.assertTrue(record.getJoinResult().equals("Done"));
Assert.assertTrue(record.getCompleteMsid() == 101);
record = joinMapDao.getJoinRecord(3, 1);
Assert.assertTrue(record != null);
Assert.assertTrue(record.getJoinMsid() == 100);
Assert.assertTrue(record.getJoinStatus() == AsyncJobConstants.STATUS_SUCCEEDED);
Assert.assertTrue(record.getJoinStatus() == JobInfo.Status.SUCCEEDED);
Assert.assertTrue(record.getJoinResult().equals("Done"));
Assert.assertTrue(record.getCompleteMsid() == 101);
@ -198,7 +201,7 @@ public class TestAsyncJobManager extends TestCase {
@Test
public void testPseudoJob() {
AsyncJob job = asyncMgr.getPseudoJob();
AsyncJob job = asyncMgr.getPseudoJob(Account.ACCOUNT_ID_SYSTEM, User.UID_SYSTEM);
Assert.assertTrue(job.getInstanceType().equals(AsyncJobConstants.PSEUDO_JOB_INSTANCE_TYPE));
Assert.assertTrue(job.getInstanceId().longValue() == Thread.currentThread().getId());
}

View File

@ -316,7 +316,7 @@ public class VmWorkMockVirtualMachineManagerImpl implements VirtualMachineManage
if(wakeupCount++ < 3) {
AsyncJobExecutionContext.getCurrentExecutionContext().resetSyncSource();
} else {
AsyncJobExecutionContext.getCurrentExecutionContext().completeAsyncJob(AsyncJobConstants.STATUS_SUCCEEDED, 0, null);
AsyncJobExecutionContext.getCurrentExecutionContext().completeAsyncJob(JobInfo.Status.SUCCEEDED, 0, null);
}
}

View File

@ -20,7 +20,7 @@ public class VmWorkTestWorkJobDispatcher extends AdapterBase implements AsyncJob
} catch (InterruptedException e) {
}
AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_SUCCEEDED, null);
AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.SUCCEEDED, null);
s_logger.info("End work job execution. job-" + job.getId());
}
}