diff --git a/server/src/com/cloud/cluster/ClusterManagerImpl.java b/server/src/com/cloud/cluster/ClusterManagerImpl.java index f68366dda0e..a45848eb660 100644 --- a/server/src/com/cloud/cluster/ClusterManagerImpl.java +++ b/server/src/com/cloud/cluster/ClusterManagerImpl.java @@ -94,6 +94,7 @@ public class ClusterManagerImpl implements ClusterManager { private final Map asyncCalls; private final Gson gson; + @Inject private AgentManager _agentMgr; @Inject private ClusteredAgentRebalanceService _rebalanceService; @@ -563,13 +564,14 @@ public class ClusterManagerImpl implements ClusterManager { return new Runnable() { @Override public void run() { + Transaction txn = Transaction.open("ClusterHeartBeat"); try { + txn.transitToUserManagedConnection(getHeartbeatConnection()); if(s_logger.isTraceEnabled()) { s_logger.trace("Cluster manager heartbeat update, id:" + _mshostId); } - Connection conn = getHeartbeatConnection(); - _mshostDao.update(conn, _mshostId, getCurrentRunId(), DateUtil.currentGMTTime()); + _mshostDao.update(_mshostId, getCurrentRunId(), DateUtil.currentGMTTime()); if (s_logger.isTraceEnabled()) { s_logger.trace("Cluster manager peer-scan, id:" + _mshostId); @@ -577,10 +579,10 @@ public class ClusterManagerImpl implements ClusterManager { if (!_peerScanInited) { _peerScanInited = true; - initPeerScan(conn); + initPeerScan(); } - peerScan(conn); + peerScan(); } catch(CloudRuntimeException e) { s_logger.error("Runtime DB exception ", e.getCause()); @@ -602,6 +604,8 @@ public class ClusterManagerImpl implements ClusterManager { } s_logger.error("Problem with the cluster heartbeat!", e); + } finally { + txn.close("ClusterHeartBeat"); } } }; @@ -706,7 +710,7 @@ public class ClusterManagerImpl implements ClusterManager { } } - try { Thread.currentThread().sleep(1000); } catch (InterruptedException e) {} + try { Thread.sleep(1000); } catch (InterruptedException e) {} } } }; @@ -729,20 +733,20 @@ public class ClusterManagerImpl implements ClusterManager { return null; } - private void initPeerScan(Connection conn) { + private void initPeerScan() { // upon startup, for all inactive management server nodes that we see at startup time, we will send notification also to help upper layer perform // missed cleanup Date cutTime = DateUtil.currentGMTTime(); - List inactiveList = _mshostDao.getInactiveList(conn, new Date(cutTime.getTime() - heartbeatThreshold)); + List inactiveList = _mshostDao.getInactiveList(new Date(cutTime.getTime() - heartbeatThreshold)); if(inactiveList.size() > 0) { this.queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeRemoved, inactiveList)); } } - private void peerScan(Connection conn) { + private void peerScan() { Date cutTime = DateUtil.currentGMTTime(); - List currentList = _mshostDao.getActiveList(conn, new Date(cutTime.getTime() - heartbeatThreshold)); + List currentList = _mshostDao.getActiveList(new Date(cutTime.getTime() - heartbeatThreshold)); List removedNodeList = new ArrayList(); List invalidatedNodeList = new ArrayList(); @@ -801,7 +805,7 @@ public class ClusterManagerImpl implements ClusterManager { if(!pingManagementNode(mshost)) { s_logger.warn("Management node " + mshost.getId() + " is detected inactive by timestamp and also not pingable"); activePeers.remove(mshost.getId()); - _mshostDao.invalidateRunSession(conn, mshost.getId(), mshost.getRunid()); + _mshostDao.invalidateRunSession(mshost.getId(), mshost.getRunid()); try { JmxUtil.unregisterMBean("ClusterManager", "Node " + mshost.getId()); } catch(Exception e) { diff --git a/server/src/com/cloud/cluster/dao/ManagementServerHostDao.java b/server/src/com/cloud/cluster/dao/ManagementServerHostDao.java index 01bfac6a487..679aa695fa0 100644 --- a/server/src/com/cloud/cluster/dao/ManagementServerHostDao.java +++ b/server/src/com/cloud/cluster/dao/ManagementServerHostDao.java @@ -18,14 +18,12 @@ package com.cloud.cluster.dao; -import java.sql.Connection; import java.util.Date; import java.util.List; -import com.cloud.cluster.ManagementServerHost.State; import com.cloud.cluster.ManagementServerHost; +import com.cloud.cluster.ManagementServerHost.State; import com.cloud.cluster.ManagementServerHostVO; -import com.cloud.host.Status; import com.cloud.utils.db.GenericDao; public interface ManagementServerHostDao extends GenericDao { @@ -40,13 +38,9 @@ public interface ManagementServerHostDao extends GenericDao getActiveList(Date cutTime); List getInactiveList(Date cutTime); - void update(Connection conn, long id, long runid, String name, String version, String serviceIP, int servicePort, Date lastUpdate); - void update(Connection conn, long id, long runid, Date lastUpdate); - void invalidateRunSession(Connection conn, long id, long runid); - List getActiveList(Connection conn, Date cutTime); - List getInactiveList(Connection conn, Date cutTime); + void invalidateRunSession(long id, long runid); - void update(Connection conn, long id, long runId, State state, Date lastUpdate); + void update(long id, long runId, State state, Date lastUpdate); List listBy(ManagementServerHost.State...states); } diff --git a/server/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java b/server/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java index bb68e53939b..845fa4253e9 100644 --- a/server/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java +++ b/server/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java @@ -18,7 +18,6 @@ package com.cloud.cluster.dao; -import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.Date; @@ -51,110 +50,20 @@ public class ManagementServerHostDaoImpl extends GenericDaoBase StateSearch; @Override - public void update(Connection conn, long id, long runid, String name, String version, String serviceIP, int servicePort, Date lastUpdate) { + public void invalidateRunSession(long id, long runid) { + Transaction txn = Transaction.currentTxn(); PreparedStatement pstmt = null; try { - pstmt = conn.prepareStatement("update mshost set name=?, version=?, service_ip=?, service_port=?, last_update=?, removed=null, alert_count=0, runid=? where id=?"); - pstmt.setString(1, name); - pstmt.setString(2, version); - pstmt.setString(3, serviceIP); - pstmt.setInt(4, servicePort); - pstmt.setString(5, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), lastUpdate)); - pstmt.setLong(6, runid); - pstmt.setLong(7, id); - - pstmt.executeUpdate(); - conn.commit(); - } catch(SQLException e ) { - throw new CloudRuntimeException("DB exception on " + pstmt.toString(), e); - } finally { - if(pstmt != null) { - try { - pstmt.close(); - } catch(Exception e) { - s_logger.warn("Unable to close prepared statement due to exception ", e); - } - } - } - } - - @Override - public void update(Connection conn, long id, long runid, Date lastUpdate) { - PreparedStatement pstmt = null; - try { - pstmt = conn.prepareStatement("update mshost set last_update=?, removed=null, alert_count=0 where id=? and runid=?"); - pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), lastUpdate)); - pstmt.setLong(2, id); - pstmt.setLong(3, runid); - - int count = pstmt.executeUpdate(); - conn.commit(); - - if(count < 1) { - throw new CloudRuntimeException("Invalid cluster session detected", new ClusterInvalidSessionException("runid " + runid + " is no longer valid")); - } - } catch (SQLException e) { - throw new CloudRuntimeException("DB exception on " + pstmt.toString(), e); - } finally { - if(pstmt != null) { - try { - pstmt.close(); - } catch(Exception e) { - s_logger.warn("Unable to close prepared statement due to exception ", e); - } - } - } - } - - @Override - public void invalidateRunSession(Connection conn, long id, long runid) { - PreparedStatement pstmt = null; - try { - pstmt = conn.prepareStatement("update mshost set runid=0, state='Down' where id=? and runid=?"); + pstmt = txn.prepareAutoCloseStatement("update mshost set runid=0, state='Down' where id=? and runid=?"); pstmt.setLong(1, id); pstmt.setLong(2, runid); pstmt.executeUpdate(); - conn.commit(); } catch (SQLException e) { throw new CloudRuntimeException("DB exception on " + pstmt.toString(), e); - } finally { - if(pstmt != null) { - try { - pstmt.close(); - } catch(Exception e) { - s_logger.warn("Unable to close prepared statement due to exception ", e); - } - } } } - @Override - public List getActiveList(Connection conn, Date cutTime) { - Transaction txn = Transaction.openNew("getActiveList", conn); - try { - SearchCriteria sc = ActiveSearch.create(); - sc.setParameters("lastUpdateTime", cutTime); - - return listIncludingRemovedBy(sc); - } finally { - txn.close("getActiveList"); - } - } - - @Override - public List getInactiveList(Connection conn, Date cutTime) { - Transaction txn = Transaction.openNew("getInactiveList", conn); - try { - SearchCriteria sc = InactiveSearch.create(); - sc.setParameters("lastUpdateTime", cutTime); - - return listIncludingRemovedBy(sc); - } finally { - txn.close("getInactiveList"); - } - } - @Override public ManagementServerHostVO findByMsid(long msid) { SearchCriteria sc = MsIdSearch.create(); @@ -190,7 +99,6 @@ public class ManagementServerHostDaoImpl extends GenericDaoBase implements Gene return createForUpdate(null); } - @Override + @Override @DB(txn=false) public K getNextInSequence(final Class clazz, final String name) { final TableGenerator tg = _tgs.get(name); assert (tg != null) : "Couldn't find Table generator using " + name; @@ -377,7 +377,7 @@ public abstract class GenericDaoBase implements Gene } } - @Override @SuppressWarnings("unchecked") @DB + @Override @SuppressWarnings("unchecked") public List customSearchIncludingRemoved(SearchCriteria sc, final Filter filter) { String clause = sc != null ? sc.getWhereClause() : null; if (clause != null && clause.length() == 0) { @@ -733,7 +733,6 @@ public abstract class GenericDaoBase implements Gene return rowsUpdated; } - // @Override public int update(UpdateBuilder ub, final SearchCriteria sc, Integer rows) { StringBuilder sql = null; PreparedStatement pstmt = null; diff --git a/utils/src/com/cloud/utils/db/Transaction.java b/utils/src/com/cloud/utils/db/Transaction.java index c2a3d6d4806..c4fdd50fa2e 100755 --- a/utils/src/com/cloud/utils/db/Transaction.java +++ b/utils/src/com/cloud/utils/db/Transaction.java @@ -97,7 +97,7 @@ public class Transaction { private String _name; private Connection _conn; private boolean _txn; - private final short _dbId; + private short _dbId; private long _txnTime; private Statement _stmt; private String _creator; @@ -125,17 +125,16 @@ public class Transaction { // transaction context in the stack. It is used in special use cases that we want to control DB connection explicitly and in the mean time utilize // the existing DAO features // - public static Transaction openNew(final String name, Connection conn) { - assert(conn != null); - Transaction txn = new Transaction(name, false, CONNECTED_DB); - txn._conn = conn; - txn._prev = tls.get(); - tls.set(txn); - - txn.takeOver(name, true); - s_logger.debug("Registering txn" + txn.getId()); - s_mbean.addTransaction(txn); - return txn; + public void transitToUserManagedConnection(Connection conn) { + assert(_conn == null && _stack.size() <= 1) : "Can't change to a user managed connection unless the stack is empty and the db connection is null: " + toString(); + _conn = conn; + _dbId = CONNECTED_DB; + } + + public void transitToAutoManagedConnection(short dbId) { + assert(_stack.size() == 0) : "Can't change to auto managed connection unless your stack is empty"; + _dbId = dbId; + _conn = null; } public static Transaction open(final String name) { @@ -166,7 +165,6 @@ public class Transaction { txn.takeOver(name, false); if (isNew) { - s_logger.debug("Registering txn" + txn.getId()); s_mbean.addTransaction(txn); } return txn; @@ -611,7 +609,6 @@ public class Transaction { if(this._dbId == CONNECTED_DB) { tls.set(_prev); _prev = null; - s_logger.debug("Unregistering txn" + getId()); s_mbean.removeTransaction(this); } } diff --git a/utils/src/com/cloud/utils/db/TransactionMBean.java b/utils/src/com/cloud/utils/db/TransactionMBean.java index 25925dbe1ca..f36c46bfb2d 100644 --- a/utils/src/com/cloud/utils/db/TransactionMBean.java +++ b/utils/src/com/cloud/utils/db/TransactionMBean.java @@ -18,14 +18,17 @@ package com.cloud.utils.db; import java.util.List; +import java.util.Map; public interface TransactionMBean { int getTransactionCount(); - int getActiveTransactionCount(); + int[] getActiveTransactionCount(); - List getTransactions(); + List> getTransactions(); - List getActiveTransactions(); + List> getActiveTransactions(); + + List> getTransactionsWithDatabaseConnection(); } diff --git a/utils/src/com/cloud/utils/db/TransactionMBeanImpl.java b/utils/src/com/cloud/utils/db/TransactionMBeanImpl.java index fdff868dbc6..9aca7b99176 100644 --- a/utils/src/com/cloud/utils/db/TransactionMBeanImpl.java +++ b/utils/src/com/cloud/utils/db/TransactionMBeanImpl.java @@ -17,7 +17,9 @@ */ package com.cloud.utils.db; +import java.sql.Connection; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -48,49 +50,65 @@ public class TransactionMBeanImpl extends StandardMBean implements TransactionMB } @Override - public int getActiveTransactionCount() { - int count = 0; + public int[] getActiveTransactionCount() { + int[] count = new int[2]; + count[0] = 0; + count[1] = 0; for (Transaction txn : _txns.values()) { if (txn.getStack().size() > 0) { - count++; + count[0]++; + } + if (txn.getCurrentConnection() != null) { + count[1]++; } } return count; } @Override - public List getTransactions() { - ArrayList txns = new ArrayList(); + public List> getTransactions() { + ArrayList> txns = new ArrayList>(); for (Transaction info : _txns.values()) { - txns.add(toString(info)); + txns.add(toMap(info)); } return txns; } @Override - public List getActiveTransactions() { - ArrayList txns = new ArrayList(); + public List> getActiveTransactions() { + ArrayList> txns = new ArrayList>(); for (Transaction txn : _txns.values()) { if (txn.getStack().size() > 0 || txn.getCurrentConnection() != null) { - txns.add(toString(txn)); + txns.add(toMap(txn)); } } return txns; } - protected String toString(Transaction txn) { - StringBuilder buff = new StringBuilder("[Name="); - buff.append(txn.getName()); - buff.append("; Creator="); - buff.append(txn.getCreator()); - buff.append("; DB="); - buff.append(txn.getCurrentConnection()); - buff.append("; Stack="); + protected Map toMap(Transaction txn) { + Map map = new HashMap(); + map.put("name", txn.getName()); + map.put("id", Long.toString(txn.getId())); + map.put("creator", txn.getCreator()); + Connection conn = txn.getCurrentConnection(); + map.put("db", conn != null ? Integer.toString(System.identityHashCode(conn)) : "none"); + StringBuilder buff = new StringBuilder(); for (StackElement element : txn.getStack()) { - buff.append(",").append(element.toString()); + buff.append(element.toString()).append(","); } - buff.append("]"); + map.put("stack", buff.toString()); - return buff.toString(); + return map; + } + + @Override + public List> getTransactionsWithDatabaseConnection() { + ArrayList> txns = new ArrayList>(); + for (Transaction txn : _txns.values()) { + if (txn.getCurrentConnection() != null) { + txns.add(toMap(txn)); + } + } + return txns; } } \ No newline at end of file