one big transaction to see if it's any faster

This commit is contained in:
Alex Huang 2011-08-03 14:39:32 -07:00
parent 9ace67310e
commit 11650bfc9d
1 changed files with 65 additions and 75 deletions

View File

@ -106,6 +106,8 @@ import com.cloud.vm.VirtualMachineManager;
import com.cloud.vm.dao.UserVmDao;
import com.cloud.vm.dao.VMInstanceDao;
import edu.emory.mathcs.backport.java.util.Collections;
@Local(value = { SecurityGroupManager.class, SecurityGroupService.class })
public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityGroupService, Manager, StateListener<State, VirtualMachine.Event, VirtualMachine> {
public static final Logger s_logger = Logger.getLogger(SecurityGroupManagerImpl.class);
@ -361,90 +363,78 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
protected void handleVmStarted(VMInstanceVO vm) {
if (vm.getType() != VirtualMachine.Type.User || !isVmSecurityGroupEnabled(vm.getId()))
return;
Set<Long> affectedVms = getAffectedVmsForVmStart(vm);
List<Long> affectedVms = getAffectedVmsForVmStart(vm);
scheduleRulesetUpdateToHosts(affectedVms, true, null);
}
@DB
public void scheduleRulesetUpdateToHosts(Set<Long> affectedVms, boolean updateSeqno, Long delayMs) {
public void scheduleRulesetUpdateToHosts(List<Long> affectedVms, boolean updateSeqno, Long delayMs) {
if (affectedVms.size() == 0) {
return;
}
if (delayMs == null) {
delayMs = new Long(100l);
}
Collections.sort(affectedVms);
if (s_logger.isTraceEnabled()) {
s_logger.trace("Security Group Mgr: scheduling ruleset updates for " + affectedVms.size() + " vms");
}
if (affectedVms.size() == 0) {
boolean locked = _workLock.lock(_globalWorkLockTimeout);
if (!locked) {
s_logger.warn("Security Group Mgr: failed to acquire global work lock");
return;
}
boolean locked = _workLock.lock(_globalWorkLockTimeout);
if (locked) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Security Group Mgr: acquired global work lock");
}
try {
for (Long vmId : affectedVms) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Security Group Mgr: scheduling ruleset update for " + vmId);
}
VmRulesetLogVO log = null;
SecurityGroupWorkVO work = null;
//UserVm vm = null;
Transaction txn = null;
try {
txn = Transaction.currentTxn();
txn.start();
//vm = _userVMDao.acquireInLockTable(vmId);
//if (vm == null) {
//s_logger.warn("Failed to acquire lock on vm id " + vmId);
//continue;
//}
log = _rulesetLogDao.findByVmId(vmId);
if (log == null) {
log = new VmRulesetLogVO(vmId);
log = _rulesetLogDao.persist(log);
}
if (log != null && updateSeqno) {
log.incrLogsequence();
_rulesetLogDao.update(log.getId(), log);
}
work = _workDao.findByVmIdStep(vmId, Step.Scheduled);
if (work == null) {
work = new SecurityGroupWorkVO(vmId, null, null, SecurityGroupWorkVO.Step.Scheduled, null);
work = _workDao.persist(work);
if (s_logger.isTraceEnabled()) {
s_logger.trace("Security Group Mgr: created new work item for " + vmId);
}
}
work.setLogsequenceNumber(log.getLogsequence());
_workDao.update(work.getId(), work);
} finally {
// if (vm != null) {
// _userVMDao.releaseFromLockTable(vmId);
// }
if (txn != null)
txn.commit();
}
_executorPool.schedule(new WorkerThread(), delayMs, TimeUnit.MILLISECONDS);
}
} finally {
_workLock.unlock();
if (s_logger.isTraceEnabled()) {
s_logger.trace("Security Group Mgr: acquired global work lock");
}
Transaction txn = Transaction.currentTxn();
try {
txn.start();
for (Long vmId : affectedVms) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Security Group Mgr: released global work lock");
s_logger.trace("Security Group Mgr: scheduling ruleset update for " + vmId);
}
VmRulesetLogVO log = null;
SecurityGroupWorkVO work = null;
log = _rulesetLogDao.findByVmId(vmId);
if (log == null) {
log = new VmRulesetLogVO(vmId);
log = _rulesetLogDao.persist(log);
}
if (log != null && updateSeqno) {
log.incrLogsequence();
_rulesetLogDao.update(log.getId(), log);
}
work = _workDao.findByVmIdStep(vmId, Step.Scheduled);
if (work == null) {
work = new SecurityGroupWorkVO(vmId, null, null, SecurityGroupWorkVO.Step.Scheduled, null);
work = _workDao.persist(work);
if (s_logger.isTraceEnabled()) {
s_logger.trace("Security Group Mgr: created new work item for " + vmId);
}
}
work.setLogsequenceNumber(log.getLogsequence());
_workDao.update(work.getId(), work);
_executorPool.schedule(new WorkerThread(), delayMs, TimeUnit.MILLISECONDS);
}
txn.commit();
} finally {
_workLock.unlock();
if (s_logger.isTraceEnabled()) {
s_logger.trace("Security Group Mgr: released global work lock");
}
} else {
s_logger.warn("Security Group Mgr: failed to acquire global work lock");
}
}
protected Set<Long> getAffectedVmsForVmStart(VMInstanceVO vm) {
Set<Long> affectedVms = new HashSet<Long>();
protected List<Long> getAffectedVmsForVmStart(VMInstanceVO vm) {
List<Long> affectedVms = new ArrayList<Long>();
affectedVms.add(vm.getId());
List<SecurityGroupVMMapVO> groupsForVm = _securityGroupVMMapDao.listByInstanceId(vm.getId());
// For each group, find the ingress rules that allow the group
@ -456,8 +446,8 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
return affectedVms;
}
protected Set<Long> getAffectedVmsForVmStop(VMInstanceVO vm) {
Set<Long> affectedVms = new HashSet<Long>();
protected List<Long> getAffectedVmsForVmStop(VMInstanceVO vm) {
List<Long> affectedVms = new ArrayList<Long>();
List<SecurityGroupVMMapVO> groupsForVm = _securityGroupVMMapDao.listByInstanceId(vm.getId());
// For each group, find the ingress rules that allow the group
for (SecurityGroupVMMapVO mapVO : groupsForVm) {// FIXME: use custom sql in the dao
@ -468,9 +458,9 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
return affectedVms;
}
protected Set<Long> getAffectedVmsForIngressRules(List<IngressRuleVO> allowingRules) {
protected List<Long> getAffectedVmsForIngressRules(List<IngressRuleVO> allowingRules) {
Set<Long> distinctGroups = new HashSet<Long>();
Set<Long> affectedVms = new HashSet<Long>();
List<Long> affectedVms = new ArrayList<Long>();
for (IngressRuleVO allowingRule : allowingRules) {
distinctGroups.add(allowingRule.getSecurityGroupId());
@ -497,7 +487,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
protected void handleVmStopped(VMInstanceVO vm) {
if (vm.getType() != VirtualMachine.Type.User || !isVmSecurityGroupEnabled(vm.getId()))
return;
Set<Long> affectedVms = getAffectedVmsForVmStop(vm);
List<Long> affectedVms = getAffectedVmsForVmStop(vm);
scheduleRulesetUpdateToHosts(affectedVms, true, null);
}
@ -517,7 +507,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
}
} else {
Set<Long> affectedVms = new HashSet<Long>();
List<Long> affectedVms = new ArrayList<Long>();
affectedVms.add(vm.getId());
scheduleRulesetUpdateToHosts(affectedVms, true, null);
}
@ -687,7 +677,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
s_logger.debug("Added " + newRules.size() + " rules to security group " + securityGroup.getName());
}
txn.commit();
final Set<Long> affectedVms = new HashSet<Long>();
final ArrayList<Long> affectedVms = new ArrayList<Long>();
affectedVms.addAll(_securityGroupVMMapDao.listVmIdsBySecurityGroup(securityGroup.getId()));
scheduleRulesetUpdateToHosts(affectedVms, true, null);
return newRules;
@ -733,7 +723,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
_ingressRuleDao.remove(id);
s_logger.debug("revokeSecurityGroupIngress succeeded for ingress rule id: " + id);
final Set<Long> affectedVms = new HashSet<Long>();
final ArrayList<Long> affectedVms = new ArrayList<Long>();
affectedVms.addAll(_securityGroupVMMapDao.listVmIdsBySecurityGroup(groupHandle.getId()));
scheduleRulesetUpdateToHosts(affectedVms, true, null);
@ -860,7 +850,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
if (s_logger.isDebugEnabled()) {
s_logger.debug("Security Group work: found a job in done state, rescheduling for vm: " + userVmId);
}
Set<Long> affectedVms = new HashSet<Long>();
ArrayList<Long> affectedVms = new ArrayList<Long>();
affectedVms.add(userVmId);
scheduleRulesetUpdateToHosts(affectedVms, true, _timeBetweenCleanups*1000l);
}
@ -1135,7 +1125,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
@Override
public void fullSync(long agentId, HashMap<String, Pair<Long, Long>> newGroupStates) {
Set<Long> affectedVms = new HashSet<Long>();
ArrayList<Long> affectedVms = new ArrayList<Long>();
for (String vmName : newGroupStates.keySet()) {
Long vmId = newGroupStates.get(vmName).first();
Long seqno = newGroupStates.get(vmName).second();
@ -1166,7 +1156,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
List<SecurityGroupWorkVO> unfinished = _workDao.findUnfinishedWork(before);
if (unfinished.size() > 0) {
s_logger.info("Network Group Work cleanup found " + unfinished.size() + " unfinished work items older than " + before.toString());
Set<Long> affectedVms = new HashSet<Long>();
ArrayList<Long> affectedVms = new ArrayList<Long>();
for (SecurityGroupWorkVO work : unfinished) {
affectedVms.add(work.getInstanceId());
work.setStep(Step.Error);