bug 10918: cap async-job execution thread pool to be 2/3 of maximum DB connection limit

This commit is contained in:
Kelven Yang 2011-07-29 18:33:21 -07:00
parent d120b9cca4
commit 1c4e1ba5a5
3 changed files with 63 additions and 8 deletions

View File

@ -18,13 +18,17 @@
package com.cloud.async;
import java.io.File;
import java.io.FileInputStream;
import java.lang.reflect.Type;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -58,6 +62,7 @@ import com.cloud.user.UserContext;
import com.cloud.user.dao.AccountDao;
import com.cloud.utils.DateUtil;
import com.cloud.utils.NumbersUtil;
import com.cloud.utils.PropertiesUtil;
import com.cloud.utils.component.ComponentLocator;
import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.db.DB;
@ -93,7 +98,7 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
private final ScheduledExecutorService _heartbeatScheduler =
Executors.newScheduledThreadPool(1, new NamedThreadFactory("AsyncJobMgr-Heartbeat"));
private final ExecutorService _executor = Executors.newCachedThreadPool(new NamedThreadFactory("Job-Executor"));
private ExecutorService _executor;
@Override
public AsyncJobExecutorContext getExecutorContext() {
@ -370,7 +375,7 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
if (executeInContext) {
runnable.run();
} else {
_executor.submit(runnable);
_executor.submit(runnable);
}
}
@ -511,7 +516,14 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
job.setCompleteMsid(getMsid());
_jobDao.update(job.getId(), job);
scheduleExecution(job);
try {
scheduleExecution(job);
} catch(RejectedExecutionException e) {
s_logger.warn("Execution for job-" + job.getId() + " is rejected, return it to the queue for next turn");
_queueMgr.returnItem(item.getId());
}
} else {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Unable to find related job for queue item: " + item.toString());
@ -559,14 +571,14 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
return new Runnable() {
@Override
public void run() {
try {
try {
List<SyncQueueItemVO> l = _queueMgr.dequeueFromAny(getMsid(), MAX_ONETIME_SCHEDULE_SIZE);
if(l != null && l.size() > 0) {
for(SyncQueueItemVO item: l) {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Execute sync-queue item: " + item.toString());
}
executeQueueItem(item, false);
}
executeQueueItem(item, false);
}
}
} catch(Throwable e) {
@ -709,8 +721,24 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
_accountMgr = locator.getManager(AccountManager.class);
_dispatcher = ApiDispatcher.getInstance();
return true;
try {
final File dbPropsFile = PropertiesUtil.findConfigFile("db.properties");
final Properties dbProps = new Properties();
dbProps.load(new FileInputStream(dbPropsFile));
final int cloudMaxActive = Integer.parseInt(dbProps.getProperty("db.cloud.maxActive"));
int poolSize = (cloudMaxActive * 2) / 3;
s_logger.info("Start AsyncJobManager thread pool in size " + poolSize);
_executor = Executors.newFixedThreadPool(poolSize, new NamedThreadFactory("Job-Executor"));
} catch (final Exception e) {
throw new ConfigurationException("Unable to load db.properties to configure AsyncJobManagerImpl");
}
return true;
}
@Override

View File

@ -27,6 +27,7 @@ public interface SyncQueueManager extends Manager {
public SyncQueueItemVO dequeueFromOne(long queueId, Long msid);
public List<SyncQueueItemVO> dequeueFromAny(Long msid, int maxItems);
public void purgeItem(long queueItemId);
public void returnItem(long queueItemId);
public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive);
public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive);

View File

@ -195,6 +195,32 @@ public class SyncQueueManagerImpl implements SyncQueueManager {
s_logger.error("Unexpected exception: ", e);
txt.rollback();
}
}
@Override
@DB
public void returnItem(long queueItemId) {
Transaction txt = Transaction.currentTxn();
try {
txt.start();
SyncQueueItemVO itemVO = _syncQueueItemDao.findById(queueItemId);
if(itemVO != null) {
SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true);
itemVO.setLastProcessMsid(null);
itemVO.setLastProcessNumber(null);
_syncQueueItemDao.update(queueItemId, itemVO);
queueVO.setLastProcessTime(null);
queueVO.setLastUpdated(DateUtil.currentGMTTime());
_syncQueueDao.update(queueVO.getId(), queueVO);
}
txt.commit();
} catch(Exception e) {
s_logger.error("Unexpected exception: ", e);
txt.rollback();
}
}
@Override