diff --git a/server/src/com/cloud/agent/manager/AgentMonitor.java b/server/src/com/cloud/agent/manager/AgentMonitor.java deleted file mode 100755 index c439b6fb60e..00000000000 --- a/server/src/com/cloud/agent/manager/AgentMonitor.java +++ /dev/null @@ -1,201 +0,0 @@ -/** - * Copyright (C) 2010 Cloud.com, Inc. All rights reserved. - * - * This software is licensed under the GNU General Public License v3 or later. - * - * It is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or any later version. - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - * - */ -package com.cloud.agent.manager; - -import java.util.List; - -import org.apache.log4j.Logger; - -import com.cloud.agent.Listener; -import com.cloud.agent.api.AgentControlAnswer; -import com.cloud.agent.api.AgentControlCommand; -import com.cloud.agent.api.Answer; -import com.cloud.agent.api.Command; -import com.cloud.agent.api.PingCommand; -import com.cloud.agent.api.StartupCommand; -import com.cloud.alert.AlertManager; -import com.cloud.dc.DataCenterVO; -import com.cloud.dc.HostPodVO; -import com.cloud.dc.dao.DataCenterDao; -import com.cloud.dc.dao.HostPodDao; -import com.cloud.host.Host; -import com.cloud.host.HostVO; -import com.cloud.host.Status; -import com.cloud.host.Status.Event; -import com.cloud.host.dao.HostDao; -import com.cloud.utils.db.GlobalLock; -import com.cloud.vm.VMInstanceVO; -import com.cloud.vm.dao.VMInstanceDao; - -public class AgentMonitor extends Thread implements Listener { - private static Logger s_logger = Logger.getLogger(AgentMonitor.class); - private final long _pingTimeout; - private final HostDao _hostDao; - private boolean _stop; - private final AgentManagerImpl _agentMgr; - private final VMInstanceDao _vmDao; - private DataCenterDao _dcDao = null; - private HostPodDao _podDao = null; - private final AlertManager _alertMgr; - private final long _msId; - - public AgentMonitor(long msId, HostDao hostDao, VMInstanceDao vmDao, DataCenterDao dcDao, HostPodDao podDao, AgentManagerImpl agentMgr, AlertManager alertMgr, long pingTimeout) { - super("AgentMonitor"); - _msId = msId; - _pingTimeout = pingTimeout; - _hostDao = hostDao; - _agentMgr = agentMgr; - _stop = false; - _vmDao = vmDao; - _dcDao = dcDao; - _podDao = podDao; - _alertMgr = alertMgr; - } - - // TODO : use host machine time is not safe in clustering environment - @Override - public void run() { - s_logger.info("Agent Monitor is started."); - - while (!_stop) { - try { - // check every 60 seconds - Thread.sleep(60 * 1000); - } catch (InterruptedException e) { - s_logger.info("Who woke me from my slumber?"); - } - - GlobalLock lock = GlobalLock.getInternLock("AgentMonitorLock"); - if (lock == null) { - s_logger.error("Unable to acquire lock. Better luck next time?"); - continue; - } - - if (!lock.lock(10)) { - s_logger.info("Someone else is already working on the agents. Skipping my turn"); - continue; - } - - try { - long time = (System.currentTimeMillis() >> 10) - _pingTimeout; - List hosts = _hostDao.findLostHosts(time); - if (s_logger.isInfoEnabled()) { - s_logger.info("Found " + hosts.size() + " hosts behind on ping. pingTimeout : " + _pingTimeout + ", mark time : " + time); - } - - for (HostVO host : hosts) { - if (host.getType().equals(Host.Type.ExternalFirewall) || - host.getType().equals(Host.Type.ExternalLoadBalancer) || - host.getType().equals(Host.Type.TrafficMonitor) || - host.getType().equals(Host.Type.SecondaryStorage)) { - continue; - } - - if (host.getManagementServerId() == null || host.getManagementServerId() == _msId) { - if (s_logger.isInfoEnabled()) { - s_logger.info("Asking agent mgr to investgate why host " + host.getId() + " is behind on ping. last ping time: " + host.getLastPinged()); - } - _agentMgr.disconnect(host.getId(), Event.PingTimeout, true); - } - } - - hosts = _hostDao.listByStatus(Status.PrepareForMaintenance, Status.ErrorInMaintenance); - for (HostVO host : hosts) { - long hostId = host.getId(); - DataCenterVO dcVO = _dcDao.findById(host.getDataCenterId()); - HostPodVO podVO = _podDao.findById(host.getPodId()); - String hostDesc = "name: " + host.getName() + " (id:" + host.getId() + "), availability zone: " + dcVO.getName() + ", pod: " + podVO.getName(); - - if (host.getType() != Host.Type.Storage) { - List vos = _vmDao.listByHostId(host.getId()); - if (vos.size() == 0) { - _alertMgr.sendAlert(AlertManager.ALERT_TYPE_HOST, host.getDataCenterId(), host.getPodId(), "Migration Complete for host " + hostDesc, "Host [" + hostDesc + "] is ready for maintenance"); - _hostDao.updateStatus(host, Event.PreparationComplete, _msId); - } - } - } - } catch (Throwable th) { - s_logger.error("Caught the following exception: ", th); - } finally { - lock.unlock(); - } - } - - s_logger.info("Agent Monitor is leaving the building!"); - } - - public void signalStop() { - _stop = true; - interrupt(); - } - - @Override - public boolean isRecurring() { - return true; - } - - @Override - public boolean processAnswers(long agentId, long seq, Answer[] answers) { - return false; - } - - @Override - public boolean processCommands(long agentId, long seq, Command[] commands) { - boolean processed = false; - for (Command cmd : commands) { - if (cmd instanceof PingCommand) { - HostVO host = _hostDao.findById(agentId); - if( host == null ) { - if (s_logger.isDebugEnabled()) { - s_logger.debug("Cant not find host " + agentId); - } - } else { - _hostDao.updateStatus(host, Event.Ping, _msId); - } - processed = true; - } - } - return processed; - } - - @Override - public AgentControlAnswer processControlCommand(long agentId, AgentControlCommand cmd) { - return null; - } - - @Override - public void processConnect(HostVO host, StartupCommand cmd, boolean forRebalance) { - s_logger.debug("Registering agent monitor for " + host.getId()); - } - - @Override - public boolean processDisconnect(long agentId, Status state) { - return true; - } - - @Override - public boolean processTimeout(long agentId, long seq) { - return true; - } - - @Override - public int getTimeout() { - return -1; - } - -} diff --git a/server/src/com/cloud/cluster/ClusterManagerImpl.java b/server/src/com/cloud/cluster/ClusterManagerImpl.java index 24a006a2f28..3e4140aba42 100644 --- a/server/src/com/cloud/cluster/ClusterManagerImpl.java +++ b/server/src/com/cloud/cluster/ClusterManagerImpl.java @@ -477,14 +477,14 @@ public class ClusterManagerImpl implements ClusterManager { } public void notifyNodeJoined(List nodeList) { - if(s_logger.isDebugEnabled()) { - s_logger.debug("Notify management server node join to listeners."); - - for(ManagementServerHostVO mshost : nodeList) { - s_logger.debug("Joining node, IP: " + mshost.getServiceIP() + ", msid: " + mshost.getMsid()); - } - } - + if(s_logger.isDebugEnabled()) { + s_logger.debug("Notify management server node join to listeners."); + + for(ManagementServerHostVO mshost : nodeList) { + s_logger.debug("Joining node, IP: " + mshost.getServiceIP() + ", msid: " + mshost.getMsid()); + } + } + synchronized(listeners) { for(ClusterManagerListener listener : listeners) { listener.onManagementNodeJoined(nodeList, _mshostId); @@ -496,14 +496,14 @@ public class ClusterManagerImpl implements ClusterManager { } public void notifyNodeLeft(List nodeList) { - if(s_logger.isDebugEnabled()) { - s_logger.debug("Notify management server node left to listeners."); - - for(ManagementServerHostVO mshost : nodeList) { - s_logger.debug("Leaving node, IP: " + mshost.getServiceIP() + ", msid: " + mshost.getMsid()); - } - } - + if(s_logger.isDebugEnabled()) { + s_logger.debug("Notify management server node left to listeners."); + + for(ManagementServerHostVO mshost : nodeList) { + s_logger.debug("Leaving node, IP: " + mshost.getServiceIP() + ", msid: " + mshost.getMsid()); + } + } + synchronized(listeners) { for(ClusterManagerListener listener : listeners) { listener.onManagementNodeLeft(nodeList, _mshostId); @@ -515,9 +515,9 @@ public class ClusterManagerImpl implements ClusterManager { } public void notifyNodeIsolated() { - if(s_logger.isDebugEnabled()) - s_logger.debug("Notify management server node isolation to listeners"); - + if(s_logger.isDebugEnabled()) + s_logger.debug("Notify management server node isolation to listeners"); + synchronized(listeners) { for(ClusterManagerListener listener : listeners) { listener.onManagementNodeIsolated(); @@ -673,7 +673,7 @@ public class ClusterManagerImpl implements ClusterManager { private Connection getHeartbeatConnection() throws SQLException { if(_heartbeatConnection == null) { Connection conn = Transaction.getStandaloneConnectionWithException(); - _heartbeatConnection = new ConnectionConcierge("ClusterManagerHeartBeat", conn, false, false); + _heartbeatConnection = new ConnectionConcierge("ClusterManagerHeartBeat", conn, false); } return _heartbeatConnection.conn(); @@ -955,11 +955,10 @@ public class ClusterManagerImpl implements ClusterManager { if (s_logger.isInfoEnabled()) { s_logger.info("Management server (host id : " + _mshostId + ") is being started at " + _clusterNodeIP + ":" + _currentServiceAdapter.getServicePort()); } - + // use seperate thread for heartbeat updates _heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), heartbeatInterval, heartbeatInterval, TimeUnit.MILLISECONDS); _notificationExecutor.submit(getNotificationTask()); - } catch (Throwable e) { s_logger.error("Unexpected exception : ", e); @@ -1079,8 +1078,8 @@ public class ClusterManagerImpl implements ClusterManager { if(_currentServiceAdapter == null) { throw new ConfigurationException("Unable to set current cluster service adapter"); } - - + + _agentLBEnabled = Boolean.valueOf(configDao.getValue(Config.AgentLbEnable.key())); String connectedAgentsThreshold = configs.get("agent.load.threshold"); @@ -1211,7 +1210,7 @@ public class ClusterManagerImpl implements ClusterManager { public boolean rebalanceAgent(long agentId, Event event, long currentOwnerId, long futureOwnerId) throws AgentUnavailableException, OperationTimedoutException { return _rebalanceService.executeRebalanceRequest(agentId, currentOwnerId, futureOwnerId, event); } - + @Override public boolean isAgentRebalanceEnabled() { return _agentLBEnabled; diff --git a/utils/src/com/cloud/utils/db/ConnectionConcierge.java b/utils/src/com/cloud/utils/db/ConnectionConcierge.java index 08b8336b76f..7478e535db8 100644 --- a/utils/src/com/cloud/utils/db/ConnectionConcierge.java +++ b/utils/src/com/cloud/utils/db/ConnectionConcierge.java @@ -48,19 +48,19 @@ import com.cloud.utils.mgmt.JmxUtil; * your own. */ public class ConnectionConcierge { - + static final Logger s_logger = Logger.getLogger(ConnectionConcierge.class); - + static final ConnectionConciergeManager s_mgr = new ConnectionConciergeManager(); - + Connection _conn; String _name; boolean _keepAlive; boolean _autoCommit; int _isolationLevel; int _holdability; - - public ConnectionConcierge(String name, Connection conn, boolean autoCommit, boolean keepAlive) { + + public ConnectionConcierge(String name, Connection conn, boolean keepAlive) { _name = name + s_mgr.getNextId(); _keepAlive = keepAlive; try { @@ -72,7 +72,7 @@ public class ConnectionConcierge { } reset(conn); } - + public void reset(Connection conn) { try { release(); @@ -90,11 +90,11 @@ public class ConnectionConcierge { s_mgr.register(_name, this); s_logger.debug("Registering a database connection for " + _name); } - + public final Connection conn() { return _conn; } - + public void release() { s_mgr.unregister(_name); try { @@ -106,23 +106,23 @@ public class ConnectionConcierge { throw new CloudRuntimeException("Problem in closing a connection", e); } } - + @Override protected void finalize() throws Exception { if (_conn != null) { release(); } } - + public boolean keepAlive() { return _keepAlive; } - + protected static class ConnectionConciergeManager extends StandardMBean implements ConnectionConciergeMBean { ScheduledExecutorService _executor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("ConnectionKeeper")); final ConcurrentHashMap _conns = new ConcurrentHashMap(); final AtomicInteger _idGenerator = new AtomicInteger(); - + ConnectionConciergeManager() { super(ConnectionConciergeMBean.class, false); resetKeepAliveTask(20); @@ -132,19 +132,19 @@ public class ConnectionConcierge { s_logger.error("Unable to register mbean", e); } } - + public Integer getNextId() { return _idGenerator.incrementAndGet(); } - + public void register(String name, ConnectionConcierge concierge) { _conns.put(name, concierge); } - + public void unregister(String name) { _conns.remove(name); } - + protected String testValidity(String name, Connection conn) { PreparedStatement pstmt = null; try { @@ -182,12 +182,12 @@ public class ConnectionConcierge { if (concierge == null) { return "Not Found"; } - + Connection conn = Transaction.getStandaloneConnection(); if (conn == null) { return "Unable to get anotehr db connection"; } - + concierge.reset(conn); return "Done"; } @@ -201,7 +201,7 @@ public class ConnectionConcierge { s_logger.error("Unable to shutdown executor", e); } } - + _executor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("ConnectionConcierge")); _executor.schedule(new Runnable() { @Override @@ -215,7 +215,7 @@ public class ConnectionConcierge { } } }, seconds, TimeUnit.SECONDS); - + return "As you wish."; } diff --git a/utils/src/com/cloud/utils/db/Merovingian2.java b/utils/src/com/cloud/utils/db/Merovingian2.java index a24375dab70..37d580d71de 100644 --- a/utils/src/com/cloud/utils/db/Merovingian2.java +++ b/utils/src/com/cloud/utils/db/Merovingian2.java @@ -66,7 +66,9 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean { Connection conn = null; try { conn = Transaction.getStandaloneConnectionWithException(); - _concierge = new ConnectionConcierge("LockMaster", conn, true, true); + conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); + conn.setAutoCommit(true); + _concierge = new ConnectionConcierge("LockMaster", conn, false); } catch (SQLException e) { s_logger.error("Unable to get a new db connection", e); throw new CloudRuntimeException("Unable to initialize a connection to the database for locking purposes: ", e);