diff --git a/server/src/com/cloud/agent/manager/AgentMonitor.java b/server/src/com/cloud/agent/manager/AgentMonitor.java index f9aba66ee1f..8b7f455dc07 100755 --- a/server/src/com/cloud/agent/manager/AgentMonitor.java +++ b/server/src/com/cloud/agent/manager/AgentMonitor.java @@ -18,6 +18,7 @@ package com.cloud.agent.manager; import java.sql.Connection; +import java.sql.SQLException; import java.util.List; import org.apache.log4j.Logger; @@ -59,13 +60,13 @@ public class AgentMonitor extends Thread implements Listener { private AlertManager _alertMgr; private long _msId; private ConnectionConcierge _concierge; - + protected AgentMonitor() { } - + public AgentMonitor(long msId, HostDao hostDao, VMInstanceDao vmDao, DataCenterDao dcDao, HostPodDao podDao, AgentManagerImpl agentMgr, AlertManager alertMgr, long pingTimeout) { - super("AgentMonitor"); - _msId = msId; + super("AgentMonitor"); + _msId = msId; _pingTimeout = pingTimeout; _hostDao = hostDao; _agentMgr = agentMgr; @@ -74,19 +75,21 @@ public class AgentMonitor extends Thread implements Listener { _dcDao = dcDao; _podDao = podDao; _alertMgr = alertMgr; - Connection conn = Transaction.getStandaloneConnection(); - if (conn == null) { - throw new CloudRuntimeException("Unable to get a db connection."); + try { + Connection conn = Transaction.getStandaloneConnectionWithException(); + conn.setAutoCommit(true); + conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); + _concierge = new ConnectionConcierge("AgentMonitor", conn, true); + } catch (SQLException e) { + throw new CloudRuntimeException("Cannot get connection", e); } - - _concierge = new ConnectionConcierge("AgentMonitor", conn, true, true); } - + // TODO : use host machine time is not safe in clustering environment @Override - public void run() { + public void run() { s_logger.info("Agent Monitor is started."); - + while (!_stop) { try { // check every 60 seconds @@ -94,41 +97,41 @@ public class AgentMonitor extends Thread implements Listener { } catch (InterruptedException e) { s_logger.info("Who woke me from my slumber?"); } - - GlobalLock lock = GlobalLock.getInternLock("AgentMonitorLock"); - if (lock == null) { - s_logger.error("Unable to acquire lock. Better luck next time?"); - continue; - } - - if (!lock.lock(10)) { - s_logger.info("Someone else is already working on the agents. Skipping my turn"); - continue; - } - + + GlobalLock lock = GlobalLock.getInternLock("AgentMonitorLock"); + if (lock == null) { + s_logger.error("Unable to acquire lock. Better luck next time?"); + continue; + } + + if (!lock.lock(10)) { + s_logger.info("Someone else is already working on the agents. Skipping my turn"); + continue; + } + try { long time = (System.currentTimeMillis() >> 10) - _pingTimeout; List hosts = _hostDao.findLostHosts(time); if (s_logger.isInfoEnabled()) { s_logger.info("Found " + hosts.size() + " hosts behind on ping. pingTimeout : " + _pingTimeout + ", mark time : " + time); } - + for (HostVO host : hosts) { - if (host.getType().equals(Host.Type.ExternalFirewall) || - host.getType().equals(Host.Type.ExternalLoadBalancer) || - host.getType().equals(Host.Type.TrafficMonitor) || - host.getType().equals(Host.Type.SecondaryStorage)) { - continue; - } - - if (host.getManagementServerId() == null || host.getManagementServerId() == _msId) { - if (s_logger.isInfoEnabled()) { - s_logger.info("Asking agent mgr to investgate why host " + host.getId() + " is behind on ping. last ping time: " + host.getLastPinged()); - } - _agentMgr.disconnect(host.getId(), Event.PingTimeout, true); - } + if (host.getType().equals(Host.Type.ExternalFirewall) || + host.getType().equals(Host.Type.ExternalLoadBalancer) || + host.getType().equals(Host.Type.TrafficMonitor) || + host.getType().equals(Host.Type.SecondaryStorage)) { + continue; + } + + if (host.getManagementServerId() == null || host.getManagementServerId() == _msId) { + if (s_logger.isInfoEnabled()) { + s_logger.info("Asking agent mgr to investgate why host " + host.getId() + " is behind on ping. last ping time: " + host.getLastPinged()); + } + _agentMgr.disconnect(host.getId(), Event.PingTimeout, true); + } } - + hosts = _hostDao.listByStatus(Status.PrepareForMaintenance, Status.ErrorInMaintenance); for (HostVO host : hosts) { long hostId = host.getId(); @@ -147,13 +150,13 @@ public class AgentMonitor extends Thread implements Listener { } catch (Throwable th) { s_logger.error("Caught the following exception: ", th); } finally { - lock.unlock(); + lock.unlock(); } } - + s_logger.info("Agent Monitor is leaving the building!"); } - + public void signalStop() { _stop = true; interrupt(); @@ -193,10 +196,10 @@ public class AgentMonitor extends Thread implements Listener { } return processed; } - + @Override public AgentControlAnswer processControlCommand(long agentId, AgentControlCommand cmd) { - return null; + return null; } @Override @@ -207,15 +210,15 @@ public class AgentMonitor extends Thread implements Listener { public boolean processDisconnect(long agentId, Status state) { return true; } - + @Override public boolean processTimeout(long agentId, long seq) { - return true; + return true; } - + @Override public int getTimeout() { - return -1; + return -1; } - + } diff --git a/server/src/com/cloud/cluster/ClusterManagerImpl.java b/server/src/com/cloud/cluster/ClusterManagerImpl.java index c0d6793791e..c765cf6cbb3 100644 --- a/server/src/com/cloud/cluster/ClusterManagerImpl.java +++ b/server/src/com/cloud/cluster/ClusterManagerImpl.java @@ -477,14 +477,14 @@ public class ClusterManagerImpl implements ClusterManager { } public void notifyNodeJoined(List nodeList) { - if(s_logger.isDebugEnabled()) { - s_logger.debug("Notify management server node join to listeners."); - - for(ManagementServerHostVO mshost : nodeList) { - s_logger.debug("Joining node, IP: " + mshost.getServiceIP() + ", msid: " + mshost.getMsid()); - } - } - + if(s_logger.isDebugEnabled()) { + s_logger.debug("Notify management server node join to listeners."); + + for(ManagementServerHostVO mshost : nodeList) { + s_logger.debug("Joining node, IP: " + mshost.getServiceIP() + ", msid: " + mshost.getMsid()); + } + } + synchronized(listeners) { for(ClusterManagerListener listener : listeners) { listener.onManagementNodeJoined(nodeList, _mshostId); @@ -496,14 +496,14 @@ public class ClusterManagerImpl implements ClusterManager { } public void notifyNodeLeft(List nodeList) { - if(s_logger.isDebugEnabled()) { - s_logger.debug("Notify management server node left to listeners."); - - for(ManagementServerHostVO mshost : nodeList) { - s_logger.debug("Leaving node, IP: " + mshost.getServiceIP() + ", msid: " + mshost.getMsid()); - } - } - + if(s_logger.isDebugEnabled()) { + s_logger.debug("Notify management server node left to listeners."); + + for(ManagementServerHostVO mshost : nodeList) { + s_logger.debug("Leaving node, IP: " + mshost.getServiceIP() + ", msid: " + mshost.getMsid()); + } + } + synchronized(listeners) { for(ClusterManagerListener listener : listeners) { listener.onManagementNodeLeft(nodeList, _mshostId); @@ -515,9 +515,9 @@ public class ClusterManagerImpl implements ClusterManager { } public void notifyNodeIsolated() { - if(s_logger.isDebugEnabled()) - s_logger.debug("Notify management server node isolation to listeners"); - + if(s_logger.isDebugEnabled()) + s_logger.debug("Notify management server node isolation to listeners"); + synchronized(listeners) { for(ClusterManagerListener listener : listeners) { listener.onManagementNodeIsolated(); @@ -673,7 +673,7 @@ public class ClusterManagerImpl implements ClusterManager { private Connection getHeartbeatConnection() throws SQLException { if(_heartbeatConnection == null) { Connection conn = Transaction.getStandaloneConnectionWithException(); - _heartbeatConnection = new ConnectionConcierge("ClusterManagerHeartBeat", conn, false, false); + _heartbeatConnection = new ConnectionConcierge("ClusterManagerHeartBeat", conn, false); } return _heartbeatConnection.conn(); @@ -969,11 +969,10 @@ public class ClusterManagerImpl implements ClusterManager { if (s_logger.isInfoEnabled()) { s_logger.info("Management server (host id : " + _mshostId + ") is being started at " + _clusterNodeIP + ":" + _currentServiceAdapter.getServicePort()); } - + // use seperate thread for heartbeat updates _heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), heartbeatInterval, heartbeatInterval, TimeUnit.MILLISECONDS); _notificationExecutor.submit(getNotificationTask()); - } catch (Throwable e) { s_logger.error("Unexpected exception : ", e); @@ -1093,8 +1092,8 @@ public class ClusterManagerImpl implements ClusterManager { if(_currentServiceAdapter == null) { throw new ConfigurationException("Unable to set current cluster service adapter"); } - - + + _agentLBEnabled = Boolean.valueOf(configDao.getValue(Config.AgentLbEnable.key())); String connectedAgentsThreshold = configs.get("agent.load.threshold"); @@ -1225,7 +1224,7 @@ public class ClusterManagerImpl implements ClusterManager { public boolean rebalanceAgent(long agentId, Event event, long currentOwnerId, long futureOwnerId) throws AgentUnavailableException, OperationTimedoutException { return _rebalanceService.executeRebalanceRequest(agentId, currentOwnerId, futureOwnerId, event); } - + @Override public boolean isAgentRebalanceEnabled() { return _agentLBEnabled; diff --git a/utils/src/com/cloud/utils/db/ConnectionConcierge.java b/utils/src/com/cloud/utils/db/ConnectionConcierge.java index 08b8336b76f..7478e535db8 100644 --- a/utils/src/com/cloud/utils/db/ConnectionConcierge.java +++ b/utils/src/com/cloud/utils/db/ConnectionConcierge.java @@ -48,19 +48,19 @@ import com.cloud.utils.mgmt.JmxUtil; * your own. */ public class ConnectionConcierge { - + static final Logger s_logger = Logger.getLogger(ConnectionConcierge.class); - + static final ConnectionConciergeManager s_mgr = new ConnectionConciergeManager(); - + Connection _conn; String _name; boolean _keepAlive; boolean _autoCommit; int _isolationLevel; int _holdability; - - public ConnectionConcierge(String name, Connection conn, boolean autoCommit, boolean keepAlive) { + + public ConnectionConcierge(String name, Connection conn, boolean keepAlive) { _name = name + s_mgr.getNextId(); _keepAlive = keepAlive; try { @@ -72,7 +72,7 @@ public class ConnectionConcierge { } reset(conn); } - + public void reset(Connection conn) { try { release(); @@ -90,11 +90,11 @@ public class ConnectionConcierge { s_mgr.register(_name, this); s_logger.debug("Registering a database connection for " + _name); } - + public final Connection conn() { return _conn; } - + public void release() { s_mgr.unregister(_name); try { @@ -106,23 +106,23 @@ public class ConnectionConcierge { throw new CloudRuntimeException("Problem in closing a connection", e); } } - + @Override protected void finalize() throws Exception { if (_conn != null) { release(); } } - + public boolean keepAlive() { return _keepAlive; } - + protected static class ConnectionConciergeManager extends StandardMBean implements ConnectionConciergeMBean { ScheduledExecutorService _executor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("ConnectionKeeper")); final ConcurrentHashMap _conns = new ConcurrentHashMap(); final AtomicInteger _idGenerator = new AtomicInteger(); - + ConnectionConciergeManager() { super(ConnectionConciergeMBean.class, false); resetKeepAliveTask(20); @@ -132,19 +132,19 @@ public class ConnectionConcierge { s_logger.error("Unable to register mbean", e); } } - + public Integer getNextId() { return _idGenerator.incrementAndGet(); } - + public void register(String name, ConnectionConcierge concierge) { _conns.put(name, concierge); } - + public void unregister(String name) { _conns.remove(name); } - + protected String testValidity(String name, Connection conn) { PreparedStatement pstmt = null; try { @@ -182,12 +182,12 @@ public class ConnectionConcierge { if (concierge == null) { return "Not Found"; } - + Connection conn = Transaction.getStandaloneConnection(); if (conn == null) { return "Unable to get anotehr db connection"; } - + concierge.reset(conn); return "Done"; } @@ -201,7 +201,7 @@ public class ConnectionConcierge { s_logger.error("Unable to shutdown executor", e); } } - + _executor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("ConnectionConcierge")); _executor.schedule(new Runnable() { @Override @@ -215,7 +215,7 @@ public class ConnectionConcierge { } } }, seconds, TimeUnit.SECONDS); - + return "As you wish."; } diff --git a/utils/src/com/cloud/utils/db/Merovingian2.java b/utils/src/com/cloud/utils/db/Merovingian2.java index eeb1e99637c..79c955a593e 100644 --- a/utils/src/com/cloud/utils/db/Merovingian2.java +++ b/utils/src/com/cloud/utils/db/Merovingian2.java @@ -55,7 +55,7 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean { TimeZone s_gmtTimeZone = TimeZone.getTimeZone("GMT"); - private long _msId; + private final long _msId; private static Merovingian2 s_instance = null; ConnectionConcierge _concierge = null; @@ -67,7 +67,8 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean { try { conn = Transaction.getStandaloneConnectionWithException(); conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); - _concierge = new ConnectionConcierge("LockMaster", conn, true, true); + conn.setAutoCommit(true); + _concierge = new ConnectionConcierge("LockMaster", conn, false); } catch (SQLException e) { s_logger.error("Unable to get a new db connection", e); throw new CloudRuntimeException("Unable to initialize a connection to the database for locking purposes: ", e);