diff --git a/utils/src/com/cloud/utils/db/Merovingian2.java b/utils/src/com/cloud/utils/db/Merovingian2.java index 1182b771251..3e3f5f48498 100644 --- a/utils/src/com/cloud/utils/db/Merovingian2.java +++ b/utils/src/com/cloud/utils/db/Merovingian2.java @@ -27,16 +27,21 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.TimeZone; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import javax.management.StandardMBean; import org.apache.log4j.Logger; import com.cloud.utils.DateUtil; +import com.cloud.utils.concurrency.NamedThreadFactory; import com.cloud.utils.exception.CloudRuntimeException; import com.cloud.utils.mgmt.JmxUtil; import com.cloud.utils.time.InaccurateClock; + public class Merovingian2 extends StandardMBean implements MerovingianMBean { private static final Logger s_logger = Logger.getLogger(Merovingian2.class); @@ -59,10 +64,19 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean { private long _msId; private static Merovingian2 s_instance = null; + ScheduledExecutorService _executor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("LockMasterConnectionKeepAlive")); + Connection _conn; private Merovingian2(long msId) { super(MerovingianMBean.class, false); _msId = msId; + try { + _conn = Transaction.getStandaloneConnectionWithException(); + _conn.setAutoCommit(true); + } catch (SQLException e) { + throw new CloudRuntimeException("Unable to initialize a connection to the database for locking purposes", e); + } + _executor.schedule(new KeepAliveTask(), 10, TimeUnit.SECONDS); } public static synchronized Merovingian2 createLockMaster(long msId) { @@ -91,34 +105,19 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean { } long startTime = InaccurateClock.getTime(); - Connection conn = null; - try { - conn = Transaction.getStandaloneConnectionWithException(); - conn.setAutoCommit(true); - while ((InaccurateClock.getTime() - startTime) < (timeInSeconds * 1000)) { - int count = owns(conn, key); - if (count == -1) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - } - } else if (count >= 1) { - return increment(conn, key, threadName, threadId); - } else { - if (doAcquire(conn, key, threadName, threadId)) { - return true; - } + while ((InaccurateClock.getTime() - startTime) < (timeInSeconds * 1000)) { + int count = owns(key); + if (count == -1) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { } - } - } catch (SQLException e) { - throw new CloudRuntimeException("Unable to acquire lock " + key, e); - } finally { - try { - if (conn != null) { - conn.close(); + } else if (count >= 1) { + return increment(key, threadName, threadId); + } else { + if (doAcquire(key, threadName, threadId)) { + return true; } - } catch (SQLException e) { - s_logger.warn("Unable to close conn", e); } } if (s_logger.isTraceEnabled()) { @@ -127,10 +126,10 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean { return false; } - protected boolean increment(Connection conn, String key, String threadName, int threadId) { + protected boolean increment(String key, String threadName, int threadId) { PreparedStatement pstmt = null; try { - pstmt = conn.prepareStatement(INCREMENT_SQL); + pstmt = _conn.prepareStatement(INCREMENT_SQL); pstmt.setString(1, key); pstmt.setLong(2, _msId); pstmt.setString(3, threadName); @@ -153,12 +152,12 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean { } } - protected boolean doAcquire(Connection conn, String key, String threadName, int threadId) { + protected boolean doAcquire(String key, String threadName, int threadId) { PreparedStatement pstmt = null; long startTime = InaccurateClock.getTime(); try { - pstmt = conn.prepareStatement(ACQUIRE_SQL); + pstmt = _conn.prepareStatement(ACQUIRE_SQL); pstmt.setString(1, key); pstmt.setLong(2, _msId); pstmt.setString(3, threadName); @@ -192,11 +191,11 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean { return false; } - protected Map isLocked(Connection conn, String key) { + protected Map isLocked(String key) { PreparedStatement pstmt = null; ResultSet rs = null; try { - pstmt = conn.prepareStatement(INQUIRE_SQL); + pstmt = _conn.prepareStatement(INQUIRE_SQL); pstmt.setString(1, key); rs = pstmt.executeQuery(); if (!rs.next()) { @@ -225,29 +224,9 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean { } public void cleanupForServer(long msId) { - Connection conn = null; - try { - conn = Transaction.getStandaloneConnectionWithException(); - conn.setAutoCommit(true); - cleanup(conn, msId); - } catch (SQLException e) { - throw new CloudRuntimeException("Unable to clear the locks", e); - } finally { - try { - if (conn != null) { - conn.close(); - } - } catch (SQLException e) { - } - } - } - - protected void cleanup(Connection conn, long msId) { PreparedStatement pstmt = null; try { - conn = Transaction.getStandaloneConnectionWithException(); - conn.setAutoCommit(true); - pstmt = conn.prepareStatement(CLEANUP_MGMT_LOCKS_SQL); + pstmt = _conn.prepareStatement(CLEANUP_MGMT_LOCKS_SQL); pstmt.setLong(1, _msId); pstmt.executeUpdate(); } catch (SQLException e) { @@ -263,15 +242,12 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean { } public boolean release(String key) { - Connection conn = null; PreparedStatement pstmt = null; Thread th = Thread.currentThread(); String threadName = th.getName(); int threadId = System.identityHashCode(th); try { - conn = Transaction.getStandaloneConnectionWithException(); - conn.setAutoCommit(true); - pstmt = conn.prepareStatement(DECREMENT_SQL); + pstmt = _conn.prepareStatement(DECREMENT_SQL); pstmt.setString(1, key); pstmt.setLong(2, _msId); pstmt.setString(3, threadName); @@ -283,7 +259,7 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean { } if (rows == 1) { pstmt.close(); - pstmt = conn.prepareStatement(RELEASE_SQL); + pstmt = _conn.prepareStatement(RELEASE_SQL); pstmt.setString(1, key); pstmt.setLong(2, _msId); int result = pstmt.executeUpdate(); @@ -299,11 +275,7 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean { if (pstmt != null) { pstmt.close(); } - if (conn != null) { - conn.close(); - } } catch (SQLException e) { - } } } @@ -329,13 +301,10 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean { } protected List> getLocks(String sql, Long msId) { - Connection conn = null; PreparedStatement pstmt = null; ResultSet rs = null; try { - conn = Transaction.getStandaloneConnectionWithException(); - conn.setAutoCommit(true); - pstmt = conn.prepareStatement(sql); + pstmt = _conn.prepareStatement(sql); if (msId != null) { pstmt.setLong(1, msId); } @@ -351,9 +320,6 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean { if (pstmt != null) { pstmt.close(); } - if (conn != null) { - conn.close(); - } } catch(SQLException e) { } } @@ -369,10 +335,10 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean { return getLocks(SELECT_MGMT_LOCKS_SQL, _msId); } - public int owns(Connection conn, String key) { + public int owns(String key) { Thread th = Thread.currentThread(); int threadId = System.identityHashCode(th); - Map owner = isLocked(conn, key); + Map owner = isLocked(key); if (owner == null) { return 0; } @@ -382,32 +348,11 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean { return -1; } - public int owns(String key) { - Connection conn = null; - try { - conn = Transaction.getStandaloneConnectionWithException(); - conn.setAutoCommit(true); - return owns(conn, key); - } catch (SQLException e) { - throw new CloudRuntimeException("Unable to retrieve locks ", e); - } finally { - try { - if (conn != null) { - conn.close(); - } - } catch(SQLException e) { - } - } - } - public List> getLocksAcquiredBy(long msId, String threadName) { - Connection conn = null; PreparedStatement pstmt = null; ResultSet rs = null; try { - conn = Transaction.getStandaloneConnectionWithException(); - conn.setAutoCommit(true); - pstmt = conn.prepareStatement(SELECT_THREAD_LOCKS_SQL); + pstmt = _conn.prepareStatement(SELECT_THREAD_LOCKS_SQL); pstmt.setLong(1, msId); pstmt.setString(2, threadName); rs = pstmt.executeQuery(); @@ -422,9 +367,6 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean { if (pstmt != null) { pstmt.close(); } - if (conn != null) { - conn.close(); - } } catch (SQLException e) { } } @@ -435,12 +377,9 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean { String threadName = th.getName(); int threadId = System.identityHashCode(th); - Connection conn = null; PreparedStatement pstmt = null; try { - conn = Transaction.getStandaloneConnectionWithException(); - conn.setAutoCommit(true); - pstmt = conn.prepareStatement(CLEANUP_THREAD_LOCKS_SQL); + pstmt = _conn.prepareStatement(CLEANUP_THREAD_LOCKS_SQL); pstmt.setLong(1, _msId); pstmt.setString(2, threadName); pstmt.setInt(3, threadId); @@ -453,12 +392,28 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean { if (pstmt != null) { pstmt.close(); } - if (conn != null) { - conn.close(); - } } catch (SQLException e) { } } } + protected class KeepAliveTask implements Runnable { + @Override + public void run() { + PreparedStatement pstmt = null; // Should this even be prepared everytime? + try { + pstmt = _conn.prepareStatement("SELECT 1"); + pstmt.executeQuery(); + } catch (Throwable th) { + s_logger.error("Unable to keep the db connection alive for locking purposes!", th); + } finally { + if (pstmt != null) { + try { + pstmt.close(); + } catch (SQLException e) { + } + } + } + } + } }