From 41e6aeae961613ae9ac6ac97b99683eb09720a54 Mon Sep 17 00:00:00 2001 From: Chiradeep Vittal Date: Fri, 29 Jul 2011 16:27:35 -0700 Subject: [PATCH] bug 10884: 1. cleanup was scheduled wrong (seconds vs ms) 2. when finding a work item to do, lock one random row to reduce contentions by thundering herds of workers 3. cleanup thread also finds scheduled work items and gets threads to work on them. this way other mgmt servers can take jobs 4. add lots of trace logs 5. commit transactions when returning early --- .../security/SecurityGroupManagerImpl.java | 40 ++++++++++++++++-- .../security/dao/SecurityGroupWorkDao.java | 2 + .../dao/SecurityGroupWorkDaoImpl.java | 41 +++++++++++++------ 3 files changed, 68 insertions(+), 15 deletions(-) diff --git a/server/src/com/cloud/network/security/SecurityGroupManagerImpl.java b/server/src/com/cloud/network/security/SecurityGroupManagerImpl.java index 7fa76f9aa42..097babce619 100755 --- a/server/src/com/cloud/network/security/SecurityGroupManagerImpl.java +++ b/server/src/com/cloud/network/security/SecurityGroupManagerImpl.java @@ -26,6 +26,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; @@ -189,6 +190,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG try { cleanupFinishedWork(); cleanupUnfinishedWork(); + processScheduledWork(); } finally { txn.close("SG Cleanup"); } @@ -362,9 +364,15 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG if (delayMs == null) { delayMs = new Long(100l); } + + if (s_logger.isTraceEnabled()) { + s_logger.trace("Security Group Mgr: scheduling ruleset updates for " + affectedVms.size() + " vms"); + } for (Long vmId : affectedVms) { - + if (s_logger.isTraceEnabled()) { + s_logger.trace("Security Group Mgr: scheduling ruleset updates for " + vmId); + } VmRulesetLogVO log = null; SecurityGroupWorkVO work = null; UserVm vm = null; @@ -392,6 +400,9 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG if (work == null) { work = new SecurityGroupWorkVO(vmId, null, null, SecurityGroupWorkVO.Step.Scheduled, null); work = _workDao.persist(work); + if (s_logger.isTraceEnabled()) { + s_logger.trace("Security Group Mgr: created new work item for " + vmId); + } } work.setLogsequenceNumber(log.getLogsequence()); @@ -1102,7 +1113,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG } public void cleanupFinishedWork() { - Date before = new Date(System.currentTimeMillis() - 24 * 3600 * 1000l); + Date before = new Date(System.currentTimeMillis() - 6 * 3600 * 1000l); int numDeleted = _workDao.deleteFinishedWork(before); if (numDeleted > 0) { s_logger.info("Network Group Work cleanup deleted " + numDeleted + " finished work items older than " + before.toString()); @@ -1111,7 +1122,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG } private void cleanupUnfinishedWork() { - Date before = new Date(System.currentTimeMillis() - _timeBetweenCleanups* 1000l); + Date before = new Date(System.currentTimeMillis() - _timeBetweenCleanups); List unfinished = _workDao.findUnfinishedWork(before); if (unfinished.size() > 0) { s_logger.info("Network Group Work cleanup found " + unfinished.size() + " unfinished work items older than " + before.toString()); @@ -1124,6 +1135,20 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG s_logger.debug("Network Group Work cleanup found no unfinished work items older than " + before.toString()); } } + + private void processScheduledWork() { + List scheduled = _workDao.findScheduledWork(); + int numJobs = scheduled.size(); + if (numJobs > 0) { + s_logger.debug("Security group work: found scheduled jobs " + numJobs); + Random rand = new Random(); + for (int i=0; i < numJobs; i++) { + long delayMs = 100 + 10*rand.nextInt(numJobs); + _executorPool.schedule(new WorkerThread(), delayMs, TimeUnit.MILLISECONDS); + } + } + + } @Override public String getSecurityGroupsNamesForVm(long vmId) { @@ -1181,10 +1206,19 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG } if (VirtualMachine.State.isVmStarted(oldState, event, newState)) { + if (s_logger.isTraceEnabled()) { + s_logger.trace("Security Group Mgr: handling start of vm id" + vm.getId()); + } handleVmStarted((VMInstanceVO) vm); } else if (VirtualMachine.State.isVmStopped(oldState, event, newState)) { + if (s_logger.isTraceEnabled()) { + s_logger.trace("Security Group Mgr: handling stop of vm id" + vm.getId()); + } handleVmStopped((VMInstanceVO) vm); } else if (VirtualMachine.State.isVmMigrated(oldState, event, newState)) { + if (s_logger.isTraceEnabled()) { + s_logger.trace("Security Group Mgr: handling migration of vm id" + vm.getId()); + } handleVmMigrated((VMInstanceVO) vm); } diff --git a/server/src/com/cloud/network/security/dao/SecurityGroupWorkDao.java b/server/src/com/cloud/network/security/dao/SecurityGroupWorkDao.java index 5e05c0f00f2..59e46618d56 100644 --- a/server/src/com/cloud/network/security/dao/SecurityGroupWorkDao.java +++ b/server/src/com/cloud/network/security/dao/SecurityGroupWorkDao.java @@ -40,6 +40,8 @@ public interface SecurityGroupWorkDao extends GenericDao findUnfinishedWork(Date timeBefore); + + List findScheduledWork(); } diff --git a/server/src/com/cloud/network/security/dao/SecurityGroupWorkDaoImpl.java b/server/src/com/cloud/network/security/dao/SecurityGroupWorkDaoImpl.java index afaa1e33759..16cf33574a2 100644 --- a/server/src/com/cloud/network/security/dao/SecurityGroupWorkDaoImpl.java +++ b/server/src/com/cloud/network/security/dao/SecurityGroupWorkDaoImpl.java @@ -23,7 +23,10 @@ import java.util.List; import javax.ejb.Local; +import org.apache.log4j.Logger; + import com.cloud.ha.HaWorkVO; +import com.cloud.ha.dao.HighAvailabilityDaoImpl; import com.cloud.network.security.SecurityGroupWorkVO; import com.cloud.network.security.SecurityGroupWorkVO.Step; import com.cloud.utils.db.DB; @@ -36,7 +39,9 @@ import com.cloud.utils.db.SearchCriteria.Op; import com.cloud.utils.exception.CloudRuntimeException; @Local(value={SecurityGroupWorkDao.class}) -public class SecurityGroupWorkDaoImpl extends GenericDaoBase implements SecurityGroupWorkDao { +public class SecurityGroupWorkDaoImpl extends GenericDaoBase implements SecurityGroupWorkDao { + private static final Logger s_logger = Logger.getLogger(SecurityGroupWorkDaoImpl.class); + private SearchBuilder VmIdTakenSearch; private SearchBuilder VmIdSeqNumSearch; private SearchBuilder VmIdUnTakenSearch; @@ -101,22 +106,27 @@ public class SecurityGroupWorkDaoImpl extends GenericDaoBase sc = UntakenWorkSearch.create(); sc.setParameters("step", Step.Scheduled); - final Filter filter = new Filter(SecurityGroupWorkVO.class, null, true, 0l, 1l);//FIXME: order desc by update time? - txn.start(); - final List vos = lockRows(sc, filter, true); - if (vos.size() == 0) { + final SecurityGroupWorkVO vo = this.lockOneRandomRow(sc, true); + if (vo == null) { + if (s_logger.isTraceEnabled()) { + s_logger.trace("No security group work items found"); + } + txn.commit(); return null; } SecurityGroupWorkVO work = null; - for (SecurityGroupWorkVO w: vos) { - //ensure that there is no job in Processing state for the same VM - if ( findByVmIdStep(w.getInstanceId(), Step.Processing) == null) { - work = w; - break; - } + + //ensure that there is no job in Processing state for the same VM + if ( findByVmIdStep(vo.getInstanceId(), Step.Processing) == null) { + work = vo; } + if (work == null) { + if (s_logger.isTraceEnabled()) { + s_logger.trace("Found a security group work item in Scheduled and Processing, exiting vm="+vo.getInstanceId()); + } + txn.commit(); return null; } work.setServerId(serverId); @@ -203,7 +213,14 @@ public class SecurityGroupWorkDaoImpl extends GenericDaoBase findScheduledWork() { + final SearchCriteria sc = UntakenWorkSearch.create(); + sc.setParameters("step", Step.Scheduled); + return listIncludingRemovedBy(sc); + } }