try using multi inserts to overcome deadlock during batch update

This commit is contained in:
Chiradeep Vittal 2011-08-28 00:57:39 -07:00
parent 3c45a7a195
commit 46200d122e
2 changed files with 69 additions and 8 deletions

View File

@ -74,22 +74,20 @@ public class SecurityGroupManagerImpl2 extends SecurityGroupManagerImpl {
}
@Override
@DB
//@DB
public void scheduleRulesetUpdateToHosts(List<Long> affectedVms, boolean updateSeqno, Long delayMs) {
if (affectedVms.size() == 0) {
return;
}
if (s_logger.isTraceEnabled()) {
s_logger.trace("Security Group Mgr v2: scheduling ruleset updates for " + affectedVms.size() + " vms, current queue size=" + _workQueue.size());
}
Set<Long> workItems = new TreeSet<Long>();
workItems.addAll(affectedVms);
if (s_logger.isTraceEnabled()) {
s_logger.trace("Security Group Mgr v2: scheduling ruleset updates for " + affectedVms.size() + " vms " + " (unique=" + workItems.size() + "), current queue size=" + _workQueue.size());
}
Profiler p = new Profiler();
p.start();
Transaction txn = Transaction.currentTxn();
txn.start();
int updated = _rulesetLogDao.createOrUpdate(workItems);
txn.commit();
int newJobs = _workQueue.submitWorkForVms(workItems);
p.stop();
if (s_logger.isTraceEnabled()){

View File

@ -20,6 +20,8 @@ package com.cloud.network.security.dao;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Set;
import javax.ejb.Local;
@ -38,7 +40,26 @@ public class VmRulesetLogDaoImpl extends GenericDaoBase<VmRulesetLogVO, Long> im
private SearchBuilder<VmRulesetLogVO> VmIdSearch;
private String INSERT_OR_UPDATE = "INSERT INTO op_vm_ruleset_log (instance_id, created, logsequence) " +
" VALUES(?, now(), 1) ON DUPLICATE KEY UPDATE logsequence=logsequence+1";
private static HashMap<Integer, String> cachedPrepStmtStrings = new HashMap<Integer, String>();
final static private int cacheStringSizes [] = {512, 256, 128, 64, 32, 16, 8, 4, 2, 1};
static {
//prepare the cache.
for (int size: cacheStringSizes) {
cachedPrepStmtStrings.put(size, createPrepStatementString(size));
}
}
private static String createPrepStatementString(int numItems) {
StringBuilder builder = new StringBuilder("INSERT INTO op_vm_ruleset_log (instance_id, created, logsequence) VALUES ");
for (int i=0; i < numItems-1; i++) {
builder.append("(?, now(), 1), ");
}
builder.append("(?, now(), 1) ");
builder.append(" ON DUPLICATE KEY UPDATE logsequence=logsequence+1");
return builder.toString();
}
protected VmRulesetLogDaoImpl() {
VmIdSearch = createSearchBuilder();
@ -57,6 +78,48 @@ public class VmRulesetLogDaoImpl extends GenericDaoBase<VmRulesetLogVO, Long> im
@Override
public int createOrUpdate(Set<Long> workItems) {
//return createOrUpdateUsingBatch(workItems);
return createOrUpdateUsingMultiInsert(workItems);
}
protected int createOrUpdateUsingMultiInsert(Set<Long> workItems) {
Transaction txn = Transaction.currentTxn();
PreparedStatement stmtInsert = null;
int size = workItems.size();
int count = 0;
Iterator<Long> workIter = workItems.iterator();
int remaining = size;
try {
for (int stmtSize : cacheStringSizes) {
int numStmts = remaining / stmtSize;
if (numStmts > 0) {
String pstmt = cachedPrepStmtStrings.get(stmtSize);
stmtInsert = txn.prepareAutoCloseStatement(pstmt);
for (int i=0; i < numStmts; i++) {
for (int argIndex=1; argIndex <= stmtSize; argIndex++) {
Long vmId = workIter.next();
stmtInsert.setLong(argIndex, vmId);
}
int numUpdated = stmtInsert.executeUpdate();
if (s_logger.isTraceEnabled()) {
s_logger.trace("Inserted or updated " + numUpdated + " rows");
}
Thread.yield();
count += stmtSize;
}
remaining = remaining - numStmts * stmtSize;
}
}
} catch (SQLException sqe) {
s_logger.warn("Failed to execute multi insert ", sqe);
}
return count;
}
protected int createOrUpdateUsingBatch(Set<Long> workItems) {
Transaction txn = Transaction.currentTxn();
PreparedStatement stmtInsert = null;
int [] queryResult = null;