diff --git a/server/src/com/cloud/network/security/SecurityGroupManagerImpl.java b/server/src/com/cloud/network/security/SecurityGroupManagerImpl.java index 8b40666833a..586b7240237 100755 --- a/server/src/com/cloud/network/security/SecurityGroupManagerImpl.java +++ b/server/src/com/cloud/network/security/SecurityGroupManagerImpl.java @@ -106,6 +106,8 @@ import com.cloud.vm.VirtualMachineManager; import com.cloud.vm.dao.UserVmDao; import com.cloud.vm.dao.VMInstanceDao; +import edu.emory.mathcs.backport.java.util.Collections; + @Local(value = { SecurityGroupManager.class, SecurityGroupService.class }) public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityGroupService, Manager, StateListener { public static final Logger s_logger = Logger.getLogger(SecurityGroupManagerImpl.class); @@ -361,90 +363,78 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG protected void handleVmStarted(VMInstanceVO vm) { if (vm.getType() != VirtualMachine.Type.User || !isVmSecurityGroupEnabled(vm.getId())) return; - Set affectedVms = getAffectedVmsForVmStart(vm); + List affectedVms = getAffectedVmsForVmStart(vm); scheduleRulesetUpdateToHosts(affectedVms, true, null); } @DB - public void scheduleRulesetUpdateToHosts(Set affectedVms, boolean updateSeqno, Long delayMs) { + public void scheduleRulesetUpdateToHosts(List affectedVms, boolean updateSeqno, Long delayMs) { + if (affectedVms.size() == 0) { + return; + } + if (delayMs == null) { delayMs = new Long(100l); } + Collections.sort(affectedVms); if (s_logger.isTraceEnabled()) { s_logger.trace("Security Group Mgr: scheduling ruleset updates for " + affectedVms.size() + " vms"); } - if (affectedVms.size() == 0) { + boolean locked = _workLock.lock(_globalWorkLockTimeout); + if (!locked) { + s_logger.warn("Security Group Mgr: failed to acquire global work lock"); return; } - boolean locked = _workLock.lock(_globalWorkLockTimeout); - if (locked) { - if (s_logger.isTraceEnabled()) { - s_logger.trace("Security Group Mgr: acquired global work lock"); - } - try { - for (Long vmId : affectedVms) { - if (s_logger.isTraceEnabled()) { - s_logger.trace("Security Group Mgr: scheduling ruleset update for " + vmId); - } - VmRulesetLogVO log = null; - SecurityGroupWorkVO work = null; - //UserVm vm = null; - Transaction txn = null; - try { - txn = Transaction.currentTxn(); - txn.start(); - //vm = _userVMDao.acquireInLockTable(vmId); - //if (vm == null) { - //s_logger.warn("Failed to acquire lock on vm id " + vmId); - //continue; - //} - log = _rulesetLogDao.findByVmId(vmId); - if (log == null) { - log = new VmRulesetLogVO(vmId); - log = _rulesetLogDao.persist(log); - } - - if (log != null && updateSeqno) { - log.incrLogsequence(); - _rulesetLogDao.update(log.getId(), log); - } - work = _workDao.findByVmIdStep(vmId, Step.Scheduled); - if (work == null) { - work = new SecurityGroupWorkVO(vmId, null, null, SecurityGroupWorkVO.Step.Scheduled, null); - work = _workDao.persist(work); - if (s_logger.isTraceEnabled()) { - s_logger.trace("Security Group Mgr: created new work item for " + vmId); - } - } - - work.setLogsequenceNumber(log.getLogsequence()); - _workDao.update(work.getId(), work); - - } finally { - // if (vm != null) { - // _userVMDao.releaseFromLockTable(vmId); - // } - if (txn != null) - txn.commit(); - } - - _executorPool.schedule(new WorkerThread(), delayMs, TimeUnit.MILLISECONDS); - } - } finally { - _workLock.unlock(); + if (s_logger.isTraceEnabled()) { + s_logger.trace("Security Group Mgr: acquired global work lock"); + } + Transaction txn = Transaction.currentTxn(); + try { + txn.start(); + for (Long vmId : affectedVms) { if (s_logger.isTraceEnabled()) { - s_logger.trace("Security Group Mgr: released global work lock"); + s_logger.trace("Security Group Mgr: scheduling ruleset update for " + vmId); } + VmRulesetLogVO log = null; + SecurityGroupWorkVO work = null; + + log = _rulesetLogDao.findByVmId(vmId); + if (log == null) { + log = new VmRulesetLogVO(vmId); + log = _rulesetLogDao.persist(log); + } + + if (log != null && updateSeqno) { + log.incrLogsequence(); + _rulesetLogDao.update(log.getId(), log); + } + work = _workDao.findByVmIdStep(vmId, Step.Scheduled); + if (work == null) { + work = new SecurityGroupWorkVO(vmId, null, null, SecurityGroupWorkVO.Step.Scheduled, null); + work = _workDao.persist(work); + if (s_logger.isTraceEnabled()) { + s_logger.trace("Security Group Mgr: created new work item for " + vmId); + } + } + + work.setLogsequenceNumber(log.getLogsequence()); + _workDao.update(work.getId(), work); + + _executorPool.schedule(new WorkerThread(), delayMs, TimeUnit.MILLISECONDS); + } + txn.commit(); + } finally { + _workLock.unlock(); + if (s_logger.isTraceEnabled()) { + s_logger.trace("Security Group Mgr: released global work lock"); } - } else { - s_logger.warn("Security Group Mgr: failed to acquire global work lock"); } } - protected Set getAffectedVmsForVmStart(VMInstanceVO vm) { - Set affectedVms = new HashSet(); + protected List getAffectedVmsForVmStart(VMInstanceVO vm) { + List affectedVms = new ArrayList(); affectedVms.add(vm.getId()); List groupsForVm = _securityGroupVMMapDao.listByInstanceId(vm.getId()); // For each group, find the ingress rules that allow the group @@ -456,8 +446,8 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG return affectedVms; } - protected Set getAffectedVmsForVmStop(VMInstanceVO vm) { - Set affectedVms = new HashSet(); + protected List getAffectedVmsForVmStop(VMInstanceVO vm) { + List affectedVms = new ArrayList(); List groupsForVm = _securityGroupVMMapDao.listByInstanceId(vm.getId()); // For each group, find the ingress rules that allow the group for (SecurityGroupVMMapVO mapVO : groupsForVm) {// FIXME: use custom sql in the dao @@ -468,9 +458,9 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG return affectedVms; } - protected Set getAffectedVmsForIngressRules(List allowingRules) { + protected List getAffectedVmsForIngressRules(List allowingRules) { Set distinctGroups = new HashSet(); - Set affectedVms = new HashSet(); + List affectedVms = new ArrayList(); for (IngressRuleVO allowingRule : allowingRules) { distinctGroups.add(allowingRule.getSecurityGroupId()); @@ -497,7 +487,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG protected void handleVmStopped(VMInstanceVO vm) { if (vm.getType() != VirtualMachine.Type.User || !isVmSecurityGroupEnabled(vm.getId())) return; - Set affectedVms = getAffectedVmsForVmStop(vm); + List affectedVms = getAffectedVmsForVmStop(vm); scheduleRulesetUpdateToHosts(affectedVms, true, null); } @@ -517,7 +507,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG } } else { - Set affectedVms = new HashSet(); + List affectedVms = new ArrayList(); affectedVms.add(vm.getId()); scheduleRulesetUpdateToHosts(affectedVms, true, null); } @@ -687,7 +677,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG s_logger.debug("Added " + newRules.size() + " rules to security group " + securityGroup.getName()); } txn.commit(); - final Set affectedVms = new HashSet(); + final ArrayList affectedVms = new ArrayList(); affectedVms.addAll(_securityGroupVMMapDao.listVmIdsBySecurityGroup(securityGroup.getId())); scheduleRulesetUpdateToHosts(affectedVms, true, null); return newRules; @@ -733,7 +723,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG _ingressRuleDao.remove(id); s_logger.debug("revokeSecurityGroupIngress succeeded for ingress rule id: " + id); - final Set affectedVms = new HashSet(); + final ArrayList affectedVms = new ArrayList(); affectedVms.addAll(_securityGroupVMMapDao.listVmIdsBySecurityGroup(groupHandle.getId())); scheduleRulesetUpdateToHosts(affectedVms, true, null); @@ -860,7 +850,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG if (s_logger.isDebugEnabled()) { s_logger.debug("Security Group work: found a job in done state, rescheduling for vm: " + userVmId); } - Set affectedVms = new HashSet(); + ArrayList affectedVms = new ArrayList(); affectedVms.add(userVmId); scheduleRulesetUpdateToHosts(affectedVms, true, _timeBetweenCleanups*1000l); } @@ -1135,7 +1125,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG @Override public void fullSync(long agentId, HashMap> newGroupStates) { - Set affectedVms = new HashSet(); + ArrayList affectedVms = new ArrayList(); for (String vmName : newGroupStates.keySet()) { Long vmId = newGroupStates.get(vmName).first(); Long seqno = newGroupStates.get(vmName).second(); @@ -1166,7 +1156,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG List unfinished = _workDao.findUnfinishedWork(before); if (unfinished.size() > 0) { s_logger.info("Network Group Work cleanup found " + unfinished.size() + " unfinished work items older than " + before.toString()); - Set affectedVms = new HashSet(); + ArrayList affectedVms = new ArrayList(); for (SecurityGroupWorkVO work : unfinished) { affectedVms.add(work.getInstanceId()); work.setStep(Step.Error);