hostCpuUsedMap, hostMemoryUsedMap;
+
+
+ @Mock
+ private ServiceOfferingDao serviceOfferingDao;
+
+
+ private AutoCloseable closeable;
+
+ @Before
+ public void setUp() throws NoSuchFieldException, IllegalAccessException {
+ closeable = MockitoAnnotations.openMocks(this);
+
+
+ vm1 = Mockito.mock(VirtualMachine.class);
+ vm2 = Mockito.mock(VirtualMachine.class);
+ vm3 = Mockito.mock(VirtualMachine.class); // vm to migrate
+
+ destHost = Mockito.mock(Host.class);
+ hostVmMap = new HashMap<>();
+ hostVmMap.put(1L, Collections.singletonList(vm1));
+ hostVmMap.put(2L, Arrays.asList(vm2, vm3));
+
+ serviceOffering = Mockito.mock(ServiceOfferingVO.class);
+ Mockito.when(vm3.getHostId()).thenReturn(2L);
+
+ Mockito.when(destHost.getId()).thenReturn(1L);
+
+ Mockito.when(serviceOffering.getCpu()).thenReturn(1);
+ Mockito.when(serviceOffering.getSpeed()).thenReturn(1000);
+ Mockito.when(serviceOffering.getRamSize()).thenReturn(512);
+
+ overrideDefaultConfigValue(ClusterDrsImbalanceThreshold, "_defaultValue", "0.5");
+
+ cpuList = Arrays.asList(1L, 2L);
+ memoryList = Arrays.asList(512L, 2048L);
+
+ hostCpuUsedMap = new HashMap<>();
+ hostCpuUsedMap.put(1L, 1000L);
+ hostCpuUsedMap.put(2L, 2000L);
+
+ hostMemoryUsedMap = new HashMap<>();
+ hostMemoryUsedMap.put(1L, 512L * 1024L * 1024L);
+ hostMemoryUsedMap.put(2L, 2048L * 1024L * 1024L);
+ }
+
+ private void overrideDefaultConfigValue(final ConfigKey configKey, final String name,
+ final Object o) throws IllegalAccessException, NoSuchFieldException {
+ Field f = ConfigKey.class.getDeclaredField(name);
+ f.setAccessible(true);
+ f.set(configKey, o);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ closeable.close();
+ }
+
+ /**
+ * needsDrs tests
+ * Scenarios to test for needsDrs
+ *
1. cluster with cpu metric
+ *
2. cluster with memory metric
+ *
3. cluster with "unknown" metric
+ *
+ *
CPU imbalance = 0.333
+ *
Memory imbalance = 0.6
+ */
+
+ /*
+ 1. cluster with cpu metric
+ 0.3333 > 0.5 -> False
+ */
+ @Test
+ public void needsDrsWithCpu() throws ConfigurationException, NoSuchFieldException, IllegalAccessException {
+ overrideDefaultConfigValue(ClusterDrsMetric, "_defaultValue", "cpu");
+ assertFalse(balanced.needsDrs(clusterId, cpuList, memoryList));
+ }
+
+ /*
+ 2. cluster with memory metric
+ 0.6 > 0.5 -> True
+ */
+ @Test
+ public void needsDrsWithMemory() throws ConfigurationException, NoSuchFieldException, IllegalAccessException {
+ overrideDefaultConfigValue(ClusterDrsMetric, "_defaultValue", "memory");
+ assertTrue(balanced.needsDrs(clusterId, cpuList, memoryList));
+ }
+
+ /* 3. cluster with "unknown" metric */
+ @Test
+ public void needsDrsWithUnknown() throws ConfigurationException, NoSuchFieldException, IllegalAccessException {
+ overrideDefaultConfigValue(ClusterDrsMetric, "_defaultValue", "unknown");
+ assertThrows(ConfigurationException.class, () -> balanced.needsDrs(clusterId, cpuList, memoryList));
+ }
+
+ /**
+ * getMetrics tests
+ *
Scenarios to test for getMetrics
+ *
1. cluster with cpu metric
+ *
2. cluster with memory metric
+ *
3. cluster with default metric
+ *
+ *
Pre
+ *
CPU imbalance = 0.333333
+ *
Memory imbalance = 0.6
+ *
+ *
Post
+ *
CPU imbalance = 0.3333
+ *
Memory imbalance = 0.2
+ *
+ *
Cost 512.0
+ *
Benefit (0.6-0.2) * 8192 = 3276.8
+ */
+
+ /*
+ 1. cluster with cpu metric
+ improvement = 0.3333 - 0.3333 = 0.0
+ */
+ @Test
+ public void getMetricsWithCpu() throws NoSuchFieldException, IllegalAccessException {
+ overrideDefaultConfigValue(ClusterDrsMetric, "_defaultValue", "cpu");
+ Ternary result = balanced.getMetrics(clusterId, vm3, serviceOffering, destHost,
+ hostCpuUsedMap, hostMemoryUsedMap, false);
+ assertEquals(0.0, result.first(), 0.01);
+ assertEquals(0.0, result.second(), 0.0);
+ assertEquals(1.0, result.third(), 0.0);
+ }
+
+ /*
+ 2. cluster with memory metric
+ improvement = 0.6 - 0.2 = 0.4
+ */
+ @Test
+ public void getMetricsWithMemory() throws NoSuchFieldException, IllegalAccessException {
+ overrideDefaultConfigValue(ClusterDrsMetric, "_defaultValue", "memory");
+ Ternary result = balanced.getMetrics(clusterId, vm3, serviceOffering, destHost,
+ hostCpuUsedMap, hostMemoryUsedMap, false);
+ assertEquals(0.4, result.first(), 0.01);
+ assertEquals(0, result.second(), 0.0);
+ assertEquals(1, result.third(), 0.0);
+ }
+
+ /*
+ 3. cluster with default metric
+ improvement = 0.3333 + 0.6 - 0.3333 - 0.2 = 0.4
+ */
+ @Test
+ public void getMetricsWithDefault() throws NoSuchFieldException, IllegalAccessException {
+ overrideDefaultConfigValue(ClusterDrsMetric, "_defaultValue", "both");
+ Ternary result = balanced.getMetrics(clusterId, vm3, serviceOffering, destHost,
+ hostCpuUsedMap, hostMemoryUsedMap, false);
+ assertEquals(0.4, result.first(), 0.01);
+ assertEquals(0, result.second(), 0.0);
+ assertEquals(1, result.third(), 0.0);
+ }
+}
diff --git a/plugins/drs/cluster/condensed/pom.xml b/plugins/drs/cluster/condensed/pom.xml
new file mode 100644
index 00000000000..ea8acdc6d1b
--- /dev/null
+++ b/plugins/drs/cluster/condensed/pom.xml
@@ -0,0 +1,33 @@
+
+
+
+
+ 4.0.0
+ Apache CloudStack Plugin - Cluster DRS Algorithm - Condensed
+ cloud-plugin-cluster-drs-condensed
+
+ org.apache.cloudstack
+ cloudstack-plugins
+ 4.19.0.0-SNAPSHOT
+ ../../../pom.xml
+
+
diff --git a/plugins/drs/cluster/condensed/src/main/java/org/apache/cloudstack/cluster/Condensed.java b/plugins/drs/cluster/condensed/src/main/java/org/apache/cloudstack/cluster/Condensed.java
new file mode 100644
index 00000000000..aefd11905ef
--- /dev/null
+++ b/plugins/drs/cluster/condensed/src/main/java/org/apache/cloudstack/cluster/Condensed.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.cluster;
+
+import com.cloud.host.Host;
+import com.cloud.offering.ServiceOffering;
+import com.cloud.utils.Pair;
+import com.cloud.utils.Ternary;
+import com.cloud.utils.component.AdapterBase;
+import com.cloud.vm.VirtualMachine;
+
+import javax.naming.ConfigurationException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.cloudstack.cluster.ClusterDrsService.ClusterDrsImbalanceThreshold;
+import static org.apache.cloudstack.cluster.ClusterDrsService.ClusterDrsMetric;
+
+public class Condensed extends AdapterBase implements ClusterDrsAlgorithm {
+
+ @Override
+ public String getName() {
+ return "condensed";
+ }
+
+ @Override
+ public boolean needsDrs(long clusterId, List cpuList, List memoryList) throws ConfigurationException {
+ Double cpuImbalance = getClusterImbalance(cpuList);
+ Double memoryImbalance = getClusterImbalance(memoryList);
+ double threshold = getThreshold(clusterId);
+ String metric = ClusterDrsMetric.valueIn(clusterId);
+ switch (metric) {
+ case "cpu":
+ return cpuImbalance < threshold;
+ case "memory":
+ return memoryImbalance < threshold;
+ default:
+ throw new ConfigurationException(
+ String.format("Invalid metric: %s for cluster: %d", metric, clusterId));
+ }
+ }
+
+ private double getThreshold(long clusterId) throws ConfigurationException {
+ return ClusterDrsImbalanceThreshold.valueIn(clusterId);
+ }
+
+ @Override
+ public Ternary getMetrics(long clusterId, VirtualMachine vm,
+ ServiceOffering serviceOffering, Host destHost,
+ Map hostCpuUsedMap, Map hostMemoryUsedMap,
+ Boolean requiresStorageMotion) {
+ Double preCpuImbalance = getClusterImbalance(new ArrayList<>(hostCpuUsedMap.values()));
+ Double preMemoryImbalance = getClusterImbalance(new ArrayList<>(hostMemoryUsedMap.values()));
+
+ Pair imbalancePair = getImbalancePostMigration(serviceOffering, vm, destHost, hostCpuUsedMap,
+ hostMemoryUsedMap);
+ Double postCpuImbalance = imbalancePair.first();
+ Double postMemoryImbalance = imbalancePair.second();
+
+ // This needs more research to determine the cost and benefit of a migration
+ // TODO: Cost should be a factor of the VM size and the host capacity
+ // TODO: Benefit should be a factor of the VM size and the host capacity and the number of VMs on the host
+ double cost = 0;
+ double benefit = 1;
+
+ String metric = ClusterDrsMetric.valueIn(clusterId);
+ double improvement;
+ switch (metric) {
+ case "cpu":
+ improvement = postCpuImbalance - preCpuImbalance;
+ break;
+ case "memory":
+ improvement = postMemoryImbalance - preMemoryImbalance;
+ break;
+ default:
+ improvement = postCpuImbalance + postMemoryImbalance - preCpuImbalance - preMemoryImbalance;
+ }
+ return new Ternary<>(improvement, cost, benefit);
+ }
+}
diff --git a/plugins/drs/cluster/condensed/src/main/resources/META-INF/cloudstack/condensed/module.properties b/plugins/drs/cluster/condensed/src/main/resources/META-INF/cloudstack/condensed/module.properties
new file mode 100644
index 00000000000..0581736b17b
--- /dev/null
+++ b/plugins/drs/cluster/condensed/src/main/resources/META-INF/cloudstack/condensed/module.properties
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+name=condensed
+parent=cluster
diff --git a/plugins/drs/cluster/condensed/src/main/resources/META-INF/cloudstack/condensed/spring-condensed-context.xml b/plugins/drs/cluster/condensed/src/main/resources/META-INF/cloudstack/condensed/spring-condensed-context.xml
new file mode 100644
index 00000000000..dffa7d85da7
--- /dev/null
+++ b/plugins/drs/cluster/condensed/src/main/resources/META-INF/cloudstack/condensed/spring-condensed-context.xml
@@ -0,0 +1,33 @@
+
+
+
+
+
+
+
diff --git a/plugins/drs/cluster/condensed/src/test/java/org/apache/cloudstack/cluster/CondensedTest.java b/plugins/drs/cluster/condensed/src/test/java/org/apache/cloudstack/cluster/CondensedTest.java
new file mode 100644
index 00000000000..d8cf581768a
--- /dev/null
+++ b/plugins/drs/cluster/condensed/src/test/java/org/apache/cloudstack/cluster/CondensedTest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.cluster;
+
+import com.cloud.host.Host;
+import com.cloud.service.ServiceOfferingVO;
+import com.cloud.utils.Ternary;
+import com.cloud.vm.VirtualMachine;
+import org.apache.cloudstack.framework.config.ConfigKey;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import javax.naming.ConfigurationException;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.cloudstack.cluster.ClusterDrsService.ClusterDrsImbalanceThreshold;
+import static org.apache.cloudstack.cluster.ClusterDrsService.ClusterDrsMetric;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(MockitoJUnitRunner.class)
+public class CondensedTest {
+
+ @InjectMocks
+ Condensed condensed;
+
+ VirtualMachine vm1, vm2, vm3;
+
+ Host destHost;
+
+ ServiceOfferingVO serviceOffering;
+
+ long clusterId = 1L;
+
+ Map> hostVmMap;
+
+ List cpuList, memoryList;
+
+ Map hostCpuUsedMap, hostMemoryUsedMap;
+
+
+ private AutoCloseable closeable;
+
+ @Before
+ public void setUp() throws NoSuchFieldException, IllegalAccessException {
+ closeable = MockitoAnnotations.openMocks(this);
+
+ vm1 = Mockito.mock(VirtualMachine.class);
+ vm2 = Mockito.mock(VirtualMachine.class);
+ vm3 = Mockito.mock(VirtualMachine.class); // vm to migrate
+
+ destHost = Mockito.mock(Host.class);
+ hostVmMap = new HashMap<>();
+ hostVmMap.put(1L, Collections.singletonList(vm1));
+ hostVmMap.put(2L, Arrays.asList(vm2, vm3));
+
+ serviceOffering = Mockito.mock(ServiceOfferingVO.class);
+ Mockito.when(vm3.getHostId()).thenReturn(2L);
+
+ Mockito.when(destHost.getId()).thenReturn(1L);
+
+ Mockito.when(serviceOffering.getCpu()).thenReturn(1);
+ Mockito.when(serviceOffering.getSpeed()).thenReturn(1000);
+ Mockito.when(serviceOffering.getRamSize()).thenReturn(512);
+
+ overrideDefaultConfigValue(ClusterDrsImbalanceThreshold, "_defaultValue", "0.5");
+
+ cpuList = Arrays.asList(1L, 2L);
+ memoryList = Arrays.asList(512L, 2048L);
+
+ hostCpuUsedMap = new HashMap<>();
+ hostCpuUsedMap.put(1L, 1000L);
+ hostCpuUsedMap.put(2L, 2000L);
+
+ hostMemoryUsedMap = new HashMap<>();
+ hostMemoryUsedMap.put(1L, 512L * 1024L * 1024L);
+ hostMemoryUsedMap.put(2L, 2048L * 1024L * 1024L);
+ }
+
+ private void overrideDefaultConfigValue(final ConfigKey configKey,
+ final String name,
+ final Object o) throws IllegalAccessException, NoSuchFieldException {
+ Field f = ConfigKey.class.getDeclaredField(name);
+ f.setAccessible(true);
+ f.set(configKey, o);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ closeable.close();
+ }
+
+ /**
+ * needsDrs tests
+ *
Scenarios to test for needsDrs
+ *
1. cluster with cpu metric
+ *
2. cluster with memory metric
+ *
3. cluster with "unknown" metric
+ *
+ *
CPU imbalance = 0.333
+ *
Memory imbalance = 0.6
+ */
+
+ /*
+ 1. cluster with cpu metric
+ 0.3333 < 0.5 -> True
+ */
+ @Test
+ public void needsDrsWithCpu() throws ConfigurationException, NoSuchFieldException, IllegalAccessException {
+ overrideDefaultConfigValue(ClusterDrsMetric, "_defaultValue", "cpu");
+ assertTrue(condensed.needsDrs(clusterId, cpuList, memoryList));
+ }
+
+ /*
+ 2. cluster with memory metric
+ 0.6 < 0.5 -> False
+ */
+ @Test
+ public void needsDrsWithMemory() throws ConfigurationException, NoSuchFieldException, IllegalAccessException {
+ overrideDefaultConfigValue(ClusterDrsMetric, "_defaultValue", "memory");
+ assertFalse(condensed.needsDrs(clusterId, cpuList, memoryList));
+ }
+
+ /* 3. cluster with "unknown" metric */
+ @Test
+ public void needsDrsWithUnknown() throws ConfigurationException, NoSuchFieldException, IllegalAccessException {
+ overrideDefaultConfigValue(ClusterDrsMetric, "_defaultValue", "unknown");
+ assertThrows(ConfigurationException.class, () -> condensed.needsDrs(clusterId, cpuList, memoryList));
+ }
+
+ /**
+ * getMetrics tests
+ *
Scenarios to test for getMetrics
+ *
1. cluster with cpu metric
+ *
2. cluster with memory metric
+ *
3. cluster with default metric
+ *
+ *
Pre
+ *
CPU imbalance = 0.333333
+ *
Memory imbalance = 0.6
+ *
+ *
Post
+ *
CPU imbalance = 0.3333
+ *
Memory imbalance = 0.2
+ *
+ *
Cost 512.0
+ *
Benefit (0.2-0.6) * 8192 = -3276.8
+ */
+
+ /*
+ 1. cluster with cpu metric
+ improvement = 0.3333 - 0.3333 = 0.0
+ */
+ @Test
+ public void getMetricsWithCpu() throws NoSuchFieldException, IllegalAccessException {
+ overrideDefaultConfigValue(ClusterDrsMetric, "_defaultValue", "cpu");
+ Ternary result = condensed.getMetrics(clusterId, vm3, serviceOffering, destHost,
+ hostCpuUsedMap, hostMemoryUsedMap, false);
+ assertEquals(0.0, result.first(), 0.0);
+ assertEquals(0, result.second(), 0.0);
+ assertEquals(1, result.third(), 0.0);
+ }
+
+ /*
+ 2. cluster with memory metric
+ improvement = 0.2 - 0.6 = -0.4
+ */
+ @Test
+ public void getMetricsWithMemory() throws NoSuchFieldException, IllegalAccessException {
+ overrideDefaultConfigValue(ClusterDrsMetric, "_defaultValue", "memory");
+ Ternary result = condensed.getMetrics(clusterId, vm3, serviceOffering, destHost,
+ hostCpuUsedMap, hostMemoryUsedMap, false);
+ assertEquals(-0.4, result.first(), 0.01);
+ assertEquals(0, result.second(), 0.0);
+ assertEquals(1, result.third(), 0.0);
+ }
+
+ /*
+ 3. cluster with default metric
+ improvement = 0.3333 + 0.2 - 0.3333 - 0.6 = -0.4
+ */
+ @Test
+ public void getMetricsWithDefault() throws NoSuchFieldException, IllegalAccessException {
+ overrideDefaultConfigValue(ClusterDrsMetric, "_defaultValue", "both");
+ Ternary result = condensed.getMetrics(clusterId, vm3, serviceOffering, destHost,
+ hostCpuUsedMap, hostMemoryUsedMap, false);
+ assertEquals(-0.4, result.first(), 0.0001);
+ assertEquals(0, result.second(), 0.0);
+ assertEquals(1, result.third(), 0.0);
+ }
+}
diff --git a/plugins/pom.xml b/plugins/pom.xml
index d0661c01a2c..af131f08669 100755
--- a/plugins/pom.xml
+++ b/plugins/pom.xml
@@ -73,6 +73,9 @@
deployment-planners/user-concentrated-pod
deployment-planners/user-dispersing
+ drs/cluster/balanced
+ drs/cluster/condensed
+
event-bus/inmemory
event-bus/kafka
event-bus/rabbitmq
diff --git a/server/src/main/java/com/cloud/api/ApiDBUtils.java b/server/src/main/java/com/cloud/api/ApiDBUtils.java
index d30e8b82920..ddfd0671820 100644
--- a/server/src/main/java/com/cloud/api/ApiDBUtils.java
+++ b/server/src/main/java/com/cloud/api/ApiDBUtils.java
@@ -1864,6 +1864,10 @@ public class ApiDBUtils {
return s_userVmJoinDao.newUserVmView(userVms);
}
+ public static List newUserVmView(VirtualMachine... vms) {
+ return s_userVmJoinDao.newUserVmView(vms);
+ }
+
public static SecurityGroupResponse newSecurityGroupResponse(SecurityGroupJoinVO vsg, Account caller) {
return s_securityGroupJoinDao.newSecurityGroupResponse(vsg, caller);
}
diff --git a/server/src/main/java/com/cloud/api/query/dao/UserVmJoinDao.java b/server/src/main/java/com/cloud/api/query/dao/UserVmJoinDao.java
index 652f51bcb3d..6356addcb4f 100644
--- a/server/src/main/java/com/cloud/api/query/dao/UserVmJoinDao.java
+++ b/server/src/main/java/com/cloud/api/query/dao/UserVmJoinDao.java
@@ -19,6 +19,7 @@ package com.cloud.api.query.dao;
import java.util.List;
import java.util.Set;
+import com.cloud.vm.VirtualMachine;
import org.apache.cloudstack.api.ApiConstants.VMDetails;
import org.apache.cloudstack.api.ResponseObject.ResponseView;
import org.apache.cloudstack.api.response.UserVmResponse;
@@ -37,6 +38,8 @@ public interface UserVmJoinDao extends GenericDao {
List newUserVmView(UserVm... userVms);
+ List newUserVmView(VirtualMachine... vms);
+
List searchByIds(Long... ids);
List listActiveByIsoId(Long isoId);
diff --git a/server/src/main/java/com/cloud/api/query/dao/UserVmJoinDaoImpl.java b/server/src/main/java/com/cloud/api/query/dao/UserVmJoinDaoImpl.java
index b97ae779abc..2914df28ce2 100644
--- a/server/src/main/java/com/cloud/api/query/dao/UserVmJoinDaoImpl.java
+++ b/server/src/main/java/com/cloud/api/query/dao/UserVmJoinDaoImpl.java
@@ -27,6 +27,7 @@ import java.util.stream.Collectors;
import javax.inject.Inject;
+import com.cloud.vm.VirtualMachine;
import org.apache.cloudstack.affinity.AffinityGroupResponse;
import org.apache.cloudstack.annotation.AnnotationService;
import org.apache.cloudstack.annotation.dao.AnnotationDao;
@@ -619,4 +620,18 @@ public class UserVmJoinDaoImpl extends GenericDaoBaseWithTagInformation newUserVmView(VirtualMachine... vms) {
+
+ Hashtable userVmDataHash = new Hashtable<>();
+ for (VirtualMachine vm : vms) {
+ if (!userVmDataHash.containsKey(vm.getId())) {
+ userVmDataHash.put(vm.getId(), vm);
+ }
+ }
+
+ Set vmIdSet = userVmDataHash.keySet();
+ return searchByIds(vmIdSet.toArray(new Long[vmIdSet.size()]));
+ }
+
}
diff --git a/server/src/main/java/com/cloud/configuration/ConfigurationManagerImpl.java b/server/src/main/java/com/cloud/configuration/ConfigurationManagerImpl.java
index 18cebff87fe..acf57a788a0 100644
--- a/server/src/main/java/com/cloud/configuration/ConfigurationManagerImpl.java
+++ b/server/src/main/java/com/cloud/configuration/ConfigurationManagerImpl.java
@@ -89,6 +89,7 @@ import org.apache.cloudstack.api.command.admin.zone.CreateZoneCmd;
import org.apache.cloudstack.api.command.admin.zone.DeleteZoneCmd;
import org.apache.cloudstack.api.command.admin.zone.UpdateZoneCmd;
import org.apache.cloudstack.api.command.user.network.ListNetworkOfferingsCmd;
+import org.apache.cloudstack.cluster.ClusterDrsService;
import org.apache.cloudstack.config.ApiServiceConfiguration;
import org.apache.cloudstack.config.Configuration;
import org.apache.cloudstack.context.CallContext;
@@ -571,6 +572,7 @@ public class ConfigurationManagerImpl extends ManagerBase implements Configurati
weightBasedParametersForValidation.add(Config.AgentLoadThreshold.key());
weightBasedParametersForValidation.add(Config.VmUserDispersionWeight.key());
weightBasedParametersForValidation.add(CapacityManager.SecondaryStorageCapacityThreshold.key());
+ weightBasedParametersForValidation.add(ClusterDrsService.ClusterDrsImbalanceThreshold.key());
}
diff --git a/server/src/main/java/com/cloud/server/ManagementServerImpl.java b/server/src/main/java/com/cloud/server/ManagementServerImpl.java
index 02d958a1518..50824d11d58 100644
--- a/server/src/main/java/com/cloud/server/ManagementServerImpl.java
+++ b/server/src/main/java/com/cloud/server/ManagementServerImpl.java
@@ -1330,7 +1330,7 @@ public class ManagementServerImpl extends ManagerBase implements ManagementServe
return new Pair, Integer>(result.first(), result.second());
}
- protected Pair> filterUefiHostsForMigration(List allHosts, List filteredHosts, VMInstanceVO vm) {
+ protected Pair> filterUefiHostsForMigration(List allHosts, List filteredHosts, VirtualMachine vm) {
UserVmDetailVO userVmDetailVO = _UserVmDetailsDao.findDetail(vm.getId(), ApiConstants.BootType.UEFI.toString());
if (userVmDetailVO != null &&
(ApiConstants.BootMode.LEGACY.toString().equalsIgnoreCase(userVmDetailVO.getValue()) ||
@@ -1350,9 +1350,7 @@ public class ManagementServerImpl extends ManagerBase implements ManagementServe
return new Pair<>(true, filteredHosts);
}
- @Override
- public Ternary, Integer>, List extends Host>, Map> listHostsForMigrationOfVM(final Long vmId, final Long startIndex, final Long pageSize,
- final String keyword) {
+ private void validateVmForHostMigration(VirtualMachine vm) {
final Account caller = getCaller();
if (!_accountMgr.isRootAdmin(caller.getId())) {
if (s_logger.isDebugEnabled()) {
@@ -1361,10 +1359,8 @@ public class ManagementServerImpl extends ManagerBase implements ManagementServe
throw new PermissionDeniedException("No permission to migrate VM, Only Root Admin can migrate a VM!");
}
- final VMInstanceVO vm = _vmInstanceDao.findById(vmId);
if (vm == null) {
- final InvalidParameterValueException ex = new InvalidParameterValueException("Unable to find the VM with given id");
- throw ex;
+ throw new InvalidParameterValueException("Unable to find the VM with given id");
}
if (vm.getState() != State.Running) {
@@ -1376,13 +1372,6 @@ public class ManagementServerImpl extends ManagerBase implements ManagementServe
throw ex;
}
- if (_serviceOfferingDetailsDao.findDetail(vm.getServiceOfferingId(), GPU.Keys.pciDevice.toString()) != null) {
- s_logger.info(" Live Migration of GPU enabled VM : " + vm.getInstanceName() + " is not supported");
- // Return empty list.
- return new Ternary<>(new Pair<>(new ArrayList(), new Integer(0)),
- new ArrayList<>(), new HashMap<>());
- }
-
if (!LIVE_MIGRATION_SUPPORTING_HYPERVISORS.contains(vm.getHypervisorType())) {
if (s_logger.isDebugEnabled()) {
s_logger.debug(vm + " is not XenServer/VMware/KVM/Ovm/Hyperv/Ovm3, cannot migrate this VM.");
@@ -1393,6 +1382,27 @@ public class ManagementServerImpl extends ManagerBase implements ManagementServe
if (VirtualMachine.Type.User.equals(vm.getType()) && HypervisorType.LXC.equals(vm.getHypervisorType())) {
throw new InvalidParameterValueException("Unsupported Hypervisor Type for User VM migration, we support XenServer/VMware/KVM/Ovm/Hyperv/Ovm3 only");
}
+ }
+
+ @Override
+ public Ternary, Integer>, List extends Host>, Map> listHostsForMigrationOfVM(final Long vmId, final Long startIndex, final Long pageSize,
+ final String keyword) {
+ final VMInstanceVO vm = _vmInstanceDao.findById(vmId);
+ return listHostsForMigrationOfVM(vm, startIndex, pageSize, keyword, Collections.emptyList());
+ }
+
+ @Override
+ public Ternary, Integer>, List extends Host>, Map> listHostsForMigrationOfVM(final VirtualMachine vm, final Long startIndex, final Long pageSize,
+ final String keyword, List vmList) {
+
+ validateVmForHostMigration(vm);
+
+ if (_serviceOfferingDetailsDao.findDetail(vm.getServiceOfferingId(), GPU.Keys.pciDevice.toString()) != null) {
+ s_logger.info(" Live Migration of GPU enabled VM : " + vm.getInstanceName() + " is not supported");
+ // Return empty list.
+ return new Ternary<>(new Pair<>(new ArrayList<>(), 0),
+ new ArrayList<>(), new HashMap<>());
+ }
final long srcHostId = vm.getHostId();
final Host srcHost = _hostDao.findById(srcHostId);
@@ -1531,7 +1541,7 @@ public class ManagementServerImpl extends ManagerBase implements ManagementServe
if (vmGroupCount > 0) {
for (final AffinityGroupProcessor processor : _affinityProcessors) {
- processor.process(vmProfile, plan, excludes);
+ processor.process(vmProfile, plan, excludes, vmList);
}
}
diff --git a/server/src/main/java/org/apache/cloudstack/cluster/ClusterDrsServiceImpl.java b/server/src/main/java/org/apache/cloudstack/cluster/ClusterDrsServiceImpl.java
new file mode 100644
index 00000000000..3ebb97ae4f0
--- /dev/null
+++ b/server/src/main/java/org/apache/cloudstack/cluster/ClusterDrsServiceImpl.java
@@ -0,0 +1,849 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.cluster;
+
+import com.cloud.api.ApiGsonHelper;
+import com.cloud.api.query.dao.HostJoinDao;
+import com.cloud.api.query.vo.HostJoinVO;
+import com.cloud.dc.ClusterVO;
+import com.cloud.dc.dao.ClusterDao;
+import com.cloud.domain.Domain;
+import com.cloud.event.ActionEventUtils;
+import com.cloud.event.EventTypes;
+import com.cloud.event.EventVO;
+import com.cloud.event.dao.EventDao;
+import com.cloud.exception.InvalidParameterValueException;
+import com.cloud.host.Host;
+import com.cloud.host.HostVO;
+import com.cloud.host.dao.HostDao;
+import com.cloud.offering.ServiceOffering;
+import com.cloud.org.Cluster;
+import com.cloud.server.ManagementServer;
+import com.cloud.service.dao.ServiceOfferingDao;
+import com.cloud.user.Account;
+import com.cloud.user.User;
+import com.cloud.utils.DateUtil;
+import com.cloud.utils.Pair;
+import com.cloud.utils.Ternary;
+import com.cloud.utils.component.ComponentContext;
+import com.cloud.utils.component.ManagerBase;
+import com.cloud.utils.component.PluggableService;
+import com.cloud.utils.db.GlobalLock;
+import com.cloud.utils.db.Transaction;
+import com.cloud.utils.db.TransactionCallback;
+import com.cloud.utils.exception.CloudRuntimeException;
+import com.cloud.vm.VMInstanceVO;
+import com.cloud.vm.VirtualMachine;
+import com.cloud.vm.VmDetailConstants;
+import com.cloud.vm.dao.VMInstanceDao;
+import org.apache.cloudstack.api.ApiCommandResourceType;
+import org.apache.cloudstack.api.ApiConstants;
+import org.apache.cloudstack.api.command.admin.cluster.ExecuteClusterDrsPlanCmd;
+import org.apache.cloudstack.api.command.admin.cluster.GenerateClusterDrsPlanCmd;
+import org.apache.cloudstack.api.command.admin.cluster.ListClusterDrsPlanCmd;
+import org.apache.cloudstack.api.command.admin.vm.MigrateVMCmd;
+import org.apache.cloudstack.api.response.ClusterDrsPlanMigrationResponse;
+import org.apache.cloudstack.api.response.ClusterDrsPlanResponse;
+import org.apache.cloudstack.api.response.ListResponse;
+import org.apache.cloudstack.cluster.dao.ClusterDrsPlanDao;
+import org.apache.cloudstack.cluster.dao.ClusterDrsPlanMigrationDao;
+import org.apache.cloudstack.context.CallContext;
+import org.apache.cloudstack.framework.config.ConfigKey;
+import org.apache.cloudstack.framework.jobs.AsyncJobDispatcher;
+import org.apache.cloudstack.framework.jobs.AsyncJobManager;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
+import org.apache.cloudstack.jobs.JobInfo;
+import org.apache.cloudstack.managed.context.ManagedContextTimerTask;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.time.DateUtils;
+import org.apache.log4j.Logger;
+
+import javax.inject.Inject;
+import javax.naming.ConfigurationException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.stream.Collectors;
+
+import static com.cloud.org.Grouping.AllocationState.Disabled;
+
+public class ClusterDrsServiceImpl extends ManagerBase implements ClusterDrsService, PluggableService {
+
+ private static final Logger logger = Logger.getLogger(ClusterDrsServiceImpl.class);
+
+ private static final String CLUSTER_LOCK_STR = "drs.plan.cluster.%s";
+
+ AsyncJobDispatcher asyncJobDispatcher;
+
+ @Inject
+ AsyncJobManager asyncJobManager;
+
+ @Inject
+ ClusterDao clusterDao;
+
+ @Inject
+ HostDao hostDao;
+
+ @Inject
+ EventDao eventDao;
+
+ @Inject
+ HostJoinDao hostJoinDao;
+
+ @Inject
+ VMInstanceDao vmInstanceDao;
+
+ @Inject
+ ClusterDrsPlanDao drsPlanDao;
+
+ @Inject
+ ClusterDrsPlanMigrationDao drsPlanMigrationDao;
+
+ @Inject
+ ServiceOfferingDao serviceOfferingDao;
+
+ @Inject
+ ManagementServer managementServer;
+
+ List drsAlgorithms = new ArrayList<>();
+
+ Map drsAlgorithmMap = new HashMap<>();
+
+ public AsyncJobDispatcher getAsyncJobDispatcher() {
+ return asyncJobDispatcher;
+ }
+
+ public void setAsyncJobDispatcher(final AsyncJobDispatcher dispatcher) {
+ asyncJobDispatcher = dispatcher;
+ }
+
+ public void setDrsAlgorithms(final List drsAlgorithms) {
+ this.drsAlgorithms = drsAlgorithms;
+ }
+
+ @Override
+ public boolean start() {
+ drsAlgorithmMap.clear();
+ for (final ClusterDrsAlgorithm algorithm : drsAlgorithms) {
+ drsAlgorithmMap.put(algorithm.getName(), algorithm);
+ }
+
+ final TimerTask schedulerPollTask = new ManagedContextTimerTask() {
+ @Override
+ protected void runInContext() {
+ try {
+ poll(new Date());
+ } catch (final Exception e) {
+ logger.error("Error while running DRS", e);
+ }
+ }
+ };
+ Timer vmSchedulerTimer = new Timer("VMSchedulerPollTask");
+ vmSchedulerTimer.schedule(schedulerPollTask, 5000L, 60 * 1000L);
+ return true;
+ }
+
+ @Override
+ public void poll(Date timestamp) {
+ Date currentTimestamp = DateUtils.round(timestamp, Calendar.MINUTE);
+ String displayTime = DateUtil.displayDateInTimezone(DateUtil.GMT_TIMEZONE, currentTimestamp);
+ logger.debug(String.format("ClusterDRS.poll is being called at %s", displayTime));
+
+ GlobalLock lock = GlobalLock.getInternLock("clusterDRS.poll");
+ try {
+ if (lock.lock(30)) {
+ try {
+ updateOldPlanMigrations();
+ // Executing processPlans() twice to update the migration status of plans which
+ // are completed and
+ // if required generate new plans.
+ processPlans();
+ generateDrsPlanForAllClusters();
+ processPlans();
+ } finally {
+ lock.unlock();
+ }
+ }
+ } finally {
+ lock.releaseRef();
+ }
+ GlobalLock cleanupLock = GlobalLock.getInternLock("clusterDRS.cleanup");
+ try {
+ if (cleanupLock.lock(30)) {
+ try {
+ cleanUpOldDrsPlans();
+ } finally {
+ cleanupLock.unlock();
+ }
+ }
+ } finally {
+ cleanupLock.releaseRef();
+ }
+ }
+
+ /**
+ * Fetches the plans which are in progress and updates their migration status.
+ */
+ void updateOldPlanMigrations() {
+ List plans = drsPlanDao.listByStatus(ClusterDrsPlan.Status.IN_PROGRESS);
+ for (ClusterDrsPlanVO plan : plans) {
+ try {
+ updateDrsPlanMigrations(plan);
+ } catch (Exception e) {
+ logger.error(String.format("Unable to update DRS plan details [id=%d]", plan.getId()), e);
+ }
+ }
+ }
+
+ /**
+ * Updates the job status of the plan details for the given plan.
+ *
+ * @param plan
+ * the plan to update
+ */
+ void updateDrsPlanMigrations(ClusterDrsPlanVO plan) {
+ List migrations = drsPlanMigrationDao.listPlanMigrationsInProgress(plan.getId());
+ if (migrations == null || migrations.isEmpty()) {
+ plan.setStatus(ClusterDrsPlan.Status.COMPLETED);
+ drsPlanDao.update(plan.getId(), plan);
+ ActionEventUtils.onCompletedActionEvent(User.UID_SYSTEM, Account.ACCOUNT_ID_SYSTEM, EventVO.LEVEL_INFO,
+ EventTypes.EVENT_CLUSTER_DRS, true,
+ String.format("DRS execution task completed for cluster [id=%s]", plan.getClusterId()),
+ plan.getClusterId(), ApiCommandResourceType.Cluster.toString(), plan.getEventId());
+ return;
+ }
+
+ for (ClusterDrsPlanMigrationVO migration : migrations) {
+ try {
+ AsyncJobVO job = asyncJobManager.getAsyncJob(migration.getJobId());
+ if (job == null) {
+ logger.warn(String.format("Unable to find async job [id=%d] for DRS plan migration [id=%d]",
+ migration.getJobId(), migration.getId()));
+ migration.setStatus(JobInfo.Status.FAILED);
+ drsPlanMigrationDao.update(migration.getId(), migration);
+ continue;
+ }
+ if (job.getStatus() != JobInfo.Status.IN_PROGRESS) {
+ migration.setStatus(job.getStatus());
+ drsPlanMigrationDao.update(migration.getId(), migration);
+ }
+ } catch (Exception e) {
+ logger.error(String.format("Unable to update DRS plan migration [id=%d]", migration.getId()), e);
+ }
+ }
+ }
+
+ /**
+ * Generates DRS for all clusters that meet the criteria for automated DRS.
+ */
+ void generateDrsPlanForAllClusters() {
+ List clusterList = clusterDao.listAll();
+
+ for (ClusterVO cluster : clusterList) {
+ if (cluster.getAllocationState() == Disabled || ClusterDrsEnabled.valueIn(
+ cluster.getId()).equals(Boolean.FALSE)) {
+ continue;
+ }
+
+ ClusterDrsPlanVO lastPlan = drsPlanDao.listLatestPlanForClusterId(cluster.getId());
+
+ // If the last plan is ready or in progress or was executed within the last interval, skip this cluster.
+ // This is to avoid generating plans for clusters which are already being processed and to avoid
+ // generating plans for clusters which have been processed recently.This doesn't consider the type
+ // (manual or automated) of the last plan.
+ if (lastPlan != null && (lastPlan.getStatus() == ClusterDrsPlan.Status.READY ||
+ lastPlan.getStatus() == ClusterDrsPlan.Status.IN_PROGRESS ||
+ (lastPlan.getStatus() == ClusterDrsPlan.Status.COMPLETED &&
+ lastPlan.getCreated().compareTo(DateUtils.addMinutes(new Date(), -1 * ClusterDrsInterval.valueIn(cluster.getId()))) > 0)
+ )) {
+ continue;
+ }
+
+ long eventId = ActionEventUtils.onStartedActionEvent(User.UID_SYSTEM, Account.ACCOUNT_ID_SYSTEM,
+ EventTypes.EVENT_CLUSTER_DRS,
+ String.format("Generating DRS plan for cluster %s", cluster.getUuid()), cluster.getId(),
+ ApiCommandResourceType.Cluster.toString(), true, 0);
+ GlobalLock clusterLock = GlobalLock.getInternLock(String.format(CLUSTER_LOCK_STR, cluster.getId()));
+ try {
+ if (clusterLock.lock(30)) {
+ try {
+ List> plan = getDrsPlan(cluster,
+ ClusterDrsMaxMigrations.valueIn(cluster.getId()));
+ savePlan(cluster.getId(), plan, eventId, ClusterDrsPlan.Type.AUTOMATED,
+ ClusterDrsPlan.Status.READY);
+ logger.info(String.format("Generated DRS plan for cluster %s [id=%s]", cluster.getName(),
+ cluster.getUuid()));
+ } catch (Exception e) {
+ logger.error(
+ String.format("Unable to generate DRS plans for cluster %s [id=%s]", cluster.getName(),
+ cluster.getUuid()),
+ e);
+ } finally {
+ clusterLock.unlock();
+ }
+ }
+ } finally {
+ clusterLock.releaseRef();
+ }
+ }
+ }
+
+ /**
+ * Generate DRS plan for the given cluster with the specified iteration percentage.
+ *
+ * @param cluster
+ * The cluster to generate DRS for.
+ * @param maxIterations
+ * The percentage of VMs to consider for migration
+ * during each iteration. Value between 0 and 1.
+ *
+ * @return List of Ternary object containing VM to be migrated, source host and
+ * destination host.
+ *
+ * @throws ConfigurationException
+ * If there is an error in the DRS configuration.
+ */
+ List> getDrsPlan(Cluster cluster,
+ int maxIterations) throws ConfigurationException {
+ List> migrationPlan = new ArrayList<>();
+
+ if (cluster.getAllocationState() == Disabled || maxIterations <= 0) {
+ return Collections.emptyList();
+ }
+ ClusterDrsAlgorithm algorithm = getDrsAlgorithm(ClusterDrsAlgorithm.valueIn(cluster.getId()));
+ List hostList = hostDao.findByClusterId(cluster.getId());
+ List vmList = new ArrayList<>(vmInstanceDao.listByClusterId(cluster.getId()));
+
+ int iteration = 0;
+
+ Map hostMap = hostList.stream().collect(Collectors.toMap(HostVO::getId, host -> host));
+
+ Map> hostVmMap = getHostVmMap(hostList, vmList);
+ Map> originalHostIdVmIdMap = new HashMap<>();
+ for (HostVO host : hostList) {
+ originalHostIdVmIdMap.put(host.getId(), new ArrayList<>());
+ for (VirtualMachine vm : hostVmMap.get(host.getId())) {
+ originalHostIdVmIdMap.get(host.getId()).add(vm.getId());
+ }
+ }
+
+ List hostJoinList = hostJoinDao.searchByIds(
+ hostList.stream().map(HostVO::getId).toArray(Long[]::new));
+
+ Map hostCpuMap = hostJoinList.stream().collect(Collectors.toMap(HostJoinVO::getId,
+ hostJoin -> hostJoin.getCpuUsedCapacity() + hostJoin.getCpuReservedCapacity()));
+ Map hostMemoryMap = hostJoinList.stream().collect(Collectors.toMap(HostJoinVO::getId,
+ hostJoin -> hostJoin.getMemUsedCapacity() + hostJoin.getMemReservedCapacity()));
+
+ Map vmIdServiceOfferingMap = new HashMap<>();
+
+ for (VirtualMachine vm : vmList) {
+ vmIdServiceOfferingMap.put(vm.getId(),
+ serviceOfferingDao.findByIdIncludingRemoved(vm.getId(), vm.getServiceOfferingId()));
+ }
+
+ while (iteration < maxIterations && algorithm.needsDrs(cluster.getId(), new ArrayList<>(hostCpuMap.values()),
+ new ArrayList<>(hostMemoryMap.values()))) {
+ Pair bestMigration = getBestMigration(cluster, algorithm, vmList,
+ vmIdServiceOfferingMap, hostCpuMap, hostMemoryMap);
+ VirtualMachine vm = bestMigration.first();
+ Host destHost = bestMigration.second();
+ if (destHost == null || vm == null || originalHostIdVmIdMap.get(destHost.getId()).contains(vm.getId())) {
+ logger.debug("VM migrating to it's original host or no host found for migration");
+ break;
+ }
+
+ ServiceOffering serviceOffering = vmIdServiceOfferingMap.get(vm.getId());
+ migrationPlan.add(new Ternary<>(vm, hostMap.get(vm.getHostId()), hostMap.get(destHost.getId())));
+
+ hostVmMap.get(vm.getHostId()).remove(vm);
+ hostVmMap.get(destHost.getId()).add(vm);
+ hostVmMap.get(vm.getHostId()).remove(vm);
+ hostVmMap.get(destHost.getId()).add(vm);
+
+ long vmCpu = (long) serviceOffering.getCpu() * serviceOffering.getSpeed();
+ long vmMemory = serviceOffering.getRamSize() * 1024L * 1024L;
+
+ hostCpuMap.put(vm.getHostId(), hostCpuMap.get(vm.getHostId()) - vmCpu);
+ hostCpuMap.put(destHost.getId(), hostCpuMap.get(destHost.getId()) + vmCpu);
+ hostMemoryMap.put(vm.getHostId(), hostMemoryMap.get(vm.getHostId()) - vmMemory);
+ hostMemoryMap.put(destHost.getId(), hostMemoryMap.get(destHost.getId()) + vmMemory);
+ vm.setHostId(destHost.getId());
+ iteration++;
+ }
+ return migrationPlan;
+ }
+
+ private ClusterDrsAlgorithm getDrsAlgorithm(String algoName) {
+ if (drsAlgorithmMap.containsKey(algoName)) {
+ return drsAlgorithmMap.get(algoName);
+ }
+ throw new CloudRuntimeException("Invalid algorithm configured!");
+ }
+
+ Map> getHostVmMap(List hostList, List vmList) {
+ Map> hostVmMap = new HashMap<>();
+ for (HostVO host : hostList) {
+ hostVmMap.put(host.getId(), new ArrayList<>());
+ }
+ for (VirtualMachine vm : vmList) {
+ hostVmMap.get(vm.getHostId()).add(vm);
+ }
+ return hostVmMap;
+ }
+
+ /**
+ * Returns the best migration for a given cluster using the specified DRS
+ * algorithm.
+ *
+ * @param cluster
+ * the cluster to perform DRS on
+ * @param algorithm
+ * the DRS algorithm to use
+ * @param vmList
+ * the list of virtual machines to consider for
+ * migration
+ * @param vmIdServiceOfferingMap
+ * a map of virtual machine IDs to their
+ * corresponding service offerings
+ * @param hostCpuCapacityMap
+ * a map of host IDs to their corresponding CPU
+ * capacity
+ * @param hostMemoryCapacityMap
+ * a map of host IDs to their corresponding memory
+ * capacity
+ *
+ * @return a pair of the virtual machine and host that represent the best
+ * migration, or null if no migration is
+ * possible
+ */
+ Pair getBestMigration(Cluster cluster, ClusterDrsAlgorithm algorithm,
+ List vmList,
+ Map vmIdServiceOfferingMap,
+ Map hostCpuCapacityMap,
+ Map hostMemoryCapacityMap) {
+ double improvement = 0;
+ Pair bestMigration = new Pair<>(null, null);
+
+ for (VirtualMachine vm : vmList) {
+ if (vm.getType().isUsedBySystem() || vm.getState() != VirtualMachine.State.Running ||
+ (MapUtils.isNotEmpty(vm.getDetails()) &&
+ vm.getDetails().get(VmDetailConstants.SKIP_DRS).equalsIgnoreCase("true"))
+ ) {
+ continue;
+ }
+ Ternary, Integer>, List extends Host>, Map> hostsForMigrationOfVM = managementServer
+ .listHostsForMigrationOfVM(
+ vm, 0L, 500L, null, vmList);
+ List extends Host> compatibleDestinationHosts = hostsForMigrationOfVM.first().first();
+ List extends Host> suitableDestinationHosts = hostsForMigrationOfVM.second();
+
+ Map requiresStorageMotion = hostsForMigrationOfVM.third();
+
+ for (Host destHost : compatibleDestinationHosts) {
+ if (!suitableDestinationHosts.contains(destHost)) {
+ continue;
+ }
+ Ternary metrics = algorithm.getMetrics(cluster.getId(), vm,
+ vmIdServiceOfferingMap.get(vm.getId()), destHost, hostCpuCapacityMap, hostMemoryCapacityMap,
+ requiresStorageMotion.get(destHost));
+
+ Double currentImprovement = metrics.first();
+ Double cost = metrics.second();
+ Double benefit = metrics.third();
+ if (benefit > cost && (currentImprovement > improvement)) {
+ bestMigration = new Pair<>(vm, destHost);
+ improvement = currentImprovement;
+ }
+ }
+ }
+ return bestMigration;
+ }
+
+
+ /**
+ * Saves a DRS plan for a given cluster and returns the saved plan along with the list of migrations to be executed.
+ *
+ * @param clusterId
+ * the ID of the cluster for which the DRS plan is being saved
+ * @param plan
+ * the list of virtual machine migrations to be executed as part of the DRS plan
+ * @param eventId
+ * the ID of the event that triggered the DRS plan
+ * @param type
+ * the type of the DRS plan
+ *
+ * @return a pair of the saved DRS plan and the list of migrations to be executed
+ */
+ Pair> savePlan(Long clusterId,
+ List> plan,
+ Long eventId, ClusterDrsPlan.Type type,
+ ClusterDrsPlan.Status status) {
+ return Transaction.execute(
+ (TransactionCallback>>) txStatus -> {
+ ClusterDrsPlanVO drsPlan = drsPlanDao.persist(
+ new ClusterDrsPlanVO(clusterId, eventId, type, status));
+ List planMigrations = new ArrayList<>();
+ for (Ternary migration : plan) {
+ VirtualMachine vm = migration.first();
+ Host srcHost = migration.second();
+ Host destHost = migration.third();
+ planMigrations.add(drsPlanMigrationDao.persist(
+ new ClusterDrsPlanMigrationVO(drsPlan.getId(), vm.getId(), srcHost.getId(),
+ destHost.getId())));
+ }
+ return new Pair<>(drsPlan, planMigrations);
+ });
+ }
+
+ /**
+ * Processes all DRS plans that are in the READY status.
+ */
+ void processPlans() {
+ List plans = drsPlanDao.listByStatus(ClusterDrsPlan.Status.READY);
+ for (ClusterDrsPlanVO plan : plans) {
+ try {
+ executeDrsPlan(plan);
+ } catch (Exception e) {
+ logger.error(String.format("Unable to execute DRS plan [id=%d]", plan.getId()), e);
+ }
+ }
+ }
+
+ /**
+ * Executes the DRS plan by migrating virtual machines to their destination hosts.
+ * If there are no migrations to be executed, the plan is marked as completed.
+ *
+ * @param plan
+ * the DRS plan to be executed
+ */
+ void executeDrsPlan(ClusterDrsPlanVO plan) {
+ List planMigrations = drsPlanMigrationDao.listPlanMigrationsToExecute(plan.getId());
+ if (planMigrations == null || planMigrations.isEmpty()) {
+ plan.setStatus(ClusterDrsPlan.Status.COMPLETED);
+ drsPlanDao.update(plan.getId(), plan);
+ ActionEventUtils.onCompletedActionEvent(User.UID_SYSTEM, Account.ACCOUNT_ID_SYSTEM, EventVO.LEVEL_INFO,
+ EventTypes.EVENT_CLUSTER_DRS, true,
+ String.format("DRS execution task completed for cluster [id=%s]", plan.getClusterId()),
+ plan.getClusterId(), ApiCommandResourceType.Cluster.toString(), plan.getEventId());
+ return;
+ }
+
+ plan.setStatus(ClusterDrsPlan.Status.IN_PROGRESS);
+ drsPlanDao.update(plan.getId(), plan);
+
+ for (ClusterDrsPlanMigrationVO migration : planMigrations) {
+ try {
+ VirtualMachine vm = vmInstanceDao.findById(migration.getVmId());
+ Host host = hostDao.findById(migration.getDestHostId());
+ if (vm == null || host == null) {
+ throw new CloudRuntimeException(String.format("vm %s or host %s is not found", migration.getVmId(),
+ migration.getDestHostId()));
+ }
+
+ logger.debug(
+ String.format("Executing DRS plan %s for vm %s to host %s", plan.getId(), vm.getInstanceName(),
+ host.getName()));
+ long jobId = createMigrateVMAsyncJob(vm, host, plan.getEventId());
+ AsyncJobVO job = asyncJobManager.getAsyncJob(jobId);
+ migration.setJobId(jobId);
+ migration.setStatus(job.getStatus());
+ drsPlanMigrationDao.update(migration.getId(), migration);
+ } catch (Exception e) {
+ logger.warn(String.format("Unable to execute DRS plan %s due to %s", plan.getUuid(), e.getMessage()));
+ migration.setStatus(JobInfo.Status.FAILED);
+ drsPlanMigrationDao.update(migration.getId(), migration);
+ }
+ }
+ }
+
+ /**
+ * Creates an asynchronous job to migrate a virtual machine to a specified host.
+ *
+ * @param vm
+ * the virtual machine to be migrated
+ * @param host
+ * the destination host for the virtual machine
+ * @param eventId
+ * the ID of the event that triggered the migration
+ *
+ * @return the ID of the created asynchronous job
+ */
+ long createMigrateVMAsyncJob(VirtualMachine vm, Host host, long eventId) {
+ final Map params = new HashMap<>();
+ params.put("ctxUserId", String.valueOf(User.UID_SYSTEM));
+ params.put("ctxAccountId", String.valueOf(Account.ACCOUNT_ID_SYSTEM));
+ params.put(ApiConstants.CTX_START_EVENT_ID, String.valueOf(eventId));
+ params.put(ApiConstants.HOST_ID, String.valueOf(host.getId()));
+ params.put(ApiConstants.VIRTUAL_MACHINE_ID, String.valueOf(vm.getId()));
+
+ final MigrateVMCmd cmd = new MigrateVMCmd();
+ ComponentContext.inject(cmd);
+
+ AsyncJobVO job = new AsyncJobVO("", User.UID_SYSTEM, Account.ACCOUNT_ID_SYSTEM, MigrateVMCmd.class.getName(),
+ ApiGsonHelper.getBuilder().create().toJson(params), vm.getId(),
+ ApiCommandResourceType.VirtualMachine.toString(), null);
+ job.setDispatcher(asyncJobDispatcher.getName());
+
+ return asyncJobManager.submitAsyncJob(job);
+ }
+
+ /**
+ * Removes old DRS migrations records that have expired based on the configured interval.
+ */
+ void cleanUpOldDrsPlans() {
+ Date date = DateUtils.addDays(new Date(), -1 * ClusterDrsPlanExpireInterval.value());
+ int rowsRemoved = drsPlanDao.expungeBeforeDate(date);
+ logger.debug(String.format("Removed %d old drs migration plans", rowsRemoved));
+ }
+
+ @Override
+ public String getConfigComponentName() {
+ return ClusterDrsService.class.getSimpleName();
+ }
+
+ @Override
+ public ConfigKey>[] getConfigKeys() {
+ return new ConfigKey>[]{ClusterDrsPlanExpireInterval, ClusterDrsEnabled, ClusterDrsInterval,
+ ClusterDrsMaxMigrations, ClusterDrsAlgorithm, ClusterDrsImbalanceThreshold, ClusterDrsMetric};
+ }
+
+ @Override
+ public List> getCommands() {
+ List> cmdList = new ArrayList<>();
+ cmdList.add(ListClusterDrsPlanCmd.class);
+ cmdList.add(GenerateClusterDrsPlanCmd.class);
+ cmdList.add(ExecuteClusterDrsPlanCmd.class);
+ return cmdList;
+ }
+
+ /**
+ * Generates a DRS plan for the given cluster and returns a list of migration responses.
+ *
+ * @param cmd
+ * the command containing the cluster ID and number of migrations for the DRS plan
+ *
+ * @return a list response of migration responses for the generated DRS plan
+ *
+ * @throws InvalidParameterValueException
+ * if the cluster is not found, is disabled, or is not a cloud stack managed cluster, or if the number of
+ * migrations is invalid
+ * @throws CloudRuntimeException
+ * if there is an error scheduling the DRS plan
+ */
+ @Override
+ public ClusterDrsPlanResponse generateDrsPlan(GenerateClusterDrsPlanCmd cmd) {
+ Cluster cluster = clusterDao.findById(cmd.getId());
+ if (cluster == null) {
+ throw new InvalidParameterValueException("Unable to find the cluster by id=" + cmd.getId());
+ }
+ if (cluster.getAllocationState() == Disabled) {
+ throw new InvalidParameterValueException(
+ String.format("Unable to execute DRS on the cluster %s as it is disabled", cluster.getName()));
+ }
+ if (cmd.getMaxMigrations() <= 0) {
+ throw new InvalidParameterValueException(
+ String.format("Unable to execute DRS on the cluster %s as the number of migrations [%s] is invalid",
+ cluster.getName(), cmd.getMaxMigrations()));
+ }
+
+ try {
+ List> plan = getDrsPlan(cluster, cmd.getMaxMigrations());
+ long eventId = ActionEventUtils.onActionEvent(User.UID_SYSTEM, Account.ACCOUNT_ID_SYSTEM,
+ Domain.ROOT_DOMAIN,
+ EventTypes.EVENT_CLUSTER_DRS_GENERATE,
+ String.format("Generating DRS plan for cluster %s", cluster.getUuid()), cluster.getId(),
+ ApiCommandResourceType.Cluster.toString());
+ List migrations;
+ ClusterDrsPlanVO drsPlan = new ClusterDrsPlanVO(
+ cluster.getId(), eventId, ClusterDrsPlan.Type.MANUAL, ClusterDrsPlan.Status.UNDER_REVIEW);
+ migrations = new ArrayList<>();
+ for (Ternary migration : plan) {
+ VirtualMachine vm = migration.first();
+ Host srcHost = migration.second();
+ Host destHost = migration.third();
+ migrations.add(new ClusterDrsPlanMigrationVO(0L, vm.getId(), srcHost.getId(), destHost.getId()));
+ }
+
+ CallContext.current().setEventResourceType(ApiCommandResourceType.Cluster);
+ CallContext.current().setEventResourceId(cluster.getId());
+
+ String eventUuid = null;
+ EventVO event = eventDao.findById(drsPlan.getEventId());
+ if (event != null) {
+ eventUuid = event.getUuid();
+ }
+
+ return new ClusterDrsPlanResponse(
+ cluster.getUuid(), drsPlan, eventUuid, getResponseObjectForMigrations(migrations));
+ } catch (ConfigurationException e) {
+ throw new CloudRuntimeException("Unable to schedule DRS", e);
+ }
+ }
+
+ /**
+ * Returns a list of ClusterDrsPlanMigrationResponse objects for the given list of ClusterDrsPlanMigrationVO
+ * objects.
+ *
+ * @param migrations
+ * the list of ClusterDrsPlanMigrationVO objects
+ *
+ * @return a list of ClusterDrsPlanMigrationResponse objects
+ */
+ List getResponseObjectForMigrations(List migrations) {
+ if (migrations == null) {
+ return Collections.emptyList();
+ }
+ List responses = new ArrayList<>();
+
+ for (ClusterDrsPlanMigrationVO migration : migrations) {
+ VMInstanceVO vm = vmInstanceDao.findByIdIncludingRemoved(migration.getVmId());
+ HostVO srcHost = hostDao.findByIdIncludingRemoved(migration.getSrcHostId());
+ HostVO destHost = hostDao.findByIdIncludingRemoved(migration.getDestHostId());
+ responses.add(new ClusterDrsPlanMigrationResponse(
+ vm.getUuid(), vm.getInstanceName(),
+ srcHost.getUuid(), srcHost.getName(),
+ destHost.getUuid(), destHost.getName(),
+ migration.getJobId(), migration.getStatus()));
+ }
+
+ return responses;
+ }
+
+ @Override
+ public ClusterDrsPlanResponse executeDrsPlan(ExecuteClusterDrsPlanCmd cmd) {
+
+ Map vmToHostMap = cmd.getVmToHostMap();
+ Long clusterId = cmd.getId();
+
+ if (vmToHostMap.isEmpty()) {
+ throw new InvalidParameterValueException("migrateto can not be empty.");
+ }
+
+ Cluster cluster = clusterDao.findById(clusterId);
+
+ if (cluster == null) {
+ throw new InvalidParameterValueException("cluster not found");
+ }
+
+ return executeDrsPlan(cluster, vmToHostMap);
+
+ }
+
+ private ClusterDrsPlanResponse executeDrsPlan(Cluster cluster, Map vmToHostMap) {
+ // To ensure that no other plan is generated for this cluster, we take a lock
+ GlobalLock clusterLock = GlobalLock.getInternLock(String.format(CLUSTER_LOCK_STR, cluster.getId()));
+ ClusterDrsPlanVO drsPlan = null;
+ List migrations = null;
+ try {
+ if (clusterLock.lock(5)) {
+ try {
+ List readyPlans = drsPlanDao.listByClusterIdAndStatus(cluster.getId(),
+ ClusterDrsPlan.Status.READY);
+ if (readyPlans != null && !readyPlans.isEmpty()) {
+ throw new InvalidParameterValueException(
+ String.format(
+ "Unable to execute DRS plan as there is already a plan [id=%s] in READY state",
+ readyPlans.get(0).getUuid()));
+ }
+ List inProgressPlans = drsPlanDao.listByClusterIdAndStatus(cluster.getId(),
+ ClusterDrsPlan.Status.IN_PROGRESS);
+
+ if (inProgressPlans != null && !inProgressPlans.isEmpty()) {
+ throw new InvalidParameterValueException(
+ String.format("Unable to execute DRS plan as there is already a plan [id=%s] in In " +
+ "Progress",
+ inProgressPlans.get(0).getUuid()));
+ }
+
+ List> plan = new ArrayList<>();
+ for (Map.Entry entry : vmToHostMap.entrySet()) {
+ VirtualMachine vm = entry.getKey();
+ Host destHost = entry.getValue();
+ Host srcHost = hostDao.findById(vm.getHostId());
+ plan.add(new Ternary<>(vm, srcHost, destHost));
+ }
+
+ Pair> pair = savePlan(cluster.getId(), plan,
+ CallContext.current().getStartEventId(), ClusterDrsPlan.Type.MANUAL,
+ ClusterDrsPlan.Status.READY);
+ drsPlan = pair.first();
+ migrations = pair.second();
+
+ executeDrsPlan(drsPlan);
+ } finally {
+ clusterLock.unlock();
+ }
+ }
+ } finally {
+ clusterLock.releaseRef();
+ }
+
+ String eventId = null;
+ if (drsPlan != null) {
+ EventVO event = eventDao.findById(drsPlan.getEventId());
+ eventId = event.getUuid();
+ }
+
+ return new ClusterDrsPlanResponse(
+ cluster.getUuid(), drsPlan, eventId, getResponseObjectForMigrations(migrations));
+ }
+
+ @Override
+ public ListResponse listDrsPlan(ListClusterDrsPlanCmd cmd) {
+ Long clusterId = cmd.getClusterId();
+ Long planId = cmd.getId();
+
+ if (planId != null && clusterId != null) {
+ throw new InvalidParameterValueException("Only one of clusterId or planId can be specified");
+ }
+
+ ClusterVO cluster = clusterDao.findById(clusterId);
+ if (clusterId != null && cluster == null) {
+ throw new InvalidParameterValueException("Unable to find the cluster by id=" + clusterId);
+ }
+
+ Pair, Integer> result = drsPlanDao.searchAndCount(clusterId, planId, cmd.getStartIndex(),
+ cmd.getPageSizeVal());
+
+ ListResponse response = new ListResponse<>();
+ List responseList = new ArrayList<>();
+
+ for (ClusterDrsPlan plan : result.first()) {
+ if (cluster == null || plan.getClusterId() != cluster.getId()) {
+ cluster = clusterDao.findById(plan.getClusterId());
+ }
+ List migrations = drsPlanMigrationDao.listByPlanId(plan.getId());
+ EventVO event = eventDao.findById(plan.getEventId());
+
+ responseList.add(new ClusterDrsPlanResponse(
+ cluster.getUuid(), plan, event.getUuid(), getResponseObjectForMigrations(migrations)));
+ }
+
+ response.setResponses(responseList, result.second());
+ return response;
+ }
+}
diff --git a/server/src/main/resources/META-INF/cloudstack/core/spring-server-core-managers-context.xml b/server/src/main/resources/META-INF/cloudstack/core/spring-server-core-managers-context.xml
index ee676aabbfa..b976ed6a329 100644
--- a/server/src/main/resources/META-INF/cloudstack/core/spring-server-core-managers-context.xml
+++ b/server/src/main/resources/META-INF/cloudstack/core/spring-server-core-managers-context.xml
@@ -342,6 +342,10 @@
value="#{affinityProcessorsRegistry.registered}" />
+
diff --git a/server/src/test/java/org/apache/cloudstack/cluster/ClusterDrsServiceImplTest.java b/server/src/test/java/org/apache/cloudstack/cluster/ClusterDrsServiceImplTest.java
new file mode 100644
index 00000000000..e82b39a47ec
--- /dev/null
+++ b/server/src/test/java/org/apache/cloudstack/cluster/ClusterDrsServiceImplTest.java
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.cluster;
+
+import com.cloud.api.query.dao.HostJoinDao;
+import com.cloud.api.query.vo.HostJoinVO;
+import com.cloud.dc.ClusterVO;
+import com.cloud.dc.dao.ClusterDao;
+import com.cloud.event.ActionEventUtils;
+import com.cloud.event.EventVO;
+import com.cloud.event.dao.EventDao;
+import com.cloud.exception.InvalidParameterValueException;
+import com.cloud.host.Host;
+import com.cloud.host.HostVO;
+import com.cloud.host.dao.HostDao;
+import com.cloud.offering.ServiceOffering;
+import com.cloud.org.Cluster;
+import com.cloud.org.Grouping;
+import com.cloud.server.ManagementServer;
+import com.cloud.service.ServiceOfferingVO;
+import com.cloud.service.dao.ServiceOfferingDao;
+import com.cloud.utils.Pair;
+import com.cloud.utils.Ternary;
+import com.cloud.utils.db.GlobalLock;
+import com.cloud.utils.exception.CloudRuntimeException;
+import com.cloud.vm.VMInstanceVO;
+import com.cloud.vm.VirtualMachine;
+import com.cloud.vm.dao.VMInstanceDao;
+import org.apache.cloudstack.api.command.admin.cluster.GenerateClusterDrsPlanCmd;
+import org.apache.cloudstack.api.response.ClusterDrsPlanMigrationResponse;
+import org.apache.cloudstack.api.response.ClusterDrsPlanResponse;
+import org.apache.cloudstack.cluster.dao.ClusterDrsPlanDao;
+import org.apache.cloudstack.cluster.dao.ClusterDrsPlanMigrationDao;
+import org.apache.cloudstack.framework.config.ConfigKey;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.springframework.test.util.ReflectionTestUtils;
+
+import javax.naming.ConfigurationException;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ClusterDrsServiceImplTest {
+
+ @Mock
+ ClusterDrsAlgorithm condensedAlgorithm;
+
+ @Mock
+ ManagementServer managementServer;
+
+ @Mock
+ ClusterDrsAlgorithm balancedAlgorithm;
+
+ @Mock
+ GenerateClusterDrsPlanCmd cmd;
+
+ AutoCloseable closeable;
+
+ @Mock
+ private ClusterDao clusterDao;
+
+ @Mock
+ private ClusterDrsPlanDao drsPlanDao;
+
+ @Mock
+ private ClusterDrsPlanMigrationDao drsPlanMigrationDao;
+
+ @Mock
+ private EventDao eventDao;
+
+ @Mock
+ private HostDao hostDao;
+
+ @Mock
+ private HostJoinDao hostJoinDao;
+
+ @Mock
+ private ServiceOfferingDao serviceOfferingDao;
+
+ @Mock
+ private VMInstanceDao vmInstanceDao;
+
+ @Spy
+ @InjectMocks
+ private ClusterDrsServiceImpl clusterDrsService = new ClusterDrsServiceImpl();
+
+ private MockedStatic globalLockMocked;
+
+ @Before
+ public void setUp() throws NoSuchFieldException, IllegalAccessException {
+ closeable = MockitoAnnotations.openMocks(this);
+
+ HashMap drsAlgorithmMap = new HashMap<>();
+ drsAlgorithmMap.put("balanced", balancedAlgorithm);
+ drsAlgorithmMap.put("condensed", condensedAlgorithm);
+
+ clusterDrsService.setDrsAlgorithms(List.of(new ClusterDrsAlgorithm[]{balancedAlgorithm, condensedAlgorithm}));
+ ReflectionTestUtils.setField(clusterDrsService, "drsAlgorithmMap", drsAlgorithmMap);
+ Field f = ConfigKey.class.getDeclaredField("_defaultValue");
+ f.setAccessible(true);
+ f.set(clusterDrsService.ClusterDrsAlgorithm, "balanced");
+ Mockito.when(cmd.getId()).thenReturn(1L);
+
+ globalLockMocked = Mockito.mockStatic(GlobalLock.class);
+ GlobalLock lock = Mockito.mock(GlobalLock.class);
+ Mockito.when(GlobalLock.getInternLock("cluster.drs.1")).thenReturn(lock);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ globalLockMocked.close();
+ closeable.close();
+ }
+
+ @Test
+ public void testGetCommands() {
+ assertFalse(clusterDrsService.getCommands().isEmpty());
+ }
+
+ @Test
+ public void testGetDrsPlan() throws ConfigurationException {
+ ClusterVO cluster = Mockito.mock(ClusterVO.class);
+ Mockito.when(cluster.getId()).thenReturn(1L);
+ Mockito.when(cluster.getAllocationState()).thenReturn(Grouping.AllocationState.Enabled);
+
+ HostVO host1 = Mockito.mock(HostVO.class);
+ Mockito.when(host1.getId()).thenReturn(1L);
+
+ HostVO host2 = Mockito.mock(HostVO.class);
+ Mockito.when(host2.getId()).thenReturn(2L);
+
+ VMInstanceVO vm1 = Mockito.mock(VMInstanceVO.class);
+ Mockito.when(vm1.getId()).thenReturn(1L);
+ Mockito.when(vm1.getHostId()).thenReturn(1L);
+
+ VMInstanceVO vm2 = Mockito.mock(VMInstanceVO.class);
+ Mockito.when(vm2.getHostId()).thenReturn(2L);
+
+ List hostList = new ArrayList<>();
+ hostList.add(host1);
+ hostList.add(host2);
+
+ HostJoinVO hostJoin1 = Mockito.mock(HostJoinVO.class);
+ Mockito.when(hostJoin1.getId()).thenReturn(1L);
+ Mockito.when(hostJoin1.getCpuUsedCapacity()).thenReturn(1000L);
+ Mockito.when(hostJoin1.getCpuReservedCapacity()).thenReturn(0L);
+ Mockito.when(hostJoin1.getMemUsedCapacity()).thenReturn(1024L);
+ Mockito.when(hostJoin1.getMemReservedCapacity()).thenReturn(512L);
+
+ HostJoinVO hostJoin2 = Mockito.mock(HostJoinVO.class);
+ Mockito.when(hostJoin2.getId()).thenReturn(2L);
+ Mockito.when(hostJoin2.getCpuUsedCapacity()).thenReturn(1000L);
+ Mockito.when(hostJoin2.getCpuReservedCapacity()).thenReturn(0L);
+ Mockito.when(hostJoin2.getMemUsedCapacity()).thenReturn(1024L);
+ Mockito.when(hostJoin2.getMemReservedCapacity()).thenReturn(512L);
+
+ List vmList = new ArrayList<>();
+ vmList.add(vm1);
+ vmList.add(vm2);
+
+ ServiceOfferingVO serviceOffering = Mockito.mock(ServiceOfferingVO.class);
+ Mockito.when(serviceOffering.getCpu()).thenReturn(1);
+ Mockito.when(serviceOffering.getRamSize()).thenReturn(1024);
+ Mockito.when(serviceOffering.getSpeed()).thenReturn(1000);
+
+ Mockito.when(hostDao.findByClusterId(1L)).thenReturn(hostList);
+ Mockito.when(vmInstanceDao.listByClusterId(1L)).thenReturn(vmList);
+ Mockito.when(balancedAlgorithm.needsDrs(Mockito.anyLong(), Mockito.anyList(), Mockito.anyList())).thenReturn(
+ true, false);
+ Mockito.when(
+ clusterDrsService.getBestMigration(Mockito.any(Cluster.class), Mockito.any(ClusterDrsAlgorithm.class),
+ Mockito.anyList(), Mockito.anyMap(), Mockito.anyMap(), Mockito.anyMap())).thenReturn(
+ new Pair<>(vm1, host2));
+ Mockito.when(serviceOfferingDao.findByIdIncludingRemoved(Mockito.anyLong(), Mockito.anyLong())).thenReturn(
+ serviceOffering);
+ Mockito.when(hostJoinDao.searchByIds(host1.getId(), host2.getId())).thenReturn(List.of(hostJoin1, hostJoin2));
+
+ List> iterations = clusterDrsService.getDrsPlan(cluster, 5);
+
+ Mockito.verify(hostDao, Mockito.times(1)).findByClusterId(1L);
+ Mockito.verify(vmInstanceDao, Mockito.times(1)).listByClusterId(1L);
+ Mockito.verify(balancedAlgorithm, Mockito.times(2)).needsDrs(Mockito.anyLong(), Mockito.anyList(),
+ Mockito.anyList());
+
+ assertEquals(1, iterations.size());
+ }
+
+ @Test(expected = InvalidParameterValueException.class)
+ public void testGenerateDrsPlanClusterNotFound() {
+ Mockito.when(clusterDao.findById(1L)).thenReturn(null);
+ clusterDrsService.generateDrsPlan(cmd);
+ }
+
+ @Test(expected = InvalidParameterValueException.class)
+ public void testGenerateDrsPlanClusterDisabled() {
+ ClusterVO cluster = Mockito.mock(ClusterVO.class);
+ Mockito.when(cluster.getName()).thenReturn("testCluster");
+ Mockito.when(cluster.getAllocationState()).thenReturn(Grouping.AllocationState.Disabled);
+
+ Mockito.when(clusterDao.findById(1L)).thenReturn(cluster);
+
+ clusterDrsService.generateDrsPlan(cmd);
+ }
+
+ @Test(expected = InvalidParameterValueException.class)
+ public void testGenerateDrsPlanClusterNotCloudManaged() {
+
+ ClusterVO cluster = Mockito.mock(ClusterVO.class);
+ Mockito.when(cluster.getName()).thenReturn("testCluster");
+ Mockito.when(cluster.getAllocationState()).thenReturn(Grouping.AllocationState.Enabled);
+
+ Mockito.when(clusterDao.findById(1L)).thenReturn(cluster);
+
+ clusterDrsService.generateDrsPlan(cmd);
+ }
+
+ @Test(expected = InvalidParameterValueException.class)
+ public void testGenerateDrsPlanInvalidIterations() {
+ ClusterVO cluster = Mockito.mock(ClusterVO.class);
+ Mockito.when(cluster.getName()).thenReturn("testCluster");
+ Mockito.when(cluster.getAllocationState()).thenReturn(Grouping.AllocationState.Enabled);
+
+ Mockito.when(clusterDao.findById(1L)).thenReturn(cluster);
+ Mockito.when(cmd.getMaxMigrations()).thenReturn(0);
+
+ clusterDrsService.generateDrsPlan(cmd);
+ }
+
+ @Test(expected = CloudRuntimeException.class)
+ public void testGenerateDrsPlanConfigurationException() throws ConfigurationException {
+ ClusterVO cluster = Mockito.mock(ClusterVO.class);
+ Mockito.when(cluster.getId()).thenReturn(1L);
+ Mockito.when(cluster.getAllocationState()).thenReturn(Grouping.AllocationState.Enabled);
+ Mockito.when(clusterDao.findById(1L)).thenReturn(cluster);
+ Mockito.when(clusterDrsService.getDrsPlan(cluster, 5)).thenThrow(new ConfigurationException("test"));
+ Mockito.when(cmd.getMaxMigrations()).thenReturn(1);
+
+ clusterDrsService.generateDrsPlan(cmd);
+ }
+
+ @Test
+ public void testGenerateDrsPlan() throws ConfigurationException {
+ ClusterVO cluster = Mockito.mock(ClusterVO.class);
+ Mockito.when(cluster.getId()).thenReturn(1L);
+ Mockito.when(cluster.getAllocationState()).thenReturn(Grouping.AllocationState.Enabled);
+
+ VirtualMachine vm = Mockito.mock(VirtualMachine.class);
+ Mockito.when(vm.getId()).thenReturn(1L);
+
+ Host srcHost = Mockito.mock(Host.class);
+ Mockito.when(srcHost.getId()).thenReturn(1L);
+
+ Host destHost = Mockito.mock(Host.class);
+ Mockito.when(destHost.getId()).thenReturn(2L);
+
+ Mockito.when(clusterDao.findById(1L)).thenReturn(cluster);
+ Mockito.when(eventDao.findById(Mockito.anyLong())).thenReturn(Mockito.mock(EventVO.class));
+ Mockito.when(cmd.getMaxMigrations()).thenReturn(2);
+ Mockito.doReturn(List.of(new Ternary<>(vm, srcHost,
+ destHost))).when(clusterDrsService).getDrsPlan(Mockito.any(Cluster.class), Mockito.anyInt());
+
+ ClusterDrsPlanMigrationResponse migrationResponse = Mockito.mock(ClusterDrsPlanMigrationResponse.class);
+
+ Mockito.when(clusterDrsService.getResponseObjectForMigrations(Mockito.anyList())).thenReturn(
+ List.of(migrationResponse));
+
+ try(MockedStatic ignored = Mockito.mockStatic(ActionEventUtils.class)) {
+ Mockito.when(ActionEventUtils.onActionEvent(Mockito.anyLong(), Mockito.anyLong(),
+ Mockito.anyLong(),
+ Mockito.anyString(), Mockito.anyString(),
+ Mockito.anyLong(), Mockito.anyString())).thenReturn(1L);
+
+ ClusterDrsPlanResponse response = clusterDrsService.generateDrsPlan(
+ cmd);
+
+ assertEquals(1L, response.getMigrationPlans().size());
+ assertEquals(migrationResponse, response.getMigrationPlans().get(0));
+ }
+ }
+
+ @Test
+ public void testPoll() {
+ Mockito.doNothing().when(clusterDrsService).updateOldPlanMigrations();
+ Mockito.doNothing().when(clusterDrsService).processPlans();
+ Mockito.doNothing().when(clusterDrsService).generateDrsPlanForAllClusters();
+ Mockito.doNothing().when(clusterDrsService).cleanUpOldDrsPlans();
+
+ GlobalLock lock = Mockito.mock(GlobalLock.class);
+ Mockito.when(lock.lock(Mockito.anyInt())).thenReturn(true);
+
+ Mockito.when(GlobalLock.getInternLock(Mockito.anyString())).thenReturn(lock);
+
+ clusterDrsService.poll(new Date());
+
+ Mockito.verify(clusterDrsService, Mockito.times(1)).updateOldPlanMigrations();
+ Mockito.verify(clusterDrsService, Mockito.times(2)).processPlans();
+ Mockito.verify(clusterDrsService, Mockito.times(1)).generateDrsPlanForAllClusters();
+ }
+
+ @Test
+ public void testUpdateOldPlanMigrations() {
+ ClusterDrsPlanVO drsPlan1 = Mockito.mock(ClusterDrsPlanVO.class);
+ ClusterDrsPlanVO drsPlan2 = Mockito.mock(ClusterDrsPlanVO.class);
+
+ Mockito.when(drsPlanDao.listByStatus(ClusterDrsPlan.Status.IN_PROGRESS)).thenReturn(
+ List.of(drsPlan1, drsPlan2));
+
+ Mockito.doNothing().when(clusterDrsService).updateDrsPlanMigrations(drsPlan1);
+ Mockito.doNothing().when(clusterDrsService).updateDrsPlanMigrations(drsPlan2);
+
+ clusterDrsService.updateOldPlanMigrations();
+
+ Mockito.verify(clusterDrsService, Mockito.times(2)).updateDrsPlanMigrations(
+ Mockito.any(ClusterDrsPlanVO.class));
+ }
+
+ @Test
+ public void testGetBestMigration() {
+ ClusterVO cluster = Mockito.mock(ClusterVO.class);
+ Mockito.when(cluster.getId()).thenReturn(1L);
+
+ HostVO destHost = Mockito.mock(HostVO.class);
+
+ HostVO host = Mockito.mock(HostVO.class);
+ Mockito.when(host.getId()).thenReturn(2L);
+
+ VMInstanceVO vm1 = Mockito.mock(VMInstanceVO.class);
+ Mockito.when(vm1.getId()).thenReturn(1L);
+ Mockito.when(vm1.getType()).thenReturn(VirtualMachine.Type.User);
+ Mockito.when(vm1.getState()).thenReturn(VirtualMachine.State.Running);
+ Mockito.when(vm1.getDetails()).thenReturn(Collections.emptyMap());
+
+ VMInstanceVO vm2 = Mockito.mock(VMInstanceVO.class);
+ Mockito.when(vm2.getId()).thenReturn(2L);
+ Mockito.when(vm2.getType()).thenReturn(VirtualMachine.Type.User);
+ Mockito.when(vm2.getState()).thenReturn(VirtualMachine.State.Running);
+ Mockito.when(vm2.getDetails()).thenReturn(Collections.emptyMap());
+
+ List vmList = new ArrayList<>();
+ vmList.add(vm1);
+ vmList.add(vm2);
+
+ Map> hostVmMap = new HashMap<>();
+ hostVmMap.put(host.getId(), new ArrayList<>());
+ hostVmMap.get(host.getId()).add(vm1);
+ hostVmMap.get(host.getId()).add(vm2);
+
+ Map vmIdServiceOfferingMap = new HashMap<>();
+
+ ServiceOffering serviceOffering = Mockito.mock(ServiceOffering.class);
+ for (VirtualMachine vm : vmList) {
+ vmIdServiceOfferingMap.put(vm.getId(), serviceOffering);
+ }
+
+ Mockito.when(managementServer.listHostsForMigrationOfVM(vm1, 0L, 500L, null, vmList)).thenReturn(
+ new Ternary, Integer>, List extends Host>, Map>(
+ new Pair<>(List.of(destHost), 1), List.of(destHost), Map.of(destHost,
+ false)));
+ Mockito.when(managementServer.listHostsForMigrationOfVM(vm2, 0L, 500L, null, vmList)).thenReturn(
+ new Ternary, Integer>, List extends Host>, Map>(
+ new Pair<>(List.of(destHost), 1), List.of(destHost), Map.of(destHost,
+ false)));
+ Mockito.when(balancedAlgorithm.getMetrics(cluster.getId(), vm1, serviceOffering, destHost, new HashMap<>(),
+ new HashMap<>(), false)).thenReturn(new Ternary<>(1.0, 0.5, 1.5));
+
+ Mockito.when(balancedAlgorithm.getMetrics(cluster.getId(), vm2, serviceOffering, destHost, new HashMap<>(),
+ new HashMap<>(), false)).thenReturn(new Ternary<>(1.0, 2.5, 1.5));
+
+ Pair bestMigration = clusterDrsService.getBestMigration(cluster, balancedAlgorithm,
+ vmList, vmIdServiceOfferingMap, new HashMap<>(), new HashMap<>());
+
+ assertEquals(destHost, bestMigration.second());
+ assertEquals(vm1, bestMigration.first());
+ }
+
+ @Test
+ public void testSavePlan() {
+ Mockito.when(drsPlanDao.persist(Mockito.any(ClusterDrsPlanVO.class))).thenReturn(
+ Mockito.mock(ClusterDrsPlanVO.class));
+ Mockito.when(drsPlanMigrationDao.persist(Mockito.any(ClusterDrsPlanMigrationVO.class))).thenReturn(
+ Mockito.mock(ClusterDrsPlanMigrationVO.class));
+
+ clusterDrsService.savePlan(1L,
+ List.of(new Ternary<>(Mockito.mock(VirtualMachine.class), Mockito.mock(Host.class),
+ Mockito.mock(Host.class)),
+ new Ternary<>(Mockito.mock(VirtualMachine.class), Mockito.mock(Host.class),
+ Mockito.mock(Host.class))), 1L, ClusterDrsPlan.Type.AUTOMATED,
+ ClusterDrsPlan.Status.READY);
+
+ Mockito.verify(drsPlanDao, Mockito.times(1)).persist(Mockito.any(ClusterDrsPlanVO.class));
+ Mockito.verify(drsPlanMigrationDao, Mockito.times(2)).persist(Mockito.any(ClusterDrsPlanMigrationVO.class));
+ }
+
+ @Test
+ public void testProcessPlans() {
+ Mockito.when(drsPlanDao.listByStatus(ClusterDrsPlan.Status.READY)).thenReturn(
+ List.of(Mockito.mock(ClusterDrsPlanVO.class), Mockito.mock(ClusterDrsPlanVO.class)));
+
+ Mockito.doNothing().when(clusterDrsService).executeDrsPlan(Mockito.any(ClusterDrsPlanVO.class));
+
+ clusterDrsService.processPlans();
+
+ Mockito.verify(clusterDrsService, Mockito.times(2)).executeDrsPlan(Mockito.any(ClusterDrsPlanVO.class));
+ }
+}
diff --git a/test/integration/smoke/test_cluster_drs.py b/test/integration/smoke/test_cluster_drs.py
new file mode 100644
index 00000000000..4db6654aa73
--- /dev/null
+++ b/test/integration/smoke/test_cluster_drs.py
@@ -0,0 +1,267 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Tests DRS on a cluster
+"""
+
+import logging
+import time
+
+from marvin.cloudstackTestCase import cloudstackTestCase
+from marvin.lib.base import (Cluster, Configurations, Host, Network, NetworkOffering, ServiceOffering, VirtualMachine,
+ Zone)
+from marvin.lib.common import (get_domain, get_zone, get_template)
+from marvin.lib.utils import wait_until
+from marvin import jsonHelper
+from nose.plugins.attrib import attr
+
+
+class TestClusterDRS(cloudstackTestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ cls.testClient = super(TestClusterDRS, cls).getClsTestClient()
+ cls.apiclient = cls.testClient.getApiClient()
+ cls.services = cls.testClient.getParsedTestDataConfig()
+
+ zone = get_zone(cls.apiclient, cls.testClient.getZoneForTests())
+ cls.zone = Zone(zone.__dict__)
+ cls.template = get_template(cls.apiclient, cls.zone.id)
+ cls._cleanup = []
+
+ cls.logger = logging.getLogger("TestClusterDRS")
+ cls.stream_handler = logging.StreamHandler()
+ cls.logger.setLevel(logging.DEBUG)
+ cls.logger.addHandler(cls.stream_handler)
+
+ cls.skipTests = False
+ clusters = Cluster.list(cls.apiclient, zoneid=cls.zone.id, allocationstate='Enabled')
+
+ if not clusters or not isinstance(clusters, list) or len(clusters) < 1:
+ cls.logger.debug("This test requires at least 1 (Up and Enabled) cluster in the zone")
+ cls.skipTests = True
+ return
+
+ for cluster in clusters:
+ cls.hosts = Host.list(cls.apiclient, zoneid=cls.zone.id, clusterid=cluster.id, state='Up',
+ resourcestate='Enabled')
+ if not cls.hosts or not isinstance(cls.hosts, list) or len(cls.hosts) < 2:
+ cls.logger.debug("This test requires at least two (Up and Enabled) hosts in the zone")
+ cls.skipTests = True
+ return
+ else:
+ cls.cluster = Cluster(jsonHelper.jsonDump.dump(cluster))
+ break
+
+ cls.domain = get_domain(cls.apiclient)
+
+ # 1. Create large service offering
+ cls.service_offering = ServiceOffering.create(cls.apiclient, cls.services["service_offerings"]["large"])
+ cls._cleanup.append(cls.service_offering)
+
+ # 2. Create a network
+ cls.services["network"]["name"] = "Test Network"
+ cls.network_offering = NetworkOffering.create(
+ cls.apiclient,
+ cls.services["l2-network_offering"]
+ )
+ cls._cleanup.append(cls.network_offering)
+ NetworkOffering.update(
+ cls.network_offering,
+ cls.apiclient,
+ id=cls.network_offering.id,
+ state="enabled"
+ )
+
+ cls.network = Network.create(
+ cls.apiclient,
+ cls.services["l2-network"],
+ networkofferingid=cls.network_offering.id,
+ zoneid=cls.zone.id,
+ accountid="admin",
+ domainid=cls.domain.id,
+ )
+ cls._cleanup.append(cls.network)
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestClusterDRS, cls).tearDownClass()
+
+ def setUp(self):
+ if self.skipTests:
+ self.skipTest("This test requires at least two (Up and Enabled) hosts in the zone")
+ self.apiclient = self.testClient.getApiClient()
+ self.cleanup = []
+
+ def tearDown(self):
+ super(TestClusterDRS, self).tearDown()
+
+ @classmethod
+ def get_vm_host_id(cls, vm_id):
+ list_vms = VirtualMachine.list(cls.apiclient, id=vm_id)
+ vm = list_vms[0]
+ return vm.hostid
+
+ def wait_for_vm_start(self, vm):
+ """ Wait until vm is Running """
+ def check_vm_state():
+ vms = VirtualMachine.list(
+ self.apiclient,
+ id=vm.id,
+ listall=True
+ )
+ if isinstance(vms, list):
+ if vms[0].state == 'Running':
+ return True, vms[0].state
+ return False, vms[0].state
+
+ res = wait_until(10, 30, check_vm_state)
+ if not res:
+ raise Exception("Failed to wait for VM %s (%s) to be Running" % (vm.name, vm.id))
+ return res
+
+ def wait_for_plan_completion(self, plan):
+ """ Wait until plan is completed """
+ def check_plan_status():
+ plans = self.cluster.listDrsPlans(self.apiclient, id=plan.id)
+ if isinstance(plans, list):
+ if plans[0].status == 'COMPLETED':
+ return True, plans[0].status
+ return False, plans[0].status
+
+ res = wait_until(10, 30, check_plan_status)
+ if not res:
+ raise Exception("Failed to wait for completion of plan %s" % (plan.id))
+ return res
+
+ def get_migrations(self):
+ """ Wait until migrations are generated. Sometimes it takes a little bit of time for stats to get updated. We generate migrations
+ until we get at least one migration """
+ def generate_migrations():
+ drs_plan = self.cluster.generateDrsPlan(self.apiclient, migrations=4)
+ if len(drs_plan["migrations"]) > 0:
+ return True, drs_plan["migrations"]
+ return False, drs_plan["migrations"]
+
+ res, migrations = wait_until(10, 30, generate_migrations)
+ if not res:
+ raise Exception("Failed to generate drs migrations")
+ return migrations
+
+ @attr(tags=["advanced"], required_hardware="false")
+ def test_01_condensed_drs_algorithm(self):
+ """ Verify DRS algorithm - condensed"""
+ # 1. Deploy vm-1 on host 1
+ # 2. Deploy vm-2 on host 2
+ # 3. Execute DRS to move all VMs on the same host
+ self.logger.debug("=== Running test_01_condensed_drs_algorithm ===")
+
+ # 1. Deploy vm-1 on host 1
+ self.services["virtual_machine"]["name"] = "virtual-machine-1"
+ self.services["virtual_machine"]["displayname"] = "virtual-machine-1"
+ self.virtual_machine_1 = VirtualMachine.create(self.apiclient, self.services["virtual_machine"],
+ serviceofferingid=self.service_offering.id,
+ templateid=self.template.id, zoneid=self.zone.id,
+ networkids=self.network.id, hostid=self.hosts[0].id)
+ self.cleanup.append(self.virtual_machine_1)
+ vm_1_host_id = self.get_vm_host_id(self.virtual_machine_1.id)
+
+ # 2. Deploy vm-2 on host 2
+ self.services["virtual_machine"]["name"] = "virtual-machine-2"
+ self.services["virtual_machine"]["displayname"] = "virtual-machine-2"
+ self.virtual_machine_2 = VirtualMachine.create(self.apiclient, self.services["virtual_machine"],
+ serviceofferingid=self.service_offering.id,
+ templateid=self.template.id, zoneid=self.zone.id,
+ networkids=self.network.id, hostid=self.hosts[1].id)
+ vm_2_host_id = self.get_vm_host_id(self.virtual_machine_2.id)
+ self.cleanup.append(self.virtual_machine_2)
+
+ self.assertNotEqual(vm_1_host_id, vm_2_host_id, msg="Both VMs should be on different hosts")
+ self.wait_for_vm_start(self.virtual_machine_1)
+ self.wait_for_vm_start(self.virtual_machine_2)
+
+ # 3. Generate & execute DRS to move all VMs on the same host
+ Configurations.update(self.apiclient, "drs.algorithm", "condensed", clusterid=self.cluster.id)
+ Configurations.update(self.apiclient, "drs.imbalance", "1.0", clusterid=self.cluster.id)
+
+ migrations = self.get_migrations()
+ vm_to_dest_host_map = {
+ migration["virtualmachineid"]: migration["destinationhostid"] for migration in migrations
+ }
+
+ self.assertEqual(len(vm_to_dest_host_map), 1, msg="DRS plan should have 1 migrations")
+
+ executed_plan = self.cluster.executeDrsPlan(self.apiclient, vm_to_dest_host_map)
+ self.wait_for_plan_completion(executed_plan)
+
+ vm_1_host_id = self.get_vm_host_id(self.virtual_machine_1.id)
+ vm_2_host_id = self.get_vm_host_id(self.virtual_machine_2.id)
+
+ self.assertEqual(vm_1_host_id, vm_2_host_id, msg="Both VMs should be on the same host")
+
+ @attr(tags=["advanced"], required_hardware="false")
+ def test_02_balanced_drs_algorithm(self):
+ """ Verify DRS algorithm - balanced"""
+
+ # 1. Deploy vm-1 on host 1
+ # 2. Deploy vm-2 on host 2
+ # 3. Execute DRS to move all VMs on different hosts
+
+ self.logger.debug("=== Running test_02_balanced_drs_algorithm ===")
+ # 1. Deploy vm-1 on host 1
+ self.services["virtual_machine"]["name"] = "virtual-machine-1"
+ self.services["virtual_machine"]["displayname"] = "virtual-machine-1"
+ self.virtual_machine_1 = VirtualMachine.create(self.apiclient, self.services["virtual_machine"],
+ serviceofferingid=self.service_offering.id,
+ templateid=self.template.id, zoneid=self.zone.id,
+ networkids=self.network.id, hostid=self.hosts[0].id)
+ self.cleanup.append(self.virtual_machine_1)
+ vm_1_host_id = self.get_vm_host_id(self.virtual_machine_1.id)
+
+ # 2. Deploy vm-2 on host 1
+ self.services["virtual_machine"]["name"] = "virtual-machine-2"
+ self.services["virtual_machine"]["displayname"] = "virtual-machine-2"
+ self.virtual_machine_2 = VirtualMachine.create(self.apiclient, self.services["virtual_machine"],
+ serviceofferingid=self.service_offering.id,
+ templateid=self.template.id, zoneid=self.zone.id,
+ networkids=self.network.id, hostid=self.hosts[0].id)
+ vm_2_host_id = self.get_vm_host_id(self.virtual_machine_2.id)
+ self.cleanup.append(self.virtual_machine_2)
+
+ self.assertEqual(vm_1_host_id, vm_2_host_id, msg="Both VMs should be on same hosts")
+ self.wait_for_vm_start(self.virtual_machine_1)
+ self.wait_for_vm_start(self.virtual_machine_2)
+
+ # 3. Execute DRS to move all VMs on different hosts
+ Configurations.update(self.apiclient, "drs.algorithm", "balanced", clusterid=self.cluster.id)
+ Configurations.update(self.apiclient, "drs.imbalance", "1.0", clusterid=self.cluster.id)
+
+ migrations = self.get_migrations()
+ vm_to_dest_host_map = {
+ migration["virtualmachineid"]: migration["destinationhostid"] for migration in migrations
+ }
+
+ self.assertEqual(len(vm_to_dest_host_map), 1, msg="DRS plan should have 1 migrations")
+
+ executed_plan = self.cluster.executeDrsPlan(self.apiclient, vm_to_dest_host_map)
+ self.wait_for_plan_completion(executed_plan)
+
+ vm_1_host_id = self.get_vm_host_id(self.virtual_machine_1.id)
+ vm_2_host_id = self.get_vm_host_id(self.virtual_machine_2.id)
+
+ self.assertNotEqual(vm_1_host_id, vm_2_host_id, msg="Both VMs should be on different hosts")
diff --git a/tools/marvin/marvin/lib/base.py b/tools/marvin/marvin/lib/base.py
index 7f234e0f208..d57f1a7e552 100755
--- a/tools/marvin/marvin/lib/base.py
+++ b/tools/marvin/marvin/lib/base.py
@@ -3131,6 +3131,35 @@ class Cluster:
[setattr(cmd, k, v) for k, v in list(kwargs.items())]
return (apiclient.updateCluster(cmd))
+ def listDrsPlans(cls, apiclient, **kwargs):
+ """List drs plans for cluster"""
+
+ cmd = listClusterDrsPlan.listClusterDrsPlanCmd()
+ [setattr(cmd, k, v) for k, v in list(kwargs.items())]
+ return apiclient.listClusterDrsPlan(cmd)
+
+ def generateDrsPlan(cls, apiclient, migrations=None):
+ """Generate a drs plan for cluster"""
+
+ cmd = generateClusterDrsPlan.generateClusterDrsPlanCmd()
+ cmd.id = cls.id
+ cmd.migrations = migrations
+ return apiclient.generateClusterDrsPlan(cmd)
+
+ def executeDrsPlan(cls, apiclient, migrateto=None):
+ """Execute drs plan on cluster"""
+
+ cmd = executeClusterDrsPlan.executeClusterDrsPlanCmd()
+ cmd.id = cls.id
+ if migrateto:
+ cmd.migrateto = []
+ for vm, host in list(migrateto.items()):
+ cmd.migrateto.append({
+ 'vm': vm,
+ 'host': host
+ })
+ return apiclient.executeClusterDrsPlan(cmd)
+
class Host:
"""Manage Host life cycle"""
diff --git a/ui/public/locales/en.json b/ui/public/locales/en.json
index 85f6627a103..a10ed69320a 100644
--- a/ui/public/locales/en.json
+++ b/ui/public/locales/en.json
@@ -668,6 +668,7 @@
"label.destaddressgroupuuid": "Destination Address Group",
"label.destcidr": "Destination CIDR",
"label.destendport": "Destination End Port",
+"label.desthost": "Destination host",
"label.destination": "Destination",
"label.destinationphysicalnetworkid": "Destination physical network ID",
"label.destinationtype": "Destination Type",
@@ -757,6 +758,10 @@
"label.download.state": "Download state",
"label.dpd": "Dead peer detection",
"label.driver": "Driver",
+"label.drs": "DRS",
+"label.drs.plan": "DRS Plan",
+"label.drs.generate.plan": "Generate DRS plan",
+"label.drs.no.plan.generated": "No DRS plan has been generated as the cluster is not imbalanced according to the threshold set",
"label.duration": "Duration (in sec)",
"label.duration.custom": "Custom",
"label.duration.1hour": "1 hour",
@@ -837,6 +842,7 @@
"label.every": "Every",
"label.example": "Example",
"label.example.plugin": "ExamplePlugin",
+"label.execute": "Execute",
"label.expunge": "Expunge",
"label.expungevmgraceperiod": "Expunge VM grace period (in sec)",
"label.expunged": "Expunged",
@@ -1219,6 +1225,7 @@
"label.matchall": "Match all",
"label.max.primary.storage": "Max. primary (GiB)",
"label.max.secondary.storage": "Max. secondary (GiB)",
+"label.max.migrations": "Max. migrations",
"label.maxcpu": "Max. CPU cores",
"label.maxcpunumber": "Max CPU cores",
"label.maxdatavolumeslimit": "Max data volumes limit",
@@ -1840,6 +1847,7 @@
"label.softwareversion": "Software version",
"label.source.based": "SourceBased",
"label.sourcecidr": "Source CIDR",
+"label.sourcehost": "Source host",
"label.sourceipaddress": "Source IP address",
"label.sourceipaddressnetworkid": "Network ID of source IP address",
"label.sourcenat": "Source NAT",
@@ -1935,6 +1943,7 @@
"label.submit": "Submit",
"label.succeeded": "Succeeded",
"label.success": "Success",
+"label.success.migrations": "Successful migrations",
"label.success.set": "Successfully set",
"label.success.updated": "Successfully updated",
"label.suitability": "Suitability",
@@ -2572,6 +2581,8 @@
"message.desc.registered.user.data": "Registered a User Data.",
"message.desc.zone": "A zone is the largest organizational unit in CloudStack, and it typically corresponds to a single datacenter. Zones provide physical isolation and redundancy. A zone consists of one or more pods (each of which contains hosts and primary storage servers) and a secondary storage server which is shared by all pods in the zone.",
"message.desc.zone.edge": "A zone is the largest organizational unit in CloudStack, and it typically corresponds to a single datacenter. Zones provide physical isolation and redundancy. An edge zone consists of one or more hosts (each of which provides local storage as primary storage servers). Only shared and L2 networks can be deployed in such zones and functionalities that require secondary storages are not supported.",
+"message.drs.plan.description": "The maximum number of live migrations allowed for DRS. Configure DRS under the settings tab before generating a plan or to enable automatic DRS for the cluster.",
+"message.drs.plan.executed": "DRS plan executed successfully.",
"message.zone.edge.local.storage": "Local storage will be used by default for user VMs and virtual routers",
"message.detach.disk": "Are you sure you want to detach this disk?",
"message.detach.iso.confirm": "Please confirm that you want to detach the ISO from this virtual instance.",
diff --git a/ui/src/components/view/DedicateData.vue b/ui/src/components/view/DedicateData.vue
index 0ff01ac34d5..d99b080e00d 100644
--- a/ui/src/components/view/DedicateData.vue
+++ b/ui/src/components/view/DedicateData.vue
@@ -24,7 +24,7 @@
{{ $t('label.domainid') }}
- {{ dedicatedDomainId }}
+ {{ dedicatedDomainId }}
{{ $t('label.account') }}
diff --git a/ui/src/config/section/infra/clusters.js b/ui/src/config/section/infra/clusters.js
index ca6dde7ec9b..8fc4ebd54a9 100644
--- a/ui/src/config/section/infra/clusters.js
+++ b/ui/src/config/section/infra/clusters.js
@@ -54,9 +54,18 @@ export default {
}, {
name: 'settings',
component: shallowRef(defineAsyncComponent(() => import('@/components/view/SettingsTab.vue')))
+ }, {
+ name: 'drs',
+ component: shallowRef(defineAsyncComponent(() => import('@/views/infra/ClusterDRSTab.vue')))
}, {
name: 'comments',
component: shallowRef(defineAsyncComponent(() => import('@/components/view/AnnotationsTab.vue')))
+ },
+ {
+ name: 'events',
+ resourceType: 'Cluster',
+ component: shallowRef(defineAsyncComponent(() => import('@/components/view/EventsTab.vue'))),
+ show: () => { return 'listEvents' in store.getters.apis }
}],
actions: [
{
@@ -113,6 +122,16 @@ export default {
defaultArgs: { managedstate: 'Unmanaged' },
show: (record) => { return record.managedstate === 'Managed' }
},
+ {
+ api: 'executeDRS',
+ icon: 'gold-outlined',
+ label: 'label.action.drs.cluster',
+ message: 'message.action.drs.cluster',
+ dataView: true,
+ defaultArgs: { iterations: null },
+ args: ['iterations'],
+ show: (record) => { return record.managedstate === 'Managed' }
+ },
{
api: 'enableOutOfBandManagementForCluster',
icon: 'plus-circle-outlined',
diff --git a/ui/src/views/infra/ClusterDRSTab.vue b/ui/src/views/infra/ClusterDRSTab.vue
new file mode 100644
index 00000000000..4ff255bf718
--- /dev/null
+++ b/ui/src/views/infra/ClusterDRSTab.vue
@@ -0,0 +1,297 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+
+ {{ $t('message.drs.plan.description') }}
+
+
+ {{ $t('label.algorithm') }}: {{ algorithm }}
+
+
+
+
+
+
+
+
+
+ {{ $t('label.drs.generate.plan') }}
+
+
+
+
+
+
+
+
+
+
+ {{ record.virtualmachinename }}
+
+
+
+
+ {{ record.sourcehostname }}
+
+
+
+
+ {{ record.destinationhostname }}
+
+
+
+ {{ text }}
+
+
+
+
+
+
+
+ {{ text.migrations.filter(m => m.jobstatus === 'SUCCEEDED').length }} / {{ text.migrations.length }}
+
+
+
+ {{ $toLocaleDate(text) }}
+
+
+
+ {{ $t('label.events') }}
+
+
+
+ {{ text }}
+
+
+
+
+
+
+
+
+
+ {{ record.virtualmachinename }}
+
+
+
+
+ {{ record.sourcehostname }}
+
+
+
+
+ {{ record.destinationhostname }}
+
+
+
+ {{ text }}
+
+
+
+
+ {{ $t('label.drs.no.plan.generated') }}
+
+
+
+
+
+
+