mirror of https://github.com/apache/cloudstack.git
connection conierge to deal with connections that are not returned to pools
This commit is contained in:
parent
c679fc078a
commit
ea48d40e5f
|
|
@ -68,6 +68,7 @@ import com.cloud.utils.component.Adapters;
|
|||
import com.cloud.utils.component.ComponentLocator;
|
||||
import com.cloud.utils.component.Inject;
|
||||
import com.cloud.utils.concurrency.NamedThreadFactory;
|
||||
import com.cloud.utils.db.ConnectionConcierge;
|
||||
import com.cloud.utils.db.DB;
|
||||
import com.cloud.utils.db.Transaction;
|
||||
import com.cloud.utils.events.SubscriptionMgr;
|
||||
|
|
@ -102,7 +103,7 @@ public class ClusterManagerImpl implements ClusterManager {
|
|||
private final ScheduledExecutorService _heartbeatScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Cluster-Heartbeat"));
|
||||
private final ExecutorService _notificationExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("Cluster-Notification"));
|
||||
private final List<ClusterManagerMessage> _notificationMsgs = new ArrayList<ClusterManagerMessage>();
|
||||
private Connection _heartbeatConnection = null;
|
||||
private ConnectionConcierge _heartbeatConnection = null;
|
||||
|
||||
private final ExecutorService _executor;
|
||||
|
||||
|
|
@ -624,23 +625,20 @@ public class ClusterManagerImpl implements ClusterManager {
|
|||
}
|
||||
|
||||
private Connection getHeartbeatConnection() throws SQLException {
|
||||
if(_heartbeatConnection != null) {
|
||||
return _heartbeatConnection;
|
||||
if(_heartbeatConnection == null) {
|
||||
Connection conn = Transaction.getStandaloneConnectionWithException();
|
||||
_heartbeatConnection = new ConnectionConcierge("ClusterManagerHeartBeat", conn, false, false);
|
||||
}
|
||||
|
||||
_heartbeatConnection = Transaction.getStandaloneConnectionWithException();
|
||||
return _heartbeatConnection;
|
||||
return _heartbeatConnection.conn();
|
||||
}
|
||||
|
||||
private void invalidHeartbeatConnection() {
|
||||
if(_heartbeatConnection != null) {
|
||||
try {
|
||||
_heartbeatConnection.close();
|
||||
} catch (SQLException e) {
|
||||
s_logger.warn("Unable to close hearbeat DB connection. ", e);
|
||||
Connection conn = Transaction.getStandaloneConnection();
|
||||
if (conn != null) {
|
||||
_heartbeatConnection.reset(Transaction.getStandaloneConnection());
|
||||
}
|
||||
|
||||
_heartbeatConnection = null;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -2626,7 +2626,7 @@ public class StorageManagerImpl implements StorageManager, StorageService, Manag
|
|||
}
|
||||
if (assignedPool != null) {
|
||||
Volume.State state = vol.getState();
|
||||
if (state == Volume.State.Allocated && state == Volume.State.Creating) {
|
||||
if (state == Volume.State.Allocated || state == Volume.State.Creating) {
|
||||
recreateVols.add(vol);
|
||||
} else {
|
||||
if (vol.isRecreatable()) {
|
||||
|
|
|
|||
|
|
@ -27,16 +27,12 @@ import java.util.LinkedList;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TimeZone;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.management.StandardMBean;
|
||||
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import com.cloud.utils.DateUtil;
|
||||
import com.cloud.utils.concurrency.NamedThreadFactory;
|
||||
import com.cloud.utils.exception.CloudRuntimeException;
|
||||
import com.cloud.utils.mgmt.JmxUtil;
|
||||
import com.cloud.utils.time.InaccurateClock;
|
||||
|
|
@ -62,15 +58,18 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
|
|||
private long _msId;
|
||||
|
||||
private static Merovingian2 s_instance = null;
|
||||
ScheduledExecutorService _executor = null;
|
||||
Connection _conn = null;
|
||||
ConnectionConcierge _concierge = null;
|
||||
|
||||
private Merovingian2(long msId) {
|
||||
super(MerovingianMBean.class, false);
|
||||
_msId = msId;
|
||||
String result = resetDbConnection();
|
||||
if (!result.equalsIgnoreCase("Success")) {
|
||||
throw new CloudRuntimeException("Unable to initialize a connection to the database for locking purposes due to " + result);
|
||||
Connection conn = null;
|
||||
try {
|
||||
conn = Transaction.getStandaloneConnectionWithException();
|
||||
_concierge = new ConnectionConcierge("LockMaster", conn, true, true);
|
||||
} catch (SQLException e) {
|
||||
s_logger.error("Unable to get a new db connection", e);
|
||||
throw new CloudRuntimeException("Unable to initialize a connection to the database for locking purposes: ", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -90,42 +89,6 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
|
|||
return s_instance;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String resetDbConnection() {
|
||||
if (_conn != null) {
|
||||
try {
|
||||
_conn.close();
|
||||
} catch (Throwable th) {
|
||||
s_logger.error("Unable to close connection", th);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
_conn = Transaction.getStandaloneConnectionWithException();
|
||||
_conn.setAutoCommit(true);
|
||||
} catch (SQLException e) {
|
||||
s_logger.error("Unable to get a new db connection", e);
|
||||
return "Unable to initialize a connection to the database for locking purposes: " + e;
|
||||
}
|
||||
|
||||
if (_conn == null) {
|
||||
return "Unable to initialize a connection to the database for locking purposes, shutdown this server!";
|
||||
}
|
||||
|
||||
if (_executor != null) {
|
||||
try {
|
||||
_executor.shutdown();
|
||||
} catch (Throwable th) {
|
||||
s_logger.error("Unable to shutdown the executor", th);
|
||||
}
|
||||
}
|
||||
|
||||
_executor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("LockMasterConnectionKeepAlive"));
|
||||
_executor.schedule(new KeepAliveTask(), 10, TimeUnit.SECONDS);
|
||||
|
||||
return "Success";
|
||||
}
|
||||
|
||||
public boolean acquire(String key, int timeInSeconds) {
|
||||
Thread th = Thread.currentThread();
|
||||
String threadName = th.getName();
|
||||
|
|
@ -160,7 +123,7 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
|
|||
protected boolean increment(String key, String threadName, int threadId) {
|
||||
PreparedStatement pstmt = null;
|
||||
try {
|
||||
pstmt = _conn.prepareStatement(INCREMENT_SQL);
|
||||
pstmt = _concierge.conn().prepareStatement(INCREMENT_SQL);
|
||||
pstmt.setString(1, key);
|
||||
pstmt.setLong(2, _msId);
|
||||
pstmt.setString(3, threadName);
|
||||
|
|
@ -188,7 +151,7 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
|
|||
|
||||
long startTime = InaccurateClock.getTime();
|
||||
try {
|
||||
pstmt = _conn.prepareStatement(ACQUIRE_SQL);
|
||||
pstmt = _concierge.conn().prepareStatement(ACQUIRE_SQL);
|
||||
pstmt.setString(1, key);
|
||||
pstmt.setLong(2, _msId);
|
||||
pstmt.setString(3, threadName);
|
||||
|
|
@ -226,7 +189,7 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
|
|||
PreparedStatement pstmt = null;
|
||||
ResultSet rs = null;
|
||||
try {
|
||||
pstmt = _conn.prepareStatement(INQUIRE_SQL);
|
||||
pstmt = _concierge.conn().prepareStatement(INQUIRE_SQL);
|
||||
pstmt.setString(1, key);
|
||||
rs = pstmt.executeQuery();
|
||||
if (!rs.next()) {
|
||||
|
|
@ -258,7 +221,7 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
|
|||
public void cleanupForServer(long msId) {
|
||||
PreparedStatement pstmt = null;
|
||||
try {
|
||||
pstmt = _conn.prepareStatement(CLEANUP_MGMT_LOCKS_SQL);
|
||||
pstmt = _concierge.conn().prepareStatement(CLEANUP_MGMT_LOCKS_SQL);
|
||||
pstmt.setLong(1, _msId);
|
||||
pstmt.executeUpdate();
|
||||
} catch (SQLException e) {
|
||||
|
|
@ -279,7 +242,7 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
|
|||
String threadName = th.getName();
|
||||
int threadId = System.identityHashCode(th);
|
||||
try {
|
||||
pstmt = _conn.prepareStatement(DECREMENT_SQL);
|
||||
pstmt = _concierge.conn().prepareStatement(DECREMENT_SQL);
|
||||
pstmt.setString(1, key);
|
||||
pstmt.setLong(2, _msId);
|
||||
pstmt.setString(3, threadName);
|
||||
|
|
@ -291,7 +254,7 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
|
|||
}
|
||||
if (rows == 1) {
|
||||
pstmt.close();
|
||||
pstmt = _conn.prepareStatement(RELEASE_SQL);
|
||||
pstmt = _concierge.conn().prepareStatement(RELEASE_SQL);
|
||||
pstmt.setString(1, key);
|
||||
pstmt.setLong(2, _msId);
|
||||
int result = pstmt.executeUpdate();
|
||||
|
|
@ -336,7 +299,7 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
|
|||
PreparedStatement pstmt = null;
|
||||
ResultSet rs = null;
|
||||
try {
|
||||
pstmt = _conn.prepareStatement(sql);
|
||||
pstmt = _concierge.conn().prepareStatement(sql);
|
||||
if (msId != null) {
|
||||
pstmt.setLong(1, msId);
|
||||
}
|
||||
|
|
@ -384,7 +347,7 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
|
|||
PreparedStatement pstmt = null;
|
||||
ResultSet rs = null;
|
||||
try {
|
||||
pstmt = _conn.prepareStatement(SELECT_THREAD_LOCKS_SQL);
|
||||
pstmt = _concierge.conn().prepareStatement(SELECT_THREAD_LOCKS_SQL);
|
||||
pstmt.setLong(1, msId);
|
||||
pstmt.setString(2, threadName);
|
||||
rs = pstmt.executeQuery();
|
||||
|
|
@ -411,7 +374,7 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
|
|||
|
||||
PreparedStatement pstmt = null;
|
||||
try {
|
||||
pstmt = _conn.prepareStatement(CLEANUP_THREAD_LOCKS_SQL);
|
||||
pstmt = _concierge.conn().prepareStatement(CLEANUP_THREAD_LOCKS_SQL);
|
||||
pstmt.setLong(1, _msId);
|
||||
pstmt.setString(2, threadName);
|
||||
pstmt.setInt(3, threadId);
|
||||
|
|
@ -433,7 +396,7 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
|
|||
public boolean releaseLockAsLastResortAndIReallyKnowWhatIAmDoing(String key) {
|
||||
PreparedStatement pstmt = null;
|
||||
try {
|
||||
pstmt = _conn.prepareStatement(RELEASE_LOCK_SQL);
|
||||
pstmt = _concierge.conn().prepareStatement(RELEASE_LOCK_SQL);
|
||||
pstmt.setString(1, key);
|
||||
int rows = pstmt.executeUpdate();
|
||||
return rows > 0;
|
||||
|
|
@ -442,25 +405,4 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
|
|||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
protected class KeepAliveTask implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
PreparedStatement pstmt = null; // Should this even be prepared everytime?
|
||||
try {
|
||||
pstmt = _conn.prepareStatement("SELECT 1");
|
||||
pstmt.executeQuery();
|
||||
} catch (Throwable th) {
|
||||
s_logger.error("Unable to keep the db connection alive for locking purposes!", th);
|
||||
} finally {
|
||||
if (pstmt != null) {
|
||||
try {
|
||||
pstmt.close();
|
||||
} catch (SQLException e) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,7 +29,5 @@ public interface MerovingianMBean {
|
|||
|
||||
boolean releaseLockAsLastResortAndIReallyKnowWhatIAmDoing(String key);
|
||||
|
||||
String resetDbConnection();
|
||||
|
||||
void cleanupForServer(long msId);
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue