From ea6ca5ff5c928a61c024c242f07cc1f813e2f616 Mon Sep 17 00:00:00 2001 From: Alex Huang Date: Fri, 14 Jun 2013 19:21:25 -0700 Subject: [PATCH] Removed AsyncJobConstant --- .../apache/cloudstack/api/ResponseObject.java | 1 + .../org/apache/cloudstack/jobs/JobInfo.java | 8 +- .../cloud/vm/VirtualMachineManagerImpl.java | 4 +- .../src/com/cloud/vm/VmWorkJobDispatcher.java | 4 +- .../cloudstack/vm/jobs/VmWorkJobDaoImpl.java | 2 +- .../cloudstack/framework/jobs/AsyncJob.java | 2 +- .../framework/jobs/AsyncJobConstants.java | 34 ------ .../jobs/AsyncJobExecutionContext.java | 25 ++--- .../framework/jobs/AsyncJobManager.java | 10 +- .../framework/jobs/dao/AsyncJobDaoImpl.java | 12 +- .../jobs/dao/AsyncJobJoinMapDao.java | 5 +- .../jobs/dao/AsyncJobJoinMapDaoImpl.java | 50 +++++---- .../jobs/impl/AsyncJobJoinMapVO.java | 11 +- .../jobs/impl/AsyncJobMBeanImpl.java | 66 +++++++---- .../framework/jobs/impl/AsyncJobVO.java | 16 ++- .../com/cloud/api/ApiAsyncJobDispatcher.java | 6 +- server/src/com/cloud/api/ApiServer.java | 2 +- .../com/cloud/async/AsyncJobManagerImpl.java | 106 ++++++++---------- .../src/com/cloud/async/AsyncJobResult.java | 14 +-- .../consoleproxy/ConsoleProxyManagerImpl.java | 8 +- .../cloud/server/ManagementServerImpl.java | 4 +- .../SecondaryStorageManagerImpl.java | 8 +- .../snapshot/SnapshotSchedulerImpl.java | 6 +- .../cloud/storage/upload/UploadListener.java | 10 +- .../src/com/cloud/vm/SystemVmLoadScanner.java | 4 +- .../com/cloud/async/TestAsyncJobManager.java | 13 ++- .../VmWorkMockVirtualMachineManagerImpl.java | 2 +- .../cloud/vm/VmWorkTestWorkJobDispatcher.java | 2 +- 28 files changed, 214 insertions(+), 221 deletions(-) delete mode 100644 framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobConstants.java diff --git a/api/src/org/apache/cloudstack/api/ResponseObject.java b/api/src/org/apache/cloudstack/api/ResponseObject.java index c8bd45727c9..b69615984b5 100644 --- a/api/src/org/apache/cloudstack/api/ResponseObject.java +++ b/api/src/org/apache/cloudstack/api/ResponseObject.java @@ -16,6 +16,7 @@ // under the License. package org.apache.cloudstack.api; + public interface ResponseObject { /** * Get the name of the API response diff --git a/api/src/org/apache/cloudstack/jobs/JobInfo.java b/api/src/org/apache/cloudstack/jobs/JobInfo.java index bce96271770..6a531238598 100644 --- a/api/src/org/apache/cloudstack/jobs/JobInfo.java +++ b/api/src/org/apache/cloudstack/jobs/JobInfo.java @@ -22,6 +22,12 @@ import org.apache.cloudstack.api.Identity; import org.apache.cloudstack.api.InternalIdentity; public interface JobInfo extends Identity, InternalIdentity { + public enum Status { + IN_PROGRESS, + SUCCEEDED, + FAILED, + CANCELLED; + } String getType(); @@ -39,7 +45,7 @@ public interface JobInfo extends Identity, InternalIdentity { String getCmdInfo(); - int getStatus(); + Status getStatus(); int getProcessStatus(); diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java index 4fee49ebe38..0f710d10d67 100755 --- a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java +++ b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java @@ -725,7 +725,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac return true; VmWorkJobVO workJob = _workJobDao.findById(jobId); - if(workJob.getStatus() != AsyncJobConstants.STATUS_IN_PROGRESS) + if(workJob.getStatus() != JobInfo.Status.IN_PROGRESS) return true; return false; @@ -1152,7 +1152,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac return true; VmWorkJobVO workJob = _workJobDao.findById(jobId); - if(workJob.getStatus() != AsyncJobConstants.STATUS_IN_PROGRESS) + if(workJob.getStatus() != JobInfo.Status.IN_PROGRESS) return true; return false; diff --git a/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java b/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java index 8c7fd9cbf73..7819c1ab0b6 100644 --- a/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java +++ b/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java @@ -99,10 +99,10 @@ public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatch VmWorkStop stop = (VmWorkStop)work; _vmMgr.orchestrateStop(vm.getUuid(), stop.isForceStop()); } - _asyncJobMgr.completeAsyncJob(job.getId(), AsyncJobConstants.STATUS_SUCCEEDED, 0, null); + _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.SUCCEEDED, 0, null); } catch(Throwable e) { s_logger.error("Unable to complete " + job, e); - _asyncJobMgr.completeAsyncJob(job.getId(), AsyncJobConstants.STATUS_FAILED, 0, e.getMessage()); + _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, e.getMessage()); } finally { CallContext.unregister(); } diff --git a/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobDaoImpl.java b/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobDaoImpl.java index 4ece4e82bc8..0135d813801 100644 --- a/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobDaoImpl.java +++ b/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobDaoImpl.java @@ -111,7 +111,7 @@ public class VmWorkJobDaoImpl extends GenericDaoBase implemen public void expungeCompletedWorkJobs(Date cutDate) { SearchCriteria sc = ExpungeWorkJobSearch.create(); sc.setParameters("lastUpdated",cutDate); - sc.setParameters("status", AsyncJobConstants.STATUS_IN_PROGRESS); + sc.setParameters("status", JobInfo.Status.IN_PROGRESS); expunge(sc); } diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java index dfb67f8c053..be92846612d 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java @@ -57,7 +57,7 @@ public interface AsyncJob extends JobInfo { String getCmdInfo(); @Override - int getStatus(); + Status getStatus(); @Override int getProcessStatus(); diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobConstants.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobConstants.java deleted file mode 100644 index 9568eb47a14..00000000000 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobConstants.java +++ /dev/null @@ -1,34 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -package org.apache.cloudstack.framework.jobs; - -public interface AsyncJobConstants { - public static final int STATUS_IN_PROGRESS = 0; - public static final int STATUS_SUCCEEDED = 1; - public static final int STATUS_FAILED = 2; - - public static final String JOB_DISPATCHER_PSEUDO = "pseudoJobDispatcher"; - public static final String PSEUDO_JOB_INSTANCE_TYPE = "Thread"; - - public static final String JOB_POOL_THREAD_PREFIX = "Job-Executor"; - - // Although we may have detailed masks for each individual wakeup event, i.e. - // periodical timer, matched topic from message bus, it seems that we don't - // need to distinguish them to such level. Therefore, only one wakeup signal - // is defined - public static final int SIGNAL_MASK_WAKEUP = 1; -} diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java index 3d5c3268778..ef0a4a69bab 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java @@ -22,11 +22,11 @@ import org.apache.cloudstack.framework.jobs.dao.AsyncJobJoinMapDao; import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO; import org.apache.cloudstack.framework.jobs.impl.JobSerializerHelper; import org.apache.cloudstack.framework.jobs.impl.SyncQueueItem; +import org.apache.cloudstack.jobs.JobInfo; import com.cloud.exception.ConcurrentOperationException; import com.cloud.exception.InsufficientCapacityException; import com.cloud.exception.ResourceUnavailableException; -import com.cloud.utils.component.ComponentContext; public class AsyncJobExecutionContext { private AsyncJob _job; @@ -57,10 +57,6 @@ public class AsyncJobExecutionContext { } public AsyncJob getJob() { - if(_job == null) { - _job = _jobMgr.getPseudoJob(); - } - return _job; } @@ -68,7 +64,7 @@ public class AsyncJobExecutionContext { _job = job; } - public void completeAsyncJob(int jobStatus, int resultCode, Object resultObject) { + public void completeAsyncJob(JobInfo.Status jobStatus, int resultCode, Object resultObject) { assert(_job != null); _jobMgr.completeAsyncJob(_job.getId(), jobStatus, resultCode, resultObject); } @@ -114,7 +110,7 @@ public class AsyncJobExecutionContext { assert(_job != null); AsyncJobJoinMapVO record = _joinMapDao.getJoinRecord(_job.getId(), joinedJobId); - if(record.getJoinStatus() == AsyncJobConstants.STATUS_FAILED && record.getJoinResult() != null) { + if(record.getJoinStatus() == JobInfo.Status.FAILED && record.getJoinResult() != null) { Object exception = JobSerializerHelper.fromObjectSerializedString(record.getJoinResult()); if(exception != null && exception instanceof Exception) { if(exception instanceof InsufficientCapacityException) @@ -131,12 +127,12 @@ public class AsyncJobExecutionContext { _jobMgr.disjoinJob(_job.getId(), joinedJobId); } - public void completeJoin(int joinStatus, String joinResult) { + public void completeJoin(JobInfo.Status joinStatus, String joinResult) { assert(_job != null); _jobMgr.completeJoin(_job.getId(), joinStatus, joinResult); } - public void completeJobAndJoin(int joinStatus, String joinResult) { + public void completeJobAndJoin(JobInfo.Status joinStatus, String joinResult) { assert(_job != null); _jobMgr.completeJoin(_job.getId(), joinStatus, joinResult); _jobMgr.completeAsyncJob(_job.getId(), joinStatus, 0, null); @@ -144,21 +140,14 @@ public class AsyncJobExecutionContext { public static AsyncJobExecutionContext getCurrentExecutionContext() { AsyncJobExecutionContext context = s_currentExectionContext.get(); - if(context == null) { - context = new AsyncJobExecutionContext(); - context = ComponentContext.inject(context); - context.getJob(); - setCurrentExecutionContext(context); - } - return context; } - public static AsyncJobExecutionContext registerPseudoExecutionContext() { + public static AsyncJobExecutionContext registerPseudoExecutionContext(long accountId, long userId) { AsyncJobExecutionContext context = s_currentExectionContext.get(); if (context == null) { context = new AsyncJobExecutionContext(); - context.getJob(); + context.setJob(_jobMgr.getPseudoJob(accountId, userId)); setCurrentExecutionContext(context); } diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java index e577fb048b4..31782682a35 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java @@ -20,6 +20,7 @@ import java.util.List; import org.apache.cloudstack.api.command.user.job.QueryAsyncJobResultCmd; import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO; +import org.apache.cloudstack.jobs.JobInfo; import com.cloud.utils.Predicate; import com.cloud.utils.component.Manager; @@ -31,12 +32,9 @@ public interface AsyncJobManager extends Manager { List findInstancePendingAsyncJobs(String instanceType, Long accountId); long submitAsyncJob(AsyncJob job); - long submitAsyncJob(AsyncJob job, boolean scheduleJobExecutionInContext); long submitAsyncJob(AsyncJob job, String syncObjType, long syncObjId); -// AsyncJobResult queryAsyncJobResult(long jobId); - - void completeAsyncJob(long jobId, int jobStatus, int resultCode, Object resultObject); + void completeAsyncJob(long jobId, JobInfo.Status jobStatus, int resultCode, Object resultObject); void updateAsyncJobStatus(long jobId, int processStatus, Object resultObject); void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId); void logJobJournal(long jobId, AsyncJob.JournalType journalType, String @@ -50,7 +48,7 @@ public interface AsyncJobManager extends Manager { * * @return pseudo job for the thread */ - AsyncJob getPseudoJob(); + AsyncJob getPseudoJob(long accountId, long userId); /** * Used by upper level job to wait for completion of a down-level job (usually VmWork jobs) @@ -101,7 +99,7 @@ public interface AsyncJobManager extends Manager { * for legacy code to work. To help pass exception object easier, we use * object-stream based serialization instead of GSON */ - void completeJoin(long joinJobId, int joinStatus, String joinResult); + void completeJoin(long joinJobId, JobInfo.Status joinStatus, String joinResult); void releaseSyncSource(); void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId, long queueSizeLimit); diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java index c30dbde0ff0..96775f715e2 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java @@ -23,8 +23,8 @@ import java.util.List; import org.apache.log4j.Logger; -import org.apache.cloudstack.framework.jobs.AsyncJobConstants; import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO; +import org.apache.cloudstack.jobs.JobInfo; import com.cloud.utils.db.DB; import com.cloud.utils.db.Filter; @@ -100,7 +100,7 @@ public class AsyncJobDaoImpl extends GenericDaoBase implements SearchCriteria sc = pendingAsyncJobSearch.create(); sc.setParameters("instanceType", instanceType); sc.setParameters("instanceId", instanceId); - sc.setParameters("status", AsyncJobConstants.STATUS_IN_PROGRESS); + sc.setParameters("status", JobInfo.Status.IN_PROGRESS); List l = listIncludingRemovedBy(sc); if(l != null && l.size() > 0) { @@ -121,7 +121,7 @@ public class AsyncJobDaoImpl extends GenericDaoBase implements if (accountId != null) { sc.setParameters("accountId", accountId); } - sc.setParameters("status", AsyncJobConstants.STATUS_IN_PROGRESS); + sc.setParameters("status", JobInfo.Status.IN_PROGRESS); return listBy(sc); } @@ -129,8 +129,8 @@ public class AsyncJobDaoImpl extends GenericDaoBase implements @Override public AsyncJobVO findPseudoJob(long threadId, long msid) { SearchCriteria sc = pseudoJobSearch.create(); - sc.setParameters("jobDispatcher", AsyncJobConstants.JOB_DISPATCHER_PSEUDO); - sc.setParameters("instanceType", AsyncJobConstants.PSEUDO_JOB_INSTANCE_TYPE); + sc.setParameters("jobDispatcher", AsyncJobVO.JOB_DISPATCHER_PSEUDO); + sc.setParameters("instanceType", AsyncJobVO.PSEUDO_JOB_INSTANCE_TYPE); sc.setParameters("instanceId", threadId); List result = listBy(sc); @@ -178,7 +178,7 @@ public class AsyncJobDaoImpl extends GenericDaoBase implements @Override @DB public void resetJobProcess(long msid, int jobResultCode, String jobResultMessage) { - String sql = "UPDATE async_job SET job_status=" + AsyncJobConstants.STATUS_FAILED + ", job_result_code=" + jobResultCode + String sql = "UPDATE async_job SET job_status=" + JobInfo.Status.FAILED + ", job_result_code=" + jobResultCode + ", job_result='" + jobResultMessage + "' where job_status=0 AND (job_complete_msid=? OR (job_complete_msid IS NULL AND job_init_msid=?))"; Transaction txn = Transaction.currentTxn(); diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java index a9e82a74715..9c993f13eb9 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java @@ -19,12 +19,13 @@ package org.apache.cloudstack.framework.jobs.dao; import java.util.List; import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO; +import org.apache.cloudstack.jobs.JobInfo; import com.cloud.utils.db.GenericDao; public interface AsyncJobJoinMapDao extends GenericDao { - Long joinJob(long jobId, long joinJobId, long joinMsid, + Long joinJob(long jobId, long joinJobId, long joinMsid, long wakeupIntervalMs, long expirationMs, Long syncSourceId, String wakeupHandler, String wakeupDispatcher); void disjoinJob(long jobId, long joinedJobId); @@ -33,7 +34,7 @@ public interface AsyncJobJoinMapDao extends GenericDao AsyncJobJoinMapVO getJoinRecord(long jobId, long joinJobId); List listJoinRecords(long jobId); - void completeJoin(long joinJobId, int joinStatus, String joinResult, long completeMsid); + void completeJoin(long joinJobId, JobInfo.Status joinStatus, String joinResult, long completeMsid); List wakeupScan(); List wakeupByJoinedJobCompletion(long joinedJobId); diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java index 4cc2218896a..60dea03a29d 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java @@ -26,23 +26,23 @@ import java.util.TimeZone; import org.apache.log4j.Logger; -import org.apache.cloudstack.framework.jobs.AsyncJobConstants; import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO; +import org.apache.cloudstack.jobs.JobInfo; import com.cloud.utils.DateUtil; import com.cloud.utils.db.GenericDaoBase; import com.cloud.utils.db.SearchBuilder; import com.cloud.utils.db.SearchCriteria; +import com.cloud.utils.db.SearchCriteria.Op; import com.cloud.utils.db.Transaction; import com.cloud.utils.db.UpdateBuilder; -import com.cloud.utils.db.SearchCriteria.Op; public class AsyncJobJoinMapDaoImpl extends GenericDaoBase implements AsyncJobJoinMapDao { public static final Logger s_logger = Logger.getLogger(AsyncJobJoinMapDaoImpl.class); - private final SearchBuilder RecordSearch; - private final SearchBuilder RecordSearchByOwner; - private final SearchBuilder CompleteJoinSearch; + private final SearchBuilder RecordSearch; + private final SearchBuilder RecordSearchByOwner; + private final SearchBuilder CompleteJoinSearch; private final SearchBuilder WakeupSearch; public AsyncJobJoinMapDaoImpl() { @@ -66,7 +66,8 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase sc = RecordSearch.create(); sc.setParameters("jobId", jobId); sc.setParameters("joinJobId", joinedJobId); @@ -96,14 +98,16 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase sc = RecordSearchByOwner.create(); sc.setParameters("jobId", jobId); this.expunge(sc); } - public AsyncJobJoinMapVO getJoinRecord(long jobId, long joinJobId) { + @Override + public AsyncJobJoinMapVO getJoinRecord(long jobId, long joinJobId) { SearchCriteria sc = RecordSearch.create(); sc.setParameters("jobId", jobId); sc.setParameters("joinJobId", joinJobId); @@ -117,14 +121,16 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase listJoinRecords(long jobId) { + @Override + public List listJoinRecords(long jobId) { SearchCriteria sc = RecordSearchByOwner.create(); sc.setParameters("jobId", jobId); return this.listBy(sc); } - public void completeJoin(long joinJobId, int joinStatus, String joinResult, long completeMsid) { + @Override + public void completeJoin(long joinJobId, JobInfo.Status joinStatus, String joinResult, long completeMsid) { AsyncJobJoinMapVO record = createForUpdate(); record.setJoinStatus(joinStatus); record.setJoinResult(joinResult); @@ -138,7 +144,8 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase wakeupScan() { + @Override + public List wakeupScan() { List standaloneList = new ArrayList(); Date cutDate = DateUtil.currentGMTTime(); @@ -149,9 +156,9 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase ?)"; pstmt = txn.prepareStatement(sql); pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); @@ -159,7 +166,7 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase ?)"; pstmt = txn.prepareStatement(sql); pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); @@ -194,7 +201,8 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase wakeupByJoinedJobCompletion(long joinedJobId) { + @Override + public List wakeupByJoinedJobCompletion(long joinedJobId) { List standaloneList = new ArrayList(); Transaction txn = Transaction.currentTxn(); @@ -203,16 +211,16 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase _jobDispatchers; @Inject private MessageBus _messageBus; @Inject private AsyncJobMonitor _jobMonitor; + @Inject + private ConfigRepo _configRepo; private long _jobExpireSeconds = 86400; // 1 day private long _jobCancelThresholdSeconds = 3600; // 1 hour (for cancelling the jobs blocking other jobs) @@ -128,15 +137,15 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, } @Override @DB - public AsyncJob getPseudoJob() { + public AsyncJob getPseudoJob(long accountId, long userId) { AsyncJobVO job = _jobDao.findPseudoJob(Thread.currentThread().getId(), getMsid()); if(job == null) { job = new AsyncJobVO(); - job.setAccountId(_accountMgr.getSystemAccount().getId()); - job.setUserId(_accountMgr.getSystemUser().getId()); + job.setAccountId(accountId); + job.setUserId(userId); job.setInitMsid(getMsid()); - job.setDispatcher(AsyncJobConstants.JOB_DISPATCHER_PSEUDO); - job.setInstanceType(AsyncJobConstants.PSEUDO_JOB_INSTANCE_TYPE); + job.setDispatcher(AsyncJobVO.JOB_DISPATCHER_PSEUDO); + job.setInstanceType(AsyncJobVO.PSEUDO_JOB_INSTANCE_TYPE); job.setInstanceId(Thread.currentThread().getId()); _jobDao.persist(job); } @@ -149,31 +158,19 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, } @SuppressWarnings("unchecked") - @Override @DB + @DB public long submitAsyncJob(AsyncJob job, boolean scheduleJobExecutionInContext) { - Transaction txt = Transaction.currentTxn(); - try { - @SuppressWarnings("rawtypes") - GenericDao dao = GenericDaoBase.getDao(job.getClass()); - - txt.start(); - job.setInitMsid(getMsid()); - dao.persist(job); - txt.commit(); + @SuppressWarnings("rawtypes") + GenericDao dao = GenericDaoBase.getDao(job.getClass()); + job.setInitMsid(getMsid()); + job.setSyncSource(null); // no sync source originally + dao.persist(job); - // no sync source originally - job.setSyncSource(null); - scheduleExecution(job, scheduleJobExecutionInContext); - if(s_logger.isDebugEnabled()) { - s_logger.debug("submit async job-" + job.getId() + ", details: " + job.toString()); - } - return job.getId(); - } catch(Exception e) { - txt.rollback(); - String errMsg = "Unable to schedule async job for command " + job.getCmd() + ", unexpected exception."; - s_logger.warn(errMsg, e); - throw new CloudRuntimeException(errMsg); + scheduleExecution(job, scheduleJobExecutionInContext); + if (s_logger.isDebugEnabled()) { + s_logger.debug("submit async job-" + job.getId() + ", details: " + job.toString()); } + return job.getId(); } @SuppressWarnings("unchecked") @@ -199,7 +196,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, } @Override @DB - public void completeAsyncJob(long jobId, int jobStatus, int resultCode, Object resultObject) { + public void completeAsyncJob(long jobId, Status jobStatus, int resultCode, Object resultObject) { if(s_logger.isDebugEnabled()) { s_logger.debug("Complete async job-" + jobId + ", jobStatus: " + jobStatus + ", resultCode: " + resultCode + ", result: " + resultObject); @@ -219,7 +216,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, return; } - if(job.getStatus() != AsyncJobConstants.STATUS_IN_PROGRESS) { + if(job.getStatus() != JobInfo.Status.IN_PROGRESS) { if(s_logger.isDebugEnabled()) { s_logger.debug("job-" + jobId + " is already completed."); } @@ -251,7 +248,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, for(Long id : wakeupList) { // TODO, we assume that all jobs in this category is API job only AsyncJobVO jobToWakeup = _jobDao.findById(id); - if(jobToWakeup != null && (jobToWakeup.getPendingSignals() & AsyncJobConstants.SIGNAL_MASK_WAKEUP) != 0) + if (jobToWakeup != null && (jobToWakeup.getPendingSignals() & SIGNAL_MASK_WAKEUP) != 0) scheduleExecution(jobToWakeup, false); } @@ -358,7 +355,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, } @Override @DB - public void completeJoin(long joinJobId, int joinStatus, String joinResult) { + public void completeJoin(long joinJobId, JobInfo.Status joinStatus, String joinResult) { _joinMapDao.completeJoin(joinJobId, joinStatus, joinResult, getMsid()); } @@ -436,8 +433,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, jobResult.setResultCode(job.getResultCode()); jobResult.setUuid(job.getUuid()); - if(job.getStatus() == AsyncJobConstants.STATUS_SUCCEEDED || - job.getStatus() == AsyncJobConstants.STATUS_FAILED) { + if(job.getStatus() == JobInfo.Status.SUCCEEDED || + job.getStatus() == JobInfo.Status.FAILED) { if(s_logger.isDebugEnabled()) { s_logger.debug("Async job-" + jobId + " completed"); @@ -451,14 +448,14 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, s_logger.debug("Async job-" + jobId + " does not exist, invalid job id?"); } - jobResult.setJobStatus(AsyncJobConstants.STATUS_FAILED); + jobResult.setJobStatus(JobInfo.Status.FAILED); jobResult.setResult("job-" + jobId + " does not exist"); } txt.commit(); } catch(Exception e) { s_logger.error("Unexpected exception while querying async job-" + jobId + " status: ", e); - jobResult.setJobStatus(AsyncJobConstants.STATUS_FAILED); + jobResult.setJobStatus(JobInfo.Status.FAILED); jobResult.setResult("Exception: " + e.toString()); txt.rollback(); } @@ -475,7 +472,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, } private void scheduleExecution(final AsyncJob job, boolean executeInContext) { - Runnable runnable = getExecutorRunnable(this, job); + Runnable runnable = getExecutorRunnable(job); if (executeInContext) { runnable.run(); } else { @@ -516,7 +513,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, } } - private Runnable getExecutorRunnable(final AsyncJobManager mgr, final AsyncJob job) { + private Runnable getExecutorRunnable(final AsyncJob job) { return new Runnable() { @Override public void run() { @@ -539,20 +536,19 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, } _jobMonitor.registerActiveTask(runNumber, job.getId()); - AsyncJobExecutionContext.setCurrentExecutionContext((AsyncJobExecutionContext)ComponentContext.inject(new AsyncJobExecutionContext(job)) - ); + AsyncJobExecutionContext.setCurrentExecutionContext(new AsyncJobExecutionContext(job)); // execute the job if(s_logger.isDebugEnabled()) { s_logger.debug("Executing " + job); } - if((getAndResetPendingSignals(job) & AsyncJobConstants.SIGNAL_MASK_WAKEUP) != 0) { + if ((getAndResetPendingSignals(job) & SIGNAL_MASK_WAKEUP) != 0) { AsyncJobDispatcher jobDispatcher = getWakeupDispatcher(job); if(jobDispatcher != null) { jobDispatcher.runJob(job); } else { - s_logger.error("Unable to find a wakeup dispatcher from the joined job. job-" + job.getId()); + s_logger.error("Unable to find a wakeup dispatcher from the joined job: " + job); } } else { AsyncJobDispatcher jobDispatcher = getDispatcher(job.getDispatcher()); @@ -560,7 +556,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, jobDispatcher.runJob(job); } else { s_logger.error("Unable to find job dispatcher, job will be cancelled"); - completeAsyncJob(job.getId(), AsyncJobConstants.STATUS_FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), null); + completeAsyncJob(job.getId(), JobInfo.Status.FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), null); } } @@ -570,7 +566,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, } catch (Throwable e) { s_logger.error("Unexpected exception", e); - completeAsyncJob(job.getId(), AsyncJobConstants.STATUS_FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), null); + completeAsyncJob(job.getId(), JobInfo.Status.FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), null); } finally { // guard final clause as well try { @@ -598,7 +594,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, // // clean execution environment // - AsyncJobExecutionContext.setCurrentExecutionContext(null); + AsyncJobExecutionContext.unregister(); _jobMonitor.unregisterActiveTask(runNumber); } catch(Throwable e) { @@ -727,7 +723,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, for(Long jobId : standaloneWakeupJobs) { // TODO, we assume that all jobs in this category is API job only AsyncJobVO job = _jobDao.findById(jobId); - if(job != null && (job.getPendingSignals() & AsyncJobConstants.SIGNAL_MASK_WAKEUP) != 0) + if (job != null && (job.getPendingSignals() & SIGNAL_MASK_WAKEUP) != 0) scheduleExecution(job, false); } } catch(Throwable e) { @@ -789,7 +785,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, if(blockItems != null && blockItems.size() > 0) { for(SyncQueueItemVO item : blockItems) { if(item.getContentType().equalsIgnoreCase(SyncQueueItem.AsyncJobContentType)) { - completeAsyncJob(item.getContentId(), AsyncJobConstants.STATUS_FAILED, 0, + completeAsyncJob(item.getContentId(), JobInfo.Status.FAILED, 0, getResetResultResponse("Job is cancelled as it has been blocking others for too long")); } @@ -819,11 +815,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, } private long getMsid() { - if(_clusterMgr != null) { - return _clusterMgr.getManagementNodeId(); - } - - return MacAddress.getMacAddress().toLong(); + return ManagementServerNode.getManagementServerId(); } private void cleanupPendingJobs(List l) { @@ -838,7 +830,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, Long jobId = item.getContentId(); if(jobId != null) { s_logger.warn("Mark job as failed as its correspoding queue-item has been discarded. job id: " + jobId); - completeAsyncJob(jobId, AsyncJobConstants.STATUS_FAILED, 0, getResetResultResponse("Execution was cancelled because of server shutdown")); + completeAsyncJob(jobId, JobInfo.Status.FAILED, 0, getResetResultResponse("Execution was cancelled because of server shutdown")); } } _queueMgr.purgeItem(item.getId()); @@ -864,7 +856,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, int poolSize = (cloudMaxActive * 2) / 3; s_logger.info("Start AsyncJobManager thread pool in size " + poolSize); - _executor = Executors.newFixedThreadPool(poolSize, new NamedThreadFactory(AsyncJobConstants.JOB_POOL_THREAD_PREFIX)); + _executor = Executors.newFixedThreadPool(poolSize, new NamedThreadFactory(JOB_POOL_THREAD_PREFIX)); } catch (final Exception e) { throw new ConfigurationException("Unable to load db.properties to configure AsyncJobManagerImpl"); } diff --git a/server/src/com/cloud/async/AsyncJobResult.java b/server/src/com/cloud/async/AsyncJobResult.java index 783655ec461..d71e64b0019 100644 --- a/server/src/com/cloud/async/AsyncJobResult.java +++ b/server/src/com/cloud/async/AsyncJobResult.java @@ -16,14 +16,14 @@ // under the License. package com.cloud.async; -import org.apache.cloudstack.framework.jobs.AsyncJobConstants; +import org.apache.cloudstack.jobs.JobInfo; import com.cloud.api.ApiSerializerHelper; public class AsyncJobResult { private long jobId; - private int jobStatus; + private JobInfo.Status jobStatus; private int processStatus; private int resultCode; private String result; @@ -31,7 +31,7 @@ public class AsyncJobResult { public AsyncJobResult(long jobId) { this.jobId = jobId; - jobStatus = AsyncJobConstants.STATUS_IN_PROGRESS; + jobStatus = JobInfo.Status.IN_PROGRESS; processStatus = 0; resultCode = 0; result = ""; @@ -46,18 +46,18 @@ public class AsyncJobResult { } public String getUuid() { - return this.uuid; + return uuid; } public void setUuid(String uuid) { this.uuid = uuid; } - public int getJobStatus() { + public JobInfo.Status getJobStatus() { return jobStatus; } - public void setJobStatus(int jobStatus) { + public void setJobStatus(JobInfo.Status jobStatus) { this.jobStatus = jobStatus; } @@ -97,7 +97,7 @@ public class AsyncJobResult { public String toString() { StringBuffer sb = new StringBuffer(); sb.append("AsyncJobResult {jobId:").append(getJobId()); - sb.append(", jobStatus: ").append(getJobStatus()); + sb.append(", jobStatus: ").append(getJobStatus().ordinal()); sb.append(", processStatus: ").append(getProcessStatus()); sb.append(", resultCode: ").append(getResultCode()); sb.append(", result: ").append(result); diff --git a/server/src/com/cloud/consoleproxy/ConsoleProxyManagerImpl.java b/server/src/com/cloud/consoleproxy/ConsoleProxyManagerImpl.java index f6bddf09639..513a71345ed 100755 --- a/server/src/com/cloud/consoleproxy/ConsoleProxyManagerImpl.java +++ b/server/src/com/cloud/consoleproxy/ConsoleProxyManagerImpl.java @@ -1728,11 +1728,11 @@ public class ConsoleProxyManagerImpl extends ManagerBase implements ConsoleProxy // _itMgr.processVmStartWork(vm, ((VmWorkStart)work).getParams(), // user, account, ((VmWorkStart)work).getPlan()); // -// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_SUCCEEDED, null); +// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.SUCCEEDED, null); // } catch(Exception e) { // s_logger.error("Exception in process VM-start work", e); // String result = SerializerHelper.toObjectSerializedString(e); -// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_FAILED, result); +// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.FAILED, result); // } // } // @@ -1748,11 +1748,11 @@ public class ConsoleProxyManagerImpl extends ManagerBase implements ConsoleProxy // try { // _itMgr.processVmStopWork(vm, ((VmWorkStop)work).isForceStop(), user, account); // -// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_SUCCEEDED, null); +// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.SUCCEEDED, null); // } catch(Exception e) { // s_logger.error("Exception in process VM-stop work", e); // String result = SerializerHelper.toObjectSerializedString(e); -// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_FAILED, result); +// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.FAILED, result); // } // } } diff --git a/server/src/com/cloud/server/ManagementServerImpl.java b/server/src/com/cloud/server/ManagementServerImpl.java index 6de88408c4c..9fc46458145 100755 --- a/server/src/com/cloud/server/ManagementServerImpl.java +++ b/server/src/com/cloud/server/ManagementServerImpl.java @@ -3469,7 +3469,7 @@ public class ManagementServerImpl extends ManagerBase implements ManagementServe if (asyncExecutionContext != null) { job = asyncExecutionContext.getJob(); _asyncMgr.updateAsyncJobAttachment(job.getId(), Upload.Type.VOLUME.toString(), volumeId); - _asyncMgr.updateAsyncJobStatus(job.getId(), AsyncJobConstants.STATUS_IN_PROGRESS, resultObj); + _asyncMgr.updateAsyncJobStatus(job.getId(), JobInfo.Status.IN_PROGRESS, resultObj); } String value = _configs.get(Config.CopyVolumeWait.toString()); int copyvolumewait = NumbersUtil.parseInt(value, Integer.parseInt(Config.CopyVolumeWait.getDefaultValue())); @@ -3490,7 +3490,7 @@ public class ManagementServerImpl extends ManagerBase implements ManagementServe resultObj.setResultString(errorString); resultObj.setUploadStatus(UploadVO.Status.COPY_ERROR.toString()); if (asyncExecutionContext != null) { - _asyncMgr.completeAsyncJob(job.getId(), AsyncJobConstants.STATUS_FAILED, 0, resultObj); + _asyncMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, resultObj); } // Update the DB that volume couldn't be copied diff --git a/server/src/com/cloud/storage/secondary/SecondaryStorageManagerImpl.java b/server/src/com/cloud/storage/secondary/SecondaryStorageManagerImpl.java index 562db2868ca..1324ac796d6 100755 --- a/server/src/com/cloud/storage/secondary/SecondaryStorageManagerImpl.java +++ b/server/src/com/cloud/storage/secondary/SecondaryStorageManagerImpl.java @@ -1462,11 +1462,11 @@ public class SecondaryStorageManagerImpl extends ManagerBase implements Secondar // _itMgr.processVmStartWork(vm, ((VmWorkStart)work).getParams(), // user, account, ((VmWorkStart)work).getPlan()); // -// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_SUCCEEDED, null); +// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.SUCCEEDED, null); // } catch(Exception e) { // s_logger.error("Exception in process VM-start work", e); // String result = SerializerHelper.toObjectSerializedString(e); -// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_FAILED, result); +// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.FAILED, result); // } // } // @@ -1482,11 +1482,11 @@ public class SecondaryStorageManagerImpl extends ManagerBase implements Secondar // try { // _itMgr.processVmStopWork(vm, ((VmWorkStop)work).isForceStop(), user, account); // -// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_SUCCEEDED, null); +// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.SUCCEEDED, null); // } catch(Exception e) { // s_logger.error("Exception in process VM-stop work", e); // String result = SerializerHelper.toObjectSerializedString(e); -// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_FAILED, result); +// AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.FAILED, result); // } // } } diff --git a/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java b/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java index c6dd772731e..607e39bc233 100644 --- a/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java +++ b/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java @@ -143,14 +143,14 @@ public class SnapshotSchedulerImpl extends ManagerBase implements SnapshotSchedu Long asyncJobId = snapshotSchedule.getAsyncJobId(); AsyncJobVO asyncJob = _asyncJobDao.findById(asyncJobId); switch (asyncJob.getStatus()) { - case AsyncJobConstants.STATUS_SUCCEEDED: + case JobInfo.Status.SUCCEEDED: // The snapshot has been successfully backed up. // The snapshot state has also been cleaned up. // We can schedule the next job for this snapshot. // Remove the existing entry in the snapshot_schedule table. scheduleNextSnapshotJob(snapshotSchedule); break; - case AsyncJobConstants.STATUS_FAILED: + case JobInfo.Status.FAILED: // Check the snapshot status. Long snapshotId = snapshotSchedule.getSnapshotId(); if (snapshotId == null) { @@ -188,7 +188,7 @@ public class SnapshotSchedulerImpl extends ManagerBase implements SnapshotSchedu } break; - case AsyncJobConstants.STATUS_IN_PROGRESS: + case JobInfo.Status.IN_PROGRESS: // There is no way of knowing from here whether // 1) Another management server is processing this snapshot job // 2) The management server has crashed and this snapshot is lying diff --git a/server/src/com/cloud/storage/upload/UploadListener.java b/server/src/com/cloud/storage/upload/UploadListener.java index b66624d8d7b..b6a92884aef 100755 --- a/server/src/com/cloud/storage/upload/UploadListener.java +++ b/server/src/com/cloud/storage/upload/UploadListener.java @@ -365,7 +365,7 @@ public class UploadListener implements Listener { resultObj.setResultString(uploadErrorString); resultObj.setState(state.toString()); asyncMgr.updateAsyncJobAttachment(asyncJobId, type.toString(), 1L); - asyncMgr.updateAsyncJobStatus(asyncJobId, AsyncJobConstants.STATUS_IN_PROGRESS, resultObj); + asyncMgr.updateAsyncJobStatus(asyncJobId, JobInfo.Status.IN_PROGRESS, resultObj); UploadVO vo = uploadDao.createForUpdate(); vo.setUploadState(state); @@ -378,7 +378,7 @@ public class UploadListener implements Listener { resultObj.setResultString(uploadErrorString); resultObj.setState(state.toString()); asyncMgr.updateAsyncJobAttachment(asyncJobId, type.toString(), 1L); - asyncMgr.updateAsyncJobStatus(asyncJobId, AsyncJobConstants.STATUS_IN_PROGRESS, resultObj); + asyncMgr.updateAsyncJobStatus(asyncJobId, JobInfo.Status.IN_PROGRESS, resultObj); UploadVO vo = uploadDao.createForUpdate(); @@ -407,12 +407,12 @@ public class UploadListener implements Listener { if (answer.getUploadStatus() == Status.UPLOAD_IN_PROGRESS){ asyncMgr.updateAsyncJobAttachment(asyncJobId, type.toString(), 1L); - asyncMgr.updateAsyncJobStatus(asyncJobId, AsyncJobConstants.STATUS_IN_PROGRESS, resultObj); + asyncMgr.updateAsyncJobStatus(asyncJobId, JobInfo.Status.IN_PROGRESS, resultObj); }else if(answer.getUploadStatus() == Status.UPLOADED){ resultObj.setResultString("Success"); - asyncMgr.completeAsyncJob(asyncJobId, AsyncJobConstants.STATUS_SUCCEEDED, 1, resultObj); + asyncMgr.completeAsyncJob(asyncJobId, JobInfo.Status.SUCCEEDED, 1, resultObj); }else{ - asyncMgr.completeAsyncJob(asyncJobId, AsyncJobConstants.STATUS_FAILED, 2, resultObj); + asyncMgr.completeAsyncJob(asyncJobId, JobInfo.Status.FAILED, 2, resultObj); } UploadVO updateBuilder = uploadDao.createForUpdate(); updateBuilder.setUploadPercent(answer.getUploadPct()); diff --git a/server/src/com/cloud/vm/SystemVmLoadScanner.java b/server/src/com/cloud/vm/SystemVmLoadScanner.java index 704129dd924..4d378cece6c 100644 --- a/server/src/com/cloud/vm/SystemVmLoadScanner.java +++ b/server/src/com/cloud/vm/SystemVmLoadScanner.java @@ -71,8 +71,8 @@ public class SystemVmLoadScanner { @Override public void run() { try { - CallContext.registerSystemCallContextOnceOnly(); - AsyncJobExecutionContext.registerPseudoExecutionContext(); + CallContext cc = CallContext.registerSystemCallContextOnceOnly(); + AsyncJobExecutionContext.registerPseudoExecutionContext(cc.getCallingAccountId(), cc.getCallingUserId()); } catch (Exception e) { s_logger.fatal("Unable to start the capacity scan task", e); System.exit(1); diff --git a/server/test/com/cloud/async/TestAsyncJobManager.java b/server/test/com/cloud/async/TestAsyncJobManager.java index 320c68f51d6..419f5fa02af 100644 --- a/server/test/com/cloud/async/TestAsyncJobManager.java +++ b/server/test/com/cloud/async/TestAsyncJobManager.java @@ -48,10 +48,13 @@ import org.apache.cloudstack.framework.jobs.impl.SyncQueueItemVO; import org.apache.cloudstack.framework.jobs.impl.SyncQueueVO; import org.apache.cloudstack.framework.messagebus.MessageBus; import org.apache.cloudstack.framework.messagebus.PublishScope; +import org.apache.cloudstack.jobs.JobInfo; import com.cloud.cluster.ClusterManager; +import com.cloud.user.Account; import com.cloud.user.AccountManager; import com.cloud.user.AccountVO; +import com.cloud.user.User; import com.cloud.user.UserVO; import com.cloud.utils.Predicate; import com.cloud.utils.component.ComponentContext; @@ -142,21 +145,21 @@ public class TestAsyncJobManager extends TestCase { AsyncJobJoinMapVO record = joinMapDao.getJoinRecord(2, 1); Assert.assertTrue(record != null); Assert.assertTrue(record.getJoinMsid() == 100); - Assert.assertTrue(record.getJoinStatus() == AsyncJobConstants.STATUS_IN_PROGRESS); + Assert.assertTrue(record.getJoinStatus() == JobInfo.Status.IN_PROGRESS); - joinMapDao.completeJoin(1, AsyncJobConstants.STATUS_SUCCEEDED, "Done", 101); + joinMapDao.completeJoin(1, JobInfo.Status.SUCCEEDED, "Done", 101); record = joinMapDao.getJoinRecord(2, 1); Assert.assertTrue(record != null); Assert.assertTrue(record.getJoinMsid() == 100); - Assert.assertTrue(record.getJoinStatus() == AsyncJobConstants.STATUS_SUCCEEDED); + Assert.assertTrue(record.getJoinStatus() == JobInfo.Status.SUCCEEDED); Assert.assertTrue(record.getJoinResult().equals("Done")); Assert.assertTrue(record.getCompleteMsid() == 101); record = joinMapDao.getJoinRecord(3, 1); Assert.assertTrue(record != null); Assert.assertTrue(record.getJoinMsid() == 100); - Assert.assertTrue(record.getJoinStatus() == AsyncJobConstants.STATUS_SUCCEEDED); + Assert.assertTrue(record.getJoinStatus() == JobInfo.Status.SUCCEEDED); Assert.assertTrue(record.getJoinResult().equals("Done")); Assert.assertTrue(record.getCompleteMsid() == 101); @@ -198,7 +201,7 @@ public class TestAsyncJobManager extends TestCase { @Test public void testPseudoJob() { - AsyncJob job = asyncMgr.getPseudoJob(); + AsyncJob job = asyncMgr.getPseudoJob(Account.ACCOUNT_ID_SYSTEM, User.UID_SYSTEM); Assert.assertTrue(job.getInstanceType().equals(AsyncJobConstants.PSEUDO_JOB_INSTANCE_TYPE)); Assert.assertTrue(job.getInstanceId().longValue() == Thread.currentThread().getId()); } diff --git a/server/test/com/cloud/vm/VmWorkMockVirtualMachineManagerImpl.java b/server/test/com/cloud/vm/VmWorkMockVirtualMachineManagerImpl.java index 2c1249ed14f..49b0e31e99f 100644 --- a/server/test/com/cloud/vm/VmWorkMockVirtualMachineManagerImpl.java +++ b/server/test/com/cloud/vm/VmWorkMockVirtualMachineManagerImpl.java @@ -316,7 +316,7 @@ public class VmWorkMockVirtualMachineManagerImpl implements VirtualMachineManage if(wakeupCount++ < 3) { AsyncJobExecutionContext.getCurrentExecutionContext().resetSyncSource(); } else { - AsyncJobExecutionContext.getCurrentExecutionContext().completeAsyncJob(AsyncJobConstants.STATUS_SUCCEEDED, 0, null); + AsyncJobExecutionContext.getCurrentExecutionContext().completeAsyncJob(JobInfo.Status.SUCCEEDED, 0, null); } } diff --git a/server/test/com/cloud/vm/VmWorkTestWorkJobDispatcher.java b/server/test/com/cloud/vm/VmWorkTestWorkJobDispatcher.java index 72210b402ea..eb5e81fd77c 100644 --- a/server/test/com/cloud/vm/VmWorkTestWorkJobDispatcher.java +++ b/server/test/com/cloud/vm/VmWorkTestWorkJobDispatcher.java @@ -20,7 +20,7 @@ public class VmWorkTestWorkJobDispatcher extends AdapterBase implements AsyncJob } catch (InterruptedException e) { } - AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_SUCCEEDED, null); + AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.SUCCEEDED, null); s_logger.info("End work job execution. job-" + job.getId()); } }