Use seperate DB connection to handle cluster management

This commit is contained in:
Kelven Yang 2011-04-11 16:37:40 -07:00
parent ac223c197f
commit 149eb0fade
4 changed files with 254 additions and 84 deletions

View File

@ -5,6 +5,8 @@ import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.rmi.RemoteException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.Enumeration;
@ -40,6 +42,7 @@ import com.cloud.host.dao.HostDao;
import com.cloud.serializer.GsonHelper;
import com.cloud.utils.DateUtil;
import com.cloud.utils.NumbersUtil;
import com.cloud.utils.Pair;
import com.cloud.utils.Profiler;
import com.cloud.utils.PropertiesUtil;
import com.cloud.utils.component.Adapters;
@ -71,11 +74,12 @@ public class ClusterManagerImpl implements ClusterManager {
private AgentManager _agentMgr;
private final ScheduledExecutorService _heartbeatScheduler =
Executors.newScheduledThreadPool(1, new NamedThreadFactory("Cluster-Heartbeat"));
private final ScheduledExecutorService _peerScanScheduler =
Executors.newScheduledThreadPool(1, new NamedThreadFactory("Cluster-PeerScan"));
private final ScheduledExecutorService _heartbeatScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Cluster-Heartbeat"));
private final ExecutorService _notificationExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("Cluster-Notification"));
private List< Pair<List<ManagementServerHostVO>, List<ManagementServerHostVO>> > _notificationMsgs = new ArrayList< Pair<List<ManagementServerHostVO>, List<ManagementServerHostVO>> >();
private Connection _heartbeatConnection = null;
private final ExecutorService _executor;
private ClusterServiceAdapter _currentServiceAdapter;
@ -539,50 +543,140 @@ public class ClusterManagerImpl implements ClusterManager {
s_logger.trace("Cluster manager heartbeat update, id:" + _mshostId);
}
_mshostDao.update(_mshostId, DateUtil.currentGMTTime());
Connection conn = getHeartbeatConnection();
_mshostDao.update(conn, _mshostId, DateUtil.currentGMTTime());
if(s_logger.isTraceEnabled()) {
s_logger.trace("Cluster manager peer-scan, id:" + _mshostId);
}
if(!_peerScanInited) {
_peerScanInited = true;
initPeerScan(conn);
}
peerScan(conn);
} catch(CloudRuntimeException e) {
// TODO, if the exception is caused by lost of DB connection
// start fencing and shutdown the management server
s_logger.error("Runtime DB exception ", e.getCause());
invalidHeartbeatConnection();
} catch (Throwable e) {
s_logger.error("Problem with the cluster heartbeat!", e);
}
}
};
}
private Runnable getPeerScanTask() {
private Connection getHeartbeatConnection() {
if(_heartbeatConnection != null) {
return _heartbeatConnection;
}
_heartbeatConnection = Transaction.getStandaloneUsageConnection();
return _heartbeatConnection;
}
private void invalidHeartbeatConnection() {
if(_heartbeatConnection != null) {
try {
_heartbeatConnection.close();
} catch (SQLException e) {
s_logger.warn("Unable to close hearbeat DB connection. ", e);
}
_heartbeatConnection = null;
}
}
private Runnable getNotificationTask() {
return new Runnable() {
@Override
public void run() {
try {
if(s_logger.isTraceEnabled()) {
s_logger.trace("Cluster manager peer-scan, id:" + _mshostId);
}
if(!_peerScanInited) {
_peerScanInited = true;
initPeerScan();
}
peerScan();
} catch (Throwable e) {
s_logger.error("Problem with the cluster peer-scan!", e);
}
while(true) {
synchronized(_notificationMsgs) {
try {
_notificationMsgs.wait(1000);
} catch (InterruptedException e) {
}
}
Pair<List<ManagementServerHostVO>, List<ManagementServerHostVO>> msgPair = null;
while((msgPair = getNextNotificationMessage()) != null) {
try {
if(msgPair.first() != null && msgPair.first().size() > 0) {
Profiler profiler = new Profiler();
profiler.start();
notifyNodeJoined(msgPair.first());
profiler.stop();
if(profiler.getDuration() > 1000) {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Notifying management server join event took " + profiler.getDuration() + " ms");
}
} else {
s_logger.warn("Notifying management server join event took " + profiler.getDuration() + " ms");
}
}
if(msgPair.second() != null && msgPair.second().size() > 0) {
Profiler profiler = new Profiler();
profiler.start();
notifyNodeLeft(msgPair.second());
profiler.stop();
if(profiler.getDuration() > 1000) {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Notifying management server leave event took " + profiler.getDuration() + " ms");
}
} else {
s_logger.warn("Notifying management server leave event took " + profiler.getDuration() + " ms");
}
}
} catch (Throwable e) {
s_logger.warn("Unexpected exception during cluster notification. ", e);
}
}
try { Thread.currentThread().sleep(1000); } catch (InterruptedException e) {}
}
}
};
}
private void initPeerScan() {
private void queueNotification(List<ManagementServerHostVO> addedNodeList, List<ManagementServerHostVO> removedNodeList) {
synchronized(this._notificationMsgs) {
this._notificationMsgs.add(new Pair<List<ManagementServerHostVO>, List<ManagementServerHostVO>>(addedNodeList, removedNodeList));
this._notificationMsgs.notifyAll();
}
}
private Pair<List<ManagementServerHostVO>, List<ManagementServerHostVO>> getNextNotificationMessage() {
synchronized(this._notificationMsgs) {
if(this._notificationMsgs.size() > 0)
return this._notificationMsgs.remove(0);
}
return null;
}
private void initPeerScan(Connection conn) {
// upon startup, for all inactive management server nodes that we see at startup time, we will send notification also to help upper layer perform
// missed cleanup
Date cutTime = DateUtil.currentGMTTime();
List<ManagementServerHostVO> inactiveList = _mshostDao.getInactiveList(new Date(cutTime.getTime() - heartbeatThreshold));
List<ManagementServerHostVO> inactiveList = _mshostDao.getInactiveList(conn, new Date(cutTime.getTime() - heartbeatThreshold));
if(inactiveList.size() > 0) {
notifyNodeLeft(inactiveList);
}
}
private void peerScan() {
private void peerScan(Connection conn) {
Date cutTime = DateUtil.currentGMTTime();
List<ManagementServerHostVO> currentList = _mshostDao.getActiveList(new Date(cutTime.getTime() - heartbeatThreshold));
List<ManagementServerHostVO> currentList = _mshostDao.getActiveList(conn, new Date(cutTime.getTime() - heartbeatThreshold));
List<ManagementServerHostVO> removedNodeList = new ArrayList<ManagementServerHostVO>();
if(_mshostId != null) {
@ -602,7 +696,7 @@ public class ClusterManagerImpl implements ClusterManager {
Iterator<ManagementServerHostVO> it = removedNodeList.iterator();
while(it.hasNext()) {
ManagementServerHostVO mshost = it.next();
if(!pingManagementNode(mshost.getMsid())) {
if(!pingManagementNode(mshost)) {
s_logger.warn("Management node " + mshost.getId() + " is detected inactive by timestamp and also not pingable");
activePeers.remove(mshost.getId());
@ -635,37 +729,8 @@ public class ClusterManagerImpl implements ClusterManager {
}
}
if(newNodeList.size() > 0) {
Profiler profiler = new Profiler();
profiler.start();
notifyNodeJoined(newNodeList);
profiler.stop();
if(profiler.getDuration() > 1000) {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Notifying management server join event took " + profiler.getDuration() + " ms");
}
} else {
s_logger.warn("Notifying management server join event took " + profiler.getDuration() + " ms");
}
}
if(removedNodeList.size() > 0) {
Profiler profiler = new Profiler();
profiler.start();
notifyNodeLeft(removedNodeList);
profiler.stop();
if(profiler.getDuration() > 1000) {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Notifying management server leave event took " + profiler.getDuration() + " ms");
}
} else {
s_logger.warn("Notifying management server leave event took " + profiler.getDuration() + " ms");
}
}
if(newNodeList.size() > 0 || removedNodeList.size() > 0)
this.queueNotification(newNodeList, removedNodeList);
}
private static boolean isIdInList(Long id, List<ManagementServerHostVO> l) {
@ -731,9 +796,7 @@ public class ClusterManagerImpl implements ClusterManager {
// use seperated thread for heartbeat updates
_heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), heartbeatInterval,
heartbeatInterval, TimeUnit.MILLISECONDS);
_peerScanScheduler.scheduleAtFixedRate(getPeerScanTask(), heartbeatInterval,
heartbeatInterval, TimeUnit.MILLISECONDS);
_notificationExecutor.submit(getNotificationTask());
} catch (Throwable e) {
s_logger.error("Unexpected exception : ", e);
txn.rollback();
@ -882,6 +945,11 @@ public class ClusterManagerImpl implements ClusterManager {
if(mshost == null) {
return false;
}
return pingManagementNode(mshost);
}
private boolean pingManagementNode(ManagementServerHostVO mshost) {
String targetIp = mshost.getServiceIP();
if("127.0.0.1".equals(targetIp) || "0.0.0.0".equals(targetIp)) {
@ -889,7 +957,7 @@ public class ClusterManagerImpl implements ClusterManager {
return false;
}
String targetPeer = String.valueOf(msid);
String targetPeer = String.valueOf(mshost.getMsid());
ClusterService peerService = null;
for(int i = 0; i < 2; i++) {
try {
@ -906,13 +974,14 @@ public class ClusterManagerImpl implements ClusterManager {
invalidatePeerService(targetPeer);
}
} else {
s_logger.warn("Remote peer " + msid + " no longer exists");
s_logger.warn("Remote peer " + mshost.getMsid() + " no longer exists");
}
}
return false;
}
@Override
public int getHeartbeatThreshold() {
return this.heartbeatThreshold;

View File

@ -18,6 +18,7 @@
package com.cloud.cluster.dao;
import java.sql.Connection;
import java.util.Date;
import java.util.List;
@ -26,10 +27,15 @@ import com.cloud.utils.db.GenericDao;
public interface ManagementServerHostDao extends GenericDao<ManagementServerHostVO, Long> {
ManagementServerHostVO findByMsid(long msid);
void increaseAlertCount(long id);
void update(long id, String name, String version, String serviceIP, int servicePort, Date lastUpdate);
void update(long id, Date lastUpdate);
List<ManagementServerHostVO> getActiveList(Date cutTime);
List<ManagementServerHostVO> getInactiveList(Date cutTime);
void increaseAlertCount(long id);
void update(Connection conn, long id, String name, String version, String serviceIP, int servicePort, Date lastUpdate);
void update(Connection conn, long id, Date lastUpdate);
List<ManagementServerHostVO> getActiveList(Connection conn, Date cutTime);
List<ManagementServerHostVO> getInactiveList(Connection conn, Date cutTime);
}

View File

@ -18,7 +18,9 @@
package com.cloud.cluster.dao;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Date;
import java.util.List;
import java.util.TimeZone;
@ -33,6 +35,7 @@ import com.cloud.utils.db.GenericDaoBase;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.Transaction;
import com.cloud.utils.exception.CloudRuntimeException;
@Local(value={ManagementServerHostDao.class})
public class ManagementServerHostDaoImpl extends GenericDaoBase<ManagementServerHostVO, Long> implements ManagementServerHostDao {
@ -41,20 +44,89 @@ public class ManagementServerHostDaoImpl extends GenericDaoBase<ManagementServer
private final SearchBuilder<ManagementServerHostVO> MsIdSearch;
private final SearchBuilder<ManagementServerHostVO> ActiveSearch;
private final SearchBuilder<ManagementServerHostVO> InactiveSearch;
public ManagementServerHostVO findByMsid(long msid) {
SearchCriteria<ManagementServerHostVO> sc = MsIdSearch.create();
sc.setParameters("msid", msid);
List<ManagementServerHostVO> l = listIncludingRemovedBy(sc);
if(l != null && l.size() > 0)
return l.get(0);
return null;
}
public void update(Connection conn, long id, String name, String version, String serviceIP, int servicePort, Date lastUpdate) {
PreparedStatement pstmt = null;
try {
pstmt = conn.prepareStatement("update mshost set name=?, version=?, service_ip=?, service_port=?, last_update=?, removed=null, alert_count=0 where id=?");
pstmt.setString(1, name);
pstmt.setString(2, version);
pstmt.setString(3, serviceIP);
pstmt.setInt(4, servicePort);
pstmt.setString(5, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), lastUpdate));
pstmt.setLong(6, id);
pstmt.executeUpdate();
} catch(SQLException e ) {
throw new CloudRuntimeException("DB exception on " + pstmt.toString(), e);
} finally {
if(pstmt != null) {
try {
pstmt.close();
} catch(Exception e) {
s_logger.warn("Unable to close prepared statement due to exception ", e);
}
}
}
}
public void update(Connection conn, long id, Date lastUpdate) {
PreparedStatement pstmt = null;
try {
pstmt = conn.prepareStatement("update mshost set last_update=?, removed=null, alert_count=0 where id=?");
pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), lastUpdate));
pstmt.setLong(2, id);
pstmt.executeUpdate();
} catch (SQLException e) {
throw new CloudRuntimeException("DB exception on " + pstmt.toString(), e);
} finally {
if(pstmt != null) {
try {
pstmt.close();
} catch(Exception e) {
s_logger.warn("Unable to close prepared statement due to exception ", e);
}
}
}
}
public List<ManagementServerHostVO> getActiveList(Connection conn, Date cutTime) {
Transaction txn = Transaction.openNew("getActiveList", conn);
try {
SearchCriteria<ManagementServerHostVO> sc = ActiveSearch.create();
sc.setParameters("lastUpdateTime", cutTime);
return listIncludingRemovedBy(sc);
} finally {
txn.close();
}
}
public List<ManagementServerHostVO> getInactiveList(Connection conn, Date cutTime) {
Transaction txn = Transaction.openNew("getInactiveList", conn);
try {
SearchCriteria<ManagementServerHostVO> sc = InactiveSearch.create();
sc.setParameters("lastUpdateTime", cutTime);
return listIncludingRemovedBy(sc);
} finally {
txn.close();
}
}
public ManagementServerHostVO findByMsid(long msid) {
SearchCriteria<ManagementServerHostVO> sc = MsIdSearch.create();
sc.setParameters("msid", msid);
List<ManagementServerHostVO> l = listIncludingRemovedBy(sc);
if(l != null && l.size() > 0)
return l.get(0);
return null;
}
public void update(long id, String name, String version, String serviceIP, int servicePort, Date lastUpdate) {
// Can't get removed column to work with null value, switch to raw JDBC that I can control
Transaction txn = Transaction.currentTxn();
PreparedStatement pstmt = null;
try {
@ -75,9 +147,8 @@ public class ManagementServerHostDaoImpl extends GenericDaoBase<ManagementServer
txn.rollback();
}
}
public void update(long id, Date lastUpdate) {
// Can't get removed column to work with null value, switch to raw JDBC that I can control
Transaction txn = Transaction.currentTxn();
PreparedStatement pstmt = null;
try {
@ -93,15 +164,15 @@ public class ManagementServerHostDaoImpl extends GenericDaoBase<ManagementServer
s_logger.warn("Unexpected exception, ", e);
txn.rollback();
}
}
public List<ManagementServerHostVO> getActiveList(Date cutTime) {
}
public List<ManagementServerHostVO> getActiveList(Date cutTime) {
SearchCriteria<ManagementServerHostVO> sc = ActiveSearch.create();
sc.setParameters("lastUpdateTime", cutTime);
return listIncludingRemovedBy(sc);
}
public List<ManagementServerHostVO> getInactiveList(Date cutTime) {
SearchCriteria<ManagementServerHostVO> sc = InactiveSearch.create();
sc.setParameters("lastUpdateTime", cutTime);

View File

@ -73,6 +73,7 @@ public class Transaction {
public static final short CLOUD_DB = 0;
public static final short USAGE_DB = 1;
public static final short CONNECTED_DB = -1;
private final LinkedList<StackElement> _stack;
@ -85,6 +86,8 @@ public class Transaction {
private long _txnTime;
private Statement _stmt;
private final Merovingian _lockMaster;
private Transaction _prev = null;
public static Transaction currentTxn() {
Transaction txn = tls.get();
@ -101,6 +104,22 @@ public class Transaction {
}
return open(name, databaseId, true);
}
//
// Usage of this transaction setup should be limited, it will always open a new transaction context regardless of whether or not there is other
// transaction context in the stack. It is used in special use cases that we want to control DB connection explicitly and in the mean time utilize
// the existing DAO features
//
public static Transaction openNew(final String name, Connection conn) {
assert(conn != null);
Transaction txn = new Transaction(name, false, CONNECTED_DB);
txn._conn = conn;
txn._prev = tls.get();
tls.set(txn);
txn.takeOver(name, true);
return txn;
}
public static Transaction open(final String name) {
return open(name, CLOUD_DB, false);
@ -529,6 +548,9 @@ public class Transaction {
s_logger.trace("Transaction is done");
cleanup();
}
tls.set(_prev);
_prev = null;
}
/**
@ -625,7 +647,9 @@ public class Transaction {
try {
s_logger.trace("conn: Closing DB connection");
_conn.close();
if(this._dbId != CONNECTED_DB)
_conn.close();
_conn = null;
} catch (final SQLException e) {
s_logger.warn("Unable to close connection", e);