Refactoring the LibvirtComputingResource

- Adding LibvirtMigrateCommandWrapper
  - 1 unit tests added
  - KVM hypervisor plugin with 10.9% coverage
This commit is contained in:
wilderrodrigues 2015-04-23 13:31:58 +02:00
parent d730efb866
commit 28e55462f1
5 changed files with 319 additions and 164 deletions

View File

@ -47,14 +47,11 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -119,8 +116,6 @@ import com.cloud.agent.api.MaintainAnswer;
import com.cloud.agent.api.MaintainCommand;
import com.cloud.agent.api.ManageSnapshotAnswer;
import com.cloud.agent.api.ManageSnapshotCommand;
import com.cloud.agent.api.MigrateAnswer;
import com.cloud.agent.api.MigrateCommand;
import com.cloud.agent.api.ModifySshKeysCommand;
import com.cloud.agent.api.ModifyStoragePoolAnswer;
import com.cloud.agent.api.ModifyStoragePoolCommand;
@ -298,9 +293,6 @@ public class LibvirtComputingResource extends ServerResourceBase implements Serv
private String _dcId;
private String _pod;
private String _clusterId;
private int _migrateSpeed;
private int _migrateDowntime;
private int _migratePauseAfter;
private long _hvVersion;
private long _kernelVersion;
@ -397,6 +389,22 @@ public class LibvirtComputingResource extends ServerResourceBase implements Serv
return _storagePoolMgr;
}
public String getPrivateIp() {
return _privateIp;
}
public int getMigrateDowntime() {
return _migrateDowntime;
}
public int getMigratePauseAfter() {
return _migratePauseAfter;
}
public int getMigrateSpeed() {
return _migrateSpeed;
}
private static final class KeyValueInterpreter extends OutputInterpreter {
private final Map<String, String> map = new HashMap<String, String>();
@ -459,6 +467,10 @@ public class LibvirtComputingResource extends ServerResourceBase implements Serv
protected String _videoHw;
protected int _videoRam;
protected Pair<Integer,Integer> hostOsVersion;
protected int _migrateSpeed;
protected int _migrateDowntime;
protected int _migratePauseAfter;
private final Map <String, String> _pifs = new HashMap<String, String>();
private final Map<String, VmStats> _vmStats = new ConcurrentHashMap<String, VmStats>();
@ -1299,9 +1311,7 @@ public class LibvirtComputingResource extends ServerResourceBase implements Serv
}
try {
if (cmd instanceof MigrateCommand) {
return execute((MigrateCommand)cmd);
} else if (cmd instanceof PingTestCommand) {
if (cmd instanceof PingTestCommand) {
return execute((PingTestCommand)cmd);
} else if (cmd instanceof CheckVirtualMachineCommand) {
return execute((CheckVirtualMachineCommand)cmd);
@ -3054,159 +3064,6 @@ public class LibvirtComputingResource extends ServerResourceBase implements Serv
return command.execute();
}
private Answer execute(final MigrateCommand cmd) {
final String vmName = cmd.getVmName();
String result = null;
List<InterfaceDef> ifaces = null;
List<DiskDef> disks = null;
Domain dm = null;
Connect dconn = null;
Domain destDomain = null;
Connect conn = null;
String xmlDesc = null;
try {
conn = LibvirtConnection.getConnectionByVmName(vmName);
ifaces = getInterfaces(conn, vmName);
disks = getDisks(conn, vmName);
dm = conn.domainLookupByName(vmName);
/*
We replace the private IP address with the address of the destination host.
This is because the VNC listens on the private IP address of the hypervisor,
but that address is ofcourse different on the target host.
MigrateCommand.getDestinationIp() returns the private IP address of the target
hypervisor. So it's safe to use.
The Domain.migrate method from libvirt supports passing a different XML
description for the instance to be used on the target host.
This is supported by libvirt-java from version 0.50.0
*/
xmlDesc = dm.getXMLDesc(0).replace(_privateIp, cmd.getDestinationIp());
dconn = new Connect("qemu+tcp://" + cmd.getDestinationIp() + "/system");
//run migration in thread so we can monitor it
s_logger.info("Live migration of instance " + vmName + " initiated");
final ExecutorService executor = Executors.newFixedThreadPool(1);
final Callable<Domain> worker = new MigrateKVMAsync(dm, dconn, xmlDesc, vmName, cmd.getDestinationIp());
final Future<Domain> migrateThread = executor.submit(worker);
executor.shutdown();
long sleeptime = 0;
while (!executor.isTerminated()) {
Thread.sleep(100);
sleeptime += 100;
if (sleeptime == 1000) { // wait 1s before attempting to set downtime on migration, since I don't know of a VIR_DOMAIN_MIGRATING state
if (_migrateDowntime > 0 ) {
try {
final int setDowntime = dm.migrateSetMaxDowntime(_migrateDowntime);
if (setDowntime == 0 ) {
s_logger.debug("Set max downtime for migration of " + vmName + " to " + String.valueOf(_migrateDowntime) + "ms");
}
} catch (final LibvirtException e) {
s_logger.debug("Failed to set max downtime for migration, perhaps migration completed? Error: " + e.getMessage());
}
}
}
if (sleeptime % 1000 == 0) {
s_logger.info("Waiting for migration of " + vmName + " to complete, waited " + sleeptime + "ms");
}
// pause vm if we meet the vm.migrate.pauseafter threshold and not already paused
if (_migratePauseAfter > 0 && sleeptime > _migratePauseAfter && dm.getInfo().state == DomainState.VIR_DOMAIN_RUNNING ) {
s_logger.info("Pausing VM " + vmName + " due to property vm.migrate.pauseafter setting to " + _migratePauseAfter+ "ms to complete migration");
try {
dm.suspend();
} catch (final LibvirtException e) {
// pause could be racy if it attempts to pause right when vm is finished, simply warn
s_logger.info("Failed to pause vm " + vmName + " : " + e.getMessage());
}
}
}
s_logger.info("Migration thread for " + vmName + " is done");
destDomain = migrateThread.get(10, TimeUnit.SECONDS);
if (destDomain != null) {
for (final DiskDef disk : disks) {
cleanupDisk(disk);
}
}
} catch (final LibvirtException e) {
s_logger.debug("Can't migrate domain: " + e.getMessage());
result = e.getMessage();
} catch (final InterruptedException e) {
s_logger.debug("Interrupted while migrating domain: " + e.getMessage());
result = e.getMessage();
} catch (final ExecutionException e) {
s_logger.debug("Failed to execute while migrating domain: " + e.getMessage());
result = e.getMessage();
} catch (final TimeoutException e) {
s_logger.debug("Timed out while migrating domain: " + e.getMessage());
result = e.getMessage();
} finally {
try {
if (dm != null) {
if (dm.isPersistent() == 1) {
dm.undefine();
}
dm.free();
}
if (dconn != null) {
dconn.close();
}
if (destDomain != null) {
destDomain.free();
}
} catch (final LibvirtException e) {
s_logger.trace("Ignoring libvirt error.", e);
}
}
if (result != null) {
} else {
destroyNetworkRulesForVM(conn, vmName);
for (final InterfaceDef iface : ifaces) {
// We don't know which "traffic type" is associated with
// each interface at this point, so inform all vif drivers
for (final VifDriver vifDriver : getAllVifDrivers()) {
vifDriver.unplug(iface);
}
}
}
return new MigrateAnswer(cmd, result == null, result, null);
}
private class MigrateKVMAsync implements Callable<Domain> {
Domain dm = null;
Connect dconn = null;
String dxml = "";
String vmName = "";
String destIp = "";
MigrateKVMAsync(final Domain dm, final Connect dconn, final String dxml, final String vmName, final String destIp) {
this.dm = dm;
this.dconn = dconn;
this.dxml = dxml;
this.vmName = vmName;
this.destIp = destIp;
}
@Override
public Domain call() throws LibvirtException {
// set compression flag for migration if libvirt version supports it
if (dconn.getLibVirVersion() < 1003000) {
return dm.migrate(dconn, 1 << 0, dxml, vmName, "tcp:" + destIp, _migrateSpeed);
} else {
return dm.migrate(dconn, 1 << 0|1 << 11, dxml, vmName, "tcp:" + destIp, _migrateSpeed);
}
}
}
public String networkUsage(final String privateIpAddress, final String option, final String vif) {
final Script getUsage = new Script(_routerProxyPath, s_logger);
getUsage.add("netusage.sh");

View File

@ -0,0 +1,38 @@
package com.cloud.hypervisor.kvm.resource;
import java.util.concurrent.Callable;
import org.libvirt.Connect;
import org.libvirt.Domain;
import org.libvirt.LibvirtException;
public class MigrateKVMAsync implements Callable<Domain> {
private final LibvirtComputingResource libvirtComputingResource;
private Domain dm = null;
private Connect dconn = null;
private String dxml = "";
private String vmName = "";
private String destIp = "";
public MigrateKVMAsync(final LibvirtComputingResource libvirtComputingResource, final Domain dm, final Connect dconn, final String dxml, final String vmName, final String destIp) {
this.libvirtComputingResource = libvirtComputingResource;
this.dm = dm;
this.dconn = dconn;
this.dxml = dxml;
this.vmName = vmName;
this.destIp = destIp;
}
@Override
public Domain call() throws LibvirtException {
// set compression flag for migration if libvirt version supports it
if (dconn.getLibVirVersion() < 1003000) {
return dm.migrate(dconn, 1 << 0, dxml, vmName, "tcp:" + destIp, libvirtComputingResource.getMigrateSpeed());
} else {
return dm.migrate(dconn, 1 << 0|1 << 11, dxml, vmName, "tcp:" + destIp, libvirtComputingResource.getMigrateSpeed());
}
}
}

View File

@ -0,0 +1,183 @@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
package com.cloud.hypervisor.kvm.resource.wrapper;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.log4j.Logger;
import org.libvirt.Connect;
import org.libvirt.Domain;
import org.libvirt.DomainInfo.DomainState;
import org.libvirt.LibvirtException;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.MigrateAnswer;
import com.cloud.agent.api.MigrateCommand;
import com.cloud.hypervisor.kvm.resource.LibvirtComputingResource;
import com.cloud.hypervisor.kvm.resource.LibvirtVMDef.DiskDef;
import com.cloud.hypervisor.kvm.resource.LibvirtVMDef.InterfaceDef;
import com.cloud.hypervisor.kvm.resource.MigrateKVMAsync;
import com.cloud.hypervisor.kvm.resource.VifDriver;
import com.cloud.resource.CommandWrapper;
public final class LibvirtMigrateCommandWrapper extends CommandWrapper<MigrateCommand, Answer, LibvirtComputingResource> {
private static final Logger s_logger = Logger.getLogger(LibvirtMigrateCommandWrapper.class);
@Override
public Answer execute(final MigrateCommand command, final LibvirtComputingResource libvirtComputingResource) {
final String vmName = command.getVmName();
String result = null;
List<InterfaceDef> ifaces = null;
List<DiskDef> disks = null;
Domain dm = null;
Connect dconn = null;
Domain destDomain = null;
Connect conn = null;
String xmlDesc = null;
try {
final LibvirtConnectionWrapper libvirtConnectionWrapper = libvirtComputingResource.getLibvirtConnectionWrapper();
conn = libvirtConnectionWrapper.getConnectionByVmName(vmName);
ifaces = libvirtComputingResource.getInterfaces(conn, vmName);
disks = libvirtComputingResource.getDisks(conn, vmName);
dm = conn.domainLookupByName(vmName);
/*
We replace the private IP address with the address of the destination host.
This is because the VNC listens on the private IP address of the hypervisor,
but that address is ofcourse different on the target host.
MigrateCommand.getDestinationIp() returns the private IP address of the target
hypervisor. So it's safe to use.
The Domain.migrate method from libvirt supports passing a different XML
description for the instance to be used on the target host.
This is supported by libvirt-java from version 0.50.0
*/
xmlDesc = dm.getXMLDesc(0).replace(libvirtComputingResource.getPrivateIp(), command.getDestinationIp());
dconn = new Connect("qemu+tcp://" + command.getDestinationIp() + "/system");
//run migration in thread so we can monitor it
s_logger.info("Live migration of instance " + vmName + " initiated");
final ExecutorService executor = Executors.newFixedThreadPool(1);
final Callable<Domain> worker = new MigrateKVMAsync(libvirtComputingResource, dm, dconn, xmlDesc, vmName, command.getDestinationIp());
final Future<Domain> migrateThread = executor.submit(worker);
executor.shutdown();
long sleeptime = 0;
while (!executor.isTerminated()) {
Thread.sleep(100);
sleeptime += 100;
if (sleeptime == 1000) { // wait 1s before attempting to set downtime on migration, since I don't know of a VIR_DOMAIN_MIGRATING state
final int migrateDowntime = libvirtComputingResource.getMigrateDowntime();
if (migrateDowntime > 0 ) {
try {
final int setDowntime = dm.migrateSetMaxDowntime(migrateDowntime);
if (setDowntime == 0 ) {
s_logger.debug("Set max downtime for migration of " + vmName + " to " + String.valueOf(migrateDowntime) + "ms");
}
} catch (final LibvirtException e) {
s_logger.debug("Failed to set max downtime for migration, perhaps migration completed? Error: " + e.getMessage());
}
}
}
if (sleeptime % 1000 == 0) {
s_logger.info("Waiting for migration of " + vmName + " to complete, waited " + sleeptime + "ms");
}
// pause vm if we meet the vm.migrate.pauseafter threshold and not already paused
final int migratePauseAfter = libvirtComputingResource.getMigratePauseAfter();
if (migratePauseAfter > 0 && sleeptime > migratePauseAfter && dm.getInfo().state == DomainState.VIR_DOMAIN_RUNNING ) {
s_logger.info("Pausing VM " + vmName + " due to property vm.migrate.pauseafter setting to " + migratePauseAfter+ "ms to complete migration");
try {
dm.suspend();
} catch (final LibvirtException e) {
// pause could be racy if it attempts to pause right when vm is finished, simply warn
s_logger.info("Failed to pause vm " + vmName + " : " + e.getMessage());
}
}
}
s_logger.info("Migration thread for " + vmName + " is done");
destDomain = migrateThread.get(10, TimeUnit.SECONDS);
if (destDomain != null) {
for (final DiskDef disk : disks) {
libvirtComputingResource.cleanupDisk(disk);
}
}
} catch (final LibvirtException e) {
s_logger.debug("Can't migrate domain: " + e.getMessage());
result = e.getMessage();
} catch (final InterruptedException e) {
s_logger.debug("Interrupted while migrating domain: " + e.getMessage());
result = e.getMessage();
} catch (final ExecutionException e) {
s_logger.debug("Failed to execute while migrating domain: " + e.getMessage());
result = e.getMessage();
} catch (final TimeoutException e) {
s_logger.debug("Timed out while migrating domain: " + e.getMessage());
result = e.getMessage();
} finally {
try {
if (dm != null) {
if (dm.isPersistent() == 1) {
dm.undefine();
}
dm.free();
}
if (dconn != null) {
dconn.close();
}
if (destDomain != null) {
destDomain.free();
}
} catch (final LibvirtException e) {
s_logger.trace("Ignoring libvirt error.", e);
}
}
if (result != null) {
} else {
libvirtComputingResource.destroyNetworkRulesForVM(conn, vmName);
for (final InterfaceDef iface : ifaces) {
// We don't know which "traffic type" is associated with
// each interface at this point, so inform all vif drivers
final List<VifDriver> allVifDrivers = libvirtComputingResource.getAllVifDrivers();
for (final VifDriver vifDriver : allVifDrivers) {
vifDriver.unplug(iface);
}
}
}
return new MigrateAnswer(command, result == null, result, null);
}
}

View File

@ -26,6 +26,7 @@ import com.cloud.agent.api.Command;
import com.cloud.agent.api.GetHostStatsCommand;
import com.cloud.agent.api.GetVmDiskStatsCommand;
import com.cloud.agent.api.GetVmStatsCommand;
import com.cloud.agent.api.MigrateCommand;
import com.cloud.agent.api.PrepareForMigrationCommand;
import com.cloud.agent.api.RebootCommand;
import com.cloud.agent.api.RebootRouterCommand;
@ -60,6 +61,7 @@ public class LibvirtRequestWrapper extends RequestWrapper {
linbvirtCommands.put(GetHostStatsCommand.class, new LibvirtGetHostStatsCommandWrapper());
linbvirtCommands.put(CheckHealthCommand.class, new LibvirtCheckHealthCommandWrapper());
linbvirtCommands.put(PrepareForMigrationCommand.class, new LibvirtPrepareForMigrationCommandWrapper());
linbvirtCommands.put(MigrateCommand.class, new LibvirtMigrateCommandWrapper());
resources.put(LibvirtComputingResource.class, linbvirtCommands);
}

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -66,6 +67,7 @@ import com.cloud.agent.api.CheckHealthCommand;
import com.cloud.agent.api.GetHostStatsCommand;
import com.cloud.agent.api.GetVmDiskStatsCommand;
import com.cloud.agent.api.GetVmStatsCommand;
import com.cloud.agent.api.MigrateCommand;
import com.cloud.agent.api.PrepareForMigrationCommand;
import com.cloud.agent.api.RebootCommand;
import com.cloud.agent.api.RebootRouterCommand;
@ -668,4 +670,77 @@ public class LibvirtComputingResourceTest {
verify(vm, times(1)).getDisks();
verify(diskTO, times(1)).getType();
}
@Test(expected = UnsatisfiedLinkError.class)
public void testMigrateCommand() {
// The Connect constructor used inside the LibvirtMigrateCommandWrapper has a call to native methods, which
// makes difficult to test it now.
// Will keep it expecting the UnsatisfiedLinkError and fix later.
final Connect conn = Mockito.mock(Connect.class);
final LibvirtConnectionWrapper libvirtConnectionWrapper = Mockito.mock(LibvirtConnectionWrapper.class);
final String vmName = "Test";
final String destIp = "10.1.1.100";
final boolean isWindows = false;
final VirtualMachineTO vmTO = Mockito.mock(VirtualMachineTO.class);
final boolean executeInSequence = false;
final MigrateCommand command = new MigrateCommand(vmName, destIp, isWindows, vmTO, executeInSequence );
when(libvirtComputingResource.getLibvirtConnectionWrapper()).thenReturn(libvirtConnectionWrapper);
try {
when(libvirtConnectionWrapper.getConnectionByVmName(vmName)).thenReturn(conn);
} catch (final LibvirtException e) {
fail(e.getMessage());
}
final InterfaceDef interfaceDef = Mockito.mock(InterfaceDef.class);
final List<InterfaceDef> ifaces = new ArrayList<InterfaceDef>();
ifaces.add(interfaceDef);
when(libvirtComputingResource.getInterfaces(conn, vmName)).thenReturn(ifaces);
final DiskDef diskDef = Mockito.mock(DiskDef.class);
final List<DiskDef> disks = new ArrayList<DiskDef>();
disks.add(diskDef);
when(libvirtComputingResource.getDisks(conn, vmName)).thenReturn(disks);
final Domain dm = Mockito.mock(Domain.class);
try {
when(conn.domainLookupByName(vmName)).thenReturn(dm);
when(libvirtComputingResource.getPrivateIp()).thenReturn("192.168.1.10");
when(dm.getXMLDesc(0)).thenReturn("host_domain");
when(dm.isPersistent()).thenReturn(1);
doNothing().when(dm).undefine();
} catch (final LibvirtException e) {
fail(e.getMessage());
} catch (final Exception e) {
fail(e.getMessage());
}
final LibvirtRequestWrapper wrapper = LibvirtRequestWrapper.getInstance();
assertNotNull(wrapper);
final Answer answer = wrapper.execute(command, libvirtComputingResource);
assertFalse(answer.getResult());
verify(libvirtComputingResource, times(1)).getLibvirtConnectionWrapper();
try {
verify(libvirtConnectionWrapper, times(1)).getConnectionByVmName(vmName);
} catch (final LibvirtException e) {
fail(e.getMessage());
}
verify(libvirtComputingResource, times(1)).getInterfaces(conn, vmName);
verify(libvirtComputingResource, times(1)).getDisks(conn, vmName);
try {
verify(conn, times(1)).domainLookupByName(vmName);
verify(dm, times(1)).getXMLDesc(0);
} catch (final LibvirtException e) {
fail(e.getMessage());
}
}
}