CLOUDSTACK-7566:Many jobs getting stuck in pending state and cloud is

unusable.
This commit is contained in:
Min Chen 2014-09-16 15:14:08 -07:00
parent cb45133799
commit a2d85c8cae
2 changed files with 44 additions and 6 deletions

View File

@ -26,6 +26,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
import org.apache.cloudstack.framework.serializer.MessageSerializer;
public class MessageBusBase implements MessageBus {
@ -36,6 +38,8 @@ public class MessageBusBase implements MessageBus {
private final SubscriptionNode _subscriberRoot;
private MessageSerializer _messageSerializer;
private static final Logger s_logger = Logger.getLogger(MessageBusBase.class);
public MessageBusBase() {
_gate = new Gate();
_pendingActions = new ArrayList<ActionRecord>();
@ -58,6 +62,9 @@ public class MessageBusBase implements MessageBus {
assert (subject != null);
assert (subscriber != null);
if (_gate.enter()) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Enter gate in message bus subscribe");
}
try {
SubscriptionNode current = locate(subject, null, true);
assert (current != null);
@ -75,6 +82,9 @@ public class MessageBusBase implements MessageBus {
@Override
public void unsubscribe(String subject, MessageSubscriber subscriber) {
if (_gate.enter()) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Enter gate in message bus unsubscribe");
}
try {
if (subject != null) {
SubscriptionNode current = locate(subject, null, false);
@ -96,6 +106,9 @@ public class MessageBusBase implements MessageBus {
@Override
public void clearAll() {
if (_gate.enter()) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Enter gate in message bus clearAll");
}
try {
_subscriberRoot.clearAll();
doPrune();
@ -112,6 +125,9 @@ public class MessageBusBase implements MessageBus {
@Override
public void prune() {
if (_gate.enter()) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Enter gate in message bus prune");
}
try {
doPrune();
} finally {
@ -144,6 +160,9 @@ public class MessageBusBase implements MessageBus {
public void publish(String senderAddress, String subject, PublishScope scope, Object args) {
if (_gate.enter(true)) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Enter gate in message bus publish");
}
try {
List<SubscriptionNode> chainFromTop = new ArrayList<SubscriptionNode>();
SubscriptionNode current = locate(subject, chainFromTop, false);
@ -309,14 +328,20 @@ public class MessageBusBase implements MessageBus {
public void leave() {
synchronized (this) {
if (_reentranceCount > 0) {
assert (_gateOwner == Thread.currentThread());
try {
assert (_gateOwner == Thread.currentThread());
onGateOpen();
_reentranceCount--;
assert (_reentranceCount == 0);
_gateOwner = null;
onGateOpen();
} finally {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Open gate of message bus");
}
_reentranceCount--;
assert (_reentranceCount == 0);
_gateOwner = null;
notifyAll();
notifyAll();
}
}
}
}

View File

@ -236,10 +236,20 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
job.setResult(resultObject);
}
if (s_logger.isDebugEnabled()) {
s_logger.debug("Publish async job-" + jobId + " complete on message bus");
}
publishOnEventBus(job, "complete"); // publish before the instance type and ID are wiped out
if (s_logger.isDebugEnabled()) {
s_logger.debug("Wake up jobs related to job- " + jobId);
}
List<Long> wakeupList = Transaction.execute(new TransactionCallback<List<Long>>() {
@Override
public List<Long> doInTransaction(TransactionStatus status) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Update db status for job- " + jobId);
}
job.setCompleteMsid(getMsid());
job.setStatus(jobStatus);
job.setResultCode(resultCode);
@ -253,6 +263,9 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
job.setLastUpdated(DateUtil.currentGMTTime());
_jobDao.update(jobId, job);
if (s_logger.isDebugEnabled()) {
s_logger.debug("Wake up jobs joined with job- " + jobId + " and disjoin all subjobs created from job- " + jobId);
}
List<Long> wakeupList = wakeupByJoinedJobCompletion(jobId);
_joinMapDao.disjoinAllJobs(jobId);