diff --git a/framework/db/src/com/cloud/utils/db/Transaction.java b/framework/db/src/com/cloud/utils/db/Transaction.java index 471e0cfe3b5..dd91a967a06 100755 --- a/framework/db/src/com/cloud/utils/db/Transaction.java +++ b/framework/db/src/com/cloud/utils/db/Transaction.java @@ -18,11 +18,15 @@ package com.cloud.utils.db; import java.util.concurrent.atomic.AtomicLong; +import org.apache.log4j.Logger; + public class Transaction { private final static AtomicLong counter = new AtomicLong(0); private final static TransactionStatus STATUS = new TransactionStatus() { }; + private static final Logger s_logger = Logger.getLogger(Transaction.class); + @SuppressWarnings("deprecation") public static T execute(TransactionCallbackWithException callback) throws E { String name = "tx-" + counter.incrementAndGet(); @@ -33,6 +37,10 @@ public class Transaction { } TransactionLegacy txn = TransactionLegacy.open(name, databaseId, false); try { +// if (txn.dbTxnStarted()){ +// String warnMsg = "Potential Wrong Usage: TRANSACTION.EXECUTE IS WRAPPED INSIDE ANOTHER DB TRANSACTION!"; +// s_logger.warn(warnMsg, new CloudRuntimeException(warnMsg)); +// } txn.start(); T result = callback.doInTransaction(STATUS); txn.commit(); diff --git a/framework/ipc/pom.xml b/framework/ipc/pom.xml index 12b4a3d295a..09b0c413ba0 100644 --- a/framework/ipc/pom.xml +++ b/framework/ipc/pom.xml @@ -39,4 +39,22 @@ ${project.version} + + + + maven-surefire-plugin + + true + + + + integration-test + + test + + + + + + diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java index e8f9bce0972..e3eeb7bc6c3 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java @@ -30,6 +30,9 @@ import org.apache.log4j.Logger; import org.apache.cloudstack.framework.serializer.MessageSerializer; +import com.cloud.utils.db.TransactionLegacy; +import com.cloud.utils.exception.CloudRuntimeException; + public class MessageBusBase implements MessageBus { private final Gate _gate; @@ -158,7 +161,11 @@ public class MessageBusBase implements MessageBus { @Override public void publish(String senderAddress, String subject, PublishScope scope, Object args) { - + // publish cannot be in DB transaction, which may hold DB lock too long, and we are guarding this here + if (!noDbTxn()){ + String errMsg = "NO EVENT PUBLISH CAN BE WRAPPED WITHIN DB TRANSACTION!"; + s_logger.error(errMsg, new CloudRuntimeException(errMsg)); + } if (_gate.enter(true)) { if (s_logger.isTraceEnabled()) { s_logger.trace("Enter gate in message bus publish"); @@ -256,6 +263,11 @@ public class MessageBusBase implements MessageBus { } } + private boolean noDbTxn() { + TransactionLegacy txn = TransactionLegacy.currentTxn(); + return !txn.dbTxnStarted(); + } + // // Support inner classes // diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java index 04fab24d7ae..aab16837896 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java @@ -258,6 +258,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, } job.setLastUpdated(DateUtil.currentGMTTime()); + job.setExecutingMsid(null); _jobDao.update(jobId, job); if (s_logger.isDebugEnabled()) { @@ -266,6 +267,11 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, List wakeupList = wakeupByJoinedJobCompletion(jobId); _joinMapDao.disjoinAllJobs(jobId); + // purge the job sync item from queue + if (job.getSyncSource() != null) { + _queueMgr.purgeItem(job.getSyncSource().getId()); + } + return wakeupList; } }); @@ -527,12 +533,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, } finally { // guard final clause as well try { - AsyncJobVO jobToUpdate = _jobDao.findById(job.getId()); - jobToUpdate.setExecutingMsid(null); - _jobDao.update(job.getId(), jobToUpdate); - if (job.getSyncSource() != null) { - _queueMgr.purgeItem(job.getSyncSource().getId()); checkQueue(job.getSyncSource().getQueueId()); }