diff --git a/engine/schema/src/com/cloud/host/dao/HostDaoImpl.java b/engine/schema/src/com/cloud/host/dao/HostDaoImpl.java index 810b973e296..a84527e2f88 100755 --- a/engine/schema/src/com/cloud/host/dao/HostDaoImpl.java +++ b/engine/schema/src/com/cloud/host/dao/HostDaoImpl.java @@ -21,6 +21,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TimeZone; @@ -116,6 +117,7 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao protected SearchBuilder HostsForReconnectSearch; protected GenericSearchBuilder ClustersOwnedByMSSearch; + protected GenericSearchBuilder ClustersForHostsNotOwnedByAnyMSSearch; protected GenericSearchBuilder AllClustersSearch; protected SearchBuilder HostsInClusterSearch; @@ -264,7 +266,7 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao UnmanagedDirectConnectSearch.and("server", UnmanagedDirectConnectSearch.entity().getManagementServerId(), SearchCriteria.Op.NULL); UnmanagedDirectConnectSearch.and("lastPinged", UnmanagedDirectConnectSearch.entity().getLastPinged(), SearchCriteria.Op.LTEQ); UnmanagedDirectConnectSearch.and("resourceStates", UnmanagedDirectConnectSearch.entity().getResourceState(), SearchCriteria.Op.NIN); - UnmanagedDirectConnectSearch.and("cluster", UnmanagedDirectConnectSearch.entity().getClusterId(), SearchCriteria.Op.EQ); + UnmanagedDirectConnectSearch.and("clusterIn", UnmanagedDirectConnectSearch.entity().getClusterId(), SearchCriteria.Op.IN); /* * UnmanagedDirectConnectSearch.op(SearchCriteria.Op.OR, "managementServerId", * UnmanagedDirectConnectSearch.entity().getManagementServerId(), SearchCriteria.Op.EQ); @@ -353,6 +355,13 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao ClustersOwnedByMSSearch.and("server", ClustersOwnedByMSSearch.entity().getManagementServerId(), SearchCriteria.Op.EQ); ClustersOwnedByMSSearch.done(); + ClustersForHostsNotOwnedByAnyMSSearch = createSearchBuilder(Long.class); + ClustersForHostsNotOwnedByAnyMSSearch.select(null, Func.DISTINCT, ClustersForHostsNotOwnedByAnyMSSearch.entity().getClusterId()); + ClustersForHostsNotOwnedByAnyMSSearch.and("resource", ClustersForHostsNotOwnedByAnyMSSearch.entity().getResource(), SearchCriteria.Op.NNULL); + ClustersForHostsNotOwnedByAnyMSSearch.and("cluster", ClustersForHostsNotOwnedByAnyMSSearch.entity().getClusterId(), SearchCriteria.Op.NNULL); + ClustersForHostsNotOwnedByAnyMSSearch.and("server", ClustersForHostsNotOwnedByAnyMSSearch.entity().getManagementServerId(), SearchCriteria.Op.NULL); + ClustersForHostsNotOwnedByAnyMSSearch.done(); + AllClustersSearch = _clusterDao.createSearchBuilder(Long.class); AllClustersSearch.select(null, Func.NATIVE, AllClustersSearch.entity().getId()); AllClustersSearch.and("managed", AllClustersSearch.entity().getManagedState(), SearchCriteria.Op.EQ); @@ -409,10 +418,17 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao sc.setParameters("lastPinged", lastPingSecondsAfter); sc.setParameters("status", Status.Disconnected, Status.Down, Status.Alert); + StringBuilder sb = new StringBuilder(); List hosts = lockRows(sc, null, true); // exclusive lock for (HostVO host : hosts) { host.setManagementServerId(null); update(host.getId(), host); + sb.append(host.getId()); + sb.append(" "); + } + + if (s_logger.isTraceEnabled()) { + s_logger.trace("Following hosts got reset: " + sb.toString()); } } @@ -427,6 +443,16 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao return clusters; } + /* + * Returns clusters based on the list of hosts not owned by any MS + */ + private List findClustersForHostsNotOwnedByAnyManagementServer() { + SearchCriteria sc = ClustersForHostsNotOwnedByAnyMSSearch.create(); + + List clusters = customSearch(sc, null); + return clusters; + } + /* * Returns a list of all cluster Ids */ @@ -459,55 +485,100 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao public List findAndUpdateDirectAgentToLoad(long lastPingSecondsAfter, Long limit, long managementServerId) { Transaction txn = Transaction.currentTxn(); + txn.start(); + if (s_logger.isDebugEnabled()) { + s_logger.debug("Resetting hosts suitable for reconnect"); + } // reset hosts that are suitable candidates for reconnect - txn.start(); resetHosts(managementServerId, lastPingSecondsAfter); - txn.commit(); + if (s_logger.isDebugEnabled()) { + s_logger.debug("Completed resetting hosts suitable for reconnect"); + } - List clusters = findClustersOwnedByManagementServer(managementServerId); - List allClusters = listAllClusters(); - - SearchCriteria sc = UnmanagedDirectConnectSearch.create(); - sc.setParameters("lastPinged", lastPingSecondsAfter); - sc.setJoinParameters("ClusterManagedSearch", "managed", Managed.ManagedState.Managed); List assignedHosts = new ArrayList(); - List remainingClusters = new ArrayList(); - // handle clusters already owned by @managementServerId - txn.start(); - for (Long clusterId : allClusters) { - if (clusters.contains(clusterId)) { // host belongs to clusters owned by @managementServerId - sc.setParameters("cluster", clusterId); + if (s_logger.isDebugEnabled()) { + s_logger.debug("Acquiring hosts for clusters already owned by this management server"); + } + List clusters = findClustersOwnedByManagementServer(managementServerId); + if (clusters.size() > 0) { + // handle clusters already owned by @managementServerId + SearchCriteria sc = UnmanagedDirectConnectSearch.create(); + sc.setParameters("lastPinged", lastPingSecondsAfter); + sc.setJoinParameters("ClusterManagedSearch", "managed", Managed.ManagedState.Managed); + sc.setParameters("clusterIn", clusters.toArray()); + List unmanagedHosts = lockRows(sc, new Filter(HostVO.class, "clusterId", true, 0L, limit), true); // host belongs to clusters owned by @managementServerId + StringBuilder sb = new StringBuilder(); + for (HostVO host : unmanagedHosts) { + host.setManagementServerId(managementServerId); + update(host.getId(), host); + assignedHosts.add(host); + sb.append(host.getId()); + sb.append(" "); + } + if (s_logger.isTraceEnabled()) { + s_logger.trace("Following hosts got acquired for clusters already owned: " + sb.toString()); + } + } + if (s_logger.isDebugEnabled()) { + s_logger.debug("Completed acquiring hosts for clusters already owned by this management server"); + } + + if (assignedHosts.size() < limit) { + if (s_logger.isDebugEnabled()) { + s_logger.debug("Acquiring hosts for clusters not owned by any management server"); + } + // for remaining hosts not owned by any MS check if they can be owned (by owning full cluster) + clusters = findClustersForHostsNotOwnedByAnyManagementServer(); + List updatedClusters = clusters; + if (clusters.size() > limit) { + updatedClusters = clusters.subList(0, limit.intValue()); + } + if (updatedClusters.size() > 0) { + SearchCriteria sc = UnmanagedDirectConnectSearch.create(); + sc.setParameters("lastPinged", lastPingSecondsAfter); + sc.setJoinParameters("ClusterManagedSearch", "managed", Managed.ManagedState.Managed); + sc.setParameters("clusterIn", updatedClusters.toArray()); List unmanagedHosts = lockRows(sc, null, true); + + // group hosts based on cluster + Map> hostMap = new HashMap>(); for (HostVO host : unmanagedHosts) { - host.setManagementServerId(managementServerId); - update(host.getId(), host); - assignedHosts.add(host); + if (hostMap.get(host.getClusterId()) == null) { + hostMap.put(host.getClusterId(), new ArrayList()); + } + hostMap.get(host.getClusterId()).add(host); } - } else { - remainingClusters.add(clusterId); + + StringBuilder sb = new StringBuilder(); + for (Long clusterId : hostMap.keySet()) { + if (canOwnCluster(clusterId)) { // cluster is not owned by any other MS, so @managementServerId can own it + List hostList = hostMap.get(clusterId); + for (HostVO host : hostList) { + host.setManagementServerId(managementServerId); + update(host.getId(), host); + assignedHosts.add(host); + sb.append(host.getId()); + sb.append(" "); + } + } + if (assignedHosts.size() > limit) { + break; + } + } + if (s_logger.isTraceEnabled()) { + s_logger.trace("Following hosts got acquired from newly owned clusters: " + sb.toString()); + } + } + if (s_logger.isDebugEnabled()) { + s_logger.debug("Completed acquiring hosts for clusters not owned by any management server"); } } txn.commit(); - // for remaining clusters check if they can be owned - for (Long clusterId : remainingClusters) { - txn.start(); - sc.setParameters("cluster", clusterId); - List unmanagedHosts = lockRows(sc, null, true); - if (canOwnCluster(clusterId)) { // cluster is not owned by any other MS, so @managementServerId can own it - for (HostVO host : unmanagedHosts) { - host.setManagementServerId(managementServerId); - update(host.getId(), host); - assignedHosts.add(host); - } - } - txn.commit(); - } - return assignedHosts; } - + @Override @DB public List findAndUpdateApplianceToLoad(long lastPingSecondsAfter, long managementServerId) { Transaction txn = Transaction.currentTxn(); diff --git a/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java b/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java index 19f0102d173..4fdb3c6c83b 100755 --- a/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java +++ b/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java @@ -99,6 +99,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust public final static long SCAN_INTERVAL = 90000; // 90 seconds, it takes 60 sec for xenserver to fail login public final static int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 5; // 5 seconds public long _loadSize = 100; + protected int _directAgentScanInterval = 90; // 90 seconds protected Set _agentToTransferIds = new HashSet(); @Inject @@ -134,6 +135,9 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust String value = params.get(Config.DirectAgentLoadSize.key()); _loadSize = NumbersUtil.parseInt(value, 16); + value = params.get(Config.DirectAgentScanInterval.key()); + _directAgentScanInterval = NumbersUtil.parseInt(value, 90); // defaulted to 90 seconds + ClusteredAgentAttache.initialize(this); _clusterMgr.registerListener(this); @@ -146,7 +150,10 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust if (!super.start()) { return false; } - _timer.schedule(new DirectAgentScanTimerTask(), STARTUP_DELAY, SCAN_INTERVAL); + _timer.schedule(new DirectAgentScanTimerTask(), STARTUP_DELAY, _directAgentScanInterval * 1000); + if (s_logger.isDebugEnabled()) { + s_logger.debug("Scheduled direct agent scan task to run at an interval of " + _directAgentScanInterval + " seconds"); + } // schedule transfer scan executor - if agent LB is enabled if (_clusterMgr.isAgentRebalanceEnabled()) { diff --git a/server/src/com/cloud/configuration/Config.java b/server/src/com/cloud/configuration/Config.java index 1a2c6208fb3..d3ed7182f80 100755 --- a/server/src/com/cloud/configuration/Config.java +++ b/server/src/com/cloud/configuration/Config.java @@ -356,6 +356,7 @@ public enum Config { ResourceCountCheckInterval("Advanced", ManagementServer.class, Long.class, "resourcecount.check.interval", "0", "Time (in seconds) to wait before retrying resource count check task. Default is 0 which is to never run the task", "Seconds"), DirectAgentLoadSize("Advanced", ManagementServer.class, Integer.class, "direct.agent.load.size", "16", "The number of direct agents to load each time", null), + DirectAgentScanInterval("Advanced", ManagementServer.class, Integer.class, "direct.agent.scan.interval", "90", "Time interval (in seconds) to run the direct agent scan task", null), //disabling lb as cluster sync does not work with distributed cluster AgentLbEnable("Advanced", ManagementServer.class, Boolean.class, "agent.lb.enabled", "false", "If agent load balancing enabled in cluster setup", null), diff --git a/setup/db/db/schema-410to420.sql b/setup/db/db/schema-410to420.sql index 69b17ee3860..5edf733c0df 100644 --- a/setup/db/db/schema-410to420.sql +++ b/setup/db/db/schema-410to420.sql @@ -2152,6 +2152,8 @@ INSERT IGNORE INTO `cloud`.`configuration` VALUES ('Advanced', 'DEFAULT', 'manag INSERT IGNORE INTO `cloud`.`configuration` VALUES ('Advanced', 'DEFAULT', 'management-server', 'execute.in.sequence.hypervisor.commands', 'false', 'If set to true, StartCommand, StopCommand, CopyVolumeCommand, CreateCommand will be synchronized on the agent side. If set to false, these commands become asynchronous. Default value is false.'); INSERT IGNORE INTO `cloud`.`configuration` VALUES ('Advanced', 'DEFAULT', 'management-server', 'execute.in.sequence.network.element.commands', 'false', 'If set to true, DhcpEntryCommand, SavePasswordCommand, UserDataCommand, VmDataCommand will be synchronized on the agent side. If set to false, these commands become asynchronous. Default value is false.'); +INSERT IGNORE INTO `cloud`.`configuration` VALUES ('Advanced', 'DEFAULT', 'management-server', 'direct.agent.scan.interval', 90, 'Time interval (in seconds) to run the direct agent scan task.'); + ALTER TABLE `cloud`.`vm_template` ADD COLUMN `dynamically_scalable` tinyint(1) unsigned NOT NULL DEFAULT 0 COMMENT 'true if template contains XS/VMWare tools inorder to support dynamic scaling of VM cpu/memory'; UPDATE `cloud`.`vm_template` SET dynamically_scalable = 1 WHERE name = "CentOS 5.6(64-bit) no GUI (XenServer)" AND type = "BUILTIN";