bug 10884:

1. cleanup was scheduled wrong (seconds vs ms)
2. when finding a work item to do, lock one random row to reduce contentions by thundering herds of workers
3. cleanup thread also finds scheduled work items and gets threads to work on them. this way other mgmt servers can take jobs
4. add lots of trace logs
5. commit transactions when returning early
This commit is contained in:
Chiradeep Vittal 2011-07-29 16:27:35 -07:00
parent d48e97a368
commit 41e6aeae96
3 changed files with 68 additions and 15 deletions

View File

@ -26,6 +26,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
@ -189,6 +190,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
try {
cleanupFinishedWork();
cleanupUnfinishedWork();
processScheduledWork();
} finally {
txn.close("SG Cleanup");
}
@ -362,9 +364,15 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
if (delayMs == null) {
delayMs = new Long(100l);
}
if (s_logger.isTraceEnabled()) {
s_logger.trace("Security Group Mgr: scheduling ruleset updates for " + affectedVms.size() + " vms");
}
for (Long vmId : affectedVms) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Security Group Mgr: scheduling ruleset updates for " + vmId);
}
VmRulesetLogVO log = null;
SecurityGroupWorkVO work = null;
UserVm vm = null;
@ -392,6 +400,9 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
if (work == null) {
work = new SecurityGroupWorkVO(vmId, null, null, SecurityGroupWorkVO.Step.Scheduled, null);
work = _workDao.persist(work);
if (s_logger.isTraceEnabled()) {
s_logger.trace("Security Group Mgr: created new work item for " + vmId);
}
}
work.setLogsequenceNumber(log.getLogsequence());
@ -1102,7 +1113,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
}
public void cleanupFinishedWork() {
Date before = new Date(System.currentTimeMillis() - 24 * 3600 * 1000l);
Date before = new Date(System.currentTimeMillis() - 6 * 3600 * 1000l);
int numDeleted = _workDao.deleteFinishedWork(before);
if (numDeleted > 0) {
s_logger.info("Network Group Work cleanup deleted " + numDeleted + " finished work items older than " + before.toString());
@ -1111,7 +1122,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
}
private void cleanupUnfinishedWork() {
Date before = new Date(System.currentTimeMillis() - _timeBetweenCleanups* 1000l);
Date before = new Date(System.currentTimeMillis() - _timeBetweenCleanups);
List<SecurityGroupWorkVO> unfinished = _workDao.findUnfinishedWork(before);
if (unfinished.size() > 0) {
s_logger.info("Network Group Work cleanup found " + unfinished.size() + " unfinished work items older than " + before.toString());
@ -1124,6 +1135,20 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
s_logger.debug("Network Group Work cleanup found no unfinished work items older than " + before.toString());
}
}
private void processScheduledWork() {
List<SecurityGroupWorkVO> scheduled = _workDao.findScheduledWork();
int numJobs = scheduled.size();
if (numJobs > 0) {
s_logger.debug("Security group work: found scheduled jobs " + numJobs);
Random rand = new Random();
for (int i=0; i < numJobs; i++) {
long delayMs = 100 + 10*rand.nextInt(numJobs);
_executorPool.schedule(new WorkerThread(), delayMs, TimeUnit.MILLISECONDS);
}
}
}
@Override
public String getSecurityGroupsNamesForVm(long vmId) {
@ -1181,10 +1206,19 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
}
if (VirtualMachine.State.isVmStarted(oldState, event, newState)) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Security Group Mgr: handling start of vm id" + vm.getId());
}
handleVmStarted((VMInstanceVO) vm);
} else if (VirtualMachine.State.isVmStopped(oldState, event, newState)) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Security Group Mgr: handling stop of vm id" + vm.getId());
}
handleVmStopped((VMInstanceVO) vm);
} else if (VirtualMachine.State.isVmMigrated(oldState, event, newState)) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Security Group Mgr: handling migration of vm id" + vm.getId());
}
handleVmMigrated((VMInstanceVO) vm);
}

View File

@ -40,6 +40,8 @@ public interface SecurityGroupWorkDao extends GenericDao<SecurityGroupWorkVO, Lo
int deleteFinishedWork(Date timeBefore);
List<SecurityGroupWorkVO> findUnfinishedWork(Date timeBefore);
List<SecurityGroupWorkVO> findScheduledWork();
}

View File

@ -23,7 +23,10 @@ import java.util.List;
import javax.ejb.Local;
import org.apache.log4j.Logger;
import com.cloud.ha.HaWorkVO;
import com.cloud.ha.dao.HighAvailabilityDaoImpl;
import com.cloud.network.security.SecurityGroupWorkVO;
import com.cloud.network.security.SecurityGroupWorkVO.Step;
import com.cloud.utils.db.DB;
@ -36,7 +39,9 @@ import com.cloud.utils.db.SearchCriteria.Op;
import com.cloud.utils.exception.CloudRuntimeException;
@Local(value={SecurityGroupWorkDao.class})
public class SecurityGroupWorkDaoImpl extends GenericDaoBase<SecurityGroupWorkVO, Long> implements SecurityGroupWorkDao {
public class SecurityGroupWorkDaoImpl extends GenericDaoBase<SecurityGroupWorkVO, Long> implements SecurityGroupWorkDao {
private static final Logger s_logger = Logger.getLogger(SecurityGroupWorkDaoImpl.class);
private SearchBuilder<SecurityGroupWorkVO> VmIdTakenSearch;
private SearchBuilder<SecurityGroupWorkVO> VmIdSeqNumSearch;
private SearchBuilder<SecurityGroupWorkVO> VmIdUnTakenSearch;
@ -101,22 +106,27 @@ public class SecurityGroupWorkDaoImpl extends GenericDaoBase<SecurityGroupWorkVO
final SearchCriteria<SecurityGroupWorkVO> sc = UntakenWorkSearch.create();
sc.setParameters("step", Step.Scheduled);
final Filter filter = new Filter(SecurityGroupWorkVO.class, null, true, 0l, 1l);//FIXME: order desc by update time?
txn.start();
final List<SecurityGroupWorkVO> vos = lockRows(sc, filter, true);
if (vos.size() == 0) {
final SecurityGroupWorkVO vo = this.lockOneRandomRow(sc, true);
if (vo == null) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("No security group work items found");
}
txn.commit();
return null;
}
SecurityGroupWorkVO work = null;
for (SecurityGroupWorkVO w: vos) {
//ensure that there is no job in Processing state for the same VM
if ( findByVmIdStep(w.getInstanceId(), Step.Processing) == null) {
work = w;
break;
}
//ensure that there is no job in Processing state for the same VM
if ( findByVmIdStep(vo.getInstanceId(), Step.Processing) == null) {
work = vo;
}
if (work == null) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Found a security group work item in Scheduled and Processing, exiting vm="+vo.getInstanceId());
}
txn.commit();
return null;
}
work.setServerId(serverId);
@ -203,7 +213,14 @@ public class SecurityGroupWorkDaoImpl extends GenericDaoBase<SecurityGroupWorkVO
update(work, sc);
return result;
}
}
@Override
public List<SecurityGroupWorkVO> findScheduledWork() {
final SearchCriteria<SecurityGroupWorkVO> sc = UntakenWorkSearch.create();
sc.setParameters("step", Step.Scheduled);
return listIncludingRemovedBy(sc);
}
}