diff --git a/server/src/com/cloud/cluster/ClusterManagerImpl.java b/server/src/com/cloud/cluster/ClusterManagerImpl.java index d573fa2fb20..9bab3dae098 100644 --- a/server/src/com/cloud/cluster/ClusterManagerImpl.java +++ b/server/src/com/cloud/cluster/ClusterManagerImpl.java @@ -5,6 +5,8 @@ import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.rmi.RemoteException; +import java.sql.Connection; +import java.sql.SQLException; import java.util.ArrayList; import java.util.Date; import java.util.Enumeration; @@ -40,6 +42,7 @@ import com.cloud.host.dao.HostDao; import com.cloud.serializer.GsonHelper; import com.cloud.utils.DateUtil; import com.cloud.utils.NumbersUtil; +import com.cloud.utils.Pair; import com.cloud.utils.Profiler; import com.cloud.utils.PropertiesUtil; import com.cloud.utils.component.Adapters; @@ -71,11 +74,12 @@ public class ClusterManagerImpl implements ClusterManager { private AgentManager _agentMgr; - private final ScheduledExecutorService _heartbeatScheduler = - Executors.newScheduledThreadPool(1, new NamedThreadFactory("Cluster-Heartbeat")); - private final ScheduledExecutorService _peerScanScheduler = - Executors.newScheduledThreadPool(1, new NamedThreadFactory("Cluster-PeerScan")); + private final ScheduledExecutorService _heartbeatScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Cluster-Heartbeat")); + private final ExecutorService _notificationExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("Cluster-Notification")); + private List< Pair, List> > _notificationMsgs = new ArrayList< Pair, List> >(); + private Connection _heartbeatConnection = null; + private final ExecutorService _executor; private ClusterServiceAdapter _currentServiceAdapter; @@ -539,50 +543,140 @@ public class ClusterManagerImpl implements ClusterManager { s_logger.trace("Cluster manager heartbeat update, id:" + _mshostId); } - _mshostDao.update(_mshostId, DateUtil.currentGMTTime()); + Connection conn = getHeartbeatConnection(); + _mshostDao.update(conn, _mshostId, DateUtil.currentGMTTime()); + + if(s_logger.isTraceEnabled()) { + s_logger.trace("Cluster manager peer-scan, id:" + _mshostId); + } + + if(!_peerScanInited) { + _peerScanInited = true; + initPeerScan(conn); + } + + peerScan(conn); + } catch(CloudRuntimeException e) { + // TODO, if the exception is caused by lost of DB connection + // start fencing and shutdown the management server + s_logger.error("Runtime DB exception ", e.getCause()); + + invalidHeartbeatConnection(); } catch (Throwable e) { s_logger.error("Problem with the cluster heartbeat!", e); } } }; } - - private Runnable getPeerScanTask() { + + private Connection getHeartbeatConnection() { + if(_heartbeatConnection != null) { + return _heartbeatConnection; + } + + _heartbeatConnection = Transaction.getStandaloneUsageConnection(); + return _heartbeatConnection; + } + + private void invalidHeartbeatConnection() { + if(_heartbeatConnection != null) { + try { + _heartbeatConnection.close(); + } catch (SQLException e) { + s_logger.warn("Unable to close hearbeat DB connection. ", e); + } + + _heartbeatConnection = null; + } + } + + private Runnable getNotificationTask() { return new Runnable() { @Override public void run() { - try { - if(s_logger.isTraceEnabled()) { - s_logger.trace("Cluster manager peer-scan, id:" + _mshostId); - } - - if(!_peerScanInited) { - _peerScanInited = true; - initPeerScan(); - } - - peerScan(); - } catch (Throwable e) { - s_logger.error("Problem with the cluster peer-scan!", e); - } + while(true) { + synchronized(_notificationMsgs) { + try { + _notificationMsgs.wait(1000); + } catch (InterruptedException e) { + } + } + + Pair, List> msgPair = null; + while((msgPair = getNextNotificationMessage()) != null) { + try { + if(msgPair.first() != null && msgPair.first().size() > 0) { + Profiler profiler = new Profiler(); + profiler.start(); + + notifyNodeJoined(msgPair.first()); + + profiler.stop(); + if(profiler.getDuration() > 1000) { + if(s_logger.isDebugEnabled()) { + s_logger.debug("Notifying management server join event took " + profiler.getDuration() + " ms"); + } + } else { + s_logger.warn("Notifying management server join event took " + profiler.getDuration() + " ms"); + } + } + + if(msgPair.second() != null && msgPair.second().size() > 0) { + Profiler profiler = new Profiler(); + profiler.start(); + + notifyNodeLeft(msgPair.second()); + + profiler.stop(); + if(profiler.getDuration() > 1000) { + if(s_logger.isDebugEnabled()) { + s_logger.debug("Notifying management server leave event took " + profiler.getDuration() + " ms"); + } + } else { + s_logger.warn("Notifying management server leave event took " + profiler.getDuration() + " ms"); + } + } + } catch (Throwable e) { + s_logger.warn("Unexpected exception during cluster notification. ", e); + } + } + + try { Thread.currentThread().sleep(1000); } catch (InterruptedException e) {} + } } }; } - private void initPeerScan() { + private void queueNotification(List addedNodeList, List removedNodeList) { + synchronized(this._notificationMsgs) { + this._notificationMsgs.add(new Pair, List>(addedNodeList, removedNodeList)); + this._notificationMsgs.notifyAll(); + } + } + + private Pair, List> getNextNotificationMessage() { + synchronized(this._notificationMsgs) { + if(this._notificationMsgs.size() > 0) + return this._notificationMsgs.remove(0); + } + + return null; + } + + private void initPeerScan(Connection conn) { // upon startup, for all inactive management server nodes that we see at startup time, we will send notification also to help upper layer perform // missed cleanup Date cutTime = DateUtil.currentGMTTime(); - List inactiveList = _mshostDao.getInactiveList(new Date(cutTime.getTime() - heartbeatThreshold)); + List inactiveList = _mshostDao.getInactiveList(conn, new Date(cutTime.getTime() - heartbeatThreshold)); if(inactiveList.size() > 0) { notifyNodeLeft(inactiveList); } } - private void peerScan() { + private void peerScan(Connection conn) { Date cutTime = DateUtil.currentGMTTime(); - List currentList = _mshostDao.getActiveList(new Date(cutTime.getTime() - heartbeatThreshold)); + List currentList = _mshostDao.getActiveList(conn, new Date(cutTime.getTime() - heartbeatThreshold)); List removedNodeList = new ArrayList(); if(_mshostId != null) { @@ -602,7 +696,7 @@ public class ClusterManagerImpl implements ClusterManager { Iterator it = removedNodeList.iterator(); while(it.hasNext()) { ManagementServerHostVO mshost = it.next(); - if(!pingManagementNode(mshost.getMsid())) { + if(!pingManagementNode(mshost)) { s_logger.warn("Management node " + mshost.getId() + " is detected inactive by timestamp and also not pingable"); activePeers.remove(mshost.getId()); @@ -635,37 +729,8 @@ public class ClusterManagerImpl implements ClusterManager { } } - if(newNodeList.size() > 0) { - Profiler profiler = new Profiler(); - profiler.start(); - - notifyNodeJoined(newNodeList); - - profiler.stop(); - if(profiler.getDuration() > 1000) { - if(s_logger.isDebugEnabled()) { - s_logger.debug("Notifying management server join event took " + profiler.getDuration() + " ms"); - } - } else { - s_logger.warn("Notifying management server join event took " + profiler.getDuration() + " ms"); - } - } - - if(removedNodeList.size() > 0) { - Profiler profiler = new Profiler(); - profiler.start(); - - notifyNodeLeft(removedNodeList); - - profiler.stop(); - if(profiler.getDuration() > 1000) { - if(s_logger.isDebugEnabled()) { - s_logger.debug("Notifying management server leave event took " + profiler.getDuration() + " ms"); - } - } else { - s_logger.warn("Notifying management server leave event took " + profiler.getDuration() + " ms"); - } - } + if(newNodeList.size() > 0 || removedNodeList.size() > 0) + this.queueNotification(newNodeList, removedNodeList); } private static boolean isIdInList(Long id, List l) { @@ -731,9 +796,7 @@ public class ClusterManagerImpl implements ClusterManager { // use seperated thread for heartbeat updates _heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), heartbeatInterval, heartbeatInterval, TimeUnit.MILLISECONDS); - _peerScanScheduler.scheduleAtFixedRate(getPeerScanTask(), heartbeatInterval, - heartbeatInterval, TimeUnit.MILLISECONDS); - + _notificationExecutor.submit(getNotificationTask()); } catch (Throwable e) { s_logger.error("Unexpected exception : ", e); txn.rollback(); @@ -882,6 +945,11 @@ public class ClusterManagerImpl implements ClusterManager { if(mshost == null) { return false; } + + return pingManagementNode(mshost); + } + + private boolean pingManagementNode(ManagementServerHostVO mshost) { String targetIp = mshost.getServiceIP(); if("127.0.0.1".equals(targetIp) || "0.0.0.0".equals(targetIp)) { @@ -889,7 +957,7 @@ public class ClusterManagerImpl implements ClusterManager { return false; } - String targetPeer = String.valueOf(msid); + String targetPeer = String.valueOf(mshost.getMsid()); ClusterService peerService = null; for(int i = 0; i < 2; i++) { try { @@ -906,13 +974,14 @@ public class ClusterManagerImpl implements ClusterManager { invalidatePeerService(targetPeer); } } else { - s_logger.warn("Remote peer " + msid + " no longer exists"); + s_logger.warn("Remote peer " + mshost.getMsid() + " no longer exists"); } } return false; } + @Override public int getHeartbeatThreshold() { return this.heartbeatThreshold; diff --git a/server/src/com/cloud/cluster/dao/ManagementServerHostDao.java b/server/src/com/cloud/cluster/dao/ManagementServerHostDao.java index 20163a8e755..3c952ecd33d 100644 --- a/server/src/com/cloud/cluster/dao/ManagementServerHostDao.java +++ b/server/src/com/cloud/cluster/dao/ManagementServerHostDao.java @@ -18,6 +18,7 @@ package com.cloud.cluster.dao; +import java.sql.Connection; import java.util.Date; import java.util.List; @@ -26,10 +27,15 @@ import com.cloud.utils.db.GenericDao; public interface ManagementServerHostDao extends GenericDao { ManagementServerHostVO findByMsid(long msid); + void increaseAlertCount(long id); + void update(long id, String name, String version, String serviceIP, int servicePort, Date lastUpdate); void update(long id, Date lastUpdate); - List getActiveList(Date cutTime); List getInactiveList(Date cutTime); - void increaseAlertCount(long id); + + void update(Connection conn, long id, String name, String version, String serviceIP, int servicePort, Date lastUpdate); + void update(Connection conn, long id, Date lastUpdate); + List getActiveList(Connection conn, Date cutTime); + List getInactiveList(Connection conn, Date cutTime); } diff --git a/server/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java b/server/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java index ecf629baf3a..0933f2d1789 100644 --- a/server/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java +++ b/server/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java @@ -18,7 +18,9 @@ package com.cloud.cluster.dao; +import java.sql.Connection; import java.sql.PreparedStatement; +import java.sql.SQLException; import java.util.Date; import java.util.List; import java.util.TimeZone; @@ -33,6 +35,7 @@ import com.cloud.utils.db.GenericDaoBase; import com.cloud.utils.db.SearchBuilder; import com.cloud.utils.db.SearchCriteria; import com.cloud.utils.db.Transaction; +import com.cloud.utils.exception.CloudRuntimeException; @Local(value={ManagementServerHostDao.class}) public class ManagementServerHostDaoImpl extends GenericDaoBase implements ManagementServerHostDao { @@ -41,20 +44,89 @@ public class ManagementServerHostDaoImpl extends GenericDaoBase MsIdSearch; private final SearchBuilder ActiveSearch; private final SearchBuilder InactiveSearch; - - public ManagementServerHostVO findByMsid(long msid) { - SearchCriteria sc = MsIdSearch.create(); - sc.setParameters("msid", msid); - - List l = listIncludingRemovedBy(sc); - if(l != null && l.size() > 0) - return l.get(0); - - return null; - } - + + public void update(Connection conn, long id, String name, String version, String serviceIP, int servicePort, Date lastUpdate) { + PreparedStatement pstmt = null; + try { + pstmt = conn.prepareStatement("update mshost set name=?, version=?, service_ip=?, service_port=?, last_update=?, removed=null, alert_count=0 where id=?"); + pstmt.setString(1, name); + pstmt.setString(2, version); + pstmt.setString(3, serviceIP); + pstmt.setInt(4, servicePort); + pstmt.setString(5, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), lastUpdate)); + pstmt.setLong(6, id); + + pstmt.executeUpdate(); + } catch(SQLException e ) { + throw new CloudRuntimeException("DB exception on " + pstmt.toString(), e); + } finally { + if(pstmt != null) { + try { + pstmt.close(); + } catch(Exception e) { + s_logger.warn("Unable to close prepared statement due to exception ", e); + } + } + } + } + + public void update(Connection conn, long id, Date lastUpdate) { + PreparedStatement pstmt = null; + try { + pstmt = conn.prepareStatement("update mshost set last_update=?, removed=null, alert_count=0 where id=?"); + pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), lastUpdate)); + pstmt.setLong(2, id); + + pstmt.executeUpdate(); + } catch (SQLException e) { + throw new CloudRuntimeException("DB exception on " + pstmt.toString(), e); + } finally { + if(pstmt != null) { + try { + pstmt.close(); + } catch(Exception e) { + s_logger.warn("Unable to close prepared statement due to exception ", e); + } + } + } + } + + public List getActiveList(Connection conn, Date cutTime) { + Transaction txn = Transaction.openNew("getActiveList", conn); + try { + SearchCriteria sc = ActiveSearch.create(); + sc.setParameters("lastUpdateTime", cutTime); + + return listIncludingRemovedBy(sc); + } finally { + txn.close(); + } + } + + public List getInactiveList(Connection conn, Date cutTime) { + Transaction txn = Transaction.openNew("getInactiveList", conn); + try { + SearchCriteria sc = InactiveSearch.create(); + sc.setParameters("lastUpdateTime", cutTime); + + return listIncludingRemovedBy(sc); + } finally { + txn.close(); + } + } + + public ManagementServerHostVO findByMsid(long msid) { + SearchCriteria sc = MsIdSearch.create(); + sc.setParameters("msid", msid); + + List l = listIncludingRemovedBy(sc); + if(l != null && l.size() > 0) + return l.get(0); + + return null; + } + public void update(long id, String name, String version, String serviceIP, int servicePort, Date lastUpdate) { - // Can't get removed column to work with null value, switch to raw JDBC that I can control Transaction txn = Transaction.currentTxn(); PreparedStatement pstmt = null; try { @@ -75,9 +147,8 @@ public class ManagementServerHostDaoImpl extends GenericDaoBase getActiveList(Date cutTime) { + } + + public List getActiveList(Date cutTime) { SearchCriteria sc = ActiveSearch.create(); sc.setParameters("lastUpdateTime", cutTime); return listIncludingRemovedBy(sc); } - + public List getInactiveList(Date cutTime) { SearchCriteria sc = InactiveSearch.create(); sc.setParameters("lastUpdateTime", cutTime); diff --git a/utils/src/com/cloud/utils/db/Transaction.java b/utils/src/com/cloud/utils/db/Transaction.java index 20d15918333..73f74ddec1e 100755 --- a/utils/src/com/cloud/utils/db/Transaction.java +++ b/utils/src/com/cloud/utils/db/Transaction.java @@ -73,6 +73,7 @@ public class Transaction { public static final short CLOUD_DB = 0; public static final short USAGE_DB = 1; + public static final short CONNECTED_DB = -1; private final LinkedList _stack; @@ -85,6 +86,8 @@ public class Transaction { private long _txnTime; private Statement _stmt; private final Merovingian _lockMaster; + + private Transaction _prev = null; public static Transaction currentTxn() { Transaction txn = tls.get(); @@ -101,6 +104,22 @@ public class Transaction { } return open(name, databaseId, true); } + + // + // Usage of this transaction setup should be limited, it will always open a new transaction context regardless of whether or not there is other + // transaction context in the stack. It is used in special use cases that we want to control DB connection explicitly and in the mean time utilize + // the existing DAO features + // + public static Transaction openNew(final String name, Connection conn) { + assert(conn != null); + Transaction txn = new Transaction(name, false, CONNECTED_DB); + txn._conn = conn; + txn._prev = tls.get(); + tls.set(txn); + + txn.takeOver(name, true); + return txn; + } public static Transaction open(final String name) { return open(name, CLOUD_DB, false); @@ -529,6 +548,9 @@ public class Transaction { s_logger.trace("Transaction is done"); cleanup(); } + + tls.set(_prev); + _prev = null; } /** @@ -625,7 +647,9 @@ public class Transaction { try { s_logger.trace("conn: Closing DB connection"); - _conn.close(); + if(this._dbId != CONNECTED_DB) + _conn.close(); + _conn = null; } catch (final SQLException e) { s_logger.warn("Unable to close connection", e);