From a2d85c8cae5f603bbcfcd3659c1207f0bfe461a7 Mon Sep 17 00:00:00 2001 From: Min Chen Date: Tue, 16 Sep 2014 15:14:08 -0700 Subject: [PATCH] CLOUDSTACK-7566:Many jobs getting stuck in pending state and cloud is unusable. --- .../framework/messagebus/MessageBusBase.java | 37 ++++++++++++++++--- .../jobs/impl/AsyncJobManagerImpl.java | 13 +++++++ 2 files changed, 44 insertions(+), 6 deletions(-) diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java index 9432da0a723..e8f9bce0972 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java @@ -26,6 +26,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.log4j.Logger; + import org.apache.cloudstack.framework.serializer.MessageSerializer; public class MessageBusBase implements MessageBus { @@ -36,6 +38,8 @@ public class MessageBusBase implements MessageBus { private final SubscriptionNode _subscriberRoot; private MessageSerializer _messageSerializer; + private static final Logger s_logger = Logger.getLogger(MessageBusBase.class); + public MessageBusBase() { _gate = new Gate(); _pendingActions = new ArrayList(); @@ -58,6 +62,9 @@ public class MessageBusBase implements MessageBus { assert (subject != null); assert (subscriber != null); if (_gate.enter()) { + if (s_logger.isTraceEnabled()) { + s_logger.trace("Enter gate in message bus subscribe"); + } try { SubscriptionNode current = locate(subject, null, true); assert (current != null); @@ -75,6 +82,9 @@ public class MessageBusBase implements MessageBus { @Override public void unsubscribe(String subject, MessageSubscriber subscriber) { if (_gate.enter()) { + if (s_logger.isTraceEnabled()) { + s_logger.trace("Enter gate in message bus unsubscribe"); + } try { if (subject != null) { SubscriptionNode current = locate(subject, null, false); @@ -96,6 +106,9 @@ public class MessageBusBase implements MessageBus { @Override public void clearAll() { if (_gate.enter()) { + if (s_logger.isTraceEnabled()) { + s_logger.trace("Enter gate in message bus clearAll"); + } try { _subscriberRoot.clearAll(); doPrune(); @@ -112,6 +125,9 @@ public class MessageBusBase implements MessageBus { @Override public void prune() { if (_gate.enter()) { + if (s_logger.isTraceEnabled()) { + s_logger.trace("Enter gate in message bus prune"); + } try { doPrune(); } finally { @@ -144,6 +160,9 @@ public class MessageBusBase implements MessageBus { public void publish(String senderAddress, String subject, PublishScope scope, Object args) { if (_gate.enter(true)) { + if (s_logger.isTraceEnabled()) { + s_logger.trace("Enter gate in message bus publish"); + } try { List chainFromTop = new ArrayList(); SubscriptionNode current = locate(subject, chainFromTop, false); @@ -309,14 +328,20 @@ public class MessageBusBase implements MessageBus { public void leave() { synchronized (this) { if (_reentranceCount > 0) { - assert (_gateOwner == Thread.currentThread()); + try { + assert (_gateOwner == Thread.currentThread()); - onGateOpen(); - _reentranceCount--; - assert (_reentranceCount == 0); - _gateOwner = null; + onGateOpen(); + } finally { + if (s_logger.isTraceEnabled()) { + s_logger.trace("Open gate of message bus"); + } + _reentranceCount--; + assert (_reentranceCount == 0); + _gateOwner = null; - notifyAll(); + notifyAll(); + } } } } diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java index c28e87b24d7..7d374da3142 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java @@ -236,10 +236,20 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, job.setResult(resultObject); } + if (s_logger.isDebugEnabled()) { + s_logger.debug("Publish async job-" + jobId + " complete on message bus"); + } publishOnEventBus(job, "complete"); // publish before the instance type and ID are wiped out + + if (s_logger.isDebugEnabled()) { + s_logger.debug("Wake up jobs related to job- " + jobId); + } List wakeupList = Transaction.execute(new TransactionCallback>() { @Override public List doInTransaction(TransactionStatus status) { + if (s_logger.isDebugEnabled()) { + s_logger.debug("Update db status for job- " + jobId); + } job.setCompleteMsid(getMsid()); job.setStatus(jobStatus); job.setResultCode(resultCode); @@ -253,6 +263,9 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, job.setLastUpdated(DateUtil.currentGMTTime()); _jobDao.update(jobId, job); + if (s_logger.isDebugEnabled()) { + s_logger.debug("Wake up jobs joined with job- " + jobId + " and disjoin all subjobs created from job- " + jobId); + } List wakeupList = wakeupByJoinedJobCompletion(jobId); _joinMapDao.disjoinAllJobs(jobId);