diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java index e2a65b71d06..f091d5d32b6 100644 --- a/server/src/com/cloud/async/AsyncJobManagerImpl.java +++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java @@ -71,7 +71,7 @@ import com.cloud.utils.mgmt.JmxUtil; import com.cloud.utils.net.MacAddress; public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, ClusterManagerListener { - public static final Logger s_logger = Logger.getLogger(AsyncJobManagerImpl.class.getName()); + public static final Logger s_logger = Logger.getLogger(AsyncJobManagerImpl.class); private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3; // 3 seconds private static final int MAX_ONETIME_SCHEDULE_SIZE = 50; diff --git a/server/src/com/cloud/async/AsyncJobMonitor.java b/server/src/com/cloud/async/AsyncJobMonitor.java new file mode 100644 index 00000000000..4208444cab9 --- /dev/null +++ b/server/src/com/cloud/async/AsyncJobMonitor.java @@ -0,0 +1,177 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.async; + +import java.util.HashMap; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; + +import javax.inject.Inject; +import javax.naming.ConfigurationException; + +import org.apache.cloudstack.framework.messagebus.MessageBus; +import org.apache.cloudstack.framework.messagebus.MessageDispatcher; +import org.apache.cloudstack.framework.messagebus.MessageHandler; +import org.apache.cloudstack.messagebus.SubjectConstants; +import org.apache.log4j.Logger; + +import com.cloud.utils.component.ManagerBase; + +public class AsyncJobMonitor extends ManagerBase { + public static final Logger s_logger = Logger.getLogger(AsyncJobMonitor.class); + + @Inject private MessageBus _messageBus; + + private Map _activeTasks = new HashMap(); + private Timer _timer = new Timer(); + + private volatile int _activePoolThreads = 0; + private volatile int _activeInplaceThreads = 0; + + // configuration + private long _inactivityCheckIntervalMs = 60000; + private long _inactivityWarningThresholdMs = 90000; + + public AsyncJobMonitor() { + } + + public long getInactivityCheckIntervalMs() { + return _inactivityCheckIntervalMs; + } + + public void setInactivityCheckIntervalMs(long intervalMs) { + _inactivityCheckIntervalMs = intervalMs; + } + + public long getInactivityWarningThresholdMs() { + return _inactivityWarningThresholdMs; + } + + public void setInactivityWarningThresholdMs(long thresholdMs) { + _inactivityWarningThresholdMs = thresholdMs; + } + + @MessageHandler(topic=SubjectConstants.JOB_HEARTBEAT) + public void onJobHeartbeatNotify(String subject, String senderAddress, Object args) { + if(args != null && args instanceof Long) { + synchronized(this) { + ActiveTaskRecord record = _activeTasks.get((Long)args); + if(record != null) { + record.updateJobHeartbeatTick(); + } + } + } + } + + private void heartbeat() { + synchronized(this) { + for(Map.Entry entry : _activeTasks.entrySet()) { + if(entry.getValue().millisSinceLastJobHeartbeat() > _inactivityWarningThresholdMs) { + s_logger.warn("Task (job-" + entry.getValue().getJobId() + ") has been pending for " + + entry.getValue().millisSinceLastJobHeartbeat()/1000 + " seconds"); + } + } + } + } + + @Override + public boolean configure(String name, Map params) + throws ConfigurationException { + + _messageBus.subscribe(SubjectConstants.JOB_HEARTBEAT, MessageDispatcher.getDispatcher(this)); + _timer.scheduleAtFixedRate(new TimerTask() { + + @Override + public void run() { + heartbeat(); + } + + }, _inactivityCheckIntervalMs, _inactivityCheckIntervalMs); + return true; + } + + public void registerActiveTask(long jobId, long threadId, boolean fromPoolThread) { + synchronized(this) { + assert(_activeTasks.get(jobId) == null); + + ActiveTaskRecord record = new ActiveTaskRecord(threadId, jobId, fromPoolThread); + _activeTasks.put(jobId, record); + if(fromPoolThread) + _activePoolThreads++; + else + _activeInplaceThreads++; + } + } + + public void unregisterActiveTask(long jobId) { + synchronized(this) { + ActiveTaskRecord record = _activeTasks.get(jobId); + assert(record != null); + if(record != null) { + if(record.isPoolThread()) + _activePoolThreads--; + else + _activeInplaceThreads--; + + _activeTasks.remove(jobId); + } + } + } + + public int getActivePoolThreads() { + return _activePoolThreads; + } + + public int getActiveInplaceThread() { + return _activeInplaceThreads; + } + + private static class ActiveTaskRecord { + long _jobId; + long _threadId; + boolean _fromPoolThread; + long _jobLastHeartbeatTick; + + public ActiveTaskRecord(long jobId, long threadId, boolean fromPoolThread) { + _threadId = threadId; + _jobId = jobId; + _fromPoolThread = fromPoolThread; + _jobLastHeartbeatTick = System.currentTimeMillis(); + } + + public long getThreadId() { + return _threadId; + } + + public long getJobId() { + return _jobId; + } + + public boolean isPoolThread() { + return _fromPoolThread; + } + + public void updateJobHeartbeatTick() { + _jobLastHeartbeatTick = System.currentTimeMillis(); + } + + public long millisSinceLastJobHeartbeat() { + return System.currentTimeMillis() - _jobLastHeartbeatTick; + } + } +} diff --git a/server/src/org/apache/cloudstack/messagebus/SubjectConstants.java b/server/src/org/apache/cloudstack/messagebus/SubjectConstants.java index 16a9bee4cf4..d9bb21618b5 100644 --- a/server/src/org/apache/cloudstack/messagebus/SubjectConstants.java +++ b/server/src/org/apache/cloudstack/messagebus/SubjectConstants.java @@ -17,5 +17,9 @@ package org.apache.cloudstack.messagebus; public interface SubjectConstants { + // VM power state messages on message bus public static final String VM_POWER_STATE = "vm.powerstate"; + + // job messages on message bus + public static final String JOB_HEARTBEAT = "job.heartbeat"; } diff --git a/server/test/com/cloud/async/TestAsyncJobManager.java b/server/test/com/cloud/async/TestAsyncJobManager.java index e6987adeee8..2d59161fd27 100644 --- a/server/test/com/cloud/async/TestAsyncJobManager.java +++ b/server/test/com/cloud/async/TestAsyncJobManager.java @@ -30,6 +30,7 @@ import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import com.cloud.async.AsyncJobManager; +import com.cloud.async.AsyncJobMonitor; import com.cloud.cluster.ClusterManager; import com.cloud.utils.Predicate; import com.cloud.utils.component.ComponentContext; @@ -42,6 +43,7 @@ public class TestAsyncJobManager extends TestCase { @Inject AsyncJobManager asyncMgr; @Inject ClusterManager clusterMgr; @Inject MessageBus messageBus; + @Inject AsyncJobMonitor jobMonitor; @Before public void setUp() { @@ -72,7 +74,9 @@ public class TestAsyncJobManager extends TestCase { } }); thread.start(); - + + jobMonitor.registerActiveTask(1, 1, false); + asyncMgr.waitAndCheck(new String[] {"VM"}, 5000L, 10000L, new Predicate() { public boolean checkCondition() { System.out.println("Check condition to exit"); @@ -80,6 +84,8 @@ public class TestAsyncJobManager extends TestCase { } }); + jobMonitor.unregisterActiveTask(1); + try { thread.join(); } catch(InterruptedException e) { diff --git a/server/test/resources/AsyncJobTestContext.xml b/server/test/resources/AsyncJobTestContext.xml index 54ce0cd2cf0..3674f158d97 100644 --- a/server/test/resources/AsyncJobTestContext.xml +++ b/server/test/resources/AsyncJobTestContext.xml @@ -43,6 +43,10 @@ + + + +