diff --git a/utils/src/com/cloud/utils/db/Transaction.java b/utils/src/com/cloud/utils/db/Transaction.java index 7fbb2f8d954..0acb8b38382 100755 --- a/utils/src/com/cloud/utils/db/Transaction.java +++ b/utils/src/com/cloud/utils/db/Transaction.java @@ -78,7 +78,7 @@ public class Transaction { public static final short CLOUD_DB = 0; public static final short USAGE_DB = 1; public static final short CONNECTED_DB = -1; - + private static AtomicLong s_id = new AtomicLong(); private static final TransactionMBeanImpl s_mbean = new TransactionMBeanImpl(); static { @@ -91,7 +91,7 @@ public class Transaction { private final LinkedList _stack; private long _id; - + private final LinkedList> _lockTimes = new LinkedList>(); private String _name; @@ -101,17 +101,17 @@ public class Transaction { private long _txnTime; private Statement _stmt; private String _creator; - + private Transaction _prev = null; - + public static Transaction currentTxn() { Transaction txn = tls.get(); assert txn != null : "No Transaction on stack. Did you mark the method with @DB?"; - + assert checkAnnotation(3, txn) : "Did you even read the guide to use Transaction...IOW...other people's code? Try method can't be private. What about @DB? hmmm... could that be it? " + txn; return txn; } - + public static Transaction open(final short databaseId) { String name = buildName(); if (name == null) { @@ -130,17 +130,17 @@ public class Transaction { _conn = conn; _dbId = CONNECTED_DB; } - + public void transitToAutoManagedConnection(short dbId) { // assert(_stack.size() <= 1) : "Can't change to auto managed connection unless your stack is empty"; _dbId = dbId; _conn = null; } - + public static Transaction open(final String name) { return open(name, CLOUD_DB, false); } - + public static Transaction open(final String name, final short databaseId, final boolean forceDbChange) { Transaction txn = tls.get(); boolean isNew = false; @@ -169,46 +169,46 @@ public class Transaction { } return txn; } - + protected StackElement peekInStack(Object obj) { final Iterator it = _stack.iterator(); while (it.hasNext()) { - StackElement next = it.next(); + StackElement next = it.next(); if (next.type == obj) { return next; } } return null; } - + public void registerLock(String sql) { - if (_txn && s_lockLogger.isDebugEnabled()) { - Pair time = new Pair(sql, System.currentTimeMillis()); - _lockTimes.add(time); - } + if (_txn && s_lockLogger.isDebugEnabled()) { + Pair time = new Pair(sql, System.currentTimeMillis()); + _lockTimes.add(time); + } } - + public boolean dbTxnStarted() { return _txn; } - + public static Connection getStandaloneConnectionWithException() throws SQLException { Connection conn = s_ds.getConnection(); if (s_connLogger.isTraceEnabled()) { s_connLogger.trace("Retrieving a standalone connection: dbconn" + System.identityHashCode(conn)); } - return conn; + return conn; } - + public static Connection getStandaloneConnection() { - try { - return getStandaloneConnectionWithException(); - } catch (SQLException e) { - s_logger.error("Unexpected exception: ", e); - return null; - } + try { + return getStandaloneConnectionWithException(); + } catch (SQLException e) { + s_logger.error("Unexpected exception: ", e); + return null; + } } - + public static Connection getStandaloneUsageConnection() { try { Connection conn = s_usageDS.getConnection(); @@ -221,11 +221,11 @@ public class Transaction { return null; } } - + protected void attach(TransactionAttachment value) { _stack.push(new StackElement(ATTACHMENT, value)); } - + protected TransactionAttachment detach(String name) { Iterator it = _stack.descendingIterator(); while (it.hasNext()) { @@ -241,20 +241,20 @@ public class Transaction { assert false : "Are you sure you attached this: " + name; return null; } - + public static void attachToTxn(TransactionAttachment value) { Transaction txn = tls.get(); assert txn != null && txn.peekInStack(CURRENT_TXN) != null: "Come on....how can we attach something to the transaction if you haven't started it?"; - + txn.attach(value); } - + public static TransactionAttachment detachFromTxn(String name) { Transaction txn = tls.get(); assert txn != null : "No Transaction in TLS"; return txn.detach(name); } - + protected static boolean checkAnnotation(int stack, Transaction txn) { final StackTraceElement[] stacks = Thread.currentThread().getStackTrace(); StackElement se = txn.peekInStack(CURRENT_TXN); @@ -283,7 +283,7 @@ public class Transaction { i++; continue; } - + str.append("-").append(stacks[i].getClassName().substring(stacks[i].getClassName().lastIndexOf(".") + 1)).append(".").append(stacks[i].getMethodName()).append(":").append(stacks[i].getLineNumber()); j++; i++; @@ -303,11 +303,11 @@ public class Transaction { _id = s_id.incrementAndGet(); _creator = Thread.currentThread().getName(); } - + public String getCreator() { return _creator; } - + public long getId() { return _id; } @@ -342,7 +342,7 @@ public class Transaction { if (lockMaster == null) { throw new CloudRuntimeException("There's no support for locking yet"); } - return lockMaster.acquire(name, timeoutSeconds); + return lockMaster.acquire(name, timeoutSeconds); } public boolean release(final String name) { @@ -350,7 +350,7 @@ public class Transaction { if (lockMaster == null) { throw new CloudRuntimeException("There's no support for locking yet"); } - return lockMaster.release(name); + return lockMaster.release(name); } public void start() { @@ -364,7 +364,7 @@ public class Transaction { s_logger.trace("txn: has already been started."); return; } - + _txn = true; _txnTime = System.currentTimeMillis(); @@ -378,28 +378,28 @@ public class Transaction { } } } - + protected void closePreviousStatement() { - if (_stmt != null) { - try { - if (s_stmtLogger.isTraceEnabled()) { - s_stmtLogger.trace("Closing: " + _stmt); - } - try { - ResultSet rs = _stmt.getResultSet(); - if (rs != null && _stmt.getResultSetHoldability() != ResultSet.HOLD_CURSORS_OVER_COMMIT) { - rs.close(); - } - } catch(SQLException e) { - s_stmtLogger.trace("Unable to close resultset"); - } - _stmt.close(); - } catch (final SQLException e) { - s_stmtLogger.trace("Unable to close statement: " + _stmt); - } finally { - _stmt = null; - } - } + if (_stmt != null) { + try { + if (s_stmtLogger.isTraceEnabled()) { + s_stmtLogger.trace("Closing: " + _stmt); + } + try { + ResultSet rs = _stmt.getResultSet(); + if (rs != null && _stmt.getResultSetHoldability() != ResultSet.HOLD_CURSORS_OVER_COMMIT) { + rs.close(); + } + } catch(SQLException e) { + s_stmtLogger.trace("Unable to close resultset"); + } + _stmt.close(); + } catch (final SQLException e) { + s_stmtLogger.trace("Unable to close statement: " + _stmt); + } finally { + _stmt = null; + } + } } /** @@ -413,17 +413,17 @@ public class Transaction { * @see java.sql.Connection */ public PreparedStatement prepareAutoCloseStatement(final String sql) throws SQLException { - PreparedStatement stmt = prepareStatement(sql); - closePreviousStatement(); - _stmt = stmt; - return stmt; + PreparedStatement stmt = prepareStatement(sql); + closePreviousStatement(); + _stmt = stmt; + return stmt; } - + public PreparedStatement prepareStatement(final String sql) throws SQLException { final Connection conn = getConnection(); final PreparedStatement pstmt = conn.prepareStatement(sql); if (s_stmtLogger.isTraceEnabled()) { - s_stmtLogger.trace("Preparing: " + sql); + s_stmtLogger.trace("Preparing: " + sql); } return pstmt; } @@ -443,7 +443,7 @@ public class Transaction { final Connection conn = getConnection(); final PreparedStatement pstmt = conn.prepareStatement(sql, autoGeneratedKeys); if (s_stmtLogger.isTraceEnabled()) { - s_stmtLogger.trace("Preparing: " + sql); + s_stmtLogger.trace("Preparing: " + sql); } closePreviousStatement(); _stmt = pstmt; @@ -465,13 +465,13 @@ public class Transaction { final Connection conn = getConnection(); final PreparedStatement pstmt = conn.prepareStatement(sql, columnNames); if (s_stmtLogger.isTraceEnabled()) { - s_stmtLogger.trace("Preparing: " + sql); + s_stmtLogger.trace("Preparing: " + sql); } closePreviousStatement(); _stmt = pstmt; return pstmt; } - + /** * Prepares an auto close statement. The statement is closed automatically if it is * retrieved with this method. @@ -486,7 +486,7 @@ public class Transaction { final Connection conn = getConnection(); final PreparedStatement pstmt = conn.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability); if (s_stmtLogger.isTraceEnabled()) { - s_stmtLogger.trace("Preparing: " + sql); + s_stmtLogger.trace("Preparing: " + sql); } closePreviousStatement(); _stmt = pstmt; @@ -508,32 +508,31 @@ public class Transaction { if (_conn == null) { switch (_dbId) { case CLOUD_DB: - if(s_ds != null) { - _conn = s_ds.getConnection(); - } else { - s_logger.warn("A static-initialized variable becomes null, process is dying?"); + if(s_ds != null) { + _conn = s_ds.getConnection(); + } else { + s_logger.warn("A static-initialized variable becomes null, process is dying?"); throw new CloudRuntimeException("Database is not initialized, process is dying?"); - } + } break; case USAGE_DB: - if(s_usageDS != null) { - _conn = s_usageDS.getConnection(); - } else { - s_logger.warn("A static-initialized variable becomes null, process is dying?"); + if(s_usageDS != null) { + _conn = s_usageDS.getConnection(); + } else { + s_logger.warn("A static-initialized variable becomes null, process is dying?"); throw new CloudRuntimeException("Database is not initialized, process is dying?"); - } + } break; default: throw new CloudRuntimeException("No database selected for the transaction"); } _conn.setAutoCommit(!_txn); - + // // MySQL default transaction isolation level is REPEATABLE READ, // to reduce chances of DB deadlock, we will use READ COMMITED isolation level instead // see http://dev.mysql.com/doc/refman/5.0/en/innodb-deadlocks.html // - _conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); _stack.push(new StackElement(CREATE_CONN, null)); if (s_connLogger.isTraceEnabled()) { s_connLogger.trace("Creating a DB connection with " + (_txn ? " txn: " : " no txn: ") + " for " + _dbId + ": dbconn" + System.identityHashCode(_conn) + ". Stack: " + buildName()); @@ -580,8 +579,8 @@ public class Transaction { } public void cleanup() { - closePreviousStatement(); - + closePreviousStatement(); + removeUpTo(null, null); if (_txn) { rollbackTransaction(); @@ -590,7 +589,7 @@ public class Transaction { _name = null; closeConnection(); - + _stack.clear(); Merovingian2 lockMaster = Merovingian2.getLockMaster(); if (lockMaster != null) { @@ -600,19 +599,19 @@ public class Transaction { public void close() { removeUpTo(CURRENT_TXN, null); - + if (_stack.size() == 0) { s_logger.trace("Transaction is done"); cleanup(); } - + if(this._dbId == CONNECTED_DB) { - tls.set(_prev); - _prev = null; - s_mbean.removeTransaction(this); + tls.set(_prev); + _prev = null; + s_mbean.removeTransaction(this); } } - + /** * close() is used by endTxn to close the connection. This method only * closes the connection if the name is the same as what's stored. @@ -644,16 +643,16 @@ public class Transaction { } protected boolean hasTxnInStack() { - return peekInStack(START_TXN) != null; + return peekInStack(START_TXN) != null; } - + protected void clearLockTimes() { - if (s_lockLogger.isDebugEnabled()) { - for (Pair time : _lockTimes) { - s_lockLogger.trace("SQL " + time.first() + " took " + (System.currentTimeMillis() - time.second())); - } - _lockTimes.clear(); - } + if (s_lockLogger.isDebugEnabled()) { + for (Pair time : _lockTimes) { + s_lockLogger.trace("SQL " + time.first() + " took " + (System.currentTimeMillis() - time.second())); + } + _lockTimes.clear(); + } } public boolean commit() { @@ -694,8 +693,8 @@ public class Transaction { } protected void closeConnection() { - closePreviousStatement(); - + closePreviousStatement(); + if (_conn == null) { return; } @@ -712,7 +711,7 @@ public class Transaction { if(this._dbId != CONNECTED_DB) { _conn.close(); } - + _conn = null; } catch (final SQLException e) { s_logger.warn("Unable to close connection", e); @@ -726,12 +725,12 @@ public class Transaction { StackElement item = it.next(); it.remove(); - + try { if (item.type == type && (ref == null || item.ref == ref)) { break; } - + if (item.type == CURRENT_TXN) { if (s_logger.isTraceEnabled()) { s_logger.trace("Releasing the current txn: " + (item.ref != null ? item.ref : "")); @@ -755,37 +754,37 @@ public class Transaction { s_stmtLogger.trace("Closing: " + ref); } Statement stmt = (Statement)ref; - try { - ResultSet rs = stmt.getResultSet(); - if (rs != null) { - rs.close(); - } - } catch(SQLException e) { - s_stmtLogger.trace("Unable to close resultset"); - } + try { + ResultSet rs = stmt.getResultSet(); + if (rs != null) { + rs.close(); + } + } catch(SQLException e) { + s_stmtLogger.trace("Unable to close resultset"); + } stmt.close(); } catch (final SQLException e) { s_stmtLogger.trace("Unable to close statement: " + item); } } else if (item.type == ATTACHMENT) { - TransactionAttachment att = (TransactionAttachment)item.ref; - if (s_logger.isTraceEnabled()) { - s_logger.trace("Cleaning up " + att.getName()); - } - att.cleanup(); + TransactionAttachment att = (TransactionAttachment)item.ref; + if (s_logger.isTraceEnabled()) { + s_logger.trace("Cleaning up " + att.getName()); + } + att.cleanup(); } } catch(Exception e) { s_logger.error("Unable to clean up " + item, e); } } - + if (rollback) { rollback(); } } protected void rollbackTransaction() { - closePreviousStatement(); + closePreviousStatement(); if (!_txn) { if (s_logger.isTraceEnabled()) { s_logger.trace("Rollback called for " + _name + " when there's no transaction: " + buildName()); @@ -807,7 +806,7 @@ public class Transaction { s_logger.warn("Unable to rollback", e); } } - + protected void rollbackSavepoint(Savepoint sp) { try { if (_conn != null) { @@ -816,7 +815,7 @@ public class Transaction { } catch (SQLException e) { s_logger.warn("Unable to rollback to savepoint " + sp); } - + if (!hasTxnInStack()) { _txn = false; closeConnection(); @@ -836,7 +835,7 @@ public class Transaction { } } } - + rollbackTransaction(); } @@ -867,13 +866,13 @@ public class Transaction { if (_conn != null) { _conn.releaseSavepoint(sp); } - + if (!hasTxnInStack()) { _txn = false; closeConnection(); } } - + protected boolean hasSavepointInStack(Savepoint sp) { Iterator it = _stack.iterator(); while (it.hasNext()) { @@ -884,14 +883,14 @@ public class Transaction { } return false; } - + protected void removeTxn(Savepoint sp) { assert hasSavepointInStack(sp) : "Removing a save point that's not in the stack"; - + if (!hasSavepointInStack(sp)) { return; } - + Iterator it = _stack.iterator(); while (it.hasNext()) { StackElement se = it.next(); @@ -906,26 +905,26 @@ public class Transaction { public void rollback(final Savepoint sp) { removeTxn(sp); - + rollbackSavepoint(sp); } - + public Connection getCurrentConnection() { return _conn; } - + public List getStack() { return _stack; } - + protected Transaction() { - _name = null; - _conn = null; - _stack = null; - _txn = false; - _dbId = -1; + _name = null; + _conn = null; + _stack = null; + _txn = false; + _dbId = -1; } - + @Override protected void finalize() throws Throwable { if (!(_conn == null && (_stack == null || _stack.size() == 0))) { @@ -934,22 +933,22 @@ public class Transaction { cleanup(); } } - + protected class StackElement { public String type; public Object ref; - + public StackElement (String type, Object ref) { this.type = type; this.ref = ref; } - + @Override public String toString() { return type + "-" + ref; } } - + private static DataSource s_ds; private static DataSource s_usageDS; static { @@ -969,6 +968,21 @@ public class Transaction { final String cloudDbName = dbProps.getProperty("db.cloud.name"); final boolean cloudAutoReconnect = Boolean.parseBoolean(dbProps.getProperty("db.cloud.autoReconnect")); final String cloudValidationQuery = dbProps.getProperty("db.cloud.validationQuery"); + final String cloudIsolationLevel = dbProps.getProperty("db.cloud.isolation.level"); + int isolationLevel = Connection.TRANSACTION_READ_COMMITTED; + if (cloudIsolationLevel == null) { + isolationLevel = Connection.TRANSACTION_READ_COMMITTED; + } else if (cloudIsolationLevel.equalsIgnoreCase("readcommitted")) { + isolationLevel = Connection.TRANSACTION_READ_COMMITTED; + } else if (cloudIsolationLevel.equalsIgnoreCase("repeatableread")) { + isolationLevel = Connection.TRANSACTION_REPEATABLE_READ; + } else if (cloudIsolationLevel.equalsIgnoreCase("serializable")) { + isolationLevel = Connection.TRANSACTION_SERIALIZABLE; + } else if (cloudIsolationLevel.equalsIgnoreCase("readuncommitted")) { + isolationLevel = Connection.TRANSACTION_READ_UNCOMMITTED; + } else { + s_logger.warn("Unknown isolation level " + cloudIsolationLevel + ". Using read uncommitted"); + } final boolean cloudTestOnBorrow = Boolean.parseBoolean(dbProps.getProperty("db.cloud.testOnBorrow")); final boolean cloudTestWhileIdle = Boolean.parseBoolean(dbProps.getProperty("db.cloud.testWhileIdle")); final long cloudTimeBtwEvictionRunsMillis = Long.parseLong(dbProps.getProperty("db.cloud.timeBetweenEvictionRunsMillis")); @@ -985,7 +999,7 @@ public class Transaction { "?autoReconnect="+cloudAutoReconnect + (url != null ? "&" + url : ""), cloudUsername, cloudPassword); final KeyedObjectPoolFactory poolableObjFactory = (cloudPoolPreparedStatements ? new StackKeyedObjectPoolFactory() : null); final PoolableConnectionFactory cloudPoolableConnectionFactory = new PoolableConnectionFactory(cloudConnectionFactory, cloudConnectionPool, poolableObjFactory, - cloudValidationQuery, false, false); + cloudValidationQuery, false, false, isolationLevel); s_ds = new PoolingDataSource(cloudPoolableConnectionFactory.getPool()); // configure the usage db