Add job monitor to help manage system concurrent level

This commit is contained in:
Kelven Yang 2013-05-01 22:45:27 -07:00
parent 17930685e4
commit 9e4ebdd8b3
5 changed files with 193 additions and 2 deletions

View File

@ -71,7 +71,7 @@ import com.cloud.utils.mgmt.JmxUtil;
import com.cloud.utils.net.MacAddress;
public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, ClusterManagerListener {
public static final Logger s_logger = Logger.getLogger(AsyncJobManagerImpl.class.getName());
public static final Logger s_logger = Logger.getLogger(AsyncJobManagerImpl.class);
private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3; // 3 seconds
private static final int MAX_ONETIME_SCHEDULE_SIZE = 50;

View File

@ -0,0 +1,177 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.async;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import javax.inject.Inject;
import javax.naming.ConfigurationException;
import org.apache.cloudstack.framework.messagebus.MessageBus;
import org.apache.cloudstack.framework.messagebus.MessageDispatcher;
import org.apache.cloudstack.framework.messagebus.MessageHandler;
import org.apache.cloudstack.messagebus.SubjectConstants;
import org.apache.log4j.Logger;
import com.cloud.utils.component.ManagerBase;
public class AsyncJobMonitor extends ManagerBase {
public static final Logger s_logger = Logger.getLogger(AsyncJobMonitor.class);
@Inject private MessageBus _messageBus;
private Map<Long, ActiveTaskRecord> _activeTasks = new HashMap<Long, ActiveTaskRecord>();
private Timer _timer = new Timer();
private volatile int _activePoolThreads = 0;
private volatile int _activeInplaceThreads = 0;
// configuration
private long _inactivityCheckIntervalMs = 60000;
private long _inactivityWarningThresholdMs = 90000;
public AsyncJobMonitor() {
}
public long getInactivityCheckIntervalMs() {
return _inactivityCheckIntervalMs;
}
public void setInactivityCheckIntervalMs(long intervalMs) {
_inactivityCheckIntervalMs = intervalMs;
}
public long getInactivityWarningThresholdMs() {
return _inactivityWarningThresholdMs;
}
public void setInactivityWarningThresholdMs(long thresholdMs) {
_inactivityWarningThresholdMs = thresholdMs;
}
@MessageHandler(topic=SubjectConstants.JOB_HEARTBEAT)
public void onJobHeartbeatNotify(String subject, String senderAddress, Object args) {
if(args != null && args instanceof Long) {
synchronized(this) {
ActiveTaskRecord record = _activeTasks.get((Long)args);
if(record != null) {
record.updateJobHeartbeatTick();
}
}
}
}
private void heartbeat() {
synchronized(this) {
for(Map.Entry<Long, ActiveTaskRecord> entry : _activeTasks.entrySet()) {
if(entry.getValue().millisSinceLastJobHeartbeat() > _inactivityWarningThresholdMs) {
s_logger.warn("Task (job-" + entry.getValue().getJobId() + ") has been pending for "
+ entry.getValue().millisSinceLastJobHeartbeat()/1000 + " seconds");
}
}
}
}
@Override
public boolean configure(String name, Map<String, Object> params)
throws ConfigurationException {
_messageBus.subscribe(SubjectConstants.JOB_HEARTBEAT, MessageDispatcher.getDispatcher(this));
_timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
heartbeat();
}
}, _inactivityCheckIntervalMs, _inactivityCheckIntervalMs);
return true;
}
public void registerActiveTask(long jobId, long threadId, boolean fromPoolThread) {
synchronized(this) {
assert(_activeTasks.get(jobId) == null);
ActiveTaskRecord record = new ActiveTaskRecord(threadId, jobId, fromPoolThread);
_activeTasks.put(jobId, record);
if(fromPoolThread)
_activePoolThreads++;
else
_activeInplaceThreads++;
}
}
public void unregisterActiveTask(long jobId) {
synchronized(this) {
ActiveTaskRecord record = _activeTasks.get(jobId);
assert(record != null);
if(record != null) {
if(record.isPoolThread())
_activePoolThreads--;
else
_activeInplaceThreads--;
_activeTasks.remove(jobId);
}
}
}
public int getActivePoolThreads() {
return _activePoolThreads;
}
public int getActiveInplaceThread() {
return _activeInplaceThreads;
}
private static class ActiveTaskRecord {
long _jobId;
long _threadId;
boolean _fromPoolThread;
long _jobLastHeartbeatTick;
public ActiveTaskRecord(long jobId, long threadId, boolean fromPoolThread) {
_threadId = threadId;
_jobId = jobId;
_fromPoolThread = fromPoolThread;
_jobLastHeartbeatTick = System.currentTimeMillis();
}
public long getThreadId() {
return _threadId;
}
public long getJobId() {
return _jobId;
}
public boolean isPoolThread() {
return _fromPoolThread;
}
public void updateJobHeartbeatTick() {
_jobLastHeartbeatTick = System.currentTimeMillis();
}
public long millisSinceLastJobHeartbeat() {
return System.currentTimeMillis() - _jobLastHeartbeatTick;
}
}
}

View File

@ -17,5 +17,9 @@
package org.apache.cloudstack.messagebus;
public interface SubjectConstants {
// VM power state messages on message bus
public static final String VM_POWER_STATE = "vm.powerstate";
// job messages on message bus
public static final String JOB_HEARTBEAT = "job.heartbeat";
}

View File

@ -30,6 +30,7 @@ import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.cloud.async.AsyncJobManager;
import com.cloud.async.AsyncJobMonitor;
import com.cloud.cluster.ClusterManager;
import com.cloud.utils.Predicate;
import com.cloud.utils.component.ComponentContext;
@ -42,6 +43,7 @@ public class TestAsyncJobManager extends TestCase {
@Inject AsyncJobManager asyncMgr;
@Inject ClusterManager clusterMgr;
@Inject MessageBus messageBus;
@Inject AsyncJobMonitor jobMonitor;
@Before
public void setUp() {
@ -72,7 +74,9 @@ public class TestAsyncJobManager extends TestCase {
}
});
thread.start();
jobMonitor.registerActiveTask(1, 1, false);
asyncMgr.waitAndCheck(new String[] {"VM"}, 5000L, 10000L, new Predicate() {
public boolean checkCondition() {
System.out.println("Check condition to exit");
@ -80,6 +84,8 @@ public class TestAsyncJobManager extends TestCase {
}
});
jobMonitor.unregisterActiveTask(1);
try {
thread.join();
} catch(InterruptedException e) {

View File

@ -43,6 +43,10 @@
<bean id="ApiAsyncJobDispatcher" class="com.cloud.api.ApiAsyncJobDispatcher">
<property name="name" value="ApiAsyncJobDispatcher" />
</bean>
<bean id="asyncJobMonitor" class="com.cloud.async.AsyncJobMonitor">
<property name="inactivityWarningThresholdMs" value="5" />
<property name="InactivityCheckIntervalMs" value="1" />
</bean>
<bean id="messageBus" class = "org.apache.cloudstack.framework.messagebus.MessageBusBase" />