diff --git a/server/src/com/cloud/network/security/SecurityGroupManagerImpl.java b/server/src/com/cloud/network/security/SecurityGroupManagerImpl.java index 7fa76f9aa42..097babce619 100755 --- a/server/src/com/cloud/network/security/SecurityGroupManagerImpl.java +++ b/server/src/com/cloud/network/security/SecurityGroupManagerImpl.java @@ -26,6 +26,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; @@ -189,6 +190,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG try { cleanupFinishedWork(); cleanupUnfinishedWork(); + processScheduledWork(); } finally { txn.close("SG Cleanup"); } @@ -362,9 +364,15 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG if (delayMs == null) { delayMs = new Long(100l); } + + if (s_logger.isTraceEnabled()) { + s_logger.trace("Security Group Mgr: scheduling ruleset updates for " + affectedVms.size() + " vms"); + } for (Long vmId : affectedVms) { - + if (s_logger.isTraceEnabled()) { + s_logger.trace("Security Group Mgr: scheduling ruleset updates for " + vmId); + } VmRulesetLogVO log = null; SecurityGroupWorkVO work = null; UserVm vm = null; @@ -392,6 +400,9 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG if (work == null) { work = new SecurityGroupWorkVO(vmId, null, null, SecurityGroupWorkVO.Step.Scheduled, null); work = _workDao.persist(work); + if (s_logger.isTraceEnabled()) { + s_logger.trace("Security Group Mgr: created new work item for " + vmId); + } } work.setLogsequenceNumber(log.getLogsequence()); @@ -1102,7 +1113,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG } public void cleanupFinishedWork() { - Date before = new Date(System.currentTimeMillis() - 24 * 3600 * 1000l); + Date before = new Date(System.currentTimeMillis() - 6 * 3600 * 1000l); int numDeleted = _workDao.deleteFinishedWork(before); if (numDeleted > 0) { s_logger.info("Network Group Work cleanup deleted " + numDeleted + " finished work items older than " + before.toString()); @@ -1111,7 +1122,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG } private void cleanupUnfinishedWork() { - Date before = new Date(System.currentTimeMillis() - _timeBetweenCleanups* 1000l); + Date before = new Date(System.currentTimeMillis() - _timeBetweenCleanups); List unfinished = _workDao.findUnfinishedWork(before); if (unfinished.size() > 0) { s_logger.info("Network Group Work cleanup found " + unfinished.size() + " unfinished work items older than " + before.toString()); @@ -1124,6 +1135,20 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG s_logger.debug("Network Group Work cleanup found no unfinished work items older than " + before.toString()); } } + + private void processScheduledWork() { + List scheduled = _workDao.findScheduledWork(); + int numJobs = scheduled.size(); + if (numJobs > 0) { + s_logger.debug("Security group work: found scheduled jobs " + numJobs); + Random rand = new Random(); + for (int i=0; i < numJobs; i++) { + long delayMs = 100 + 10*rand.nextInt(numJobs); + _executorPool.schedule(new WorkerThread(), delayMs, TimeUnit.MILLISECONDS); + } + } + + } @Override public String getSecurityGroupsNamesForVm(long vmId) { @@ -1181,10 +1206,19 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG } if (VirtualMachine.State.isVmStarted(oldState, event, newState)) { + if (s_logger.isTraceEnabled()) { + s_logger.trace("Security Group Mgr: handling start of vm id" + vm.getId()); + } handleVmStarted((VMInstanceVO) vm); } else if (VirtualMachine.State.isVmStopped(oldState, event, newState)) { + if (s_logger.isTraceEnabled()) { + s_logger.trace("Security Group Mgr: handling stop of vm id" + vm.getId()); + } handleVmStopped((VMInstanceVO) vm); } else if (VirtualMachine.State.isVmMigrated(oldState, event, newState)) { + if (s_logger.isTraceEnabled()) { + s_logger.trace("Security Group Mgr: handling migration of vm id" + vm.getId()); + } handleVmMigrated((VMInstanceVO) vm); } diff --git a/server/src/com/cloud/network/security/dao/SecurityGroupWorkDao.java b/server/src/com/cloud/network/security/dao/SecurityGroupWorkDao.java index 5e05c0f00f2..59e46618d56 100644 --- a/server/src/com/cloud/network/security/dao/SecurityGroupWorkDao.java +++ b/server/src/com/cloud/network/security/dao/SecurityGroupWorkDao.java @@ -40,6 +40,8 @@ public interface SecurityGroupWorkDao extends GenericDao findUnfinishedWork(Date timeBefore); + + List findScheduledWork(); } diff --git a/server/src/com/cloud/network/security/dao/SecurityGroupWorkDaoImpl.java b/server/src/com/cloud/network/security/dao/SecurityGroupWorkDaoImpl.java index afaa1e33759..16cf33574a2 100644 --- a/server/src/com/cloud/network/security/dao/SecurityGroupWorkDaoImpl.java +++ b/server/src/com/cloud/network/security/dao/SecurityGroupWorkDaoImpl.java @@ -23,7 +23,10 @@ import java.util.List; import javax.ejb.Local; +import org.apache.log4j.Logger; + import com.cloud.ha.HaWorkVO; +import com.cloud.ha.dao.HighAvailabilityDaoImpl; import com.cloud.network.security.SecurityGroupWorkVO; import com.cloud.network.security.SecurityGroupWorkVO.Step; import com.cloud.utils.db.DB; @@ -36,7 +39,9 @@ import com.cloud.utils.db.SearchCriteria.Op; import com.cloud.utils.exception.CloudRuntimeException; @Local(value={SecurityGroupWorkDao.class}) -public class SecurityGroupWorkDaoImpl extends GenericDaoBase implements SecurityGroupWorkDao { +public class SecurityGroupWorkDaoImpl extends GenericDaoBase implements SecurityGroupWorkDao { + private static final Logger s_logger = Logger.getLogger(SecurityGroupWorkDaoImpl.class); + private SearchBuilder VmIdTakenSearch; private SearchBuilder VmIdSeqNumSearch; private SearchBuilder VmIdUnTakenSearch; @@ -101,22 +106,27 @@ public class SecurityGroupWorkDaoImpl extends GenericDaoBase sc = UntakenWorkSearch.create(); sc.setParameters("step", Step.Scheduled); - final Filter filter = new Filter(SecurityGroupWorkVO.class, null, true, 0l, 1l);//FIXME: order desc by update time? - txn.start(); - final List vos = lockRows(sc, filter, true); - if (vos.size() == 0) { + final SecurityGroupWorkVO vo = this.lockOneRandomRow(sc, true); + if (vo == null) { + if (s_logger.isTraceEnabled()) { + s_logger.trace("No security group work items found"); + } + txn.commit(); return null; } SecurityGroupWorkVO work = null; - for (SecurityGroupWorkVO w: vos) { - //ensure that there is no job in Processing state for the same VM - if ( findByVmIdStep(w.getInstanceId(), Step.Processing) == null) { - work = w; - break; - } + + //ensure that there is no job in Processing state for the same VM + if ( findByVmIdStep(vo.getInstanceId(), Step.Processing) == null) { + work = vo; } + if (work == null) { + if (s_logger.isTraceEnabled()) { + s_logger.trace("Found a security group work item in Scheduled and Processing, exiting vm="+vo.getInstanceId()); + } + txn.commit(); return null; } work.setServerId(serverId); @@ -203,7 +213,14 @@ public class SecurityGroupWorkDaoImpl extends GenericDaoBase findScheduledWork() { + final SearchCriteria sc = UntakenWorkSearch.create(); + sc.setParameters("step", Step.Scheduled); + return listIncludingRemovedBy(sc); + } }