bug 10548: This problem is actually still a running out of db connection problem. However, it points out a weakness in the Merovingian2 design. It should keep a database connection for itself. I originally intended to have that after writing the first edition but got distracted and didn't finish. Here in the new implementation, Merovingian2 allocates a database connection for itself and keeps it alive at 10 second intervals with a SELECT 1 to avoid the connection being considered as abandoned by the dbcp pool.

This commit is contained in:
Alex Huang 2011-07-03 19:15:58 -07:00
parent d548e8fc38
commit b45bc9449e
1 changed files with 58 additions and 103 deletions

View File

@ -27,16 +27,21 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.management.StandardMBean;
import org.apache.log4j.Logger;
import com.cloud.utils.DateUtil;
import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.utils.mgmt.JmxUtil;
import com.cloud.utils.time.InaccurateClock;
public class Merovingian2 extends StandardMBean implements MerovingianMBean {
private static final Logger s_logger = Logger.getLogger(Merovingian2.class);
@ -59,10 +64,19 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
private long _msId;
private static Merovingian2 s_instance = null;
ScheduledExecutorService _executor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("LockMasterConnectionKeepAlive"));
Connection _conn;
private Merovingian2(long msId) {
super(MerovingianMBean.class, false);
_msId = msId;
try {
_conn = Transaction.getStandaloneConnectionWithException();
_conn.setAutoCommit(true);
} catch (SQLException e) {
throw new CloudRuntimeException("Unable to initialize a connection to the database for locking purposes", e);
}
_executor.schedule(new KeepAliveTask(), 10, TimeUnit.SECONDS);
}
public static synchronized Merovingian2 createLockMaster(long msId) {
@ -91,34 +105,19 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
}
long startTime = InaccurateClock.getTime();
Connection conn = null;
try {
conn = Transaction.getStandaloneConnectionWithException();
conn.setAutoCommit(true);
while ((InaccurateClock.getTime() - startTime) < (timeInSeconds * 1000)) {
int count = owns(conn, key);
if (count == -1) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else if (count >= 1) {
return increment(conn, key, threadName, threadId);
} else {
if (doAcquire(conn, key, threadName, threadId)) {
return true;
}
while ((InaccurateClock.getTime() - startTime) < (timeInSeconds * 1000)) {
int count = owns(key);
if (count == -1) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
} catch (SQLException e) {
throw new CloudRuntimeException("Unable to acquire lock " + key, e);
} finally {
try {
if (conn != null) {
conn.close();
} else if (count >= 1) {
return increment(key, threadName, threadId);
} else {
if (doAcquire(key, threadName, threadId)) {
return true;
}
} catch (SQLException e) {
s_logger.warn("Unable to close conn", e);
}
}
if (s_logger.isTraceEnabled()) {
@ -127,10 +126,10 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
return false;
}
protected boolean increment(Connection conn, String key, String threadName, int threadId) {
protected boolean increment(String key, String threadName, int threadId) {
PreparedStatement pstmt = null;
try {
pstmt = conn.prepareStatement(INCREMENT_SQL);
pstmt = _conn.prepareStatement(INCREMENT_SQL);
pstmt.setString(1, key);
pstmt.setLong(2, _msId);
pstmt.setString(3, threadName);
@ -153,12 +152,12 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
}
}
protected boolean doAcquire(Connection conn, String key, String threadName, int threadId) {
protected boolean doAcquire(String key, String threadName, int threadId) {
PreparedStatement pstmt = null;
long startTime = InaccurateClock.getTime();
try {
pstmt = conn.prepareStatement(ACQUIRE_SQL);
pstmt = _conn.prepareStatement(ACQUIRE_SQL);
pstmt.setString(1, key);
pstmt.setLong(2, _msId);
pstmt.setString(3, threadName);
@ -192,11 +191,11 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
return false;
}
protected Map<String, String> isLocked(Connection conn, String key) {
protected Map<String, String> isLocked(String key) {
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
pstmt = conn.prepareStatement(INQUIRE_SQL);
pstmt = _conn.prepareStatement(INQUIRE_SQL);
pstmt.setString(1, key);
rs = pstmt.executeQuery();
if (!rs.next()) {
@ -225,29 +224,9 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
}
public void cleanupForServer(long msId) {
Connection conn = null;
try {
conn = Transaction.getStandaloneConnectionWithException();
conn.setAutoCommit(true);
cleanup(conn, msId);
} catch (SQLException e) {
throw new CloudRuntimeException("Unable to clear the locks", e);
} finally {
try {
if (conn != null) {
conn.close();
}
} catch (SQLException e) {
}
}
}
protected void cleanup(Connection conn, long msId) {
PreparedStatement pstmt = null;
try {
conn = Transaction.getStandaloneConnectionWithException();
conn.setAutoCommit(true);
pstmt = conn.prepareStatement(CLEANUP_MGMT_LOCKS_SQL);
pstmt = _conn.prepareStatement(CLEANUP_MGMT_LOCKS_SQL);
pstmt.setLong(1, _msId);
pstmt.executeUpdate();
} catch (SQLException e) {
@ -263,15 +242,12 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
}
public boolean release(String key) {
Connection conn = null;
PreparedStatement pstmt = null;
Thread th = Thread.currentThread();
String threadName = th.getName();
int threadId = System.identityHashCode(th);
try {
conn = Transaction.getStandaloneConnectionWithException();
conn.setAutoCommit(true);
pstmt = conn.prepareStatement(DECREMENT_SQL);
pstmt = _conn.prepareStatement(DECREMENT_SQL);
pstmt.setString(1, key);
pstmt.setLong(2, _msId);
pstmt.setString(3, threadName);
@ -283,7 +259,7 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
}
if (rows == 1) {
pstmt.close();
pstmt = conn.prepareStatement(RELEASE_SQL);
pstmt = _conn.prepareStatement(RELEASE_SQL);
pstmt.setString(1, key);
pstmt.setLong(2, _msId);
int result = pstmt.executeUpdate();
@ -299,11 +275,7 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
if (pstmt != null) {
pstmt.close();
}
if (conn != null) {
conn.close();
}
} catch (SQLException e) {
}
}
}
@ -329,13 +301,10 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
}
protected List<Map<String, String>> getLocks(String sql, Long msId) {
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
conn = Transaction.getStandaloneConnectionWithException();
conn.setAutoCommit(true);
pstmt = conn.prepareStatement(sql);
pstmt = _conn.prepareStatement(sql);
if (msId != null) {
pstmt.setLong(1, msId);
}
@ -351,9 +320,6 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
if (pstmt != null) {
pstmt.close();
}
if (conn != null) {
conn.close();
}
} catch(SQLException e) {
}
}
@ -369,10 +335,10 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
return getLocks(SELECT_MGMT_LOCKS_SQL, _msId);
}
public int owns(Connection conn, String key) {
public int owns(String key) {
Thread th = Thread.currentThread();
int threadId = System.identityHashCode(th);
Map<String, String> owner = isLocked(conn, key);
Map<String, String> owner = isLocked(key);
if (owner == null) {
return 0;
}
@ -382,32 +348,11 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
return -1;
}
public int owns(String key) {
Connection conn = null;
try {
conn = Transaction.getStandaloneConnectionWithException();
conn.setAutoCommit(true);
return owns(conn, key);
} catch (SQLException e) {
throw new CloudRuntimeException("Unable to retrieve locks ", e);
} finally {
try {
if (conn != null) {
conn.close();
}
} catch(SQLException e) {
}
}
}
public List<Map<String, String>> getLocksAcquiredBy(long msId, String threadName) {
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
conn = Transaction.getStandaloneConnectionWithException();
conn.setAutoCommit(true);
pstmt = conn.prepareStatement(SELECT_THREAD_LOCKS_SQL);
pstmt = _conn.prepareStatement(SELECT_THREAD_LOCKS_SQL);
pstmt.setLong(1, msId);
pstmt.setString(2, threadName);
rs = pstmt.executeQuery();
@ -422,9 +367,6 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
if (pstmt != null) {
pstmt.close();
}
if (conn != null) {
conn.close();
}
} catch (SQLException e) {
}
}
@ -435,12 +377,9 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
String threadName = th.getName();
int threadId = System.identityHashCode(th);
Connection conn = null;
PreparedStatement pstmt = null;
try {
conn = Transaction.getStandaloneConnectionWithException();
conn.setAutoCommit(true);
pstmt = conn.prepareStatement(CLEANUP_THREAD_LOCKS_SQL);
pstmt = _conn.prepareStatement(CLEANUP_THREAD_LOCKS_SQL);
pstmt.setLong(1, _msId);
pstmt.setString(2, threadName);
pstmt.setInt(3, threadId);
@ -453,12 +392,28 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
if (pstmt != null) {
pstmt.close();
}
if (conn != null) {
conn.close();
}
} catch (SQLException e) {
}
}
}
protected class KeepAliveTask implements Runnable {
@Override
public void run() {
PreparedStatement pstmt = null; // Should this even be prepared everytime?
try {
pstmt = _conn.prepareStatement("SELECT 1");
pstmt.executeQuery();
} catch (Throwable th) {
s_logger.error("Unable to keep the db connection alive for locking purposes!", th);
} finally {
if (pstmt != null) {
try {
pstmt.close();
} catch (SQLException e) {
}
}
}
}
}
}