From 46200d122e6fc42806ced905b1f9fd57b3f33e97 Mon Sep 17 00:00:00 2001 From: Chiradeep Vittal Date: Sun, 28 Aug 2011 00:57:39 -0700 Subject: [PATCH] try using multi inserts to overcome deadlock during batch update --- .../security/SecurityGroupManagerImpl2.java | 12 ++-- .../security/dao/VmRulesetLogDaoImpl.java | 65 ++++++++++++++++++- 2 files changed, 69 insertions(+), 8 deletions(-) diff --git a/server/src/com/cloud/network/security/SecurityGroupManagerImpl2.java b/server/src/com/cloud/network/security/SecurityGroupManagerImpl2.java index 6c7a34ebda6..67087518d77 100644 --- a/server/src/com/cloud/network/security/SecurityGroupManagerImpl2.java +++ b/server/src/com/cloud/network/security/SecurityGroupManagerImpl2.java @@ -74,22 +74,20 @@ public class SecurityGroupManagerImpl2 extends SecurityGroupManagerImpl { } @Override - @DB + //@DB public void scheduleRulesetUpdateToHosts(List affectedVms, boolean updateSeqno, Long delayMs) { if (affectedVms.size() == 0) { return; } - if (s_logger.isTraceEnabled()) { - s_logger.trace("Security Group Mgr v2: scheduling ruleset updates for " + affectedVms.size() + " vms, current queue size=" + _workQueue.size()); - } Set workItems = new TreeSet(); workItems.addAll(affectedVms); + if (s_logger.isTraceEnabled()) { + s_logger.trace("Security Group Mgr v2: scheduling ruleset updates for " + affectedVms.size() + " vms " + " (unique=" + workItems.size() + "), current queue size=" + _workQueue.size()); + } + Profiler p = new Profiler(); p.start(); - Transaction txn = Transaction.currentTxn(); - txn.start(); int updated = _rulesetLogDao.createOrUpdate(workItems); - txn.commit(); int newJobs = _workQueue.submitWorkForVms(workItems); p.stop(); if (s_logger.isTraceEnabled()){ diff --git a/server/src/com/cloud/network/security/dao/VmRulesetLogDaoImpl.java b/server/src/com/cloud/network/security/dao/VmRulesetLogDaoImpl.java index f2d82acab2e..810d259292d 100644 --- a/server/src/com/cloud/network/security/dao/VmRulesetLogDaoImpl.java +++ b/server/src/com/cloud/network/security/dao/VmRulesetLogDaoImpl.java @@ -20,6 +20,8 @@ package com.cloud.network.security.dao; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.util.HashMap; +import java.util.Iterator; import java.util.Set; import javax.ejb.Local; @@ -38,7 +40,26 @@ public class VmRulesetLogDaoImpl extends GenericDaoBase im private SearchBuilder VmIdSearch; private String INSERT_OR_UPDATE = "INSERT INTO op_vm_ruleset_log (instance_id, created, logsequence) " + " VALUES(?, now(), 1) ON DUPLICATE KEY UPDATE logsequence=logsequence+1"; - + private static HashMap cachedPrepStmtStrings = new HashMap(); + final static private int cacheStringSizes [] = {512, 256, 128, 64, 32, 16, 8, 4, 2, 1}; + + static { + //prepare the cache. + for (int size: cacheStringSizes) { + cachedPrepStmtStrings.put(size, createPrepStatementString(size)); + } + } + + + private static String createPrepStatementString(int numItems) { + StringBuilder builder = new StringBuilder("INSERT INTO op_vm_ruleset_log (instance_id, created, logsequence) VALUES "); + for (int i=0; i < numItems-1; i++) { + builder.append("(?, now(), 1), "); + } + builder.append("(?, now(), 1) "); + builder.append(" ON DUPLICATE KEY UPDATE logsequence=logsequence+1"); + return builder.toString(); + } protected VmRulesetLogDaoImpl() { VmIdSearch = createSearchBuilder(); @@ -57,6 +78,48 @@ public class VmRulesetLogDaoImpl extends GenericDaoBase im @Override public int createOrUpdate(Set workItems) { + //return createOrUpdateUsingBatch(workItems); + return createOrUpdateUsingMultiInsert(workItems); + } + + protected int createOrUpdateUsingMultiInsert(Set workItems) { + Transaction txn = Transaction.currentTxn(); + PreparedStatement stmtInsert = null; + + int size = workItems.size(); + int count = 0; + Iterator workIter = workItems.iterator(); + int remaining = size; + try { + for (int stmtSize : cacheStringSizes) { + int numStmts = remaining / stmtSize; + if (numStmts > 0) { + String pstmt = cachedPrepStmtStrings.get(stmtSize); + stmtInsert = txn.prepareAutoCloseStatement(pstmt); + for (int i=0; i < numStmts; i++) { + for (int argIndex=1; argIndex <= stmtSize; argIndex++) { + Long vmId = workIter.next(); + stmtInsert.setLong(argIndex, vmId); + } + int numUpdated = stmtInsert.executeUpdate(); + if (s_logger.isTraceEnabled()) { + s_logger.trace("Inserted or updated " + numUpdated + " rows"); + } + Thread.yield(); + count += stmtSize; + } + remaining = remaining - numStmts * stmtSize; + } + + } + } catch (SQLException sqe) { + s_logger.warn("Failed to execute multi insert ", sqe); + } + + return count; + } + + protected int createOrUpdateUsingBatch(Set workItems) { Transaction txn = Transaction.currentTxn(); PreparedStatement stmtInsert = null; int [] queryResult = null;