From cfc25d01be1fc56dd9e8f0c0f722f96cfecd21e8 Mon Sep 17 00:00:00 2001 From: Alex Huang Date: Fri, 1 Jul 2011 11:02:56 -0700 Subject: [PATCH] bug 10501: This is really Kelven's bug but I'll fix it for him anyways. DAO code already have a way to extract the DB connection from a transaction that is stored in the TLS. There's no real reason for the DAO code to add special semantics to use a different DB connection. That can be done by simply switching the transaction before it even reached the dao code. Think about it. Why would anyone want to call one dao function, switch transaction, and then switch back. The right thing is for the caller to switch transaction, call a series of dao codes, and switch it back. That's the semantics I changed to. By doing this, it also eliminates the number of debug messages in this bug. --- .../com/cloud/cluster/ClusterManagerImpl.java | 24 ++-- .../cluster/dao/ManagementServerHostDao.java | 12 +- .../dao/ManagementServerHostDaoImpl.java | 114 +----------------- utils/src/com/cloud/utils/db/DB.java | 4 +- .../com/cloud/utils/db/GenericDaoBase.java | 5 +- utils/src/com/cloud/utils/db/Transaction.java | 25 ++-- .../com/cloud/utils/db/TransactionMBean.java | 9 +- .../cloud/utils/db/TransactionMBeanImpl.java | 58 ++++++--- 8 files changed, 82 insertions(+), 169 deletions(-) diff --git a/server/src/com/cloud/cluster/ClusterManagerImpl.java b/server/src/com/cloud/cluster/ClusterManagerImpl.java index f68366dda0e..a45848eb660 100644 --- a/server/src/com/cloud/cluster/ClusterManagerImpl.java +++ b/server/src/com/cloud/cluster/ClusterManagerImpl.java @@ -94,6 +94,7 @@ public class ClusterManagerImpl implements ClusterManager { private final Map asyncCalls; private final Gson gson; + @Inject private AgentManager _agentMgr; @Inject private ClusteredAgentRebalanceService _rebalanceService; @@ -563,13 +564,14 @@ public class ClusterManagerImpl implements ClusterManager { return new Runnable() { @Override public void run() { + Transaction txn = Transaction.open("ClusterHeartBeat"); try { + txn.transitToUserManagedConnection(getHeartbeatConnection()); if(s_logger.isTraceEnabled()) { s_logger.trace("Cluster manager heartbeat update, id:" + _mshostId); } - Connection conn = getHeartbeatConnection(); - _mshostDao.update(conn, _mshostId, getCurrentRunId(), DateUtil.currentGMTTime()); + _mshostDao.update(_mshostId, getCurrentRunId(), DateUtil.currentGMTTime()); if (s_logger.isTraceEnabled()) { s_logger.trace("Cluster manager peer-scan, id:" + _mshostId); @@ -577,10 +579,10 @@ public class ClusterManagerImpl implements ClusterManager { if (!_peerScanInited) { _peerScanInited = true; - initPeerScan(conn); + initPeerScan(); } - peerScan(conn); + peerScan(); } catch(CloudRuntimeException e) { s_logger.error("Runtime DB exception ", e.getCause()); @@ -602,6 +604,8 @@ public class ClusterManagerImpl implements ClusterManager { } s_logger.error("Problem with the cluster heartbeat!", e); + } finally { + txn.close("ClusterHeartBeat"); } } }; @@ -706,7 +710,7 @@ public class ClusterManagerImpl implements ClusterManager { } } - try { Thread.currentThread().sleep(1000); } catch (InterruptedException e) {} + try { Thread.sleep(1000); } catch (InterruptedException e) {} } } }; @@ -729,20 +733,20 @@ public class ClusterManagerImpl implements ClusterManager { return null; } - private void initPeerScan(Connection conn) { + private void initPeerScan() { // upon startup, for all inactive management server nodes that we see at startup time, we will send notification also to help upper layer perform // missed cleanup Date cutTime = DateUtil.currentGMTTime(); - List inactiveList = _mshostDao.getInactiveList(conn, new Date(cutTime.getTime() - heartbeatThreshold)); + List inactiveList = _mshostDao.getInactiveList(new Date(cutTime.getTime() - heartbeatThreshold)); if(inactiveList.size() > 0) { this.queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeRemoved, inactiveList)); } } - private void peerScan(Connection conn) { + private void peerScan() { Date cutTime = DateUtil.currentGMTTime(); - List currentList = _mshostDao.getActiveList(conn, new Date(cutTime.getTime() - heartbeatThreshold)); + List currentList = _mshostDao.getActiveList(new Date(cutTime.getTime() - heartbeatThreshold)); List removedNodeList = new ArrayList(); List invalidatedNodeList = new ArrayList(); @@ -801,7 +805,7 @@ public class ClusterManagerImpl implements ClusterManager { if(!pingManagementNode(mshost)) { s_logger.warn("Management node " + mshost.getId() + " is detected inactive by timestamp and also not pingable"); activePeers.remove(mshost.getId()); - _mshostDao.invalidateRunSession(conn, mshost.getId(), mshost.getRunid()); + _mshostDao.invalidateRunSession(mshost.getId(), mshost.getRunid()); try { JmxUtil.unregisterMBean("ClusterManager", "Node " + mshost.getId()); } catch(Exception e) { diff --git a/server/src/com/cloud/cluster/dao/ManagementServerHostDao.java b/server/src/com/cloud/cluster/dao/ManagementServerHostDao.java index 01bfac6a487..679aa695fa0 100644 --- a/server/src/com/cloud/cluster/dao/ManagementServerHostDao.java +++ b/server/src/com/cloud/cluster/dao/ManagementServerHostDao.java @@ -18,14 +18,12 @@ package com.cloud.cluster.dao; -import java.sql.Connection; import java.util.Date; import java.util.List; -import com.cloud.cluster.ManagementServerHost.State; import com.cloud.cluster.ManagementServerHost; +import com.cloud.cluster.ManagementServerHost.State; import com.cloud.cluster.ManagementServerHostVO; -import com.cloud.host.Status; import com.cloud.utils.db.GenericDao; public interface ManagementServerHostDao extends GenericDao { @@ -40,13 +38,9 @@ public interface ManagementServerHostDao extends GenericDao getActiveList(Date cutTime); List getInactiveList(Date cutTime); - void update(Connection conn, long id, long runid, String name, String version, String serviceIP, int servicePort, Date lastUpdate); - void update(Connection conn, long id, long runid, Date lastUpdate); - void invalidateRunSession(Connection conn, long id, long runid); - List getActiveList(Connection conn, Date cutTime); - List getInactiveList(Connection conn, Date cutTime); + void invalidateRunSession(long id, long runid); - void update(Connection conn, long id, long runId, State state, Date lastUpdate); + void update(long id, long runId, State state, Date lastUpdate); List listBy(ManagementServerHost.State...states); } diff --git a/server/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java b/server/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java index bb68e53939b..845fa4253e9 100644 --- a/server/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java +++ b/server/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java @@ -18,7 +18,6 @@ package com.cloud.cluster.dao; -import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.Date; @@ -51,110 +50,20 @@ public class ManagementServerHostDaoImpl extends GenericDaoBase StateSearch; @Override - public void update(Connection conn, long id, long runid, String name, String version, String serviceIP, int servicePort, Date lastUpdate) { + public void invalidateRunSession(long id, long runid) { + Transaction txn = Transaction.currentTxn(); PreparedStatement pstmt = null; try { - pstmt = conn.prepareStatement("update mshost set name=?, version=?, service_ip=?, service_port=?, last_update=?, removed=null, alert_count=0, runid=? where id=?"); - pstmt.setString(1, name); - pstmt.setString(2, version); - pstmt.setString(3, serviceIP); - pstmt.setInt(4, servicePort); - pstmt.setString(5, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), lastUpdate)); - pstmt.setLong(6, runid); - pstmt.setLong(7, id); - - pstmt.executeUpdate(); - conn.commit(); - } catch(SQLException e ) { - throw new CloudRuntimeException("DB exception on " + pstmt.toString(), e); - } finally { - if(pstmt != null) { - try { - pstmt.close(); - } catch(Exception e) { - s_logger.warn("Unable to close prepared statement due to exception ", e); - } - } - } - } - - @Override - public void update(Connection conn, long id, long runid, Date lastUpdate) { - PreparedStatement pstmt = null; - try { - pstmt = conn.prepareStatement("update mshost set last_update=?, removed=null, alert_count=0 where id=? and runid=?"); - pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), lastUpdate)); - pstmt.setLong(2, id); - pstmt.setLong(3, runid); - - int count = pstmt.executeUpdate(); - conn.commit(); - - if(count < 1) { - throw new CloudRuntimeException("Invalid cluster session detected", new ClusterInvalidSessionException("runid " + runid + " is no longer valid")); - } - } catch (SQLException e) { - throw new CloudRuntimeException("DB exception on " + pstmt.toString(), e); - } finally { - if(pstmt != null) { - try { - pstmt.close(); - } catch(Exception e) { - s_logger.warn("Unable to close prepared statement due to exception ", e); - } - } - } - } - - @Override - public void invalidateRunSession(Connection conn, long id, long runid) { - PreparedStatement pstmt = null; - try { - pstmt = conn.prepareStatement("update mshost set runid=0, state='Down' where id=? and runid=?"); + pstmt = txn.prepareAutoCloseStatement("update mshost set runid=0, state='Down' where id=? and runid=?"); pstmt.setLong(1, id); pstmt.setLong(2, runid); pstmt.executeUpdate(); - conn.commit(); } catch (SQLException e) { throw new CloudRuntimeException("DB exception on " + pstmt.toString(), e); - } finally { - if(pstmt != null) { - try { - pstmt.close(); - } catch(Exception e) { - s_logger.warn("Unable to close prepared statement due to exception ", e); - } - } } } - @Override - public List getActiveList(Connection conn, Date cutTime) { - Transaction txn = Transaction.openNew("getActiveList", conn); - try { - SearchCriteria sc = ActiveSearch.create(); - sc.setParameters("lastUpdateTime", cutTime); - - return listIncludingRemovedBy(sc); - } finally { - txn.close("getActiveList"); - } - } - - @Override - public List getInactiveList(Connection conn, Date cutTime) { - Transaction txn = Transaction.openNew("getInactiveList", conn); - try { - SearchCriteria sc = InactiveSearch.create(); - sc.setParameters("lastUpdateTime", cutTime); - - return listIncludingRemovedBy(sc); - } finally { - txn.close("getInactiveList"); - } - } - @Override public ManagementServerHostVO findByMsid(long msid) { SearchCriteria sc = MsIdSearch.create(); @@ -190,7 +99,6 @@ public class ManagementServerHostDaoImpl extends GenericDaoBase implements Gene return createForUpdate(null); } - @Override + @Override @DB(txn=false) public K getNextInSequence(final Class clazz, final String name) { final TableGenerator tg = _tgs.get(name); assert (tg != null) : "Couldn't find Table generator using " + name; @@ -377,7 +377,7 @@ public abstract class GenericDaoBase implements Gene } } - @Override @SuppressWarnings("unchecked") @DB + @Override @SuppressWarnings("unchecked") public List customSearchIncludingRemoved(SearchCriteria sc, final Filter filter) { String clause = sc != null ? sc.getWhereClause() : null; if (clause != null && clause.length() == 0) { @@ -733,7 +733,6 @@ public abstract class GenericDaoBase implements Gene return rowsUpdated; } - // @Override public int update(UpdateBuilder ub, final SearchCriteria sc, Integer rows) { StringBuilder sql = null; PreparedStatement pstmt = null; diff --git a/utils/src/com/cloud/utils/db/Transaction.java b/utils/src/com/cloud/utils/db/Transaction.java index c2a3d6d4806..c4fdd50fa2e 100755 --- a/utils/src/com/cloud/utils/db/Transaction.java +++ b/utils/src/com/cloud/utils/db/Transaction.java @@ -97,7 +97,7 @@ public class Transaction { private String _name; private Connection _conn; private boolean _txn; - private final short _dbId; + private short _dbId; private long _txnTime; private Statement _stmt; private String _creator; @@ -125,17 +125,16 @@ public class Transaction { // transaction context in the stack. It is used in special use cases that we want to control DB connection explicitly and in the mean time utilize // the existing DAO features // - public static Transaction openNew(final String name, Connection conn) { - assert(conn != null); - Transaction txn = new Transaction(name, false, CONNECTED_DB); - txn._conn = conn; - txn._prev = tls.get(); - tls.set(txn); - - txn.takeOver(name, true); - s_logger.debug("Registering txn" + txn.getId()); - s_mbean.addTransaction(txn); - return txn; + public void transitToUserManagedConnection(Connection conn) { + assert(_conn == null && _stack.size() <= 1) : "Can't change to a user managed connection unless the stack is empty and the db connection is null: " + toString(); + _conn = conn; + _dbId = CONNECTED_DB; + } + + public void transitToAutoManagedConnection(short dbId) { + assert(_stack.size() == 0) : "Can't change to auto managed connection unless your stack is empty"; + _dbId = dbId; + _conn = null; } public static Transaction open(final String name) { @@ -166,7 +165,6 @@ public class Transaction { txn.takeOver(name, false); if (isNew) { - s_logger.debug("Registering txn" + txn.getId()); s_mbean.addTransaction(txn); } return txn; @@ -611,7 +609,6 @@ public class Transaction { if(this._dbId == CONNECTED_DB) { tls.set(_prev); _prev = null; - s_logger.debug("Unregistering txn" + getId()); s_mbean.removeTransaction(this); } } diff --git a/utils/src/com/cloud/utils/db/TransactionMBean.java b/utils/src/com/cloud/utils/db/TransactionMBean.java index 25925dbe1ca..f36c46bfb2d 100644 --- a/utils/src/com/cloud/utils/db/TransactionMBean.java +++ b/utils/src/com/cloud/utils/db/TransactionMBean.java @@ -18,14 +18,17 @@ package com.cloud.utils.db; import java.util.List; +import java.util.Map; public interface TransactionMBean { int getTransactionCount(); - int getActiveTransactionCount(); + int[] getActiveTransactionCount(); - List getTransactions(); + List> getTransactions(); - List getActiveTransactions(); + List> getActiveTransactions(); + + List> getTransactionsWithDatabaseConnection(); } diff --git a/utils/src/com/cloud/utils/db/TransactionMBeanImpl.java b/utils/src/com/cloud/utils/db/TransactionMBeanImpl.java index fdff868dbc6..9aca7b99176 100644 --- a/utils/src/com/cloud/utils/db/TransactionMBeanImpl.java +++ b/utils/src/com/cloud/utils/db/TransactionMBeanImpl.java @@ -17,7 +17,9 @@ */ package com.cloud.utils.db; +import java.sql.Connection; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -48,49 +50,65 @@ public class TransactionMBeanImpl extends StandardMBean implements TransactionMB } @Override - public int getActiveTransactionCount() { - int count = 0; + public int[] getActiveTransactionCount() { + int[] count = new int[2]; + count[0] = 0; + count[1] = 0; for (Transaction txn : _txns.values()) { if (txn.getStack().size() > 0) { - count++; + count[0]++; + } + if (txn.getCurrentConnection() != null) { + count[1]++; } } return count; } @Override - public List getTransactions() { - ArrayList txns = new ArrayList(); + public List> getTransactions() { + ArrayList> txns = new ArrayList>(); for (Transaction info : _txns.values()) { - txns.add(toString(info)); + txns.add(toMap(info)); } return txns; } @Override - public List getActiveTransactions() { - ArrayList txns = new ArrayList(); + public List> getActiveTransactions() { + ArrayList> txns = new ArrayList>(); for (Transaction txn : _txns.values()) { if (txn.getStack().size() > 0 || txn.getCurrentConnection() != null) { - txns.add(toString(txn)); + txns.add(toMap(txn)); } } return txns; } - protected String toString(Transaction txn) { - StringBuilder buff = new StringBuilder("[Name="); - buff.append(txn.getName()); - buff.append("; Creator="); - buff.append(txn.getCreator()); - buff.append("; DB="); - buff.append(txn.getCurrentConnection()); - buff.append("; Stack="); + protected Map toMap(Transaction txn) { + Map map = new HashMap(); + map.put("name", txn.getName()); + map.put("id", Long.toString(txn.getId())); + map.put("creator", txn.getCreator()); + Connection conn = txn.getCurrentConnection(); + map.put("db", conn != null ? Integer.toString(System.identityHashCode(conn)) : "none"); + StringBuilder buff = new StringBuilder(); for (StackElement element : txn.getStack()) { - buff.append(",").append(element.toString()); + buff.append(element.toString()).append(","); } - buff.append("]"); + map.put("stack", buff.toString()); - return buff.toString(); + return map; + } + + @Override + public List> getTransactionsWithDatabaseConnection() { + ArrayList> txns = new ArrayList>(); + for (Transaction txn : _txns.values()) { + if (txn.getCurrentConnection() != null) { + txns.add(toMap(txn)); + } + } + return txns; } } \ No newline at end of file