diff --git a/core/src/com/cloud/agent/transport/Request.java b/core/src/com/cloud/agent/transport/Request.java index f198b2bf0ec..4d4d101268a 100755 --- a/core/src/com/cloud/agent/transport/Request.java +++ b/core/src/com/cloud/agent/transport/Request.java @@ -25,15 +25,14 @@ import java.util.List; import org.apache.log4j.Logger; -import com.cloud.agent.api.Answer; import com.cloud.agent.api.Command; import com.cloud.agent.api.SecStorageFirewallCfgCommand.PortConfig; import com.cloud.exception.UnsupportedVersionException; +import com.cloud.serializer.GsonHelper; import com.cloud.utils.NumbersUtil; import com.cloud.utils.Pair; import com.cloud.utils.exception.CloudRuntimeException; import com.google.gson.Gson; -import com.google.gson.GsonBuilder; import com.google.gson.JsonArray; import com.google.gson.JsonDeserializationContext; import com.google.gson.JsonDeserializer; @@ -42,7 +41,6 @@ import com.google.gson.JsonNull; import com.google.gson.JsonParseException; import com.google.gson.JsonSerializationContext; import com.google.gson.JsonSerializer; -import com.google.gson.reflect.TypeToken; /** * Request is a simple wrapper around command and answer to add sequencing, @@ -64,6 +62,9 @@ import com.google.gson.reflect.TypeToken; public class Request { private static final Logger s_logger = Logger.getLogger(Request.class); + protected static final Gson s_gson = GsonHelper.getGson(); + protected static final Gson s_gogger = GsonHelper.getGsonLogger(); + public enum Version { v1, // using gson to marshall v2, // now using gson as marshalled. @@ -86,33 +87,6 @@ public class Request { protected static final short FLAG_FROM_SERVER = 0x20; protected static final short FLAG_CONTROL = 0x40; - protected static final Gson s_gson; - protected static final Gson s_gogger; - - static { - GsonBuilder gsonBuilder = new GsonBuilder(); - s_gson = setDefaultGsonConfig(gsonBuilder); - GsonBuilder loggerBuilder = new GsonBuilder(); - loggerBuilder.disableHtmlEscaping(); - loggerBuilder.setExclusionStrategies(new LoggingExclusionStrategy(s_logger)); - s_gogger = setDefaultGsonConfig(loggerBuilder); - s_logger.info("Default Builder inited."); - } - - public static Gson setDefaultGsonConfig(GsonBuilder builder) { - ArrayTypeAdaptor cmdAdaptor = new ArrayTypeAdaptor(); - builder.registerTypeAdapter(Command[].class, cmdAdaptor); - ArrayTypeAdaptor ansAdaptor = new ArrayTypeAdaptor(); - builder.registerTypeAdapter(Answer[].class, ansAdaptor); - builder.registerTypeAdapter(new TypeToken>() { - }.getType(), new PortConfigListTypeAdaptor()); - builder.registerTypeAdapter(new TypeToken>() { - }.getType(), new NwGroupsCommandTypeAdaptor()); - Gson gson = builder.create(); - cmdAdaptor.initGson(gson); - ansAdaptor.initGson(gson); - return gson; - } protected Version _ver; protected long _session; diff --git a/core/src/com/cloud/serializer/GsonHelper.java b/core/src/com/cloud/serializer/GsonHelper.java index 56ae916aca0..93a7ddf9cd7 100644 --- a/core/src/com/cloud/serializer/GsonHelper.java +++ b/core/src/com/cloud/serializer/GsonHelper.java @@ -18,21 +18,61 @@ package com.cloud.serializer; + + +import java.util.List; + +import org.apache.log4j.Logger; + import com.cloud.agent.api.Answer; import com.cloud.agent.api.Command; +import com.cloud.agent.api.SecStorageFirewallCfgCommand.PortConfig; import com.cloud.agent.transport.ArrayTypeAdaptor; +import com.cloud.agent.transport.LoggingExclusionStrategy; +import com.cloud.agent.transport.Request.NwGroupsCommandTypeAdaptor; +import com.cloud.agent.transport.Request.PortConfigListTypeAdaptor; +import com.cloud.utils.Pair; +import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import com.google.gson.reflect.TypeToken; -public class GsonHelper { - private static final GsonBuilder s_gBuilder; - static { - s_gBuilder = new GsonBuilder(); - s_gBuilder.setVersion(1.3); - s_gBuilder.registerTypeAdapter(Command[].class, new ArrayTypeAdaptor()); - s_gBuilder.registerTypeAdapter(Answer[].class, new ArrayTypeAdaptor()); - } - - public static GsonBuilder getBuilder() { - return s_gBuilder; - } +public class GsonHelper { + private static final Logger s_logger = Logger.getLogger(GsonHelper.class); + + protected static final Gson s_gson; + protected static final Gson s_gogger; + + static { + GsonBuilder gsonBuilder = new GsonBuilder(); + s_gson = setDefaultGsonConfig(gsonBuilder); + GsonBuilder loggerBuilder = new GsonBuilder(); + loggerBuilder.disableHtmlEscaping(); + loggerBuilder.setExclusionStrategies(new LoggingExclusionStrategy(s_logger)); + s_gogger = setDefaultGsonConfig(loggerBuilder); + s_logger.info("Default Builder inited."); + } + + static Gson setDefaultGsonConfig(GsonBuilder builder) { + builder.setVersion(1.5); + ArrayTypeAdaptor cmdAdaptor = new ArrayTypeAdaptor(); + builder.registerTypeAdapter(Command[].class, cmdAdaptor); + ArrayTypeAdaptor ansAdaptor = new ArrayTypeAdaptor(); + builder.registerTypeAdapter(Answer[].class, ansAdaptor); + builder.registerTypeAdapter(new TypeToken>() { + }.getType(), new PortConfigListTypeAdaptor()); + builder.registerTypeAdapter(new TypeToken>() { + }.getType(), new NwGroupsCommandTypeAdaptor()); + Gson gson = builder.create(); + cmdAdaptor.initGson(gson); + ansAdaptor.initGson(gson); + return gson; + } + + public final static Gson getGson() { + return s_gson; + } + + public final static Gson getGsonLogger() { + return s_gogger; + } } diff --git a/core/src/com/cloud/serializer/SerializerHelper.java b/core/src/com/cloud/serializer/SerializerHelper.java index 25a6d18a6dd..e83d6672b83 100644 --- a/core/src/com/cloud/serializer/SerializerHelper.java +++ b/core/src/com/cloud/serializer/SerializerHelper.java @@ -38,25 +38,25 @@ import com.google.gson.Gson; public class SerializerHelper { public static final Logger s_logger = Logger.getLogger(SerializerHelper.class.getName()); public static String token = "/"; - - public static String toSerializedStringOld(Object result) { - if(result != null) { - Class clz = result.getClass(); - Gson gson = GsonHelper.getBuilder().create(); - return clz.getName() + token + gson.toJson(result); - } - return null; - } - public static Object fromSerializedString(String result) { - try { - if(result != null && !result.isEmpty()) { - - String[] serializedParts = result.split(token); + public static String toSerializedStringOld(Object result) { + if(result != null) { + Class clz = result.getClass(); + Gson gson = GsonHelper.getGson(); + return clz.getName() + token + gson.toJson(result); + } + return null; + } - if (serializedParts.length < 2) { - return null; - } + public static Object fromSerializedString(String result) { + try { + if(result != null && !result.isEmpty()) { + + String[] serializedParts = result.split(token); + + if (serializedParts.length < 2) { + return null; + } String clzName = serializedParts[0]; String nameField = null; String content = null; @@ -68,117 +68,123 @@ public class SerializerHelper { content = result.substring(index + nameField.length() + 2); } - Class clz; - try { - clz = Class.forName(clzName); - } catch (ClassNotFoundException e) { - return null; - } - - Gson gson = GsonHelper.getBuilder().create(); - Object obj = gson.fromJson(content, clz); - return obj; - } - return null; - } catch(RuntimeException e) { - s_logger.error("Caught runtime exception when doing GSON deserialization on: " + result); - throw e; - } - } - - public static List> toPairList(Object o, String name) { - List> l = new ArrayList>(); - return appendPairList(l, o, name); - } - - public static List> appendPairList(List> l, Object o, String name) { - if(o != null) { - Class clz = o.getClass(); - - if(clz.isPrimitive() || clz.getSuperclass() == Number.class || clz == String.class || clz == Date.class) { - l.add(new Pair(name, o.toString())); - return l; - } - - for(Field f : clz.getDeclaredFields()) { - if((f.getModifiers() & Modifier.STATIC) != 0) - continue; - - Param param = f.getAnnotation(Param.class); - if(param == null) - continue; - - String propName = f.getName(); - if(!param.propName().isEmpty()) - propName = param.propName(); - - String paramName = param.name(); - if(paramName.isEmpty()) - paramName = propName; - - Method method = getGetMethod(o, propName); - if(method != null) { - try { - Object fieldValue = method.invoke(o); - if(fieldValue != null) { - if (f.getType() == Date.class) { - l.add(new Pair(paramName, DateUtil.getOutputString((Date)fieldValue))); - } else { - l.add(new Pair(paramName, fieldValue.toString())); - } - } - //else - // l.add(new Pair(paramName, "")); - } catch (IllegalArgumentException e) { - s_logger.error("Illegal argument exception when calling POJO " + o.getClass().getName() + " get method for property: " + propName); - - } catch (IllegalAccessException e) { - s_logger.error("Illegal access exception when calling POJO " + o.getClass().getName() + " get method for property: " + propName); - } catch (InvocationTargetException e) { - s_logger.error("Invocation target exception when calling POJO " + o.getClass().getName() + " get method for property: " + propName); - } - } - } - } - return l; - } - - private static Method getGetMethod(Object o, String propName) { - Method method = null; - String methodName = getGetMethodName("get", propName); - try { - method = o.getClass().getMethod(methodName); - } catch (SecurityException e1) { - s_logger.error("Security exception in getting POJO " + o.getClass().getName() + " get method for property: " + propName); - } catch (NoSuchMethodException e1) { - if(s_logger.isTraceEnabled()) - s_logger.trace("POJO " + o.getClass().getName() + " does not have " + methodName + "() method for property: " + propName + ", will check is-prefixed method to see if it is boolean property"); - } - - if(method != null) - return method; - - methodName = getGetMethodName("is", propName); - try { - method = o.getClass().getMethod(methodName); - } catch (SecurityException e1) { - s_logger.error("Security exception in getting POJO " + o.getClass().getName() + " get method for property: " + propName); - } catch (NoSuchMethodException e1) { - s_logger.warn("POJO " + o.getClass().getName() + " does not have " + methodName + "() method for property: " + propName); - } - return method; - } - - private static String getGetMethodName(String prefix, String fieldName) { - StringBuffer sb = new StringBuffer(prefix); - - if(fieldName.length() >= prefix.length() && fieldName.substring(0, prefix.length()).equals(prefix)) { - return fieldName; - } else { - sb.append(fieldName.substring(0, 1).toUpperCase()); - sb.append(fieldName.substring(1)); - } - - return sb.toString(); - } + Class clz; + try { + clz = Class.forName(clzName); + } catch (ClassNotFoundException e) { + return null; + } + + Gson gson = GsonHelper.getGson(); + Object obj = gson.fromJson(content, clz); + return obj; + } + return null; + } catch(RuntimeException e) { + s_logger.error("Caught runtime exception when doing GSON deserialization on: " + result); + throw e; + } + } + + public static List> toPairList(Object o, String name) { + List> l = new ArrayList>(); + return appendPairList(l, o, name); + } + + public static List> appendPairList(List> l, Object o, String name) { + if(o != null) { + Class clz = o.getClass(); + + if(clz.isPrimitive() || clz.getSuperclass() == Number.class || clz == String.class || clz == Date.class) { + l.add(new Pair(name, o.toString())); + return l; + } + + for(Field f : clz.getDeclaredFields()) { + if((f.getModifiers() & Modifier.STATIC) != 0) { + continue; + } + + Param param = f.getAnnotation(Param.class); + if(param == null) { + continue; + } + + String propName = f.getName(); + if(!param.propName().isEmpty()) { + propName = param.propName(); + } + + String paramName = param.name(); + if(paramName.isEmpty()) { + paramName = propName; + } + + Method method = getGetMethod(o, propName); + if(method != null) { + try { + Object fieldValue = method.invoke(o); + if(fieldValue != null) { + if (f.getType() == Date.class) { + l.add(new Pair(paramName, DateUtil.getOutputString((Date)fieldValue))); + } else { + l.add(new Pair(paramName, fieldValue.toString())); + } + } + //else + // l.add(new Pair(paramName, "")); + } catch (IllegalArgumentException e) { + s_logger.error("Illegal argument exception when calling POJO " + o.getClass().getName() + " get method for property: " + propName); + + } catch (IllegalAccessException e) { + s_logger.error("Illegal access exception when calling POJO " + o.getClass().getName() + " get method for property: " + propName); + } catch (InvocationTargetException e) { + s_logger.error("Invocation target exception when calling POJO " + o.getClass().getName() + " get method for property: " + propName); + } + } + } + } + return l; + } + + private static Method getGetMethod(Object o, String propName) { + Method method = null; + String methodName = getGetMethodName("get", propName); + try { + method = o.getClass().getMethod(methodName); + } catch (SecurityException e1) { + s_logger.error("Security exception in getting POJO " + o.getClass().getName() + " get method for property: " + propName); + } catch (NoSuchMethodException e1) { + if(s_logger.isTraceEnabled()) { + s_logger.trace("POJO " + o.getClass().getName() + " does not have " + methodName + "() method for property: " + propName + ", will check is-prefixed method to see if it is boolean property"); + } + } + + if(method != null) { + return method; + } + + methodName = getGetMethodName("is", propName); + try { + method = o.getClass().getMethod(methodName); + } catch (SecurityException e1) { + s_logger.error("Security exception in getting POJO " + o.getClass().getName() + " get method for property: " + propName); + } catch (NoSuchMethodException e1) { + s_logger.warn("POJO " + o.getClass().getName() + " does not have " + methodName + "() method for property: " + propName); + } + return method; + } + + private static String getGetMethodName(String prefix, String fieldName) { + StringBuffer sb = new StringBuffer(prefix); + + if(fieldName.length() >= prefix.length() && fieldName.substring(0, prefix.length()).equals(prefix)) { + return fieldName; + } else { + sb.append(fieldName.substring(0, 1).toUpperCase()); + sb.append(fieldName.substring(1)); + } + + return sb.toString(); + } } diff --git a/core/test/com/cloud/agent/api/transport/RequestTest.java b/core/test/com/cloud/agent/api/transport/RequestTest.java index 66b06deb051..9fcf843da34 100644 --- a/core/test/com/cloud/agent/api/transport/RequestTest.java +++ b/core/test/com/cloud/agent/api/transport/RequestTest.java @@ -41,19 +41,20 @@ public class RequestTest extends TestCase { GetHostStatsCommand cmd3 = new GetHostStatsCommand("hostguid", "hostname", 101); cmd2.addPortConfig("abc", "24", true, "eth0"); cmd2.addPortConfig("127.0.0.1", "44", false, "eth1"); - Request sreq = new Request(1, 2, 3, new Command[] { cmd1, cmd2, cmd3 }, true, true); + Request sreq = new Request(2, 3, new Command[] { cmd1, cmd2, cmd3 }, true, true); + sreq.setSequence(1); Logger logger = Logger.getLogger(Request.class); Level level = logger.getLevel(); logger.setLevel(Level.DEBUG); - sreq.log(1, "Debug"); + sreq.log("Debug", true); logger.setLevel(Level.TRACE); - sreq.log(1, "Trace"); + sreq.log("Trace", true); logger.setLevel(Level.INFO); - sreq.log(1, "Info"); + sreq.log("Info", true); logger.setLevel(level); diff --git a/server/src/com/cloud/cluster/ClusterManagerImpl.java b/server/src/com/cloud/cluster/ClusterManagerImpl.java index 8b8d4999ecc..1ce22e14125 100644 --- a/server/src/com/cloud/cluster/ClusterManagerImpl.java +++ b/server/src/com/cloud/cluster/ClusterManagerImpl.java @@ -79,31 +79,31 @@ public class ClusterManagerImpl implements ClusterManager { private static final Logger s_logger = Logger.getLogger(ClusterManagerImpl.class); private static final int EXECUTOR_SHUTDOWN_TIMEOUT = 1000; // 1 second - + private final List listeners = new ArrayList(); private final Map activePeers = new HashMap(); private int heartbeatInterval = ClusterManager.DEFAULT_HEARTBEAT_INTERVAL; private int heartbeatThreshold = ClusterManager.DEFAULT_HEARTBEAT_THRESHOLD; - + private final Map clusterPeers; private final Map asyncCalls; private final Gson gson; - + private AgentManager _agentMgr; - + private final ScheduledExecutorService _heartbeatScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Cluster-Heartbeat")); - + private final ExecutorService _notificationExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("Cluster-Notification")); private List _notificationMsgs = new ArrayList(); private Connection _heartbeatConnection = null; private final ExecutorService _executor; - + private ClusterServiceAdapter _currentServiceAdapter; - + private ManagementServerHostDao _mshostDao; private HostDao _hostDao; - + // // pay attention to _mshostId and _msid // _mshostId is the primary key of management host table @@ -112,74 +112,74 @@ public class ClusterManagerImpl implements ClusterManager { private Long _mshostId = null; protected long _msid = ManagementServerNode.getManagementServerId(); protected long _runId = System.currentTimeMillis(); - + private boolean _peerScanInited = false; - + private String _name; private String _clusterNodeIP = "127.0.0.1"; - + public ClusterManagerImpl() { - clusterPeers = new HashMap(); - asyncCalls = new HashMap(); - - gson = GsonHelper.getBuilder().create(); - - // executor to perform remote-calls in another thread context, to avoid potential - // recursive remote calls between nodes - // - _executor = Executors.newCachedThreadPool(new NamedThreadFactory("Cluster-Worker")); + clusterPeers = new HashMap(); + asyncCalls = new HashMap(); + + gson = GsonHelper.getGson(); + + // executor to perform remote-calls in another thread context, to avoid potential + // recursive remote calls between nodes + // + _executor = Executors.newCachedThreadPool(new NamedThreadFactory("Cluster-Worker")); } - + @Override public Answer[] sendToAgent(Long hostId, Command [] cmds, boolean stopOnError) - throws AgentUnavailableException, OperationTimedoutException { + throws AgentUnavailableException, OperationTimedoutException { Commands commands = new Commands(stopOnError ? OnError.Stop : OnError.Continue); for (Command cmd : cmds) { commands.addCommand(cmd); } - return _agentMgr.send(hostId, commands); + return _agentMgr.send(hostId, commands); } - + @Override public long sendToAgent(Long hostId, Command[] cmds, boolean stopOnError, Listener listener) - throws AgentUnavailableException { + throws AgentUnavailableException { Commands commands = new Commands(stopOnError ? OnError.Stop : OnError.Continue); for (Command cmd : cmds) { commands.addCommand(cmd); } - return _agentMgr.send(hostId, commands, listener); + return _agentMgr.send(hostId, commands, listener); } - + @Override public boolean executeAgentUserRequest(long agentId, Event event) throws AgentUnavailableException { - return _agentMgr.executeUserRequest(agentId, event); + return _agentMgr.executeUserRequest(agentId, event); } - + @Override public Boolean propagateAgentEvent(long agentId, Event event) throws AgentUnavailableException { final String msPeer = getPeerName(agentId); if (msPeer == null) { - return null; + return null; } - - if (s_logger.isDebugEnabled()) { - s_logger.debug("Propagating agent change request event:" + event.toString() + " to agent:"+ agentId); - } - Command[] cmds = new Command[1]; - cmds[0] = new ChangeAgentCommand(agentId, event); - + + if (s_logger.isDebugEnabled()) { + s_logger.debug("Propagating agent change request event:" + event.toString() + " to agent:"+ agentId); + } + Command[] cmds = new Command[1]; + cmds[0] = new ChangeAgentCommand(agentId, event); + Answer[] answers = execute(msPeer, agentId, cmds, true); if (answers == null) { - throw new AgentUnavailableException(agentId); + throw new AgentUnavailableException(agentId); } - + if (s_logger.isDebugEnabled()) { - s_logger.debug("Result for agent change is " + answers[0].getResult()); + s_logger.debug("Result for agent change is " + answers[0].getResult()); } - + return answers[0].getResult(); } - + /** * called by DatabaseUpgradeChecker to see if there are other peers running. * @param notVersion If version is passed in, the peers CANNOT be running at this @@ -190,788 +190,792 @@ public class ClusterManagerImpl implements ClusterManager { public static final boolean arePeersRunning(String notVersion) { return false; //TODO: Leaving this for Kelven to take care of. } - + @Override public void broadcast(long agentId, Command[] cmds) { - Date cutTime = DateUtil.currentGMTTime(); + Date cutTime = DateUtil.currentGMTTime(); - List peers = _mshostDao.getActiveList(new Date(cutTime.getTime() - heartbeatThreshold)); - for (ManagementServerHostVO peer : peers) { - String peerName = Long.toString(peer.getMsid()); - if (getSelfPeerName().equals(peerName)) { - continue; // Skip myself. - } - try { - if (s_logger.isDebugEnabled()) { - s_logger.debug("Forwarding " + cmds[0].toString() + " to " + peer.getMsid()); - } - Answer[] answers = execute(peerName, agentId, cmds, true); - } catch (Exception e) { - s_logger.warn("Caught exception while talkign to " + peer.getMsid()); - } - } + List peers = _mshostDao.getActiveList(new Date(cutTime.getTime() - heartbeatThreshold)); + for (ManagementServerHostVO peer : peers) { + String peerName = Long.toString(peer.getMsid()); + if (getSelfPeerName().equals(peerName)) { + continue; // Skip myself. + } + try { + if (s_logger.isDebugEnabled()) { + s_logger.debug("Forwarding " + cmds[0].toString() + " to " + peer.getMsid()); + } + Answer[] answers = execute(peerName, agentId, cmds, true); + } catch (Exception e) { + s_logger.warn("Caught exception while talkign to " + peer.getMsid()); + } + } } - + @Override public Answer[] execute(String strPeer, long agentId, Command [] cmds, boolean stopOnError) { - ClusterService peerService = null; - - if(s_logger.isDebugEnabled()) { + ClusterService peerService = null; + + if(s_logger.isDebugEnabled()) { s_logger.debug(getSelfPeerName() + " -> " + strPeer + "." + agentId + " " + - gson.toJson(cmds, Command[].class)); + gson.toJson(cmds, Command[].class)); } - - for(int i = 0; i < 2; i++) { - try { - peerService = getPeerService(strPeer); - } catch (RemoteException e) { - s_logger.error("Unable to get cluster service on peer : " + strPeer); - } - - if(peerService != null) { - try { - if(s_logger.isDebugEnabled()) { + + for(int i = 0; i < 2; i++) { + try { + peerService = getPeerService(strPeer); + } catch (RemoteException e) { + s_logger.error("Unable to get cluster service on peer : " + strPeer); + } + + if(peerService != null) { + try { + if(s_logger.isDebugEnabled()) { s_logger.debug("Send " + getSelfPeerName() + " -> " + strPeer + "." + agentId + " to remote"); } - - long startTick = System.currentTimeMillis(); - String strResult = peerService.execute(getSelfPeerName(), agentId, gson.toJson(cmds, Command[].class), stopOnError); - if(s_logger.isDebugEnabled()) { + + long startTick = System.currentTimeMillis(); + String strResult = peerService.execute(getSelfPeerName(), agentId, gson.toJson(cmds, Command[].class), stopOnError); + if(s_logger.isDebugEnabled()) { s_logger.debug("Completed " + getSelfPeerName() + " -> " + strPeer + "." + agentId + "in " + - (System.currentTimeMillis() - startTick) + " ms, result: " + strResult); + (System.currentTimeMillis() - startTick) + " ms, result: " + strResult); } - - if(strResult != null) { - try { - return gson.fromJson(strResult, Answer[].class); - } catch(Throwable e) { - s_logger.error("Exception on parsing gson package from remote call to " + strPeer); - } - } - - return null; - } catch (RemoteException e) { - invalidatePeerService(strPeer); - - if(s_logger.isInfoEnabled()) { + + if(strResult != null) { + try { + return gson.fromJson(strResult, Answer[].class); + } catch(Throwable e) { + s_logger.error("Exception on parsing gson package from remote call to " + strPeer); + } + } + + return null; + } catch (RemoteException e) { + invalidatePeerService(strPeer); + + if(s_logger.isInfoEnabled()) { s_logger.info("Exception on remote execution, peer: " + strPeer + ", iteration: " - + i + ", exception message :" + e.getMessage()); + + i + ", exception message :" + e.getMessage()); } - } - } - } - - return null; + } + } + } + + return null; } - + @Override public long executeAsync(String strPeer, long agentId, Command[] cmds, boolean stopOnError, Listener listener) { - - ClusterService peerService = null; - - if(s_logger.isDebugEnabled()) { + + ClusterService peerService = null; + + if(s_logger.isDebugEnabled()) { s_logger.debug("Async " + getSelfPeerName() + " -> " + strPeer + "." + agentId + " " + - gson.toJson(cmds, Command[].class)); + gson.toJson(cmds, Command[].class)); } - - for(int i = 0; i < 2; i++) { - try { - peerService = getPeerService(strPeer); - } catch (RemoteException e) { - s_logger.error("Unable to get cluster service on peer : " + strPeer); - } - - if(peerService != null) { - try { - long seq = 0; - synchronized(String.valueOf(agentId).intern()) { - if(s_logger.isDebugEnabled()) { + + for(int i = 0; i < 2; i++) { + try { + peerService = getPeerService(strPeer); + } catch (RemoteException e) { + s_logger.error("Unable to get cluster service on peer : " + strPeer); + } + + if(peerService != null) { + try { + long seq = 0; + synchronized(String.valueOf(agentId).intern()) { + if(s_logger.isDebugEnabled()) { s_logger.debug("Send Async " + getSelfPeerName() + " -> " + strPeer + "." + agentId + " to remote"); } - long startTick = System.currentTimeMillis(); - seq = peerService.executeAsync(getSelfPeerName(), agentId, gson.toJson(cmds, Command[].class), stopOnError); - - if(seq > 0) { - if(s_logger.isDebugEnabled()) { + long startTick = System.currentTimeMillis(); + seq = peerService.executeAsync(getSelfPeerName(), agentId, gson.toJson(cmds, Command[].class), stopOnError); + + if(seq > 0) { + if(s_logger.isDebugEnabled()) { s_logger.debug("Completed Async " + getSelfPeerName() + " -> " + strPeer + "." + agentId - + " in " + (System.currentTimeMillis() - startTick) + " ms" - + ", register local listener " + strPeer + "/" + seq); + + " in " + (System.currentTimeMillis() - startTick) + " ms" + + ", register local listener " + strPeer + "/" + seq); } - - registerAsyncCall(strPeer, seq, listener); - } else { - s_logger.warn("Completed Async " + getSelfPeerName() + " -> " + strPeer + "." + agentId - + " in " + (System.currentTimeMillis() - startTick) + " ms, return indicates failure, seq: " + seq); - } - } - return seq; - } catch (RemoteException e) { - invalidatePeerService(strPeer); - - if(s_logger.isInfoEnabled()) { + + registerAsyncCall(strPeer, seq, listener); + } else { + s_logger.warn("Completed Async " + getSelfPeerName() + " -> " + strPeer + "." + agentId + + " in " + (System.currentTimeMillis() - startTick) + " ms, return indicates failure, seq: " + seq); + } + } + return seq; + } catch (RemoteException e) { + invalidatePeerService(strPeer); + + if(s_logger.isInfoEnabled()) { s_logger.info("Exception on remote execution -> " + strPeer + ", iteration : " + i); } - } - } - } - - return 0L; - } - - @Override - public boolean onAsyncResult(String executingPeer, long agentId, long seq, Answer[] answers) { - - if(s_logger.isDebugEnabled()) { - s_logger.debug("Process Async-call result from remote peer " + executingPeer + ", {" + - agentId + "-" + seq + "} answers: " + (answers != null ? gson.toJson(answers, Answer[].class): "null")); + } + } } - Listener listener = null; - synchronized(String.valueOf(agentId).intern()) { - // need to synchronize it with executeAsync() to make sure listener have been registered - // before this callback reaches back - listener = getAsyncCallListener(executingPeer, seq); - } - - if(listener != null) { - long startTick = System.currentTimeMillis(); - - if(s_logger.isDebugEnabled()) { + return 0L; + } + + @Override + public boolean onAsyncResult(String executingPeer, long agentId, long seq, Answer[] answers) { + + if(s_logger.isDebugEnabled()) { + s_logger.debug("Process Async-call result from remote peer " + executingPeer + ", {" + + agentId + "-" + seq + "} answers: " + (answers != null ? gson.toJson(answers, Answer[].class): "null")); + } + + Listener listener = null; + synchronized(String.valueOf(agentId).intern()) { + // need to synchronize it with executeAsync() to make sure listener have been registered + // before this callback reaches back + listener = getAsyncCallListener(executingPeer, seq); + } + + if(listener != null) { + long startTick = System.currentTimeMillis(); + + if(s_logger.isDebugEnabled()) { s_logger.debug("Processing answer {" + agentId + "-" + seq + "} from remote peer " + executingPeer); } - - listener.processAnswers(agentId, seq, answers); - - if(s_logger.isDebugEnabled()) { + + listener.processAnswers(agentId, seq, answers); + + if(s_logger.isDebugEnabled()) { s_logger.debug("Answer {" + agentId + "-" + seq + "} is processed in " + - (System.currentTimeMillis() - startTick) + " ms"); + (System.currentTimeMillis() - startTick) + " ms"); } - - if(!listener.isRecurring()) { - if(s_logger.isDebugEnabled()) { + + if(!listener.isRecurring()) { + if(s_logger.isDebugEnabled()) { s_logger.debug("Listener is not recurring after async-result callback {" + - agentId + "-" + seq + "}, unregister it"); + agentId + "-" + seq + "}, unregister it"); } - - unregisterAsyncCall(executingPeer, seq); - } else { - if(s_logger.isDebugEnabled()) { + + unregisterAsyncCall(executingPeer, seq); + } else { + if(s_logger.isDebugEnabled()) { s_logger.debug("Listener is recurring after async-result callback {" + agentId - +"-" + seq + "}, will keep it"); + +"-" + seq + "}, will keep it"); } - return true; - } - } else { - if(s_logger.isInfoEnabled()) { - s_logger.info("Async-call Listener has not been registered yet for {" + agentId - +"-" + seq + "}"); + return true; } - } - return false; + } else { + if(s_logger.isInfoEnabled()) { + s_logger.info("Async-call Listener has not been registered yet for {" + agentId + +"-" + seq + "}"); + } + } + return false; } - + @Override public boolean forwardAnswer(String targetPeer, long agentId, long seq, Answer[] answers) { - if(s_logger.isDebugEnabled()) { + if(s_logger.isDebugEnabled()) { s_logger.debug("Forward -> " + targetPeer + " Async-call answer {" + agentId + "-" + seq + - "} " + (answers != null? gson.toJson(answers, Answer[].class):"")); + "} " + (answers != null? gson.toJson(answers, Answer[].class):"")); } - - final String targetPeerF = targetPeer; - final Answer[] answersF = answers; - final long agentIdF = agentId; - final long seqF = seq; - - ClusterService peerService = null; - - for(int i = 0; i < 2; i++) { - try { - peerService = getPeerService(targetPeerF); - } catch (RemoteException e) { - s_logger.error("cluster service for peer " + targetPeerF + " no longer exists"); - } - - if(peerService != null) { - try { - boolean result = false; - - long startTick = System.currentTimeMillis(); - if(s_logger.isDebugEnabled()) { + + final String targetPeerF = targetPeer; + final Answer[] answersF = answers; + final long agentIdF = agentId; + final long seqF = seq; + + ClusterService peerService = null; + + for(int i = 0; i < 2; i++) { + try { + peerService = getPeerService(targetPeerF); + } catch (RemoteException e) { + s_logger.error("cluster service for peer " + targetPeerF + " no longer exists"); + } + + if(peerService != null) { + try { + boolean result = false; + + long startTick = System.currentTimeMillis(); + if(s_logger.isDebugEnabled()) { s_logger.debug("Start forwarding Async-call answer {" + agentId + "-" + seq + "} to remote"); } - - result = peerService.onAsyncResult(getSelfPeerName(), agentIdF, seqF, gson.toJson(answersF, Answer[].class)); - - if(s_logger.isDebugEnabled()) { + + result = peerService.onAsyncResult(getSelfPeerName(), agentIdF, seqF, gson.toJson(answersF, Answer[].class)); + + if(s_logger.isDebugEnabled()) { s_logger.debug("Completed forwarding Async-call answer {" + agentId + "-" + seq + "} in " + - (System.currentTimeMillis() - startTick) + " ms, return result: " + result); + (System.currentTimeMillis() - startTick) + " ms, return result: " + result); } - - return result; - } catch (RemoteException e) { - s_logger.warn("Exception in performing remote call, ", e); - invalidatePeerService(targetPeerF); - } - } else { - s_logger.warn("Remote peer " + targetPeer + " no longer exists to process answer {" + agentId + "-" - + seq + "}"); - } - } - - return false; + + return result; + } catch (RemoteException e) { + s_logger.warn("Exception in performing remote call, ", e); + invalidatePeerService(targetPeerF); + } + } else { + s_logger.warn("Remote peer " + targetPeer + " no longer exists to process answer {" + agentId + "-" + + seq + "}"); + } + } + + return false; } - - @Override + + @Override public String getPeerName(long agentHostId) { - - HostVO host = _hostDao.findById(agentHostId); - if(host != null && host.getManagementServerId() != null) { - if(getSelfPeerName().equals(Long.toString(host.getManagementServerId()))) { + + HostVO host = _hostDao.findById(agentHostId); + if(host != null && host.getManagementServerId() != null) { + if(getSelfPeerName().equals(Long.toString(host.getManagementServerId()))) { return null; } - - return Long.toString(host.getManagementServerId()); - } - return null; - } - - @Override - public ManagementServerHostVO getPeer(String mgmtServerId) { - return _mshostDao.findByMsid(Long.valueOf(mgmtServerId)); - } - - @Override - public String getSelfPeerName() { - return Long.toString(_msid); - } - @Override - public String getSelfNodeIP() { - return _clusterNodeIP; - } - - @Override - public void registerListener(ClusterManagerListener listener) { - // Note : we don't check duplicates - synchronized(listeners) { - listeners.add(listener); - } - } - - @Override - public void unregisterListener(ClusterManagerListener listener) { - synchronized(listeners) { - listeners.remove(listener); - } - } - - public void notifyNodeJoined(List nodeList) { - synchronized(listeners) { - for(ClusterManagerListener listener : listeners) { - listener.onManagementNodeJoined(nodeList, _mshostId); - } - } - - SubscriptionMgr.getInstance().notifySubscribers(ClusterManager.ALERT_SUBJECT, this, - new ClusterNodeJoinEventArgs(_mshostId, nodeList)); - } - - public void notifyNodeLeft(List nodeList) { - synchronized(listeners) { - for(ClusterManagerListener listener : listeners) { - listener.onManagementNodeLeft(nodeList, _mshostId); - } - } - - SubscriptionMgr.getInstance().notifySubscribers(ClusterManager.ALERT_SUBJECT, this, - new ClusterNodeLeftEventArgs(_mshostId, nodeList)); - } - - public void notifyNodeIsolated() { - synchronized(listeners) { - for(ClusterManagerListener listener : listeners) { - listener.onManagementNodeIsolated(); - } - } - } - - public ClusterService getPeerService(String strPeer) throws RemoteException { - synchronized(clusterPeers) { - if(clusterPeers.containsKey(strPeer)) { + return Long.toString(host.getManagementServerId()); + } + return null; + } + + @Override + public ManagementServerHostVO getPeer(String mgmtServerId) { + return _mshostDao.findByMsid(Long.valueOf(mgmtServerId)); + } + + @Override + public String getSelfPeerName() { + return Long.toString(_msid); + } + + @Override + public String getSelfNodeIP() { + return _clusterNodeIP; + } + + @Override + public void registerListener(ClusterManagerListener listener) { + // Note : we don't check duplicates + synchronized(listeners) { + listeners.add(listener); + } + } + + @Override + public void unregisterListener(ClusterManagerListener listener) { + synchronized(listeners) { + listeners.remove(listener); + } + } + + public void notifyNodeJoined(List nodeList) { + synchronized(listeners) { + for(ClusterManagerListener listener : listeners) { + listener.onManagementNodeJoined(nodeList, _mshostId); + } + } + + SubscriptionMgr.getInstance().notifySubscribers(ClusterManager.ALERT_SUBJECT, this, + new ClusterNodeJoinEventArgs(_mshostId, nodeList)); + } + + public void notifyNodeLeft(List nodeList) { + synchronized(listeners) { + for(ClusterManagerListener listener : listeners) { + listener.onManagementNodeLeft(nodeList, _mshostId); + } + } + + SubscriptionMgr.getInstance().notifySubscribers(ClusterManager.ALERT_SUBJECT, this, + new ClusterNodeLeftEventArgs(_mshostId, nodeList)); + } + + public void notifyNodeIsolated() { + synchronized(listeners) { + for(ClusterManagerListener listener : listeners) { + listener.onManagementNodeIsolated(); + } + } + } + + public ClusterService getPeerService(String strPeer) throws RemoteException { + synchronized(clusterPeers) { + if(clusterPeers.containsKey(strPeer)) { return clusterPeers.get(strPeer); } - } - - ClusterService service = _currentServiceAdapter.getPeerService(strPeer); - - if(service != null) { - synchronized(clusterPeers) { - // re-check the peer map again to deal with the - // race conditions - if(!clusterPeers.containsKey(strPeer)) { + } + + ClusterService service = _currentServiceAdapter.getPeerService(strPeer); + + if(service != null) { + synchronized(clusterPeers) { + // re-check the peer map again to deal with the + // race conditions + if(!clusterPeers.containsKey(strPeer)) { clusterPeers.put(strPeer, service); } - } - } - - return service; - } - - public void invalidatePeerService(String strPeer) { - synchronized(clusterPeers) { - if(clusterPeers.containsKey(strPeer)) { + } + } + + return service; + } + + public void invalidatePeerService(String strPeer) { + synchronized(clusterPeers) { + if(clusterPeers.containsKey(strPeer)) { clusterPeers.remove(strPeer); } - } - } - - private void registerAsyncCall(String strPeer, long seq, Listener listener) { - String key = strPeer + "/" + seq; - - synchronized(asyncCalls) { - if(!asyncCalls.containsKey(key)) { + } + } + + private void registerAsyncCall(String strPeer, long seq, Listener listener) { + String key = strPeer + "/" + seq; + + synchronized(asyncCalls) { + if(!asyncCalls.containsKey(key)) { asyncCalls.put(key, listener); } - } - } - - private Listener getAsyncCallListener(String strPeer, long seq) { - String key = strPeer + "/" + seq; - - synchronized(asyncCalls) { - if(asyncCalls.containsKey(key)) { + } + } + + private Listener getAsyncCallListener(String strPeer, long seq) { + String key = strPeer + "/" + seq; + + synchronized(asyncCalls) { + if(asyncCalls.containsKey(key)) { return asyncCalls.get(key); } - } - - return null; - } - - private void unregisterAsyncCall(String strPeer, long seq) { - String key = strPeer + "/" + seq; + } - synchronized(asyncCalls) { - if(asyncCalls.containsKey(key)) { + return null; + } + + private void unregisterAsyncCall(String strPeer, long seq) { + String key = strPeer + "/" + seq; + + synchronized(asyncCalls) { + if(asyncCalls.containsKey(key)) { asyncCalls.remove(key); } - } - } - - private Runnable getHeartbeatTask() { - return new Runnable() { - @Override + } + } + + private Runnable getHeartbeatTask() { + return new Runnable() { + @Override public void run() { - try { - if(s_logger.isTraceEnabled()) { + try { + if(s_logger.isTraceEnabled()) { s_logger.trace("Cluster manager heartbeat update, id:" + _mshostId); } - - Connection conn = getHeartbeatConnection(); - _mshostDao.update(conn, _mshostId, getCurrentRunId(), DateUtil.currentGMTTime()); - if(s_logger.isTraceEnabled()) { + Connection conn = getHeartbeatConnection(); + _mshostDao.update(conn, _mshostId, getCurrentRunId(), DateUtil.currentGMTTime()); + + if(s_logger.isTraceEnabled()) { s_logger.trace("Cluster manager peer-scan, id:" + _mshostId); } - - if(!_peerScanInited) { - _peerScanInited = true; - initPeerScan(conn); - } - - peerScan(conn); - } catch(CloudRuntimeException e) { - s_logger.error("Runtime DB exception ", e.getCause()); - - if(e.getCause() instanceof ClusterInvalidSessionException) { - s_logger.error("Invalid cluster session found"); - queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeIsolated)); - } - - if(isRootCauseConnectionRelated(e.getCause())) { - s_logger.error("DB communication problem detected"); - queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeIsolated)); - } - - invalidHeartbeatConnection(); - } catch (Throwable e) { - if(isRootCauseConnectionRelated(e.getCause())) { - s_logger.error("DB communication problem detected"); - queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeIsolated)); - } - - s_logger.error("Problem with the cluster heartbeat!", e); - } - } - }; - } - - private boolean isRootCauseConnectionRelated(Throwable e) { - while(e != null) { - if(e instanceof com.mysql.jdbc.CommunicationsException || e instanceof com.mysql.jdbc.exceptions.jdbc4.CommunicationsException) - return true; - - e = e.getCause(); - } - - return false; - } - - private Connection getHeartbeatConnection() throws SQLException { - if(_heartbeatConnection != null) { - return _heartbeatConnection; - } - - _heartbeatConnection = Transaction.getStandaloneConnectionWithException(); - return _heartbeatConnection; - } - - private void invalidHeartbeatConnection() { - if(_heartbeatConnection != null) { - try { - _heartbeatConnection.close(); - } catch (SQLException e) { - s_logger.warn("Unable to close hearbeat DB connection. ", e); - } - - _heartbeatConnection = null; - } - } - - private Runnable getNotificationTask() { - return new Runnable() { - @Override - public void run() { - while(true) { - synchronized(_notificationMsgs) { - try { - _notificationMsgs.wait(1000); - } catch (InterruptedException e) { - } - } - - ClusterManagerMessage msg = null; - while((msg = getNextNotificationMessage()) != null) { - try { - switch(msg.getMessageType()) { - case nodeAdded: - if(msg.getNodes() != null && msg.getNodes().size() > 0) { - Profiler profiler = new Profiler(); - profiler.start(); - - notifyNodeJoined(msg.getNodes()); - - profiler.stop(); - if(profiler.getDuration() > 1000) { - if(s_logger.isDebugEnabled()) { - s_logger.debug("Notifying management server join event took " + profiler.getDuration() + " ms"); - } - } else { - s_logger.warn("Notifying management server join event took " + profiler.getDuration() + " ms"); - } - } - break; - - case nodeRemoved: - if(msg.getNodes() != null && msg.getNodes().size() > 0) { - Profiler profiler = new Profiler(); - profiler.start(); - notifyNodeLeft(msg.getNodes()); - - profiler.stop(); - if(profiler.getDuration() > 1000) { - if(s_logger.isDebugEnabled()) { - s_logger.debug("Notifying management server leave event took " + profiler.getDuration() + " ms"); - } - } else { - s_logger.warn("Notifying management server leave event took " + profiler.getDuration() + " ms"); - } - } - break; - - case nodeIsolated: - notifyNodeIsolated(); - break; - - default : - assert(false); - break; - } - - } catch (Throwable e) { - s_logger.warn("Unexpected exception during cluster notification. ", e); - } - } - - try { Thread.currentThread().sleep(1000); } catch (InterruptedException e) {} - } - } - }; - } - - private void queueNotification(ClusterManagerMessage msg) { - synchronized(this._notificationMsgs) { - this._notificationMsgs.add(msg); - this._notificationMsgs.notifyAll(); - } - } - - private ClusterManagerMessage getNextNotificationMessage() { - synchronized(this._notificationMsgs) { - if(this._notificationMsgs.size() > 0) - return this._notificationMsgs.remove(0); - } - - return null; - } - - private void initPeerScan(Connection conn) { - // upon startup, for all inactive management server nodes that we see at startup time, we will send notification also to help upper layer perform - // missed cleanup - Date cutTime = DateUtil.currentGMTTime(); - List inactiveList = _mshostDao.getInactiveList(conn, new Date(cutTime.getTime() - heartbeatThreshold)); - if(inactiveList.size() > 0) { - this.queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeRemoved, inactiveList)); + if(!_peerScanInited) { + _peerScanInited = true; + initPeerScan(conn); + } + + peerScan(conn); + } catch(CloudRuntimeException e) { + s_logger.error("Runtime DB exception ", e.getCause()); + + if(e.getCause() instanceof ClusterInvalidSessionException) { + s_logger.error("Invalid cluster session found"); + queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeIsolated)); + } + + if(isRootCauseConnectionRelated(e.getCause())) { + s_logger.error("DB communication problem detected"); + queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeIsolated)); + } + + invalidHeartbeatConnection(); + } catch (Throwable e) { + if(isRootCauseConnectionRelated(e.getCause())) { + s_logger.error("DB communication problem detected"); + queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeIsolated)); + } + + s_logger.error("Problem with the cluster heartbeat!", e); + } + } + }; + } + + private boolean isRootCauseConnectionRelated(Throwable e) { + while(e != null) { + if(e instanceof com.mysql.jdbc.CommunicationsException || e instanceof com.mysql.jdbc.exceptions.jdbc4.CommunicationsException) { + return true; + } + + e = e.getCause(); } - } - - private void peerScan(Connection conn) { - Date cutTime = DateUtil.currentGMTTime(); - - List currentList = _mshostDao.getActiveList(conn, new Date(cutTime.getTime() - heartbeatThreshold)); - List removedNodeList = new ArrayList(); - List invalidatedNodeList = new ArrayList(); + return false; + } - if(_mshostId != null) { - // only if we have already attached to cluster, will we start to check leaving nodes - for(Map.Entry entry : activePeers.entrySet()) { + private Connection getHeartbeatConnection() throws SQLException { + if(_heartbeatConnection != null) { + return _heartbeatConnection; + } - ManagementServerHostVO current = getInListById(entry.getKey(), currentList); - if(current == null) { - if(entry.getKey().longValue() != _mshostId.longValue()) { - if(s_logger.isDebugEnabled()) { + _heartbeatConnection = Transaction.getStandaloneConnectionWithException(); + return _heartbeatConnection; + } + + private void invalidHeartbeatConnection() { + if(_heartbeatConnection != null) { + try { + _heartbeatConnection.close(); + } catch (SQLException e) { + s_logger.warn("Unable to close hearbeat DB connection. ", e); + } + + _heartbeatConnection = null; + } + } + + private Runnable getNotificationTask() { + return new Runnable() { + @Override + public void run() { + while(true) { + synchronized(_notificationMsgs) { + try { + _notificationMsgs.wait(1000); + } catch (InterruptedException e) { + } + } + + ClusterManagerMessage msg = null; + while((msg = getNextNotificationMessage()) != null) { + try { + switch(msg.getMessageType()) { + case nodeAdded: + if(msg.getNodes() != null && msg.getNodes().size() > 0) { + Profiler profiler = new Profiler(); + profiler.start(); + + notifyNodeJoined(msg.getNodes()); + + profiler.stop(); + if(profiler.getDuration() > 1000) { + if(s_logger.isDebugEnabled()) { + s_logger.debug("Notifying management server join event took " + profiler.getDuration() + " ms"); + } + } else { + s_logger.warn("Notifying management server join event took " + profiler.getDuration() + " ms"); + } + } + break; + + case nodeRemoved: + if(msg.getNodes() != null && msg.getNodes().size() > 0) { + Profiler profiler = new Profiler(); + profiler.start(); + + notifyNodeLeft(msg.getNodes()); + + profiler.stop(); + if(profiler.getDuration() > 1000) { + if(s_logger.isDebugEnabled()) { + s_logger.debug("Notifying management server leave event took " + profiler.getDuration() + " ms"); + } + } else { + s_logger.warn("Notifying management server leave event took " + profiler.getDuration() + " ms"); + } + } + break; + + case nodeIsolated: + notifyNodeIsolated(); + break; + + default : + assert(false); + break; + } + + } catch (Throwable e) { + s_logger.warn("Unexpected exception during cluster notification. ", e); + } + } + + try { Thread.currentThread().sleep(1000); } catch (InterruptedException e) {} + } + } + }; + } + + private void queueNotification(ClusterManagerMessage msg) { + synchronized(this._notificationMsgs) { + this._notificationMsgs.add(msg); + this._notificationMsgs.notifyAll(); + } + } + + private ClusterManagerMessage getNextNotificationMessage() { + synchronized(this._notificationMsgs) { + if(this._notificationMsgs.size() > 0) { + return this._notificationMsgs.remove(0); + } + } + + return null; + } + + private void initPeerScan(Connection conn) { + // upon startup, for all inactive management server nodes that we see at startup time, we will send notification also to help upper layer perform + // missed cleanup + Date cutTime = DateUtil.currentGMTTime(); + List inactiveList = _mshostDao.getInactiveList(conn, new Date(cutTime.getTime() - heartbeatThreshold)); + if(inactiveList.size() > 0) { + this.queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeRemoved, inactiveList)); + } + } + + private void peerScan(Connection conn) { + Date cutTime = DateUtil.currentGMTTime(); + + List currentList = _mshostDao.getActiveList(conn, new Date(cutTime.getTime() - heartbeatThreshold)); + + List removedNodeList = new ArrayList(); + List invalidatedNodeList = new ArrayList(); + + if(_mshostId != null) { + // only if we have already attached to cluster, will we start to check leaving nodes + for(Map.Entry entry : activePeers.entrySet()) { + + ManagementServerHostVO current = getInListById(entry.getKey(), currentList); + if(current == null) { + if(entry.getKey().longValue() != _mshostId.longValue()) { + if(s_logger.isDebugEnabled()) { s_logger.debug("Detected management node left, id:" + entry.getKey() + ", nodeIP:" + entry.getValue().getServiceIP()); } - removedNodeList.add(entry.getValue()); - } - } else { - if(current.getRunid() == 0) { - if(entry.getKey().longValue() != _mshostId.longValue()) { - if(s_logger.isDebugEnabled()) { - s_logger.debug("Detected management node left because of invalidated session, id:" + entry.getKey() + ", nodeIP:" + entry.getValue().getServiceIP()); - } - invalidatedNodeList.add(entry.getValue()); - } - } else { - if(entry.getValue().getRunid() != current.getRunid()) { - if(s_logger.isDebugEnabled()) { - s_logger.debug("Detected management node left and rejoined quickly, id:" + entry.getKey() + ", nodeIP:" + entry.getValue().getServiceIP()); - } - - entry.getValue().setRunid(current.getRunid()); - } - } - } - } - } - - // process invalidated node list - if(invalidatedNodeList.size() > 0) { - for(ManagementServerHostVO mshost : invalidatedNodeList) { - activePeers.remove(mshost.getId()); - try { - JmxUtil.unregisterMBean("ClusterManager", "Node " + mshost.getId()); - } catch(Exception e) { - s_logger.warn("Unable to deregiester cluster node from JMX monitoring due to exception " + e.toString()); - } - } - - this.queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeRemoved, invalidatedNodeList)); - } - - // process removed node list - Iterator it = removedNodeList.iterator(); - while(it.hasNext()) { - ManagementServerHostVO mshost = it.next(); - if(!pingManagementNode(mshost)) { - s_logger.warn("Management node " + mshost.getId() + " is detected inactive by timestamp and also not pingable"); - activePeers.remove(mshost.getId()); - _mshostDao.invalidateRunSession(conn, mshost.getId(), mshost.getRunid()); - try { - JmxUtil.unregisterMBean("ClusterManager", "Node " + mshost.getId()); - } catch(Exception e) { - s_logger.warn("Unable to deregiester cluster node from JMX monitoring due to exception " + e.toString()); - } - } else { - s_logger.info("Management node " + mshost.getId() + " is detected inactive by timestamp but is pingable"); - it.remove(); - } - } - - if(removedNodeList.size() > 0) - this.queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeRemoved, removedNodeList)); - - List newNodeList = new ArrayList(); - for(ManagementServerHostVO mshost : currentList) { - if(!activePeers.containsKey(mshost.getId())) { - activePeers.put(mshost.getId(), mshost); + removedNodeList.add(entry.getValue()); + } + } else { + if(current.getRunid() == 0) { + if(entry.getKey().longValue() != _mshostId.longValue()) { + if(s_logger.isDebugEnabled()) { + s_logger.debug("Detected management node left because of invalidated session, id:" + entry.getKey() + ", nodeIP:" + entry.getValue().getServiceIP()); + } + invalidatedNodeList.add(entry.getValue()); + } + } else { + if(entry.getValue().getRunid() != current.getRunid()) { + if(s_logger.isDebugEnabled()) { + s_logger.debug("Detected management node left and rejoined quickly, id:" + entry.getKey() + ", nodeIP:" + entry.getValue().getServiceIP()); + } - if(s_logger.isDebugEnabled()) { + entry.getValue().setRunid(current.getRunid()); + } + } + } + } + } + + // process invalidated node list + if(invalidatedNodeList.size() > 0) { + for(ManagementServerHostVO mshost : invalidatedNodeList) { + activePeers.remove(mshost.getId()); + try { + JmxUtil.unregisterMBean("ClusterManager", "Node " + mshost.getId()); + } catch(Exception e) { + s_logger.warn("Unable to deregiester cluster node from JMX monitoring due to exception " + e.toString()); + } + } + + this.queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeRemoved, invalidatedNodeList)); + } + + // process removed node list + Iterator it = removedNodeList.iterator(); + while(it.hasNext()) { + ManagementServerHostVO mshost = it.next(); + if(!pingManagementNode(mshost)) { + s_logger.warn("Management node " + mshost.getId() + " is detected inactive by timestamp and also not pingable"); + activePeers.remove(mshost.getId()); + _mshostDao.invalidateRunSession(conn, mshost.getId(), mshost.getRunid()); + try { + JmxUtil.unregisterMBean("ClusterManager", "Node " + mshost.getId()); + } catch(Exception e) { + s_logger.warn("Unable to deregiester cluster node from JMX monitoring due to exception " + e.toString()); + } + } else { + s_logger.info("Management node " + mshost.getId() + " is detected inactive by timestamp but is pingable"); + it.remove(); + } + } + + if(removedNodeList.size() > 0) { + this.queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeRemoved, removedNodeList)); + } + + List newNodeList = new ArrayList(); + for(ManagementServerHostVO mshost : currentList) { + if(!activePeers.containsKey(mshost.getId())) { + activePeers.put(mshost.getId(), mshost); + + if(s_logger.isDebugEnabled()) { s_logger.debug("Detected management node joined, id:" + mshost.getId() + ", nodeIP:" + mshost.getServiceIP()); } - newNodeList.add(mshost); - - try { - JmxUtil.registerMBean("ClusterManager", "Node " + mshost.getId(), new ClusterManagerMBeanImpl(this, mshost)); - } catch(Exception e) { - s_logger.warn("Unable to regiester cluster node into JMX monitoring due to exception " + ExceptionUtil.toString(e)); - } - } - } - - if(newNodeList.size() > 0) - this.queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeAdded, newNodeList)); - } - - private static ManagementServerHostVO getInListById(Long id, List l) { - for(ManagementServerHostVO mshost : l) { - if(mshost.getId() == id) { + newNodeList.add(mshost); + + try { + JmxUtil.registerMBean("ClusterManager", "Node " + mshost.getId(), new ClusterManagerMBeanImpl(this, mshost)); + } catch(Exception e) { + s_logger.warn("Unable to regiester cluster node into JMX monitoring due to exception " + ExceptionUtil.toString(e)); + } + } + } + + if(newNodeList.size() > 0) { + this.queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeAdded, newNodeList)); + } + } + + private static ManagementServerHostVO getInListById(Long id, List l) { + for(ManagementServerHostVO mshost : l) { + if(mshost.getId() == id) { return mshost; } - } - return null; - } - + } + return null; + } + @Override public String getName() { return _name; } - + @Override @DB public boolean start() { - if(s_logger.isInfoEnabled()) { + if(s_logger.isInfoEnabled()) { s_logger.info("Starting cluster manager, msid : " + _msid); } - + Transaction txn = Transaction.currentTxn(); try { - txn.start(); + txn.start(); - final Class c = this.getClass(); - String version = c.getPackage().getImplementationVersion(); - - ManagementServerHostVO mshost = _mshostDao.findByMsid(_msid); - if(mshost == null) { - mshost = new ManagementServerHostVO(); - mshost.setMsid(_msid); - mshost.setRunid(this.getCurrentRunId()); - mshost.setName(NetUtils.getHostName()); - mshost.setVersion(version); - mshost.setServiceIP(_clusterNodeIP); - mshost.setServicePort(_currentServiceAdapter.getServicePort()); - mshost.setLastUpdateTime(DateUtil.currentGMTTime()); - mshost.setRemoved(null); - mshost.setAlertCount(0); - mshost.setState(ManagementServerNode.State.Up); - _mshostDao.persist(mshost); - - if(s_logger.isInfoEnabled()) { + final Class c = this.getClass(); + String version = c.getPackage().getImplementationVersion(); + + ManagementServerHostVO mshost = _mshostDao.findByMsid(_msid); + if(mshost == null) { + mshost = new ManagementServerHostVO(); + mshost.setMsid(_msid); + mshost.setRunid(this.getCurrentRunId()); + mshost.setName(NetUtils.getHostName()); + mshost.setVersion(version); + mshost.setServiceIP(_clusterNodeIP); + mshost.setServicePort(_currentServiceAdapter.getServicePort()); + mshost.setLastUpdateTime(DateUtil.currentGMTTime()); + mshost.setRemoved(null); + mshost.setAlertCount(0); + mshost.setState(ManagementServerNode.State.Up); + _mshostDao.persist(mshost); + + if(s_logger.isInfoEnabled()) { s_logger.info("New instance of management server msid " + _msid + " is being started"); } - } else { - if(s_logger.isInfoEnabled()) { + } else { + if(s_logger.isInfoEnabled()) { s_logger.info("Management server " + _msid + " is being started"); } - - _mshostDao.update(mshost.getId(), getCurrentRunId(), NetUtils.getHostName(), version, - _clusterNodeIP, _currentServiceAdapter.getServicePort(), DateUtil.currentGMTTime()); - } - - txn.commit(); - - _mshostId = mshost.getId(); - if(s_logger.isInfoEnabled()) { + + _mshostDao.update(mshost.getId(), getCurrentRunId(), NetUtils.getHostName(), version, + _clusterNodeIP, _currentServiceAdapter.getServicePort(), DateUtil.currentGMTTime()); + } + + txn.commit(); + + _mshostId = mshost.getId(); + if(s_logger.isInfoEnabled()) { s_logger.info("Management server (host id : " + _mshostId + ") is available at " + _clusterNodeIP + ":" + _currentServiceAdapter.getServicePort()); } - // use seperated thread for heartbeat updates - _heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), heartbeatInterval, - heartbeatInterval, TimeUnit.MILLISECONDS); - _notificationExecutor.submit(getNotificationTask()); + // use seperated thread for heartbeat updates + _heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), heartbeatInterval, + heartbeatInterval, TimeUnit.MILLISECONDS); + _notificationExecutor.submit(getNotificationTask()); } catch (Throwable e) { - s_logger.error("Unexpected exception : ", e); + s_logger.error("Unexpected exception : ", e); txn.rollback(); - + throw new CloudRuntimeException("Unable to initialize cluster info into database"); } - if(s_logger.isInfoEnabled()) { + if(s_logger.isInfoEnabled()) { s_logger.info("Cluster manager is started"); } - + return true; } @Override public boolean stop() { - if(_mshostId != null) { + if(_mshostId != null) { _mshostDao.remove(_mshostId); } - - _heartbeatScheduler.shutdownNow(); - _executor.shutdownNow(); - - try { - _heartbeatScheduler.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); - _executor.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - } - - if(s_logger.isInfoEnabled()) { + + _heartbeatScheduler.shutdownNow(); + _executor.shutdownNow(); + + try { + _heartbeatScheduler.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + _executor.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + } + + if(s_logger.isInfoEnabled()) { s_logger.info("Cluster manager is stopped"); } - + return true; } @Override public boolean configure(String name, Map params) throws ConfigurationException { - if(s_logger.isInfoEnabled()) { + if(s_logger.isInfoEnabled()) { s_logger.info("Start configuring cluster manager : " + name); } _name = name; - + ComponentLocator locator = ComponentLocator.getCurrentLocator(); _agentMgr = locator.getManager(AgentManager.class); if (_agentMgr == null) { throw new ConfigurationException("Unable to get " + AgentManager.class.getName()); } - + _mshostDao = locator.getDao(ManagementServerHostDao.class); if(_mshostDao == null) { throw new ConfigurationException("Unable to get " + ManagementServerHostDao.class.getName()); } - + _hostDao = locator.getDao(HostDao.class); if(_hostDao == null) { throw new ConfigurationException("Unable to get " + HostDao.class.getName()); } - + ConfigurationDao configDao = locator.getDao(ConfigurationDao.class); if (configDao == null) { throw new ConfigurationException("Unable to get the configuration dao."); } - + Map configs = configDao.getConfiguration("management-server", params); - + String value = configs.get("cluster.heartbeat.interval"); if(value != null) { heartbeatInterval = NumbersUtil.parseInt(value, ClusterManager.DEFAULT_HEARTBEAT_INTERVAL); } - + value = configs.get("cluster.heartbeat.threshold"); if(value != null) { heartbeatThreshold = NumbersUtil.parseInt(value, ClusterManager.DEFAULT_HEARTBEAT_THRESHOLD); @@ -980,151 +984,151 @@ public class ClusterManagerImpl implements ClusterManager { File dbPropsFile = PropertiesUtil.findConfigFile("db.properties"); Properties dbProps = new Properties(); try { - dbProps.load(new FileInputStream(dbPropsFile)); - } catch (FileNotFoundException e) { + dbProps.load(new FileInputStream(dbPropsFile)); + } catch (FileNotFoundException e) { throw new ConfigurationException("Unable to find db.properties"); - } catch (IOException e) { + } catch (IOException e) { throw new ConfigurationException("Unable to load db.properties content"); - } + } _clusterNodeIP = dbProps.getProperty("cluster.node.IP"); if(_clusterNodeIP == null) { _clusterNodeIP = "127.0.0.1"; } _clusterNodeIP = _clusterNodeIP.trim(); - + if(s_logger.isInfoEnabled()) { s_logger.info("Cluster node IP : " + _clusterNodeIP); } - + if(!NetUtils.isLocalAddress(_clusterNodeIP)) { throw new ConfigurationException("cluster node IP should be valid local address where the server is running, please check your configuration"); } - + Adapters adapters = locator.getAdapters(ClusterServiceAdapter.class); if (adapters == null || !adapters.isSet()) { throw new ConfigurationException("Unable to get cluster service adapters"); } - Enumeration it = adapters.enumeration(); - if(it.hasMoreElements()) { + Enumeration it = adapters.enumeration(); + if(it.hasMoreElements()) { _currentServiceAdapter = it.nextElement(); } - - if(_currentServiceAdapter == null) { + + if(_currentServiceAdapter == null) { throw new ConfigurationException("Unable to set current cluster service adapter"); } - - checkConflicts(); - + + checkConflicts(); + if(s_logger.isInfoEnabled()) { s_logger.info("Cluster manager is configured."); } return true; } - + @Override public long getManagementNodeId() { return _msid; } - + @Override public long getCurrentRunId() { return _runId; } - + @Override public boolean isManagementNodeAlive(long msid) { - ManagementServerHostVO mshost = _mshostDao.findByMsid(msid); - if(mshost != null) { - if(mshost.getLastUpdateTime().getTime() >= DateUtil.currentGMTTime().getTime() - heartbeatThreshold) { + ManagementServerHostVO mshost = _mshostDao.findByMsid(msid); + if(mshost != null) { + if(mshost.getLastUpdateTime().getTime() >= DateUtil.currentGMTTime().getTime() - heartbeatThreshold) { return true; } - } - - return false; + } + + return false; } - + @Override public boolean pingManagementNode(long msid) { - ManagementServerHostVO mshost = _mshostDao.findByMsid(msid); - if(mshost == null) { + ManagementServerHostVO mshost = _mshostDao.findByMsid(msid); + if(mshost == null) { return false; } - - return pingManagementNode(mshost); + + return pingManagementNode(mshost); } - + private boolean pingManagementNode(ManagementServerHostVO mshost) { - String targetIp = mshost.getServiceIP(); - if("127.0.0.1".equals(targetIp) || "0.0.0.0".equals(targetIp)) { - s_logger.info("ping management node cluster service can not be performed on self"); - return false; - } - - String targetPeer = String.valueOf(mshost.getMsid()); - ClusterService peerService = null; - for(int i = 0; i < 2; i++) { - try { - peerService = getPeerService(targetPeer); - } catch (RemoteException e) { - s_logger.error("cluster service for peer " + targetPeer + " no longer exists"); - } - - if(peerService != null) { - try { - return peerService.ping(getSelfPeerName()); - } catch (RemoteException e) { - s_logger.warn("Exception in performing remote call, ", e); - invalidatePeerService(targetPeer); - } - } else { - s_logger.warn("Remote peer " + mshost.getMsid() + " no longer exists"); - } - } - - return false; + String targetIp = mshost.getServiceIP(); + if("127.0.0.1".equals(targetIp) || "0.0.0.0".equals(targetIp)) { + s_logger.info("ping management node cluster service can not be performed on self"); + return false; + } + + String targetPeer = String.valueOf(mshost.getMsid()); + ClusterService peerService = null; + for(int i = 0; i < 2; i++) { + try { + peerService = getPeerService(targetPeer); + } catch (RemoteException e) { + s_logger.error("cluster service for peer " + targetPeer + " no longer exists"); + } + + if(peerService != null) { + try { + return peerService.ping(getSelfPeerName()); + } catch (RemoteException e) { + s_logger.warn("Exception in performing remote call, ", e); + invalidatePeerService(targetPeer); + } + } else { + s_logger.warn("Remote peer " + mshost.getMsid() + " no longer exists"); + } + } + + return false; } - - + + @Override - public int getHeartbeatThreshold() { - return this.heartbeatThreshold; + public int getHeartbeatThreshold() { + return this.heartbeatThreshold; } - + public int getHeartbeatInterval() { - return this.heartbeatInterval; + return this.heartbeatInterval; } - + public void setHeartbeatThreshold(int threshold) { - heartbeatThreshold = threshold; + heartbeatThreshold = threshold; } - + private void checkConflicts() throws ConfigurationException { - Date cutTime = DateUtil.currentGMTTime(); - List peers = _mshostDao.getActiveList(new Date(cutTime.getTime() - heartbeatThreshold)); - for(ManagementServerHostVO peer : peers) { - String peerIP = peer.getServiceIP().trim(); - if(_clusterNodeIP.equals(peerIP)) { - if("127.0.0.1".equals(_clusterNodeIP)) { - if(pingManagementNode(peer.getMsid())) { - String msg = "Detected another management node with localhost IP is already running, please check your cluster configuration"; - s_logger.error(msg); - throw new ConfigurationException(msg); - } else { - String msg = "Detected another management node with localhost IP is considered as running in DB, however it is not pingable, we will continue cluster initialization with this management server node"; - s_logger.info(msg); - } - } else { - if(pingManagementNode(peer.getMsid())) { - String msg = "Detected that another management node with the same IP " + peer.getServiceIP() + " is already running, please check your cluster configuration"; - s_logger.error(msg); - throw new ConfigurationException(msg); - } else { - String msg = "Detected that another management node with the same IP " + peer.getServiceIP() + " is considered as running in DB, however it is not pingable, we will continue cluster initialization with this management server node"; - s_logger.info(msg); - } - } - } - } + Date cutTime = DateUtil.currentGMTTime(); + List peers = _mshostDao.getActiveList(new Date(cutTime.getTime() - heartbeatThreshold)); + for(ManagementServerHostVO peer : peers) { + String peerIP = peer.getServiceIP().trim(); + if(_clusterNodeIP.equals(peerIP)) { + if("127.0.0.1".equals(_clusterNodeIP)) { + if(pingManagementNode(peer.getMsid())) { + String msg = "Detected another management node with localhost IP is already running, please check your cluster configuration"; + s_logger.error(msg); + throw new ConfigurationException(msg); + } else { + String msg = "Detected another management node with localhost IP is considered as running in DB, however it is not pingable, we will continue cluster initialization with this management server node"; + s_logger.info(msg); + } + } else { + if(pingManagementNode(peer.getMsid())) { + String msg = "Detected that another management node with the same IP " + peer.getServiceIP() + " is already running, please check your cluster configuration"; + s_logger.error(msg); + throw new ConfigurationException(msg); + } else { + String msg = "Detected that another management node with the same IP " + peer.getServiceIP() + " is considered as running in DB, however it is not pingable, we will continue cluster initialization with this management server node"; + s_logger.info(msg); + } + } + } + } } } diff --git a/server/src/com/cloud/cluster/ClusterServiceServletHttpHandler.java b/server/src/com/cloud/cluster/ClusterServiceServletHttpHandler.java index 61fbac60f0d..68737cd48fc 100644 --- a/server/src/com/cloud/cluster/ClusterServiceServletHttpHandler.java +++ b/server/src/com/cloud/cluster/ClusterServiceServletHttpHandler.java @@ -18,293 +18,311 @@ package com.cloud.cluster; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.net.URLDecoder; - -import org.apache.commons.httpclient.HttpStatus; -import org.apache.http.HttpEntityEnclosingRequest; -import org.apache.http.HttpException; -import org.apache.http.HttpRequest; -import org.apache.http.HttpResponse; -import org.apache.http.entity.BasicHttpEntity; -import org.apache.http.protocol.HttpContext; -import org.apache.http.protocol.HttpRequestHandler; -import org.apache.http.util.EntityUtils; -import org.apache.log4j.Logger; - -import com.cloud.agent.Listener; -import com.cloud.agent.api.Answer; -import com.cloud.agent.api.ChangeAgentAnswer; -import com.cloud.agent.api.ChangeAgentCommand; -import com.cloud.agent.api.Command; -import com.cloud.exception.AgentUnavailableException; -import com.cloud.exception.OperationTimedoutException; -import com.cloud.serializer.GsonHelper; -import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.net.URLDecoder; + +import org.apache.commons.httpclient.HttpStatus; +import org.apache.http.HttpEntityEnclosingRequest; +import org.apache.http.HttpException; +import org.apache.http.HttpRequest; +import org.apache.http.HttpResponse; +import org.apache.http.entity.BasicHttpEntity; +import org.apache.http.protocol.HttpContext; +import org.apache.http.protocol.HttpRequestHandler; +import org.apache.http.util.EntityUtils; +import org.apache.log4j.Logger; + +import com.cloud.agent.Listener; +import com.cloud.agent.api.Answer; +import com.cloud.agent.api.ChangeAgentAnswer; +import com.cloud.agent.api.ChangeAgentCommand; +import com.cloud.agent.api.Command; +import com.cloud.exception.AgentUnavailableException; +import com.cloud.exception.OperationTimedoutException; +import com.cloud.serializer.GsonHelper; +import com.google.gson.Gson; public class ClusterServiceServletHttpHandler implements HttpRequestHandler { private static final Logger s_logger = Logger.getLogger(ClusterServiceServletHttpHandler.class); - + private final Gson gson; private final ClusterManager manager; public ClusterServiceServletHttpHandler(ClusterManager manager) { - this.manager = manager; + this.manager = manager; - gson = GsonHelper.getBuilder().create(); + gson = GsonHelper.getGson(); } - - @Override - public void handle(HttpRequest request, HttpResponse response, HttpContext context) - throws HttpException, IOException { - - try { - if(s_logger.isTraceEnabled()) - s_logger.trace("Start Handling cluster HTTP request"); - - parseRequest(request); - handleRequest(request, response); - - if(s_logger.isTraceEnabled()) - s_logger.trace("Handle cluster HTTP request done"); - - } catch(Throwable e) { - if(s_logger.isTraceEnabled()) - s_logger.trace("Unexpected exception " + e.toString()); - - writeResponse(response, HttpStatus.SC_INTERNAL_SERVER_ERROR, null); - } - } - - @SuppressWarnings("deprecation") - private void parseRequest(HttpRequest request) throws IOException { - if(request instanceof HttpEntityEnclosingRequest) { - HttpEntityEnclosingRequest entityRequest = (HttpEntityEnclosingRequest)request; - - String body = EntityUtils.toString(entityRequest.getEntity()); - if(body != null) { - String[] paramArray = body.split("&"); - if(paramArray != null) { - for (String paramEntry : paramArray) { - String[] paramValue = paramEntry.split("="); - if (paramValue.length != 2) - continue; - - String name = URLDecoder.decode(paramValue[0]); - String value = URLDecoder.decode(paramValue[1]); - - if(s_logger.isTraceEnabled()) - s_logger.trace("Parsed request parameter " + name + "=" + value); - request.getParams().setParameter(name, value); - } - } - } - } - } - - private void writeResponse(HttpResponse response, int statusCode, String content) { - if(content == null) - content = ""; - response.setStatusCode(statusCode); + + @Override + public void handle(HttpRequest request, HttpResponse response, HttpContext context) + throws HttpException, IOException { + + try { + if(s_logger.isTraceEnabled()) { + s_logger.trace("Start Handling cluster HTTP request"); + } + + parseRequest(request); + handleRequest(request, response); + + if(s_logger.isTraceEnabled()) { + s_logger.trace("Handle cluster HTTP request done"); + } + + } catch(Throwable e) { + if(s_logger.isTraceEnabled()) { + s_logger.trace("Unexpected exception " + e.toString()); + } + + writeResponse(response, HttpStatus.SC_INTERNAL_SERVER_ERROR, null); + } + } + + @SuppressWarnings("deprecation") + private void parseRequest(HttpRequest request) throws IOException { + if(request instanceof HttpEntityEnclosingRequest) { + HttpEntityEnclosingRequest entityRequest = (HttpEntityEnclosingRequest)request; + + String body = EntityUtils.toString(entityRequest.getEntity()); + if(body != null) { + String[] paramArray = body.split("&"); + if(paramArray != null) { + for (String paramEntry : paramArray) { + String[] paramValue = paramEntry.split("="); + if (paramValue.length != 2) { + continue; + } + + String name = URLDecoder.decode(paramValue[0]); + String value = URLDecoder.decode(paramValue[1]); + + if(s_logger.isTraceEnabled()) { + s_logger.trace("Parsed request parameter " + name + "=" + value); + } + request.getParams().setParameter(name, value); + } + } + } + } + } + + private void writeResponse(HttpResponse response, int statusCode, String content) { + if(content == null) { + content = ""; + } + response.setStatusCode(statusCode); BasicHttpEntity body = new BasicHttpEntity(); body.setContentType("text/html; charset=UTF-8"); - + byte[] bodyData = content.getBytes(); body.setContent(new ByteArrayInputStream(bodyData)); body.setContentLength(bodyData.length); response.setEntity(body); - } - - protected void handleRequest(HttpRequest req, HttpResponse response) { - String method = (String)req.getParams().getParameter("method"); - - int nMethod = RemoteMethodConstants.METHOD_UNKNOWN; - String responseContent = null; - try { - if(method != null) - nMethod = Integer.parseInt(method); - - switch(nMethod) { - case RemoteMethodConstants.METHOD_EXECUTE : - responseContent = handleExecuteMethodCall(req); - break; - - case RemoteMethodConstants.METHOD_EXECUTE_ASYNC : - responseContent = handleExecuteAsyncMethodCall(req); - break; - - case RemoteMethodConstants.METHOD_ASYNC_RESULT : - responseContent = handleAsyncResultMethodCall(req); - break; - - case RemoteMethodConstants.METHOD_PING : - responseContent = handlePingMethodCall(req); - break; - - case RemoteMethodConstants.METHOD_UNKNOWN : - default : - assert(false); - s_logger.error("unrecognized method " + nMethod); - break; - } - } catch(Throwable e) { - s_logger.error("Unexpected exception when processing cluster service request : ", e); - } - - if(responseContent != null) { - writeResponse(response, HttpStatus.SC_OK, responseContent); - } else { - writeResponse(response, HttpStatus.SC_BAD_REQUEST, null); - } - } - - private String handleExecuteMethodCall(HttpRequest req) { - String agentId = (String)req.getParams().getParameter("agentId"); - String gsonPackage = (String)req.getParams().getParameter("gsonPackage"); - String stopOnError = (String)req.getParams().getParameter("stopOnError"); - - if(s_logger.isDebugEnabled()) - s_logger.debug("|->" + agentId + " " + gsonPackage); - - Command [] cmds = null; - try { - cmds = gson.fromJson(gsonPackage, Command[].class); - } catch(Throwable e) { - assert(false); - s_logger.error("Excection in gson decoding : ", e); - } - - if (cmds.length == 1 && cmds[0] instanceof ChangeAgentCommand) { //intercepted - ChangeAgentCommand cmd = (ChangeAgentCommand)cmds[0]; + } - if (s_logger.isDebugEnabled()) { - s_logger.debug("Intercepting command for agent change: agent " + cmd.getAgentId() + " event: " + cmd.getEvent()); - } - boolean result = false; - try { - result = manager.executeAgentUserRequest(cmd.getAgentId(), cmd.getEvent()); - if (s_logger.isDebugEnabled()) { - s_logger.debug("Result is " + result); - } - - } catch (AgentUnavailableException e) { - s_logger.warn("Agent is unavailable", e); - return null; - } - - Answer[] answers = new Answer[1]; - answers[0] = new ChangeAgentAnswer(cmd, result); - return gson.toJson(answers); - } - - try { - long startTick = System.currentTimeMillis(); - if(s_logger.isDebugEnabled()) - s_logger.debug("Send |-> " + agentId + " " + gsonPackage + " to agent manager"); - - Answer[] answers = manager.sendToAgent(Long.parseLong(agentId), cmds, - Integer.parseInt(stopOnError) != 0 ? true : false); - - if(answers != null) { - String jsonReturn = gson.toJson(answers); - - if(s_logger.isDebugEnabled()) - s_logger.debug("Completed |-> " + agentId + " " + gsonPackage + - " in " + (System.currentTimeMillis() - startTick) + " ms, return result: " + jsonReturn); - - return jsonReturn; - } else { - if(s_logger.isDebugEnabled()) - s_logger.debug("Completed |-> " + agentId + " " + gsonPackage + - " in " + (System.currentTimeMillis() - startTick) + " ms, return null result"); - } - } catch(AgentUnavailableException e) { - s_logger.warn("Agent is unavailable", e); - } catch (OperationTimedoutException e) { + protected void handleRequest(HttpRequest req, HttpResponse response) { + String method = (String)req.getParams().getParameter("method"); + + int nMethod = RemoteMethodConstants.METHOD_UNKNOWN; + String responseContent = null; + try { + if(method != null) { + nMethod = Integer.parseInt(method); + } + + switch(nMethod) { + case RemoteMethodConstants.METHOD_EXECUTE : + responseContent = handleExecuteMethodCall(req); + break; + + case RemoteMethodConstants.METHOD_EXECUTE_ASYNC : + responseContent = handleExecuteAsyncMethodCall(req); + break; + + case RemoteMethodConstants.METHOD_ASYNC_RESULT : + responseContent = handleAsyncResultMethodCall(req); + break; + + case RemoteMethodConstants.METHOD_PING : + responseContent = handlePingMethodCall(req); + break; + + case RemoteMethodConstants.METHOD_UNKNOWN : + default : + assert(false); + s_logger.error("unrecognized method " + nMethod); + break; + } + } catch(Throwable e) { + s_logger.error("Unexpected exception when processing cluster service request : ", e); + } + + if(responseContent != null) { + writeResponse(response, HttpStatus.SC_OK, responseContent); + } else { + writeResponse(response, HttpStatus.SC_BAD_REQUEST, null); + } + } + + private String handleExecuteMethodCall(HttpRequest req) { + String agentId = (String)req.getParams().getParameter("agentId"); + String gsonPackage = (String)req.getParams().getParameter("gsonPackage"); + String stopOnError = (String)req.getParams().getParameter("stopOnError"); + + if(s_logger.isDebugEnabled()) { + s_logger.debug("|->" + agentId + " " + gsonPackage); + } + + Command [] cmds = null; + try { + cmds = gson.fromJson(gsonPackage, Command[].class); + } catch(Throwable e) { + assert(false); + s_logger.error("Excection in gson decoding : ", e); + } + + if (cmds.length == 1 && cmds[0] instanceof ChangeAgentCommand) { //intercepted + ChangeAgentCommand cmd = (ChangeAgentCommand)cmds[0]; + + if (s_logger.isDebugEnabled()) { + s_logger.debug("Intercepting command for agent change: agent " + cmd.getAgentId() + " event: " + cmd.getEvent()); + } + boolean result = false; + try { + result = manager.executeAgentUserRequest(cmd.getAgentId(), cmd.getEvent()); + if (s_logger.isDebugEnabled()) { + s_logger.debug("Result is " + result); + } + + } catch (AgentUnavailableException e) { + s_logger.warn("Agent is unavailable", e); + return null; + } + + Answer[] answers = new Answer[1]; + answers[0] = new ChangeAgentAnswer(cmd, result); + return gson.toJson(answers); + } + + try { + long startTick = System.currentTimeMillis(); + if(s_logger.isDebugEnabled()) { + s_logger.debug("Send |-> " + agentId + " " + gsonPackage + " to agent manager"); + } + + Answer[] answers = manager.sendToAgent(Long.parseLong(agentId), cmds, + Integer.parseInt(stopOnError) != 0 ? true : false); + + if(answers != null) { + String jsonReturn = gson.toJson(answers); + + if(s_logger.isDebugEnabled()) { + s_logger.debug("Completed |-> " + agentId + " " + gsonPackage + + " in " + (System.currentTimeMillis() - startTick) + " ms, return result: " + jsonReturn); + } + + return jsonReturn; + } else { + if(s_logger.isDebugEnabled()) { + s_logger.debug("Completed |-> " + agentId + " " + gsonPackage + + " in " + (System.currentTimeMillis() - startTick) + " ms, return null result"); + } + } + } catch(AgentUnavailableException e) { + s_logger.warn("Agent is unavailable", e); + } catch (OperationTimedoutException e) { s_logger.warn("Timed Out", e); } - - return null; - } - - private String handleExecuteAsyncMethodCall(HttpRequest req) { - String agentId = (String)req.getParams().getParameter("agentId"); - String gsonPackage = (String)req.getParams().getParameter("gsonPackage"); - String stopOnError = (String)req.getParams().getParameter("stopOnError"); - String callingPeer = (String)req.getParams().getParameter("caller"); - - if(s_logger.isDebugEnabled()) - s_logger.debug("Async " + callingPeer + " |-> " + agentId + " " + gsonPackage); - - Command [] cmds = null; - try { - cmds = gson.fromJson(gsonPackage, Command[].class); - } catch(Throwable e) { - assert(false); - s_logger.error("Excection in gson decoding : ", e); - } - - Listener listener = new ClusterAsyncExectuionListener(manager, callingPeer); - long seq = -1; - try { - long startTick = System.currentTimeMillis(); - if(s_logger.isDebugEnabled()) - s_logger.debug("Send Async " + callingPeer + " |-> " + agentId + " " + gsonPackage + " to agent manager"); - - seq = manager.sendToAgent(Long.parseLong(agentId), cmds, - Integer.parseInt(stopOnError) != 0 ? true : false, listener); - - if(s_logger.isDebugEnabled()) - s_logger.debug("Complated Async " + callingPeer + " |-> " + agentId + " " + gsonPackage + " in " + - + (System.currentTimeMillis() - startTick) + " ms, returned seq: " + seq); - } catch (AgentUnavailableException e) { - s_logger.warn("Agent is unavailable", e); - seq = -1; - } - - return gson.toJson(seq); - } - - private String handleAsyncResultMethodCall(HttpRequest req) { - String agentId = (String)req.getParams().getParameter("agentId"); - String gsonPackage = (String)req.getParams().getParameter("gsonPackage"); - String seq = (String)req.getParams().getParameter("seq"); - String executingPeer = (String)req.getParams().getParameter("executingPeer"); - - if(s_logger.isDebugEnabled()) - s_logger.debug("Async callback " + executingPeer + "." + agentId + " |-> " + gsonPackage); - Answer[] answers = null; - try { - answers = gson.fromJson(gsonPackage, Answer[].class); - } catch(Throwable e) { - assert(false); - s_logger.error("Excection in gson decoding : ", e); - } - - long startTick = System.currentTimeMillis(); - if(manager.onAsyncResult(executingPeer, Long.parseLong(agentId), Long.parseLong(seq), answers)) { - if(s_logger.isDebugEnabled()) - s_logger.debug("Completed local callback in " + (System.currentTimeMillis() - startTick) + - " ms, return recurring=true, let async listener contine on"); - - return "recurring=true"; - } - - if(s_logger.isDebugEnabled()) - s_logger.debug("Completed local callback in " + (System.currentTimeMillis() - startTick) + - " ms, return recurring=false, indicate to tear down async listener"); - - return "recurring=false"; - } - - private String handlePingMethodCall(HttpRequest req) { - String callingPeer = (String)req.getParams().getParameter("callingPeer"); - - if(s_logger.isDebugEnabled()) - s_logger.debug("Handle ping request from " + callingPeer); - - return "true"; - } + return null; + } + + private String handleExecuteAsyncMethodCall(HttpRequest req) { + String agentId = (String)req.getParams().getParameter("agentId"); + String gsonPackage = (String)req.getParams().getParameter("gsonPackage"); + String stopOnError = (String)req.getParams().getParameter("stopOnError"); + String callingPeer = (String)req.getParams().getParameter("caller"); + + if(s_logger.isDebugEnabled()) { + s_logger.debug("Async " + callingPeer + " |-> " + agentId + " " + gsonPackage); + } + + Command [] cmds = null; + try { + cmds = gson.fromJson(gsonPackage, Command[].class); + } catch(Throwable e) { + assert(false); + s_logger.error("Excection in gson decoding : ", e); + } + + Listener listener = new ClusterAsyncExectuionListener(manager, callingPeer); + long seq = -1; + try { + long startTick = System.currentTimeMillis(); + if(s_logger.isDebugEnabled()) { + s_logger.debug("Send Async " + callingPeer + " |-> " + agentId + " " + gsonPackage + " to agent manager"); + } + + seq = manager.sendToAgent(Long.parseLong(agentId), cmds, + Integer.parseInt(stopOnError) != 0 ? true : false, listener); + + if(s_logger.isDebugEnabled()) { + s_logger.debug("Complated Async " + callingPeer + " |-> " + agentId + " " + gsonPackage + " in " + + + (System.currentTimeMillis() - startTick) + " ms, returned seq: " + seq); + } + } catch (AgentUnavailableException e) { + s_logger.warn("Agent is unavailable", e); + seq = -1; + } + + return gson.toJson(seq); + } + + private String handleAsyncResultMethodCall(HttpRequest req) { + String agentId = (String)req.getParams().getParameter("agentId"); + String gsonPackage = (String)req.getParams().getParameter("gsonPackage"); + String seq = (String)req.getParams().getParameter("seq"); + String executingPeer = (String)req.getParams().getParameter("executingPeer"); + + if(s_logger.isDebugEnabled()) { + s_logger.debug("Async callback " + executingPeer + "." + agentId + " |-> " + gsonPackage); + } + + Answer[] answers = null; + try { + answers = gson.fromJson(gsonPackage, Answer[].class); + } catch(Throwable e) { + assert(false); + s_logger.error("Excection in gson decoding : ", e); + } + + long startTick = System.currentTimeMillis(); + if(manager.onAsyncResult(executingPeer, Long.parseLong(agentId), Long.parseLong(seq), answers)) { + if(s_logger.isDebugEnabled()) { + s_logger.debug("Completed local callback in " + (System.currentTimeMillis() - startTick) + + " ms, return recurring=true, let async listener contine on"); + } + + return "recurring=true"; + } + + if(s_logger.isDebugEnabled()) { + s_logger.debug("Completed local callback in " + (System.currentTimeMillis() - startTick) + + " ms, return recurring=false, indicate to tear down async listener"); + } + + return "recurring=false"; + } + + private String handlePingMethodCall(HttpRequest req) { + String callingPeer = (String)req.getParams().getParameter("callingPeer"); + + if(s_logger.isDebugEnabled()) { + s_logger.debug("Handle ping request from " + callingPeer); + } + + return "true"; + } } diff --git a/server/src/com/cloud/cluster/ClusterServiceServletImpl.java b/server/src/com/cloud/cluster/ClusterServiceServletImpl.java index 9ad226f13af..9a4bbaed71a 100644 --- a/server/src/com/cloud/cluster/ClusterServiceServletImpl.java +++ b/server/src/com/cloud/cluster/ClusterServiceServletImpl.java @@ -18,157 +18,167 @@ package com.cloud.cluster; -import java.io.IOException; -import java.rmi.RemoteException; - -import org.apache.commons.httpclient.HttpClient; -import org.apache.commons.httpclient.HttpException; -import org.apache.commons.httpclient.HttpStatus; -import org.apache.commons.httpclient.methods.PostMethod; -import org.apache.log4j.Logger; - -import com.cloud.serializer.GsonHelper; -import com.google.gson.Gson; +import java.io.IOException; +import java.rmi.RemoteException; + +import org.apache.commons.httpclient.HttpClient; +import org.apache.commons.httpclient.HttpException; +import org.apache.commons.httpclient.HttpStatus; +import org.apache.commons.httpclient.methods.PostMethod; +import org.apache.log4j.Logger; + +import com.cloud.serializer.GsonHelper; +import com.google.gson.Gson; public class ClusterServiceServletImpl implements ClusterService { - private static final long serialVersionUID = 4574025200012566153L; + private static final long serialVersionUID = 4574025200012566153L; + + private static final Logger s_logger = Logger.getLogger(ClusterServiceServletImpl.class); - private static final Logger s_logger = Logger.getLogger(ClusterServiceServletImpl.class); - private String serviceUrl; - + private Gson gson; - + public ClusterServiceServletImpl() { - gson = GsonHelper.getBuilder().create(); + gson = GsonHelper.getGson(); } - + public ClusterServiceServletImpl(String serviceUrl) { - this.serviceUrl = serviceUrl; - - gson = GsonHelper.getBuilder().create(); + this.serviceUrl = serviceUrl; + + gson = GsonHelper.getGson(); } - + @Override - public String execute(String callingPeer, long agentId, String gsonPackage, boolean stopOnError) throws RemoteException { - if(s_logger.isDebugEnabled()) - s_logger.debug("Post (sync-call) " + gsonPackage + " to " + serviceUrl + " for agent " + agentId + " from " + callingPeer); - + public String execute(String callingPeer, long agentId, String gsonPackage, boolean stopOnError) throws RemoteException { + if(s_logger.isDebugEnabled()) { + s_logger.debug("Post (sync-call) " + gsonPackage + " to " + serviceUrl + " for agent " + agentId + " from " + callingPeer); + } + HttpClient client = new HttpClient(); PostMethod method = new PostMethod(serviceUrl); - + method.addParameter("method", Integer.toString(RemoteMethodConstants.METHOD_EXECUTE)); method.addParameter("agentId", Long.toString(agentId)); method.addParameter("gsonPackage", gsonPackage); method.addParameter("stopOnError", stopOnError ? "1" : "0"); - - return executePostMethod(client, method); + + return executePostMethod(client, method); } - + @Override - public long executeAsync(String callingPeer, long agentId, String gsonPackage, boolean stopOnError) throws RemoteException { - - if(s_logger.isDebugEnabled()) - s_logger.debug("Post (Async-call) " + gsonPackage + " to " + serviceUrl + " for agent " + agentId + " from " + callingPeer); - + public long executeAsync(String callingPeer, long agentId, String gsonPackage, boolean stopOnError) throws RemoteException { + + if(s_logger.isDebugEnabled()) { + s_logger.debug("Post (Async-call) " + gsonPackage + " to " + serviceUrl + " for agent " + agentId + " from " + callingPeer); + } + HttpClient client = new HttpClient(); PostMethod method = new PostMethod(serviceUrl); - + method.addParameter("method", Integer.toString(RemoteMethodConstants.METHOD_EXECUTE_ASYNC)); method.addParameter("agentId", Long.toString(agentId)); method.addParameter("gsonPackage", gsonPackage); method.addParameter("stopOnError", stopOnError ? "1" : "0"); method.addParameter("caller", callingPeer); - - String result = executePostMethod(client, method); - if(result == null) { - s_logger.error("Empty return from remote async-execution on " + serviceUrl); - throw new RemoteException("Invalid result returned from async-execution on peer : " + serviceUrl); - } - - try { - return gson.fromJson(result, Long.class); - } catch(Throwable e) { - s_logger.error("Unable to parse executeAsync return : " + result); - throw new RemoteException("Invalid result returned from async-execution on peer : " + serviceUrl); - } + + String result = executePostMethod(client, method); + if(result == null) { + s_logger.error("Empty return from remote async-execution on " + serviceUrl); + throw new RemoteException("Invalid result returned from async-execution on peer : " + serviceUrl); + } + + try { + return gson.fromJson(result, Long.class); + } catch(Throwable e) { + s_logger.error("Unable to parse executeAsync return : " + result); + throw new RemoteException("Invalid result returned from async-execution on peer : " + serviceUrl); + } } - - public boolean onAsyncResult(String executingPeer, long agentId, long seq, String gsonPackage) throws RemoteException { - if(s_logger.isDebugEnabled()) - s_logger.debug("Forward Async-call answer to remote listener, agent: " + agentId - + ", excutingPeer: " + executingPeer - + ", seq: " + seq + ", gsonPackage: " + gsonPackage); - + + @Override + public boolean onAsyncResult(String executingPeer, long agentId, long seq, String gsonPackage) throws RemoteException { + if(s_logger.isDebugEnabled()) { + s_logger.debug("Forward Async-call answer to remote listener, agent: " + agentId + + ", excutingPeer: " + executingPeer + + ", seq: " + seq + ", gsonPackage: " + gsonPackage); + } + HttpClient client = new HttpClient(); PostMethod method = new PostMethod(serviceUrl); - + method.addParameter("method", Integer.toString(RemoteMethodConstants.METHOD_ASYNC_RESULT)); method.addParameter("agentId", Long.toString(agentId)); method.addParameter("gsonPackage", gsonPackage); method.addParameter("seq", Long.toString(seq)); method.addParameter("executingPeer", executingPeer); - - String result = executePostMethod(client, method); - if(result.contains("recurring=true")) { - if(s_logger.isDebugEnabled()) - s_logger.debug("Remote listener returned recurring=true"); - return true; - } - - if(s_logger.isDebugEnabled()) - s_logger.debug("Remote listener returned recurring=false"); - return false; - } - - public boolean ping(String callingPeer) throws RemoteException { - if(s_logger.isDebugEnabled()) - s_logger.debug("Ping at " + serviceUrl); - + + String result = executePostMethod(client, method); + if(result.contains("recurring=true")) { + if(s_logger.isDebugEnabled()) { + s_logger.debug("Remote listener returned recurring=true"); + } + return true; + } + + if(s_logger.isDebugEnabled()) { + s_logger.debug("Remote listener returned recurring=false"); + } + return false; + } + + @Override + public boolean ping(String callingPeer) throws RemoteException { + if(s_logger.isDebugEnabled()) { + s_logger.debug("Ping at " + serviceUrl); + } + HttpClient client = new HttpClient(); PostMethod method = new PostMethod(serviceUrl); - + method.addParameter("method", Integer.toString(RemoteMethodConstants.METHOD_PING)); method.addParameter("callingPeer", callingPeer); - String returnVal = executePostMethod(client, method); - if("true".equalsIgnoreCase(returnVal)) - return true; - return false; - } - - private String executePostMethod(HttpClient client, PostMethod method) { + String returnVal = executePostMethod(client, method); + if("true".equalsIgnoreCase(returnVal)) { + return true; + } + return false; + } + + private String executePostMethod(HttpClient client, PostMethod method) { int response = 0; String result = null; try { - long startTick = System.currentTimeMillis(); - response = client.executeMethod(method); - if(response == HttpStatus.SC_OK) { - result = method.getResponseBodyAsString(); - if(s_logger.isDebugEnabled()) - s_logger.debug("POST " + serviceUrl + " response :" + result + ", responding time: " - + (System.currentTimeMillis() - startTick) + " ms"); - } else { - s_logger.error("Invalid response code : " + response + ", from : " - + serviceUrl + ", method : " + method.getParameter("method") - + " responding time: " + (System.currentTimeMillis() - startTick)); - } + long startTick = System.currentTimeMillis(); + response = client.executeMethod(method); + if(response == HttpStatus.SC_OK) { + result = method.getResponseBodyAsString(); + if(s_logger.isDebugEnabled()) { + s_logger.debug("POST " + serviceUrl + " response :" + result + ", responding time: " + + (System.currentTimeMillis() - startTick) + " ms"); + } + } else { + s_logger.error("Invalid response code : " + response + ", from : " + + serviceUrl + ", method : " + method.getParameter("method") + + " responding time: " + (System.currentTimeMillis() - startTick)); + } } catch (HttpException e) { - s_logger.error("HttpException from : " + serviceUrl + ", method : " + method.getParameter("method")); + s_logger.error("HttpException from : " + serviceUrl + ", method : " + method.getParameter("method")); } catch (IOException e) { - s_logger.error("IOException from : " + serviceUrl + ", method : " + method.getParameter("method")); + s_logger.error("IOException from : " + serviceUrl + ", method : " + method.getParameter("method")); } catch(Throwable e) { - s_logger.error("Exception from : " + serviceUrl + ", method : " + method.getParameter("method") + ", exception :", e); + s_logger.error("Exception from : " + serviceUrl + ", method : " + method.getParameter("method") + ", exception :", e); } - return result; - } - - // for test purpose only - public static void main(String[] args) { - ClusterServiceServletImpl service = new ClusterServiceServletImpl("http://localhost:9090/clusterservice"); - try { - String result = service.execute("test", 1, "{ p1:v1, p2:v2 }", true); - System.out.println(result); - } catch (RemoteException e) { - } - } + return result; + } + + // for test purpose only + public static void main(String[] args) { + ClusterServiceServletImpl service = new ClusterServiceServletImpl("http://localhost:9090/clusterservice"); + try { + String result = service.execute("test", 1, "{ p1:v1, p2:v2 }", true); + System.out.println(result); + } catch (RemoteException e) { + } + } } diff --git a/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java b/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java index a8743df3d07..de0f85ac821 100644 --- a/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java +++ b/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java @@ -30,6 +30,7 @@ import javax.naming.ConfigurationException; import org.apache.log4j.Logger; import com.cloud.api.ApiDispatcher; +import com.cloud.api.ApiGsonHelper; import com.cloud.api.commands.CreateSnapshotCmd; import com.cloud.async.AsyncJobManager; import com.cloud.async.AsyncJobResult; @@ -38,7 +39,6 @@ import com.cloud.async.dao.AsyncJobDao; import com.cloud.configuration.dao.ConfigurationDao; import com.cloud.event.EventTypes; import com.cloud.event.EventUtils; -import com.cloud.serializer.GsonHelper; import com.cloud.storage.Snapshot; import com.cloud.storage.SnapshotPolicyVO; import com.cloud.storage.SnapshotScheduleVO; @@ -67,7 +67,7 @@ import com.cloud.utils.db.SearchCriteria; @Local(value={SnapshotScheduler.class}) public class SnapshotSchedulerImpl implements SnapshotScheduler { private static final Logger s_logger = Logger.getLogger(SnapshotSchedulerImpl.class); - + private String _name = null; @Inject protected AsyncJobDao _asyncJobDao; @Inject protected SnapshotDao _snapshotDao; @@ -77,13 +77,13 @@ public class SnapshotSchedulerImpl implements SnapshotScheduler { @Inject protected SnapshotManager _snapshotManager; @Inject protected StoragePoolHostDao _poolHostDao; @Inject protected VolumeDao _volsDao; - + private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 5; // 5 seconds private int _snapshotPollInterval; private Timer _testClockTimer; private Date _currentTimestamp; private TestClock _testTimerTask; - + private Date getNextScheduledTime(long policyId, Date currentTimestamp) { SnapshotPolicyVO policy = _snapshotPolicyDao.findById(policyId); Date nextTimestamp = null; @@ -107,7 +107,7 @@ public class SnapshotSchedulerImpl implements SnapshotScheduler { public void poll(Date currentTimestamp) { // We don't maintain the time. The timer task does. _currentTimestamp = currentTimestamp; - + GlobalLock scanLock = GlobalLock.getInternLock("snapshot.poll"); try { if(scanLock.lock(ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION)) { @@ -120,7 +120,7 @@ public class SnapshotSchedulerImpl implements SnapshotScheduler { } finally { scanLock.releaseRef(); } - + scanLock = GlobalLock.getInternLock("snapshot.poll"); try { if(scanLock.lock(ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION)) { @@ -132,9 +132,9 @@ public class SnapshotSchedulerImpl implements SnapshotScheduler { } } finally { scanLock.releaseRef(); - } + } } - + private void checkStatusOfCurrentlyExecutingSnapshots() { SearchCriteria sc = _snapshotScheduleDao.createSearchCriteria(); sc.addAnd("asyncJobId", SearchCriteria.Op.NNULL); @@ -182,19 +182,19 @@ public class SnapshotSchedulerImpl implements SnapshotScheduler { // and cleanup the previous snapshot // Set the userId to that of system. //_snapshotManager.validateSnapshot(1L, snapshot); - // In all cases, schedule the next snapshot job + // In all cases, schedule the next snapshot job scheduleNextSnapshotJob(snapshotSchedule); } } - + break; case AsyncJobResult.STATUS_IN_PROGRESS: - // There is no way of knowing from here whether + // There is no way of knowing from here whether // 1) Another management server is processing this snapshot job - // 2) The management server has crashed and this snapshot is lying + // 2) The management server has crashed and this snapshot is lying // around in an inconsistent state. // Hopefully, this can be resolved at the backend when the current snapshot gets executed. - // But if it remains in this state, the current snapshot will not get executed. + // But if it remains in this state, the current snapshot will not get executed. // And it will remain in stasis. break; } @@ -205,7 +205,7 @@ public class SnapshotSchedulerImpl implements SnapshotScheduler { protected void scheduleSnapshots() { String displayTime = DateUtil.displayDateInTimezone(DateUtil.GMT_TIMEZONE, _currentTimestamp); s_logger.debug("Snapshot scheduler.poll is being called at " + displayTime); - + List snapshotsToBeExecuted = _snapshotScheduleDao.getSchedulesToExecute(_currentTimestamp); s_logger.debug("Got " + snapshotsToBeExecuted.size() + " snapshots to be executed at " + displayTime); @@ -244,12 +244,12 @@ public class SnapshotSchedulerImpl implements SnapshotScheduler { params.put("ctxUserId", "1"); params.put("ctxAccountId", "1"); params.put("ctxStartEventId", String.valueOf(eventId)); - + CreateSnapshotCmd cmd = new CreateSnapshotCmd(); ApiDispatcher.getInstance().dispatchCreateCmd(cmd, params); params.put("id", ""+cmd.getEntityId()); params.put("ctxStartEventId", "1"); - + AsyncJobVO job = new AsyncJobVO(); job.setUserId(userId); // Just have SYSTEM own the job for now. Users won't be able to see this job, but @@ -257,7 +257,7 @@ public class SnapshotSchedulerImpl implements SnapshotScheduler { job.setAccountId(1L); job.setCmd(CreateSnapshotCmd.class.getName()); job.setInstanceId(cmd.getEntityId()); - job.setCmdInfo(GsonHelper.getBuilder().create().toJson(params)); + job.setCmdInfo(ApiGsonHelper.getBuilder().create().toJson(params)); long jobId = _asyncMgr.submitAsyncJob(job); @@ -286,7 +286,7 @@ public class SnapshotSchedulerImpl implements SnapshotScheduler { } return scheduleNextSnapshotJob(snapshotPolicy); } - + @Override @DB public Date scheduleNextSnapshotJob(SnapshotPolicyVO policy) { if ( policy == null) { @@ -317,9 +317,9 @@ public class SnapshotSchedulerImpl implements SnapshotScheduler { } return nextSnapshotTimestamp; } - - - + + + @Override @DB public boolean removeSchedule(Long volumeId, Long policyId) { // We can only remove schedules which are in the future. Not which are already executed in the past. @@ -341,7 +341,7 @@ public class SnapshotSchedulerImpl implements SnapshotScheduler { _name = name; ComponentLocator locator = ComponentLocator.getCurrentLocator(); - + ConfigurationDao configDao = locator.getDao(ConfigurationDao.class); if (configDao == null) { s_logger.error("Unable to get the configuration dao. " + ConfigurationDao.class.getName()); @@ -357,12 +357,12 @@ public class SnapshotSchedulerImpl implements SnapshotScheduler { int daysPerMonth = NumbersUtil.parseInt(configDao.getValue("snapshot.test.days.per.month"), 30); int weeksPerMonth = NumbersUtil.parseInt(configDao.getValue("snapshot.test.weeks.per.month"), 4); int monthsPerYear = NumbersUtil.parseInt(configDao.getValue("snapshot.test.months.per.year"), 12); - + _testTimerTask = new TestClock(this, minutesPerHour, hoursPerDay, daysPerWeek, daysPerMonth, weeksPerMonth, monthsPerYear); } _currentTimestamp = new Date(); s_logger.info("Snapshot Scheduler is configured."); - + return true; } @@ -397,7 +397,7 @@ public class SnapshotSchedulerImpl implements SnapshotScheduler { _testClockTimer = new Timer("SnapshotPollTask"); _testClockTimer.schedule(timerTask, _snapshotPollInterval*1000L, _snapshotPollInterval*1000L); } - + return true; }