diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java index ba0018ea704..7be125c7734 100644 --- a/server/src/com/cloud/async/AsyncJobManagerImpl.java +++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java @@ -18,13 +18,17 @@ package com.cloud.async; +import java.io.File; +import java.io.FileInputStream; import java.lang.reflect.Type; import java.util.Date; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -58,6 +62,7 @@ import com.cloud.user.UserContext; import com.cloud.user.dao.AccountDao; import com.cloud.utils.DateUtil; import com.cloud.utils.NumbersUtil; +import com.cloud.utils.PropertiesUtil; import com.cloud.utils.component.ComponentLocator; import com.cloud.utils.concurrency.NamedThreadFactory; import com.cloud.utils.db.DB; @@ -93,7 +98,7 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe private final ScheduledExecutorService _heartbeatScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("AsyncJobMgr-Heartbeat")); - private final ExecutorService _executor = Executors.newCachedThreadPool(new NamedThreadFactory("Job-Executor")); + private ExecutorService _executor; @Override public AsyncJobExecutorContext getExecutorContext() { @@ -370,7 +375,7 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe if (executeInContext) { runnable.run(); } else { - _executor.submit(runnable); + _executor.submit(runnable); } } @@ -511,7 +516,14 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe job.setCompleteMsid(getMsid()); _jobDao.update(job.getId(), job); - scheduleExecution(job); + + try { + scheduleExecution(job); + } catch(RejectedExecutionException e) { + s_logger.warn("Execution for job-" + job.getId() + " is rejected, return it to the queue for next turn"); + _queueMgr.returnItem(item.getId()); + } + } else { if(s_logger.isDebugEnabled()) { s_logger.debug("Unable to find related job for queue item: " + item.toString()); @@ -559,14 +571,14 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe return new Runnable() { @Override public void run() { - try { + try { List l = _queueMgr.dequeueFromAny(getMsid(), MAX_ONETIME_SCHEDULE_SIZE); if(l != null && l.size() > 0) { for(SyncQueueItemVO item: l) { if(s_logger.isDebugEnabled()) { s_logger.debug("Execute sync-queue item: " + item.toString()); - } - executeQueueItem(item, false); + } + executeQueueItem(item, false); } } } catch(Throwable e) { @@ -709,8 +721,24 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe _accountMgr = locator.getManager(AccountManager.class); _dispatcher = ApiDispatcher.getInstance(); - - return true; + + + try { + final File dbPropsFile = PropertiesUtil.findConfigFile("db.properties"); + final Properties dbProps = new Properties(); + dbProps.load(new FileInputStream(dbPropsFile)); + + final int cloudMaxActive = Integer.parseInt(dbProps.getProperty("db.cloud.maxActive")); + + int poolSize = (cloudMaxActive * 2) / 3; + + s_logger.info("Start AsyncJobManager thread pool in size " + poolSize); + _executor = Executors.newFixedThreadPool(poolSize, new NamedThreadFactory("Job-Executor")); + } catch (final Exception e) { + throw new ConfigurationException("Unable to load db.properties to configure AsyncJobManagerImpl"); + } + + return true; } @Override diff --git a/server/src/com/cloud/async/SyncQueueManager.java b/server/src/com/cloud/async/SyncQueueManager.java index 7811c64515b..9edd1f697cb 100644 --- a/server/src/com/cloud/async/SyncQueueManager.java +++ b/server/src/com/cloud/async/SyncQueueManager.java @@ -27,6 +27,7 @@ public interface SyncQueueManager extends Manager { public SyncQueueItemVO dequeueFromOne(long queueId, Long msid); public List dequeueFromAny(Long msid, int maxItems); public void purgeItem(long queueItemId); + public void returnItem(long queueItemId); public List getActiveQueueItems(Long msid, boolean exclusive); public List getBlockedQueueItems(long thresholdMs, boolean exclusive); diff --git a/server/src/com/cloud/async/SyncQueueManagerImpl.java b/server/src/com/cloud/async/SyncQueueManagerImpl.java index d0bedcb6ed5..e2c5e8e8750 100644 --- a/server/src/com/cloud/async/SyncQueueManagerImpl.java +++ b/server/src/com/cloud/async/SyncQueueManagerImpl.java @@ -195,6 +195,32 @@ public class SyncQueueManagerImpl implements SyncQueueManager { s_logger.error("Unexpected exception: ", e); txt.rollback(); } + } + + @Override + @DB + public void returnItem(long queueItemId) { + Transaction txt = Transaction.currentTxn(); + try { + txt.start(); + + SyncQueueItemVO itemVO = _syncQueueItemDao.findById(queueItemId); + if(itemVO != null) { + SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true); + + itemVO.setLastProcessMsid(null); + itemVO.setLastProcessNumber(null); + _syncQueueItemDao.update(queueItemId, itemVO); + + queueVO.setLastProcessTime(null); + queueVO.setLastUpdated(DateUtil.currentGMTTime()); + _syncQueueDao.update(queueVO.getId(), queueVO); + } + txt.commit(); + } catch(Exception e) { + s_logger.error("Unexpected exception: ", e); + txt.rollback(); + } } @Override