diff --git a/server/src/com/cloud/cluster/ClusterManagerImpl.java b/server/src/com/cloud/cluster/ClusterManagerImpl.java index e45afea1062..76899196c33 100644 --- a/server/src/com/cloud/cluster/ClusterManagerImpl.java +++ b/server/src/com/cloud/cluster/ClusterManagerImpl.java @@ -170,9 +170,10 @@ public class ClusterManagerImpl implements ClusterManager { public Answer[] execute(String strPeer, long agentId, Command [] cmds, boolean stopOnError) { ClusterService peerService = null; - if(s_logger.isDebugEnabled()) - s_logger.debug(getSelfPeerName() + " -> " + strPeer + "." + agentId + " " + + if(s_logger.isDebugEnabled()) { + s_logger.debug(getSelfPeerName() + " -> " + strPeer + "." + agentId + " " + gson.toJson(cmds, Command[].class)); + } for(int i = 0; i < 2; i++) { try { @@ -183,14 +184,16 @@ public class ClusterManagerImpl implements ClusterManager { if(peerService != null) { try { - if(s_logger.isDebugEnabled()) - s_logger.debug("Send " + getSelfPeerName() + " -> " + strPeer + "." + agentId + " to remote"); + if(s_logger.isDebugEnabled()) { + s_logger.debug("Send " + getSelfPeerName() + " -> " + strPeer + "." + agentId + " to remote"); + } long startTick = System.currentTimeMillis(); String strResult = peerService.execute(getSelfPeerName(), agentId, gson.toJson(cmds, Command[].class), stopOnError); - if(s_logger.isDebugEnabled()) - s_logger.debug("Completed " + getSelfPeerName() + " -> " + strPeer + "." + agentId + "in " + + if(s_logger.isDebugEnabled()) { + s_logger.debug("Completed " + getSelfPeerName() + " -> " + strPeer + "." + agentId + "in " + (System.currentTimeMillis() - startTick) + " ms, result: " + strResult); + } if(strResult != null) { try { @@ -204,9 +207,10 @@ public class ClusterManagerImpl implements ClusterManager { } catch (RemoteException e) { invalidatePeerService(strPeer); - if(s_logger.isInfoEnabled()) - s_logger.info("Exception on remote execution, peer: " + strPeer + ", iteration: " + if(s_logger.isInfoEnabled()) { + s_logger.info("Exception on remote execution, peer: " + strPeer + ", iteration: " + i + ", exception message :" + e.getMessage()); + } } } } @@ -219,9 +223,10 @@ public class ClusterManagerImpl implements ClusterManager { ClusterService peerService = null; - if(s_logger.isDebugEnabled()) - s_logger.debug("Async " + getSelfPeerName() + " -> " + strPeer + "." + agentId + " " + + if(s_logger.isDebugEnabled()) { + s_logger.debug("Async " + getSelfPeerName() + " -> " + strPeer + "." + agentId + " " + gson.toJson(cmds, Command[].class)); + } for(int i = 0; i < 2; i++) { try { @@ -234,17 +239,19 @@ public class ClusterManagerImpl implements ClusterManager { try { long seq = 0; synchronized(String.valueOf(agentId).intern()) { - if(s_logger.isDebugEnabled()) - s_logger.debug("Send Async " + getSelfPeerName() + " -> " + strPeer + "." + agentId + " to remote"); + if(s_logger.isDebugEnabled()) { + s_logger.debug("Send Async " + getSelfPeerName() + " -> " + strPeer + "." + agentId + " to remote"); + } long startTick = System.currentTimeMillis(); seq = peerService.executeAsync(getSelfPeerName(), agentId, gson.toJson(cmds, Command[].class), stopOnError); if(seq > 0) { - if(s_logger.isDebugEnabled()) - s_logger.debug("Completed Async " + getSelfPeerName() + " -> " + strPeer + "." + agentId + if(s_logger.isDebugEnabled()) { + s_logger.debug("Completed Async " + getSelfPeerName() + " -> " + strPeer + "." + agentId + " in " + (System.currentTimeMillis() - startTick) + " ms" + ", register local listener " + strPeer + "/" + seq); + } registerAsyncCall(strPeer, seq, listener); } else { @@ -256,8 +263,9 @@ public class ClusterManagerImpl implements ClusterManager { } catch (RemoteException e) { invalidatePeerService(strPeer); - if(s_logger.isInfoEnabled()) - s_logger.info("Exception on remote execution -> " + strPeer + ", iteration : " + i); + if(s_logger.isInfoEnabled()) { + s_logger.info("Exception on remote execution -> " + strPeer + ", iteration : " + i); + } } } } @@ -268,9 +276,10 @@ public class ClusterManagerImpl implements ClusterManager { @Override public boolean onAsyncResult(String executingPeer, long agentId, long seq, Answer[] answers) { - if(s_logger.isDebugEnabled()) - s_logger.debug("Process Async-call result from remote peer " + executingPeer + ", {" + + if(s_logger.isDebugEnabled()) { + s_logger.debug("Process Async-call result from remote peer " + executingPeer + ", {" + agentId + "-" + seq + "} answers: " + (answers != null ? gson.toJson(answers, Answer[].class): "null")); + } Listener listener = null; synchronized(String.valueOf(agentId).intern()) { @@ -282,31 +291,36 @@ public class ClusterManagerImpl implements ClusterManager { if(listener != null) { long startTick = System.currentTimeMillis(); - if(s_logger.isDebugEnabled()) - s_logger.debug("Processing answer {" + agentId + "-" + seq + "} from remote peer " + executingPeer); + if(s_logger.isDebugEnabled()) { + s_logger.debug("Processing answer {" + agentId + "-" + seq + "} from remote peer " + executingPeer); + } listener.processAnswers(agentId, seq, answers); - if(s_logger.isDebugEnabled()) - s_logger.debug("Answer {" + agentId + "-" + seq + "} is processed in " + + if(s_logger.isDebugEnabled()) { + s_logger.debug("Answer {" + agentId + "-" + seq + "} is processed in " + (System.currentTimeMillis() - startTick) + " ms"); + } if(!listener.isRecurring()) { - if(s_logger.isDebugEnabled()) - s_logger.debug("Listener is not recurring after async-result callback {" + + if(s_logger.isDebugEnabled()) { + s_logger.debug("Listener is not recurring after async-result callback {" + agentId + "-" + seq + "}, unregister it"); + } unregisterAsyncCall(executingPeer, seq); } else { - if(s_logger.isDebugEnabled()) - s_logger.debug("Listener is recurring after async-result callback {" + agentId + if(s_logger.isDebugEnabled()) { + s_logger.debug("Listener is recurring after async-result callback {" + agentId +"-" + seq + "}, will keep it"); + } return true; } } else { - if(s_logger.isInfoEnabled()) - s_logger.info("Async-call Listener has not been registered yet for {" + agentId + if(s_logger.isInfoEnabled()) { + s_logger.info("Async-call Listener has not been registered yet for {" + agentId +"-" + seq + "}"); + } } return false; } @@ -314,9 +328,10 @@ public class ClusterManagerImpl implements ClusterManager { @Override public boolean forwardAnswer(String targetPeer, long agentId, long seq, Answer[] answers) { - if(s_logger.isDebugEnabled()) - s_logger.debug("Forward -> " + targetPeer + " Async-call answer {" + agentId + "-" + seq + + if(s_logger.isDebugEnabled()) { + s_logger.debug("Forward -> " + targetPeer + " Async-call answer {" + agentId + "-" + seq + "} " + (answers != null? gson.toJson(answers, Answer[].class):"")); + } final String targetPeerF = targetPeer; final Answer[] answersF = answers; @@ -337,14 +352,16 @@ public class ClusterManagerImpl implements ClusterManager { boolean result = false; long startTick = System.currentTimeMillis(); - if(s_logger.isDebugEnabled()) - s_logger.debug("Start forwarding Async-call answer {" + agentId + "-" + seq + "} to remote"); + if(s_logger.isDebugEnabled()) { + s_logger.debug("Start forwarding Async-call answer {" + agentId + "-" + seq + "} to remote"); + } result = peerService.onAsyncResult(getSelfPeerName(), agentIdF, seqF, gson.toJson(answersF, Answer[].class)); - if(s_logger.isDebugEnabled()) - s_logger.debug("Completed forwarding Async-call answer {" + agentId + "-" + seq + "} in " + + if(s_logger.isDebugEnabled()) { + s_logger.debug("Completed forwarding Async-call answer {" + agentId + "-" + seq + "} in " + (System.currentTimeMillis() - startTick) + " ms, return result: " + result); + } return result; } catch (RemoteException e) { @@ -365,8 +382,9 @@ public class ClusterManagerImpl implements ClusterManager { HostVO host = _hostDao.findById(agentHostId); if(host != null && host.getManagementServerId() != null) { - if(getSelfPeerName().equals(Long.toString(host.getManagementServerId()))) - return null; + if(getSelfPeerName().equals(Long.toString(host.getManagementServerId()))) { + return null; + } return Long.toString(host.getManagementServerId()); } @@ -427,8 +445,9 @@ public class ClusterManagerImpl implements ClusterManager { public ClusterService getPeerService(String strPeer) throws RemoteException { synchronized(clusterPeers) { - if(clusterPeers.containsKey(strPeer)) - return clusterPeers.get(strPeer); + if(clusterPeers.containsKey(strPeer)) { + return clusterPeers.get(strPeer); + } } ClusterService service = _currentServiceAdapter.getPeerService(strPeer); @@ -437,8 +456,9 @@ public class ClusterManagerImpl implements ClusterManager { synchronized(clusterPeers) { // re-check the peer map again to deal with the // race conditions - if(!clusterPeers.containsKey(strPeer)) - clusterPeers.put(strPeer, service); + if(!clusterPeers.containsKey(strPeer)) { + clusterPeers.put(strPeer, service); + } } } @@ -447,8 +467,9 @@ public class ClusterManagerImpl implements ClusterManager { public void invalidatePeerService(String strPeer) { synchronized(clusterPeers) { - if(clusterPeers.containsKey(strPeer)) - clusterPeers.remove(strPeer); + if(clusterPeers.containsKey(strPeer)) { + clusterPeers.remove(strPeer); + } } } @@ -456,8 +477,9 @@ public class ClusterManagerImpl implements ClusterManager { String key = strPeer + "/" + seq; synchronized(asyncCalls) { - if(!asyncCalls.containsKey(key)) - asyncCalls.put(key, listener); + if(!asyncCalls.containsKey(key)) { + asyncCalls.put(key, listener); + } } } @@ -465,8 +487,9 @@ public class ClusterManagerImpl implements ClusterManager { String key = strPeer + "/" + seq; synchronized(asyncCalls) { - if(asyncCalls.containsKey(key)) - return asyncCalls.get(key); + if(asyncCalls.containsKey(key)) { + return asyncCalls.get(key); + } } return null; @@ -476,8 +499,9 @@ public class ClusterManagerImpl implements ClusterManager { String key = strPeer + "/" + seq; synchronized(asyncCalls) { - if(asyncCalls.containsKey(key)) - asyncCalls.remove(key); + if(asyncCalls.containsKey(key)) { + asyncCalls.remove(key); + } } } @@ -485,11 +509,16 @@ public class ClusterManagerImpl implements ClusterManager { return new Runnable() { @Override public void run() { - if(s_logger.isTraceEnabled()) - s_logger.trace("Cluster manager heartbeat update, id:" + _mshostId); - - _mshostDao.update(_mshostId, DateUtil.currentGMTTime()); - peerScan(); + try { + if(s_logger.isTraceEnabled()) { + s_logger.trace("Cluster manager heartbeat update, id:" + _mshostId); + } + + _mshostDao.update(_mshostId, DateUtil.currentGMTTime()); + peerScan(); + } catch (Exception e) { + s_logger.error("Problem with the cluster heartbeat!", e); + } } }; } @@ -505,8 +534,9 @@ public class ClusterManagerImpl implements ClusterManager { for(Map.Entry entry : activePeers.entrySet()) { if(!isIdInList(entry.getKey(), currentList)) { if(entry.getKey().longValue() != _mshostId.longValue()) { - if(s_logger.isDebugEnabled()) - s_logger.debug("Detected management node left, id:" + entry.getKey() + ", nodeIP:" + entry.getValue().getServiceIP()); + if(s_logger.isDebugEnabled()) { + s_logger.debug("Detected management node left, id:" + entry.getKey() + ", nodeIP:" + entry.getValue().getServiceIP()); + } removedNodeList.add(entry.getValue()); } } @@ -522,23 +552,27 @@ public class ClusterManagerImpl implements ClusterManager { if(!activePeers.containsKey(mshost.getId())) { activePeers.put(mshost.getId(), mshost); - if(s_logger.isDebugEnabled()) - s_logger.debug("Detected management node joined, id:" + mshost.getId() + ", nodeIP:" + mshost.getServiceIP()); + if(s_logger.isDebugEnabled()) { + s_logger.debug("Detected management node joined, id:" + mshost.getId() + ", nodeIP:" + mshost.getServiceIP()); + } newNodeList.add(mshost); } } - if(newNodeList.size() > 0) - notifyNodeJoined(newNodeList); + if(newNodeList.size() > 0) { + notifyNodeJoined(newNodeList); + } - if(removedNodeList.size() > 0) - notifyNodeLeft(removedNodeList); + if(removedNodeList.size() > 0) { + notifyNodeLeft(removedNodeList); + } } private static boolean isIdInList(Long id, List l) { for(ManagementServerHostVO mshost : l) { - if(mshost.getId() != null && mshost.getId() == id) - return true; + if(mshost.getId() != null && mshost.getId() == id) { + return true; + } } return false; } @@ -550,8 +584,9 @@ public class ClusterManagerImpl implements ClusterManager { @Override @DB public boolean start() { - if(s_logger.isInfoEnabled()) - s_logger.info("Starting cluster manager, msid : " + _id); + if(s_logger.isInfoEnabled()) { + s_logger.info("Starting cluster manager, msid : " + _id); + } Transaction txn = Transaction.currentTxn(); try { @@ -574,11 +609,13 @@ public class ClusterManagerImpl implements ClusterManager { mshost.setAlertCount(0); _mshostDao.persist(mshost); - if(s_logger.isInfoEnabled()) - s_logger.info("New instance of management server msid " + _id + " is being started"); + if(s_logger.isInfoEnabled()) { + s_logger.info("New instance of management server msid " + _id + " is being started"); + } } else { - if(s_logger.isInfoEnabled()) - s_logger.info("Management server " + _id + " is being started"); + if(s_logger.isInfoEnabled()) { + s_logger.info("Management server " + _id + " is being started"); + } _mshostDao.update(mshost.getId(), NetUtils.getHostName(), version, _clusterNodeIP, _currentServiceAdapter.getServicePort(), DateUtil.currentGMTTime()); @@ -587,8 +624,9 @@ public class ClusterManagerImpl implements ClusterManager { txn.commit(); _mshostId = mshost.getId(); - if(s_logger.isInfoEnabled()) - s_logger.info("Management server (host id : " + _mshostId + ") is available at " + _clusterNodeIP + ":" + _currentServiceAdapter.getServicePort()); + if(s_logger.isInfoEnabled()) { + s_logger.info("Management server (host id : " + _mshostId + ") is available at " + _clusterNodeIP + ":" + _currentServiceAdapter.getServicePort()); + } _heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), heartbeatInterval, heartbeatInterval, TimeUnit.MILLISECONDS); @@ -600,15 +638,17 @@ public class ClusterManagerImpl implements ClusterManager { throw new CloudRuntimeException("Unable to initialize cluster info into database"); } - if(s_logger.isInfoEnabled()) - s_logger.info("Cluster manager is started"); + if(s_logger.isInfoEnabled()) { + s_logger.info("Cluster manager is started"); + } return true; } @Override public boolean stop() { - if(_mshostId != null) - _mshostDao.remove(_mshostId); + if(_mshostId != null) { + _mshostDao.remove(_mshostId); + } _heartbeatScheduler.shutdownNow(); _executor.shutdownNow(); @@ -619,29 +659,34 @@ public class ClusterManagerImpl implements ClusterManager { } catch (InterruptedException e) { } - if(s_logger.isInfoEnabled()) - s_logger.info("Cluster manager is stopped"); + if(s_logger.isInfoEnabled()) { + s_logger.info("Cluster manager is stopped"); + } return true; } @Override public boolean configure(String name, Map params) throws ConfigurationException { - if(s_logger.isInfoEnabled()) - s_logger.info("Start configuring cluster manager : " + name); + if(s_logger.isInfoEnabled()) { + s_logger.info("Start configuring cluster manager : " + name); + } ComponentLocator locator = ComponentLocator.getCurrentLocator(); _agentMgr = locator.getManager(AgentManager.class); - if (_agentMgr == null) + if (_agentMgr == null) { throw new ConfigurationException("Unable to get " + AgentManager.class.getName()); + } _mshostDao = locator.getDao(ManagementServerHostDao.class); - if(_mshostDao == null) + if(_mshostDao == null) { throw new ConfigurationException("Unable to get " + ManagementServerHostDao.class.getName()); + } _hostDao = locator.getDao(HostDao.class); - if(_hostDao == null) + if(_hostDao == null) { throw new ConfigurationException("Unable to get " + HostDao.class.getName()); + } ConfigurationDao configDao = locator.getDao(ConfigurationDao.class); if (configDao == null) { @@ -651,12 +696,14 @@ public class ClusterManagerImpl implements ClusterManager { Map configs = configDao.getConfiguration("management-server", params); String value = configs.get("cluster.heartbeat.interval"); - if(value != null) - heartbeatInterval = NumbersUtil.parseInt(value, ClusterManager.DEFAULT_HEARTBEAT_INTERVAL); + if(value != null) { + heartbeatInterval = NumbersUtil.parseInt(value, ClusterManager.DEFAULT_HEARTBEAT_INTERVAL); + } value = configs.get("cluster.heartbeat.threshold"); - if(value != null) - heartbeatThreshold = NumbersUtil.parseInt(value, ClusterManager.DEFAULT_HEARTBEAT_THRESHOLD); + if(value != null) { + heartbeatThreshold = NumbersUtil.parseInt(value, ClusterManager.DEFAULT_HEARTBEAT_THRESHOLD); + } _name = name; @@ -670,27 +717,33 @@ public class ClusterManagerImpl implements ClusterManager { throw new ConfigurationException("Unable to load db.properties content"); } _clusterNodeIP = dbProps.getProperty("cluster.node.IP"); - if(_clusterNodeIP == null) - _clusterNodeIP = "127.0.0.1"; - if(s_logger.isInfoEnabled()) - s_logger.info("Cluster node IP : " + _clusterNodeIP); + if(_clusterNodeIP == null) { + _clusterNodeIP = "127.0.0.1"; + } + if(s_logger.isInfoEnabled()) { + s_logger.info("Cluster node IP : " + _clusterNodeIP); + } - if(!NetUtils.isLocalAddress(_clusterNodeIP)) - throw new ConfigurationException("cluster node IP should be valid local address where the server is running, please check your configuration"); + if(!NetUtils.isLocalAddress(_clusterNodeIP)) { + throw new ConfigurationException("cluster node IP should be valid local address where the server is running, please check your configuration"); + } Adapters adapters = locator.getAdapters(ClusterServiceAdapter.class); if (adapters == null || !adapters.isSet()) { throw new ConfigurationException("Unable to get cluster service adapters"); } Enumeration it = adapters.enumeration(); - if(it.hasMoreElements()) - _currentServiceAdapter = it.nextElement(); + if(it.hasMoreElements()) { + _currentServiceAdapter = it.nextElement(); + } - if(_currentServiceAdapter == null) + if(_currentServiceAdapter == null) { throw new ConfigurationException("Unable to set current cluster service adapter"); + } - if(s_logger.isInfoEnabled()) - s_logger.info("Cluster manager is configured."); + if(s_logger.isInfoEnabled()) { + s_logger.info("Cluster manager is configured."); + } return true; }