mirror of https://github.com/apache/cloudstack.git
bug 9223, 9224: persist runid to form cluster session, based on cluster session and DB condition to issue isolation notification for self-fencing
This commit is contained in:
parent
b1700af146
commit
1b9cbd9166
|
|
@ -588,4 +588,8 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
|||
_hostDao.markHostsAsDisconnected(vo.getMsid(), Status.Up, Status.Connecting, Status.Updating, Status.Disconnected, Status.Down);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onManagementNodeIsolated() {
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -688,6 +688,10 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onManagementNodeIsolated() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean start() {
|
||||
|
|
|
|||
|
|
@ -81,6 +81,10 @@ public class CheckPointManagerImpl implements CheckPointManager, Manager, Cluste
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onManagementNodeIsolated() {
|
||||
}
|
||||
|
||||
@DB
|
||||
private Runnable getGCTask() {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,7 @@
|
|||
package com.cloud.cluster;
|
||||
|
||||
import com.cloud.utils.component.Manager;
|
||||
|
||||
public interface ClusterFenceManager extends Manager {
|
||||
public static final int SELF_FENCING_EXIT_CODE = 219;
|
||||
}
|
||||
|
|
@ -0,0 +1,56 @@
|
|||
package com.cloud.cluster;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.ejb.Local;
|
||||
import javax.naming.ConfigurationException;
|
||||
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import com.cloud.utils.component.Inject;
|
||||
|
||||
@Local(value={ClusterFenceManager.class})
|
||||
public class ClusterFenceManagerImpl implements ClusterFenceManager, ClusterManagerListener {
|
||||
private static final Logger s_logger = Logger.getLogger(ClusterFenceManagerImpl.class);
|
||||
|
||||
@Inject ClusterManager _clusterMgr;
|
||||
private String _name;
|
||||
|
||||
@Override
|
||||
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
|
||||
_name = name;
|
||||
|
||||
_clusterMgr.registerListener(this);
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean start() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean stop() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return _name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onManagementNodeJoined(List<ManagementServerHostVO> nodeList, long selfNodeId) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onManagementNodeLeft(List<ManagementServerHostVO> nodeList, long selfNodeId) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onManagementNodeIsolated() {
|
||||
s_logger.error("Received node isolation notification, will perform self-fencing and shut myself down");
|
||||
System.exit(SELF_FENCING_EXIT_CODE);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,15 @@
|
|||
package com.cloud.cluster;
|
||||
|
||||
public class ClusterInvalidSessionException extends Exception {
|
||||
|
||||
private static final long serialVersionUID = -6636524194520997512L;
|
||||
|
||||
public ClusterInvalidSessionException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public ClusterInvalidSessionException(String message, Throwable th) {
|
||||
super(message, th);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -42,7 +42,6 @@ import com.cloud.host.dao.HostDao;
|
|||
import com.cloud.serializer.GsonHelper;
|
||||
import com.cloud.utils.DateUtil;
|
||||
import com.cloud.utils.NumbersUtil;
|
||||
import com.cloud.utils.Pair;
|
||||
import com.cloud.utils.Profiler;
|
||||
import com.cloud.utils.PropertiesUtil;
|
||||
import com.cloud.utils.component.Adapters;
|
||||
|
|
@ -77,7 +76,7 @@ public class ClusterManagerImpl implements ClusterManager {
|
|||
private final ScheduledExecutorService _heartbeatScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Cluster-Heartbeat"));
|
||||
|
||||
private final ExecutorService _notificationExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("Cluster-Notification"));
|
||||
private List< Pair<List<ManagementServerHostVO>, List<ManagementServerHostVO>> > _notificationMsgs = new ArrayList< Pair<List<ManagementServerHostVO>, List<ManagementServerHostVO>> >();
|
||||
private List<ClusterManagerMessage> _notificationMsgs = new ArrayList<ClusterManagerMessage>();
|
||||
private Connection _heartbeatConnection = null;
|
||||
|
||||
private final ExecutorService _executor;
|
||||
|
|
@ -472,6 +471,14 @@ public class ClusterManagerImpl implements ClusterManager {
|
|||
new ClusterNodeLeftEventArgs(_mshostId, nodeList));
|
||||
}
|
||||
|
||||
public void notifyNodeIsolated() {
|
||||
synchronized(listeners) {
|
||||
for(ClusterManagerListener listener : listeners) {
|
||||
listener.onManagementNodeIsolated();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public ClusterService getPeerService(String strPeer) throws RemoteException {
|
||||
synchronized(clusterPeers) {
|
||||
if(clusterPeers.containsKey(strPeer)) {
|
||||
|
|
@ -544,7 +551,7 @@ public class ClusterManagerImpl implements ClusterManager {
|
|||
}
|
||||
|
||||
Connection conn = getHeartbeatConnection();
|
||||
_mshostDao.update(conn, _mshostId, DateUtil.currentGMTTime());
|
||||
_mshostDao.update(conn, _mshostId, getCurrentRunId(), DateUtil.currentGMTTime());
|
||||
|
||||
if(s_logger.isTraceEnabled()) {
|
||||
s_logger.trace("Cluster manager peer-scan, id:" + _mshostId);
|
||||
|
|
@ -557,24 +564,48 @@ public class ClusterManagerImpl implements ClusterManager {
|
|||
|
||||
peerScan(conn);
|
||||
} catch(CloudRuntimeException e) {
|
||||
// TODO, if the exception is caused by lost of DB connection
|
||||
// start fencing and shutdown the management server
|
||||
s_logger.error("Runtime DB exception ", e.getCause());
|
||||
|
||||
if(e.getCause() instanceof ClusterInvalidSessionException) {
|
||||
s_logger.error("Invalid cluster session found");
|
||||
queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeIsolated));
|
||||
}
|
||||
|
||||
if(isRootCauseConnectionRelated(e.getCause())) {
|
||||
s_logger.error("DB communication problem detected");
|
||||
queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeIsolated));
|
||||
}
|
||||
|
||||
invalidHeartbeatConnection();
|
||||
} catch (Throwable e) {
|
||||
if(isRootCauseConnectionRelated(e.getCause())) {
|
||||
s_logger.error("DB communication problem detected");
|
||||
queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeIsolated));
|
||||
}
|
||||
|
||||
s_logger.error("Problem with the cluster heartbeat!", e);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private Connection getHeartbeatConnection() {
|
||||
private boolean isRootCauseConnectionRelated(Throwable e) {
|
||||
while(e != null) {
|
||||
if(e instanceof com.mysql.jdbc.CommunicationsException || e instanceof com.mysql.jdbc.exceptions.jdbc4.CommunicationsException)
|
||||
return true;
|
||||
|
||||
e = e.getCause();
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
private Connection getHeartbeatConnection() throws SQLException {
|
||||
if(_heartbeatConnection != null) {
|
||||
return _heartbeatConnection;
|
||||
}
|
||||
|
||||
_heartbeatConnection = Transaction.getStandaloneConnection();
|
||||
_heartbeatConnection = Transaction.getStandaloneConnectionWithException();
|
||||
return _heartbeatConnection;
|
||||
}
|
||||
|
||||
|
|
@ -602,40 +633,55 @@ public class ClusterManagerImpl implements ClusterManager {
|
|||
}
|
||||
}
|
||||
|
||||
Pair<List<ManagementServerHostVO>, List<ManagementServerHostVO>> msgPair = null;
|
||||
while((msgPair = getNextNotificationMessage()) != null) {
|
||||
ClusterManagerMessage msg = null;
|
||||
while((msg = getNextNotificationMessage()) != null) {
|
||||
try {
|
||||
if(msgPair.first() != null && msgPair.first().size() > 0) {
|
||||
Profiler profiler = new Profiler();
|
||||
profiler.start();
|
||||
|
||||
notifyNodeJoined(msgPair.first());
|
||||
|
||||
profiler.stop();
|
||||
if(profiler.getDuration() > 1000) {
|
||||
if(s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Notifying management server join event took " + profiler.getDuration() + " ms");
|
||||
}
|
||||
} else {
|
||||
s_logger.warn("Notifying management server join event took " + profiler.getDuration() + " ms");
|
||||
}
|
||||
}
|
||||
|
||||
if(msgPair.second() != null && msgPair.second().size() > 0) {
|
||||
Profiler profiler = new Profiler();
|
||||
profiler.start();
|
||||
switch(msg.getMessageType()) {
|
||||
case nodeAdded:
|
||||
if(msg.getNodes() != null && msg.getNodes().size() > 0) {
|
||||
Profiler profiler = new Profiler();
|
||||
profiler.start();
|
||||
|
||||
notifyNodeJoined(msg.getNodes());
|
||||
|
||||
profiler.stop();
|
||||
if(profiler.getDuration() > 1000) {
|
||||
if(s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Notifying management server join event took " + profiler.getDuration() + " ms");
|
||||
}
|
||||
} else {
|
||||
s_logger.warn("Notifying management server join event took " + profiler.getDuration() + " ms");
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case nodeRemoved:
|
||||
if(msg.getNodes() != null && msg.getNodes().size() > 0) {
|
||||
Profiler profiler = new Profiler();
|
||||
profiler.start();
|
||||
|
||||
notifyNodeLeft(msgPair.second());
|
||||
|
||||
profiler.stop();
|
||||
if(profiler.getDuration() > 1000) {
|
||||
if(s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Notifying management server leave event took " + profiler.getDuration() + " ms");
|
||||
}
|
||||
} else {
|
||||
s_logger.warn("Notifying management server leave event took " + profiler.getDuration() + " ms");
|
||||
}
|
||||
}
|
||||
notifyNodeLeft(msg.getNodes());
|
||||
|
||||
profiler.stop();
|
||||
if(profiler.getDuration() > 1000) {
|
||||
if(s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Notifying management server leave event took " + profiler.getDuration() + " ms");
|
||||
}
|
||||
} else {
|
||||
s_logger.warn("Notifying management server leave event took " + profiler.getDuration() + " ms");
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case nodeIsolated:
|
||||
notifyNodeIsolated();
|
||||
break;
|
||||
|
||||
default :
|
||||
assert(false);
|
||||
break;
|
||||
}
|
||||
|
||||
} catch (Throwable e) {
|
||||
s_logger.warn("Unexpected exception during cluster notification. ", e);
|
||||
}
|
||||
|
|
@ -647,14 +693,14 @@ public class ClusterManagerImpl implements ClusterManager {
|
|||
};
|
||||
}
|
||||
|
||||
private void queueNotification(List<ManagementServerHostVO> addedNodeList, List<ManagementServerHostVO> removedNodeList) {
|
||||
private void queueNotification(ClusterManagerMessage msg) {
|
||||
synchronized(this._notificationMsgs) {
|
||||
this._notificationMsgs.add(new Pair<List<ManagementServerHostVO>, List<ManagementServerHostVO>>(addedNodeList, removedNodeList));
|
||||
this._notificationMsgs.add(msg);
|
||||
this._notificationMsgs.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
private Pair<List<ManagementServerHostVO>, List<ManagementServerHostVO>> getNextNotificationMessage() {
|
||||
private ClusterManagerMessage getNextNotificationMessage() {
|
||||
synchronized(this._notificationMsgs) {
|
||||
if(this._notificationMsgs.size() > 0)
|
||||
return this._notificationMsgs.remove(0);
|
||||
|
|
@ -669,7 +715,7 @@ public class ClusterManagerImpl implements ClusterManager {
|
|||
Date cutTime = DateUtil.currentGMTTime();
|
||||
List<ManagementServerHostVO> inactiveList = _mshostDao.getInactiveList(conn, new Date(cutTime.getTime() - heartbeatThreshold));
|
||||
if(inactiveList.size() > 0) {
|
||||
this.queueNotification(null, inactiveList);
|
||||
this.queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeRemoved, inactiveList));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -679,27 +725,63 @@ public class ClusterManagerImpl implements ClusterManager {
|
|||
List<ManagementServerHostVO> currentList = _mshostDao.getActiveList(conn, new Date(cutTime.getTime() - heartbeatThreshold));
|
||||
|
||||
List<ManagementServerHostVO> removedNodeList = new ArrayList<ManagementServerHostVO>();
|
||||
List<ManagementServerHostVO> invalidatedNodeList = new ArrayList<ManagementServerHostVO>();
|
||||
|
||||
if(_mshostId != null) {
|
||||
// only if we have already attached to cluster, will we start to check leaving nodes
|
||||
for(Map.Entry<Long, ManagementServerHostVO> entry : activePeers.entrySet()) {
|
||||
if(!isIdInList(entry.getKey(), currentList)) {
|
||||
|
||||
ManagementServerHostVO current = getInListById(entry.getKey(), currentList);
|
||||
if(current == null) {
|
||||
if(entry.getKey().longValue() != _mshostId.longValue()) {
|
||||
if(s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Detected management node left, id:" + entry.getKey() + ", nodeIP:" + entry.getValue().getServiceIP());
|
||||
}
|
||||
removedNodeList.add(entry.getValue());
|
||||
}
|
||||
} else {
|
||||
if(current.getRunid() == 0) {
|
||||
if(entry.getKey().longValue() != _mshostId.longValue()) {
|
||||
if(s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Detected management node left because of invalidated session, id:" + entry.getKey() + ", nodeIP:" + entry.getValue().getServiceIP());
|
||||
}
|
||||
invalidatedNodeList.add(entry.getValue());
|
||||
}
|
||||
} else {
|
||||
if(entry.getValue().getRunid() != current.getRunid()) {
|
||||
if(s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Detected management node left and rejoined quickly, id:" + entry.getKey() + ", nodeIP:" + entry.getValue().getServiceIP());
|
||||
}
|
||||
|
||||
entry.getValue().setRunid(current.getRunid());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// process invalidated node list
|
||||
if(invalidatedNodeList.size() > 0) {
|
||||
for(ManagementServerHostVO mshost : invalidatedNodeList) {
|
||||
activePeers.remove(mshost.getId());
|
||||
try {
|
||||
JmxUtil.unregisterMBean("ClusterManager", "Node " + mshost.getId());
|
||||
} catch(Exception e) {
|
||||
s_logger.warn("Unable to deregiester cluster node from JMX monitoring due to exception " + e.toString());
|
||||
}
|
||||
}
|
||||
|
||||
this.queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeRemoved, invalidatedNodeList));
|
||||
}
|
||||
|
||||
// process removed node list
|
||||
Iterator<ManagementServerHostVO> it = removedNodeList.iterator();
|
||||
while(it.hasNext()) {
|
||||
ManagementServerHostVO mshost = it.next();
|
||||
if(!pingManagementNode(mshost)) {
|
||||
s_logger.warn("Management node " + mshost.getId() + " is detected inactive by timestamp and also not pingable");
|
||||
activePeers.remove(mshost.getId());
|
||||
|
||||
_mshostDao.invalidateRunSession(conn, mshost.getId(), mshost.getRunid());
|
||||
try {
|
||||
JmxUtil.unregisterMBean("ClusterManager", "Node " + mshost.getId());
|
||||
} catch(Exception e) {
|
||||
|
|
@ -711,6 +793,9 @@ public class ClusterManagerImpl implements ClusterManager {
|
|||
}
|
||||
}
|
||||
|
||||
if(removedNodeList.size() > 0)
|
||||
this.queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeRemoved, removedNodeList));
|
||||
|
||||
List<ManagementServerHostVO> newNodeList = new ArrayList<ManagementServerHostVO>();
|
||||
for(ManagementServerHostVO mshost : currentList) {
|
||||
if(!activePeers.containsKey(mshost.getId())) {
|
||||
|
|
@ -729,17 +814,17 @@ public class ClusterManagerImpl implements ClusterManager {
|
|||
}
|
||||
}
|
||||
|
||||
if(newNodeList.size() > 0 || removedNodeList.size() > 0)
|
||||
this.queueNotification(newNodeList, removedNodeList);
|
||||
if(newNodeList.size() > 0)
|
||||
this.queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeAdded, newNodeList));
|
||||
}
|
||||
|
||||
private static boolean isIdInList(Long id, List<ManagementServerHostVO> l) {
|
||||
private static ManagementServerHostVO getInListById(Long id, List<ManagementServerHostVO> l) {
|
||||
for(ManagementServerHostVO mshost : l) {
|
||||
if(mshost.getId() == id) {
|
||||
return true;
|
||||
return mshost;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -764,7 +849,7 @@ public class ClusterManagerImpl implements ClusterManager {
|
|||
if(mshost == null) {
|
||||
mshost = new ManagementServerHostVO();
|
||||
mshost.setMsid(_msid);
|
||||
|
||||
mshost.setRunid(this.getCurrentRunId());
|
||||
mshost.setName(NetUtils.getHostName());
|
||||
mshost.setVersion(version);
|
||||
mshost.setServiceIP(_clusterNodeIP);
|
||||
|
|
@ -772,6 +857,7 @@ public class ClusterManagerImpl implements ClusterManager {
|
|||
mshost.setLastUpdateTime(DateUtil.currentGMTTime());
|
||||
mshost.setRemoved(null);
|
||||
mshost.setAlertCount(0);
|
||||
mshost.setState(ManagementServerNode.State.Up);
|
||||
_mshostDao.persist(mshost);
|
||||
|
||||
if(s_logger.isInfoEnabled()) {
|
||||
|
|
@ -782,7 +868,7 @@ public class ClusterManagerImpl implements ClusterManager {
|
|||
s_logger.info("Management server " + _msid + " is being started");
|
||||
}
|
||||
|
||||
_mshostDao.update(mshost.getId(), NetUtils.getHostName(), version,
|
||||
_mshostDao.update(mshost.getId(), getCurrentRunId(), NetUtils.getHostName(), version,
|
||||
_clusterNodeIP, _currentServiceAdapter.getServicePort(), DateUtil.currentGMTTime());
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -18,9 +18,10 @@
|
|||
|
||||
package com.cloud.cluster;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.List;
|
||||
|
||||
public interface ClusterManagerListener {
|
||||
void onManagementNodeJoined(List<ManagementServerHostVO> nodeList, long selfNodeId);
|
||||
void onManagementNodeLeft(List<ManagementServerHostVO> nodeList, long selfNodeId);
|
||||
void onManagementNodeLeft(List<ManagementServerHostVO> nodeList, long selfNodeId);
|
||||
void onManagementNodeIsolated();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,27 @@
|
|||
package com.cloud.cluster;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class ClusterManagerMessage {
|
||||
public static enum MessageType { nodeAdded, nodeRemoved, nodeIsolated };
|
||||
|
||||
MessageType _type;
|
||||
List<ManagementServerHostVO> _nodes;
|
||||
|
||||
public ClusterManagerMessage(MessageType type) {
|
||||
_type = type;
|
||||
}
|
||||
|
||||
public ClusterManagerMessage(MessageType type, List<ManagementServerHostVO> nodes) {
|
||||
_type = type;
|
||||
_nodes = nodes;
|
||||
}
|
||||
|
||||
public MessageType getMessageType() {
|
||||
return _type;
|
||||
}
|
||||
|
||||
public List<ManagementServerHostVO> getNodes() {
|
||||
return _nodes;
|
||||
}
|
||||
}
|
||||
|
|
@ -22,6 +22,8 @@ import java.util.Date;
|
|||
|
||||
import javax.persistence.Column;
|
||||
import javax.persistence.Entity;
|
||||
import javax.persistence.EnumType;
|
||||
import javax.persistence.Enumerated;
|
||||
import javax.persistence.GeneratedValue;
|
||||
import javax.persistence.GenerationType;
|
||||
import javax.persistence.Id;
|
||||
|
|
@ -41,10 +43,17 @@ public class ManagementServerHostVO {
|
|||
private long id;
|
||||
|
||||
@Column(name="msid", updatable=true, nullable=false)
|
||||
private long msid;
|
||||
private long msid;
|
||||
|
||||
@Column(name="runid", updatable=true, nullable=false)
|
||||
private long runid;
|
||||
|
||||
@Column(name="name", updatable=true, nullable=true)
|
||||
private String name;
|
||||
private String name;
|
||||
|
||||
@Column(name="state", updatable = true, nullable=false)
|
||||
@Enumerated(value=EnumType.STRING)
|
||||
private ManagementServerNode.State state;
|
||||
|
||||
@Column(name="version", updatable=true, nullable=true)
|
||||
private String version;
|
||||
|
|
@ -68,8 +77,9 @@ public class ManagementServerHostVO {
|
|||
public ManagementServerHostVO() {
|
||||
}
|
||||
|
||||
public ManagementServerHostVO(long msid, String serviceIP, int servicePort, Date updateTime) {
|
||||
this.msid = msid;
|
||||
public ManagementServerHostVO(long msid, long runid, String serviceIP, int servicePort, Date updateTime) {
|
||||
this.msid = msid;
|
||||
this.runid = runid;
|
||||
this.serviceIP = serviceIP;
|
||||
this.servicePort = servicePort;
|
||||
this.lastUpdateTime = updateTime;
|
||||
|
|
@ -81,6 +91,14 @@ public class ManagementServerHostVO {
|
|||
|
||||
public void setId(long id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public long getRunid() {
|
||||
return runid;
|
||||
}
|
||||
|
||||
public void setRunid(long runid) {
|
||||
this.runid = runid;
|
||||
}
|
||||
|
||||
public long getMsid() {
|
||||
|
|
@ -97,6 +115,14 @@ public class ManagementServerHostVO {
|
|||
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public ManagementServerNode.State getState() {
|
||||
return this.state;
|
||||
}
|
||||
|
||||
public void setState(ManagementServerNode.State state) {
|
||||
this.state = state;
|
||||
}
|
||||
|
||||
public String getVersion() {
|
||||
|
|
|
|||
|
|
@ -23,6 +23,8 @@ import com.cloud.utils.net.MacAddress;
|
|||
|
||||
public class ManagementServerNode implements SystemIntegrityChecker {
|
||||
private static final long s_nodeId = MacAddress.getMacAddress().toLong();
|
||||
|
||||
public static enum State { Up, Down };
|
||||
|
||||
@Override
|
||||
public void check() {
|
||||
|
|
|
|||
|
|
@ -26,16 +26,19 @@ import com.cloud.cluster.ManagementServerHostVO;
|
|||
import com.cloud.utils.db.GenericDao;
|
||||
|
||||
public interface ManagementServerHostDao extends GenericDao<ManagementServerHostVO, Long> {
|
||||
boolean remove(Long id);
|
||||
|
||||
ManagementServerHostVO findByMsid(long msid);
|
||||
void increaseAlertCount(long id);
|
||||
|
||||
void update(long id, String name, String version, String serviceIP, int servicePort, Date lastUpdate);
|
||||
void update(long id, Date lastUpdate);
|
||||
void update(long id, long runid, String name, String version, String serviceIP, int servicePort, Date lastUpdate);
|
||||
void update(long id, long runid, Date lastUpdate);
|
||||
List<ManagementServerHostVO> getActiveList(Date cutTime);
|
||||
List<ManagementServerHostVO> getInactiveList(Date cutTime);
|
||||
|
||||
void update(Connection conn, long id, String name, String version, String serviceIP, int servicePort, Date lastUpdate);
|
||||
void update(Connection conn, long id, Date lastUpdate);
|
||||
void update(Connection conn, long id, long runid, String name, String version, String serviceIP, int servicePort, Date lastUpdate);
|
||||
void update(Connection conn, long id, long runid, Date lastUpdate);
|
||||
void invalidateRunSession(Connection conn, long id, long runid);
|
||||
List<ManagementServerHostVO> getActiveList(Connection conn, Date cutTime);
|
||||
List<ManagementServerHostVO> getInactiveList(Connection conn, Date cutTime);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,8 +29,11 @@ import javax.ejb.Local;
|
|||
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import com.cloud.cluster.ClusterInvalidSessionException;
|
||||
import com.cloud.cluster.ManagementServerHostVO;
|
||||
import com.cloud.cluster.ManagementServerNode;
|
||||
import com.cloud.utils.DateUtil;
|
||||
import com.cloud.utils.db.DB;
|
||||
import com.cloud.utils.db.GenericDaoBase;
|
||||
import com.cloud.utils.db.SearchBuilder;
|
||||
import com.cloud.utils.db.SearchCriteria;
|
||||
|
|
@ -45,16 +48,17 @@ public class ManagementServerHostDaoImpl extends GenericDaoBase<ManagementServer
|
|||
private final SearchBuilder<ManagementServerHostVO> ActiveSearch;
|
||||
private final SearchBuilder<ManagementServerHostVO> InactiveSearch;
|
||||
|
||||
public void update(Connection conn, long id, String name, String version, String serviceIP, int servicePort, Date lastUpdate) {
|
||||
public void update(Connection conn, long id, long runid, String name, String version, String serviceIP, int servicePort, Date lastUpdate) {
|
||||
PreparedStatement pstmt = null;
|
||||
try {
|
||||
pstmt = conn.prepareStatement("update mshost set name=?, version=?, service_ip=?, service_port=?, last_update=?, removed=null, alert_count=0 where id=?");
|
||||
pstmt = conn.prepareStatement("update mshost set name=?, version=?, service_ip=?, service_port=?, last_update=?, removed=null, alert_count=0, runid=? where id=?");
|
||||
pstmt.setString(1, name);
|
||||
pstmt.setString(2, version);
|
||||
pstmt.setString(3, serviceIP);
|
||||
pstmt.setInt(4, servicePort);
|
||||
pstmt.setString(5, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), lastUpdate));
|
||||
pstmt.setLong(6, id);
|
||||
pstmt.setLong(6, runid);
|
||||
pstmt.setLong(7, id);
|
||||
|
||||
pstmt.executeUpdate();
|
||||
conn.commit();
|
||||
|
|
@ -71,12 +75,38 @@ public class ManagementServerHostDaoImpl extends GenericDaoBase<ManagementServer
|
|||
}
|
||||
}
|
||||
|
||||
public void update(Connection conn, long id, Date lastUpdate) {
|
||||
public void update(Connection conn, long id, long runid, Date lastUpdate) {
|
||||
PreparedStatement pstmt = null;
|
||||
try {
|
||||
pstmt = conn.prepareStatement("update mshost set last_update=?, removed=null, alert_count=0 where id=?");
|
||||
pstmt = conn.prepareStatement("update mshost set last_update=?, removed=null, alert_count=0 where id=? and runid=?");
|
||||
pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), lastUpdate));
|
||||
pstmt.setLong(2, id);
|
||||
pstmt.setLong(3, runid);
|
||||
|
||||
int count = pstmt.executeUpdate();
|
||||
conn.commit();
|
||||
|
||||
if(count < 1)
|
||||
throw new CloudRuntimeException("Invalid cluster session detected", new ClusterInvalidSessionException("runid " + runid + " is no longer valid"));
|
||||
} catch (SQLException e) {
|
||||
throw new CloudRuntimeException("DB exception on " + pstmt.toString(), e);
|
||||
} finally {
|
||||
if(pstmt != null) {
|
||||
try {
|
||||
pstmt.close();
|
||||
} catch(Exception e) {
|
||||
s_logger.warn("Unable to close prepared statement due to exception ", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void invalidateRunSession(Connection conn, long id, long runid) {
|
||||
PreparedStatement pstmt = null;
|
||||
try {
|
||||
pstmt = conn.prepareStatement("update mshost set runid=0, state='Down' where id=? and runid=?");
|
||||
pstmt.setLong(1, id);
|
||||
pstmt.setLong(2, runid);
|
||||
|
||||
pstmt.executeUpdate();
|
||||
conn.commit();
|
||||
|
|
@ -128,19 +158,21 @@ public class ManagementServerHostDaoImpl extends GenericDaoBase<ManagementServer
|
|||
return null;
|
||||
}
|
||||
|
||||
public void update(long id, String name, String version, String serviceIP, int servicePort, Date lastUpdate) {
|
||||
@DB
|
||||
public void update(long id, long runid, String name, String version, String serviceIP, int servicePort, Date lastUpdate) {
|
||||
Transaction txn = Transaction.currentTxn();
|
||||
PreparedStatement pstmt = null;
|
||||
try {
|
||||
txn.start();
|
||||
|
||||
pstmt = txn.prepareAutoCloseStatement("update mshost set name=?, version=?, service_ip=?, service_port=?, last_update=?, removed=null, alert_count=0 where id=?");
|
||||
pstmt = txn.prepareAutoCloseStatement("update mshost set name=?, version=?, service_ip=?, service_port=?, last_update=?, removed=null, alert_count=0, runid=? where id=?");
|
||||
pstmt.setString(1, name);
|
||||
pstmt.setString(2, version);
|
||||
pstmt.setString(3, serviceIP);
|
||||
pstmt.setInt(4, servicePort);
|
||||
pstmt.setString(5, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), lastUpdate));
|
||||
pstmt.setLong(6, id);
|
||||
pstmt.setLong(6, runid);
|
||||
pstmt.setLong(7, id);
|
||||
|
||||
pstmt.executeUpdate();
|
||||
txn.commit();
|
||||
|
|
@ -148,20 +180,46 @@ public class ManagementServerHostDaoImpl extends GenericDaoBase<ManagementServer
|
|||
s_logger.warn("Unexpected exception, ", e);
|
||||
txn.rollback();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@DB
|
||||
public boolean remove(Long id) {
|
||||
Transaction txn = Transaction.currentTxn();
|
||||
|
||||
try {
|
||||
txn.start();
|
||||
|
||||
ManagementServerHostVO msHost = findById(id);
|
||||
msHost.setState(ManagementServerNode.State.Down);
|
||||
super.remove(id);
|
||||
|
||||
txn.commit();
|
||||
return true;
|
||||
} catch(Exception e) {
|
||||
s_logger.warn("Unexpected exception, ", e);
|
||||
txn.rollback();
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
public void update(long id, Date lastUpdate) {
|
||||
@DB
|
||||
public void update(long id, long runid, Date lastUpdate) {
|
||||
Transaction txn = Transaction.currentTxn();
|
||||
PreparedStatement pstmt = null;
|
||||
try {
|
||||
txn.start();
|
||||
|
||||
pstmt = txn.prepareAutoCloseStatement("update mshost set last_update=?, removed=null, alert_count=0 where id=?");
|
||||
pstmt = txn.prepareAutoCloseStatement("update mshost set last_update=?, removed=null, alert_count=0 where id=? and runid=?");
|
||||
pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), lastUpdate));
|
||||
pstmt.setLong(2, id);
|
||||
pstmt.setLong(3, runid);
|
||||
|
||||
pstmt.executeUpdate();
|
||||
txn.commit();
|
||||
int count = pstmt.executeUpdate();
|
||||
txn.commit();
|
||||
|
||||
if(count < 1)
|
||||
throw new CloudRuntimeException("Invalid cluster session detected", new ClusterInvalidSessionException("runid " + runid + " is no longer valid"));
|
||||
} catch(Exception e) {
|
||||
s_logger.warn("Unexpected exception, ", e);
|
||||
txn.rollback();
|
||||
|
|
@ -181,7 +239,8 @@ public class ManagementServerHostDaoImpl extends GenericDaoBase<ManagementServer
|
|||
|
||||
return listIncludingRemovedBy(sc);
|
||||
}
|
||||
|
||||
|
||||
@DB
|
||||
public void increaseAlertCount(long id) {
|
||||
Transaction txn = Transaction.currentTxn();
|
||||
PreparedStatement pstmt = null;
|
||||
|
|
|
|||
|
|
@ -36,10 +36,11 @@ import com.cloud.async.dao.SyncQueueItemDaoImpl;
|
|||
import com.cloud.capacity.CapacityManagerImpl;
|
||||
import com.cloud.capacity.dao.CapacityDaoImpl;
|
||||
import com.cloud.certificate.dao.CertificateDaoImpl;
|
||||
import com.cloud.cluster.CheckPointManagerImpl;
|
||||
import com.cloud.cluster.ClusterFenceManagerImpl;
|
||||
import com.cloud.cluster.ClusterManagerImpl;
|
||||
import com.cloud.cluster.DummyClusterManagerImpl;
|
||||
import com.cloud.cluster.ManagementServerNode;
|
||||
import com.cloud.cluster.CheckPointManagerImpl;
|
||||
import com.cloud.cluster.dao.ManagementServerHostDaoImpl;
|
||||
import com.cloud.cluster.dao.StackMaidDaoImpl;
|
||||
import com.cloud.configuration.dao.ConfigurationDaoImpl;
|
||||
|
|
@ -308,7 +309,7 @@ public class DefaultComponentLibrary extends ComponentLibraryBase implements Com
|
|||
addManager("ClusteredAgentManager", ClusteredAgentManagerImpl.class);
|
||||
addManager("VirtualMachineManager", ClusteredVirtualMachineManagerImpl.class);
|
||||
addManager("HypervisorGuruManager", HypervisorGuruManagerImpl.class);
|
||||
|
||||
addManager("ClusterFenceManager", ClusterFenceManagerImpl.class);
|
||||
|
||||
ComponentInfo<? extends Manager> info = addManager("ConsoleProxyManager", ConsoleProxyManagerImpl.class);
|
||||
info.addParameter("consoleproxy.sslEnabled", "true");
|
||||
|
|
|
|||
|
|
@ -800,4 +800,7 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager, Clu
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onManagementNodeIsolated() {
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2785,6 +2785,10 @@ public class StorageManagerImpl implements StorageManager, StorageService, Manag
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onManagementNodeIsolated() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<CapacityVO> getSecondaryStorageUsedStats(Long hostId, Long podId, Long zoneId) {
|
||||
SearchCriteria<HostVO> sc = _hostDao.createSearchCriteria();
|
||||
|
|
|
|||
|
|
@ -43,6 +43,10 @@ public class ClusteredVirtualMachineManagerImpl extends VirtualMachineManagerImp
|
|||
cancelWorkItems(node.getMsid());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onManagementNodeIsolated() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean configure(String name, Map<String, Object> xmlParams) throws ConfigurationException {
|
||||
|
|
|
|||
|
|
@ -674,7 +674,9 @@ CREATE TABLE `cloud`.`host_details` (
|
|||
CREATE TABLE `cloud`.`mshost` (
|
||||
`id` bigint unsigned NOT NULL auto_increment,
|
||||
`msid` bigint unsigned NOT NULL UNIQUE COMMENT 'management server id derived from MAC address',
|
||||
`runid` bigint NOT NULL DEFAULT 0 COMMENT 'run id, combined with msid to form a cluster session',
|
||||
`name` varchar(255),
|
||||
`state` varchar(10) NOT NULL DEFAULT 'Down',
|
||||
`version` varchar(255),
|
||||
`service_ip` char(40) NOT NULL,
|
||||
`service_port` integer NOT NULL,
|
||||
|
|
|
|||
|
|
@ -2,6 +2,9 @@
|
|||
-- Schema upgrade from 2.2.4 to 2.2.5;
|
||||
--;
|
||||
|
||||
ALTER TABLE `cloud`.`mshost` ADD COLUMN `runid` bigint NOT NULL DEFAULT 0 COMMENT 'run id, combined with msid to form a cluster session';
|
||||
ALTER TABLE `cloud`.`mshost` ADD COLUMN `state` varchar(10) NOT NULL default 'Down';
|
||||
|
||||
CREATE TABLE `cloud`.`cmd_exec_log` (
|
||||
`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
|
||||
`host_id` bigint unsigned NOT NULL COMMENT 'host id of the system VM agent that command is sent to',
|
||||
|
|
|
|||
|
|
@ -171,6 +171,10 @@ public class Transaction {
|
|||
return _txn;
|
||||
}
|
||||
|
||||
public static Connection getStandaloneConnectionWithException() throws SQLException {
|
||||
return s_ds.getConnection();
|
||||
}
|
||||
|
||||
public static Connection getStandaloneConnection() {
|
||||
try {
|
||||
return s_ds.getConnection();
|
||||
|
|
|
|||
Loading…
Reference in New Issue