mirror of https://github.com/apache/cloudstack.git
Image server on kvm host - with image_server.py http server
This commit is contained in:
parent
3a02433d75
commit
30136c814a
|
|
@ -38,8 +38,6 @@ public interface Backup extends ControlledEntity, InternalIdentity, Identity {
|
|||
|
||||
Long getHostId();
|
||||
|
||||
Integer getNbdPort();
|
||||
|
||||
enum Status {
|
||||
Allocated, Queued, BackingUp, ReadyForTransfer, FinalizingTransfer, BackedUp, Error, Failed, Restoring, Removed, Expunged
|
||||
}
|
||||
|
|
|
|||
|
|
@ -49,8 +49,6 @@ public interface ImageTransfer extends ControlledEntity, InternalIdentity {
|
|||
|
||||
long getHostId();
|
||||
|
||||
int getNbdPort();
|
||||
|
||||
String getTransferUrl();
|
||||
|
||||
Phase getPhase();
|
||||
|
|
|
|||
|
|
@ -21,9 +21,8 @@ import com.cloud.agent.api.Command;
|
|||
|
||||
public class CreateImageTransferCommand extends Command {
|
||||
private String transferId;
|
||||
private String hostIpAddress;
|
||||
private String exportName;
|
||||
private int nbdPort;
|
||||
private String socket;
|
||||
private String direction;
|
||||
private String checkpointId;
|
||||
private String file;
|
||||
|
|
@ -32,22 +31,21 @@ public class CreateImageTransferCommand extends Command {
|
|||
public CreateImageTransferCommand() {
|
||||
}
|
||||
|
||||
private CreateImageTransferCommand(String transferId, String hostIpAddress, String direction) {
|
||||
private CreateImageTransferCommand(String transferId, String direction, String socket) {
|
||||
this.transferId = transferId;
|
||||
this.hostIpAddress = hostIpAddress;
|
||||
this.direction = direction;
|
||||
this.socket = socket;
|
||||
}
|
||||
|
||||
public CreateImageTransferCommand(String transferId, String hostIpAddress, String direction, String exportName, int nbdPort, String checkpointId) {
|
||||
this(transferId, hostIpAddress, direction);
|
||||
public CreateImageTransferCommand(String transferId, String direction, String exportName, String socket, String checkpointId) {
|
||||
this(transferId, direction, socket);
|
||||
this.backend = ImageTransfer.Backend.nbd;
|
||||
this.exportName = exportName;
|
||||
this.nbdPort = nbdPort;
|
||||
this.checkpointId = checkpointId;
|
||||
}
|
||||
|
||||
public CreateImageTransferCommand(String transferId, String hostIpAddress, String direction, String file) {
|
||||
this(transferId, hostIpAddress, direction);
|
||||
public CreateImageTransferCommand(String transferId, String direction, String socket, String file) {
|
||||
this(transferId, direction, socket);
|
||||
if (direction == ImageTransfer.Direction.download.toString()) {
|
||||
throw new IllegalArgumentException("File backend is only supported for upload");
|
||||
}
|
||||
|
|
@ -59,8 +57,8 @@ public class CreateImageTransferCommand extends Command {
|
|||
return exportName;
|
||||
}
|
||||
|
||||
public int getNbdPort() {
|
||||
return nbdPort;
|
||||
public String getSocket() {
|
||||
return socket;
|
||||
}
|
||||
|
||||
public String getFile() {
|
||||
|
|
@ -71,10 +69,6 @@ public class CreateImageTransferCommand extends Command {
|
|||
return backend;
|
||||
}
|
||||
|
||||
public String getHostIpAddress() {
|
||||
return hostIpAddress;
|
||||
}
|
||||
|
||||
public String getTransferId() {
|
||||
return transferId;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,30 +21,18 @@ import com.cloud.agent.api.Command;
|
|||
|
||||
public class FinalizeImageTransferCommand extends Command {
|
||||
private String transferId;
|
||||
private String direction;
|
||||
private int nbdPort;
|
||||
|
||||
public FinalizeImageTransferCommand() {
|
||||
}
|
||||
|
||||
public FinalizeImageTransferCommand(String transferId, String direction, int nbdPort) {
|
||||
public FinalizeImageTransferCommand(String transferId) {
|
||||
this.transferId = transferId;
|
||||
this.direction = direction;
|
||||
this.nbdPort = nbdPort;
|
||||
}
|
||||
|
||||
public String getTransferId() {
|
||||
return transferId;
|
||||
}
|
||||
|
||||
public int getNbdPort() {
|
||||
return nbdPort;
|
||||
}
|
||||
|
||||
public String getDirection() {
|
||||
return direction;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean executeInSequence() {
|
||||
return true;
|
||||
|
|
|
|||
|
|
@ -26,23 +26,21 @@ public class StartBackupCommand extends Command {
|
|||
private String toCheckpointId;
|
||||
private String fromCheckpointId;
|
||||
private Long fromCheckpointCreateTime;
|
||||
private int nbdPort;
|
||||
private String socket;
|
||||
private Map<String, String> diskPathUuidMap;
|
||||
private String hostIpAddress;
|
||||
private boolean stoppedVM;
|
||||
|
||||
public StartBackupCommand() {
|
||||
}
|
||||
|
||||
public StartBackupCommand(String vmName, String toCheckpointId, String fromCheckpointId, Long fromCheckpointCreateTime,
|
||||
int nbdPort, Map<String, String> diskPathUuidMap, String hostIpAddress, boolean stoppedVM) {
|
||||
String socket, Map<String, String> diskPathUuidMap, boolean stoppedVM) {
|
||||
this.vmName = vmName;
|
||||
this.toCheckpointId = toCheckpointId;
|
||||
this.fromCheckpointId = fromCheckpointId;
|
||||
this.fromCheckpointCreateTime = fromCheckpointCreateTime;
|
||||
this.nbdPort = nbdPort;
|
||||
this.socket = socket;
|
||||
this.diskPathUuidMap = diskPathUuidMap;
|
||||
this.hostIpAddress = hostIpAddress;
|
||||
this.stoppedVM = stoppedVM;
|
||||
}
|
||||
|
||||
|
|
@ -62,8 +60,8 @@ public class StartBackupCommand extends Command {
|
|||
return fromCheckpointCreateTime;
|
||||
}
|
||||
|
||||
public int getNbdPort() {
|
||||
return nbdPort;
|
||||
public String getSocket() {
|
||||
return socket;
|
||||
}
|
||||
|
||||
public Map<String, String> getDiskPathUuidMap() {
|
||||
|
|
@ -74,10 +72,6 @@ public class StartBackupCommand extends Command {
|
|||
return fromCheckpointId != null && !fromCheckpointId.isEmpty();
|
||||
}
|
||||
|
||||
public String getHostIpAddress() {
|
||||
return hostIpAddress;
|
||||
}
|
||||
|
||||
public boolean isStoppedVM() {
|
||||
return stoppedVM;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,27 +24,31 @@ public class StartNBDServerCommand extends Command {
|
|||
private String hostIpAddress;
|
||||
private String exportName;
|
||||
private String volumePath;
|
||||
private int nbdPort;
|
||||
private String socket;
|
||||
private String direction;
|
||||
|
||||
public StartNBDServerCommand() {
|
||||
}
|
||||
|
||||
public StartNBDServerCommand(String transferId, String hostIpAddress, String exportName, String volumePath, int nbdPort, String direction) {
|
||||
protected StartNBDServerCommand(String transferId, String hostIpAddress, String exportName, String volumePath, String direction) {
|
||||
this.transferId = transferId;
|
||||
this.hostIpAddress = hostIpAddress;
|
||||
this.exportName = exportName;
|
||||
this.volumePath = volumePath;
|
||||
this.nbdPort = nbdPort;
|
||||
this.direction = direction;
|
||||
}
|
||||
|
||||
public StartNBDServerCommand(String transferId, String hostIpAddress, String exportName, String volumePath, String socket, String direction) {
|
||||
this(transferId, hostIpAddress, exportName, volumePath, direction);
|
||||
this.socket = socket;
|
||||
}
|
||||
|
||||
public String getExportName() {
|
||||
return exportName;
|
||||
}
|
||||
|
||||
public int getNbdPort() {
|
||||
return nbdPort;
|
||||
public String getSocket() {
|
||||
return socket;
|
||||
}
|
||||
|
||||
public String getHostIpAddress() {
|
||||
|
|
|
|||
|
|
@ -22,25 +22,19 @@ import com.cloud.agent.api.Command;
|
|||
public class StopNBDServerCommand extends Command {
|
||||
private String transferId;
|
||||
private String direction;
|
||||
private int nbdPort;
|
||||
|
||||
public StopNBDServerCommand() {
|
||||
}
|
||||
|
||||
public StopNBDServerCommand(String transferId, String direction, int nbdPort) {
|
||||
public StopNBDServerCommand(String transferId, String direction) {
|
||||
this.transferId = transferId;
|
||||
this.direction = direction;
|
||||
this.nbdPort = nbdPort;
|
||||
}
|
||||
|
||||
public String getTransferId() {
|
||||
return transferId;
|
||||
}
|
||||
|
||||
public int getNbdPort() {
|
||||
return nbdPort;
|
||||
}
|
||||
|
||||
public String getDirection() {
|
||||
return direction;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -115,9 +115,6 @@ public class BackupVO implements Backup {
|
|||
@Column(name = "host_id")
|
||||
private Long hostId;
|
||||
|
||||
@Column(name = "nbd_port")
|
||||
private Integer nbdPort;
|
||||
|
||||
@Transient
|
||||
Map<String, String> details;
|
||||
|
||||
|
|
@ -339,13 +336,4 @@ public class BackupVO implements Backup {
|
|||
public void setHostId(Long hostId) {
|
||||
this.hostId = hostId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer getNbdPort() {
|
||||
return nbdPort;
|
||||
}
|
||||
|
||||
public void setNbdPort(Integer nbdPort) {
|
||||
this.nbdPort = nbdPort;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -51,8 +51,8 @@ public class ImageTransferVO implements ImageTransfer {
|
|||
@Column(name = "host_id")
|
||||
private long hostId;
|
||||
|
||||
@Column(name = "nbd_port")
|
||||
private int nbdPort;
|
||||
@Column(name = "socket")
|
||||
private String socket;
|
||||
|
||||
@Column(name = "file")
|
||||
private String file;
|
||||
|
|
@ -114,10 +114,10 @@ public class ImageTransferVO implements ImageTransfer {
|
|||
this.created = new Date();
|
||||
}
|
||||
|
||||
public ImageTransferVO(String uuid, Long backupId, long diskId, long hostId, int nbdPort, Phase phase, Direction direction, Long accountId, Long domainId, Long dataCenterId) {
|
||||
public ImageTransferVO(String uuid, Long backupId, long diskId, long hostId, String socket, Phase phase, Direction direction, Long accountId, Long domainId, Long dataCenterId) {
|
||||
this(uuid, diskId, hostId, phase, direction, accountId, domainId, dataCenterId);
|
||||
this.backupId = backupId;
|
||||
this.nbdPort = nbdPort;
|
||||
this.socket = socket;
|
||||
this.backend = Backend.nbd;
|
||||
}
|
||||
|
||||
|
|
@ -164,13 +164,8 @@ public class ImageTransferVO implements ImageTransfer {
|
|||
this.hostId = hostId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNbdPort() {
|
||||
return nbdPort;
|
||||
}
|
||||
|
||||
public void setNbdPort(int nbdPort) {
|
||||
this.nbdPort = nbdPort;
|
||||
public void setSocket(String socket) {
|
||||
this.socket = socket;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -27,7 +27,6 @@ import com.cloud.utils.db.GenericDao;
|
|||
public interface ImageTransferDao extends GenericDao<ImageTransferVO, Long> {
|
||||
List<ImageTransferVO> listByBackupId(Long backupId);
|
||||
ImageTransferVO findByUuid(String uuid);
|
||||
ImageTransferVO findByNbdPort(int port);
|
||||
ImageTransferVO findByVolume(Long volumeId);
|
||||
ImageTransferVO findUnfinishedByVolume(Long volumeId);
|
||||
List<ImageTransferVO> listByPhaseAndDirection(ImageTransfer.Phase phase, ImageTransfer.Direction direction);
|
||||
|
|
|
|||
|
|
@ -34,7 +34,6 @@ public class ImageTransferDaoImpl extends GenericDaoBase<ImageTransferVO, Long>
|
|||
|
||||
private SearchBuilder<ImageTransferVO> backupIdSearch;
|
||||
private SearchBuilder<ImageTransferVO> uuidSearch;
|
||||
private SearchBuilder<ImageTransferVO> nbdPortSearch;
|
||||
private SearchBuilder<ImageTransferVO> volumeSearch;
|
||||
private SearchBuilder<ImageTransferVO> volumeUnfinishedSearch;
|
||||
private SearchBuilder<ImageTransferVO> phaseDirectionSearch;
|
||||
|
|
@ -52,10 +51,6 @@ public class ImageTransferDaoImpl extends GenericDaoBase<ImageTransferVO, Long>
|
|||
uuidSearch.and("uuid", uuidSearch.entity().getUuid(), SearchCriteria.Op.EQ);
|
||||
uuidSearch.done();
|
||||
|
||||
nbdPortSearch = createSearchBuilder();
|
||||
nbdPortSearch.and("nbdPort", nbdPortSearch.entity().getNbdPort(), SearchCriteria.Op.EQ);
|
||||
nbdPortSearch.done();
|
||||
|
||||
volumeSearch = createSearchBuilder();
|
||||
volumeSearch.and("volumeId", volumeSearch.entity().getDiskId(), SearchCriteria.Op.EQ);
|
||||
volumeSearch.done();
|
||||
|
|
@ -85,13 +80,6 @@ public class ImageTransferDaoImpl extends GenericDaoBase<ImageTransferVO, Long>
|
|||
return findOneBy(sc);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ImageTransferVO findByNbdPort(int port) {
|
||||
SearchCriteria<ImageTransferVO> sc = nbdPortSearch.create();
|
||||
sc.setParameters("nbdPort", port);
|
||||
return findOneBy(sc);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ImageTransferVO findByVolume(Long volumeId) {
|
||||
SearchCriteria<ImageTransferVO> sc = volumeSearch.create();
|
||||
|
|
|
|||
|
|
@ -92,7 +92,3 @@ CALL `cloud`.`IDEMPOTENT_ADD_UNIQUE_KEY`('cloud.counter', 'uc_counter__provider_
|
|||
UPDATE `cloud`.`configuration` SET `scope` = 2 WHERE `name` = 'use.https.to.upload';
|
||||
-- Delete the configuration for 'use.https.to.upload' from StoragePool
|
||||
DELETE FROM `cloud`.`storage_pool_details` WHERE `name` = 'use.https.to.upload';
|
||||
|
||||
<<<<<<< HEAD
|
||||
=======
|
||||
>>>>>>> 1ec4e52fa6 (Support file backend for cow format: api and server)
|
||||
|
|
|
|||
|
|
@ -123,7 +123,6 @@ CALL `cloud`.`IDEMPOTENT_ADD_COLUMN`('cloud.backups', 'from_checkpoint_id', 'VAR
|
|||
CALL `cloud`.`IDEMPOTENT_ADD_COLUMN`('cloud.backups', 'to_checkpoint_id', 'VARCHAR(255) DEFAULT NULL COMMENT "New checkpoint id created for this backup session"');
|
||||
CALL `cloud`.`IDEMPOTENT_ADD_COLUMN`('cloud.backups', 'checkpoint_create_time', 'BIGINT DEFAULT NULL COMMENT "Checkpoint creation timestamp from libvirt"');
|
||||
CALL `cloud`.`IDEMPOTENT_ADD_COLUMN`('cloud.backups', 'host_id', 'BIGINT UNSIGNED DEFAULT NULL COMMENT "Host where backup is running"');
|
||||
CALL `cloud`.`IDEMPOTENT_ADD_COLUMN`('cloud.backups', 'nbd_port', 'INT DEFAULT NULL COMMENT "NBD server port for backup"');
|
||||
|
||||
-- Add checkpoint tracking fields to vm_instance table for domain recreation
|
||||
CALL `cloud`.`IDEMPOTENT_ADD_COLUMN`('cloud.vm_instance', 'active_checkpoint_id', 'VARCHAR(255) DEFAULT NULL COMMENT "Active checkpoint id tracked for incremental backups"');
|
||||
|
|
@ -139,10 +138,10 @@ CREATE TABLE IF NOT EXISTS `cloud`.`image_transfer`(
|
|||
`backup_id` bigint unsigned COMMENT 'Backup ID',
|
||||
`disk_id` bigint unsigned NOT NULL COMMENT 'Disk/Volume ID',
|
||||
`host_id` bigint unsigned NOT NULL COMMENT 'Host ID',
|
||||
`nbd_port` int NOT NULL COMMENT 'NBD port',
|
||||
`transfer_url` varchar(255) COMMENT 'ImageIO transfer URL',
|
||||
`file` varchar(255) COMMENT 'File for the file backend',
|
||||
`phase` varchar(20) NOT NULL COMMENT 'Transfer phase: initializing, transferring, finished, failed',
|
||||
`socket` varchar(255) COMMENT 'Unix socket for nbd backend',
|
||||
`direction` varchar(20) NOT NULL COMMENT 'Direction: upload, download',
|
||||
`backend` varchar(20) NOT NULL COMMENT 'Backend: nbd, file',
|
||||
`progress` int COMMENT 'Transfer progress percentage (0-100)',
|
||||
|
|
|
|||
|
|
@ -395,6 +395,7 @@ public class LibvirtComputingResource extends ServerResourceBase implements Serv
|
|||
private String heartBeatPath;
|
||||
private String vmActivityCheckPath;
|
||||
private String nasBackupPath;
|
||||
private String imageServerPath;
|
||||
private String securityGroupPath;
|
||||
private String ovsPvlanDhcpHostPath;
|
||||
private String ovsPvlanVmPath;
|
||||
|
|
@ -809,6 +810,10 @@ public class LibvirtComputingResource extends ServerResourceBase implements Serv
|
|||
return nasBackupPath;
|
||||
}
|
||||
|
||||
public String getImageServerPath() {
|
||||
return imageServerPath;
|
||||
}
|
||||
|
||||
public String getOvsPvlanDhcpHostPath() {
|
||||
return ovsPvlanDhcpHostPath;
|
||||
}
|
||||
|
|
@ -1095,6 +1100,11 @@ public class LibvirtComputingResource extends ServerResourceBase implements Serv
|
|||
throw new ConfigurationException("Unable to find nasbackup.sh");
|
||||
}
|
||||
|
||||
imageServerPath = Script.findScript(kvmScriptsDir, "image_server.py");
|
||||
if (imageServerPath == null) {
|
||||
throw new ConfigurationException("Unable to find image_server.py");
|
||||
}
|
||||
|
||||
createTmplPath = Script.findScript(storageScriptsDir, "createtmplt.sh");
|
||||
if (createTmplPath == null) {
|
||||
throw new ConfigurationException("Unable to find the createtmplt.sh");
|
||||
|
|
|
|||
|
|
@ -17,8 +17,16 @@
|
|||
|
||||
package com.cloud.hypervisor.kvm.resource.wrapper;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.cloudstack.backup.CreateImageTransferAnswer;
|
||||
import org.apache.cloudstack.backup.CreateImageTransferCommand;
|
||||
import org.apache.cloudstack.backup.ImageTransfer;
|
||||
import org.apache.cloudstack.storage.resource.IpTablesHelper;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
|
|
@ -26,36 +34,128 @@ import com.cloud.agent.api.Answer;
|
|||
import com.cloud.hypervisor.kvm.resource.LibvirtComputingResource;
|
||||
import com.cloud.resource.CommandWrapper;
|
||||
import com.cloud.resource.ResourceWrapper;
|
||||
import com.cloud.utils.StringUtils;
|
||||
import com.cloud.utils.script.Script;
|
||||
import com.google.gson.GsonBuilder;
|
||||
|
||||
@ResourceWrapper(handles = CreateImageTransferCommand.class)
|
||||
public class LibvirtCreateImageTransferCommandWrapper extends CommandWrapper<CreateImageTransferCommand, Answer, LibvirtComputingResource> {
|
||||
protected Logger logger = LogManager.getLogger(getClass());
|
||||
|
||||
private CreateImageTransferAnswer handleUpload(CreateImageTransferCommand cmd) {
|
||||
return new CreateImageTransferAnswer(cmd, false, "Image Upload is not handled by KVM agent");
|
||||
}
|
||||
private boolean startImageServerIfNotRunning(int imageServerPort, LibvirtComputingResource resource) {
|
||||
final String imageServerScript = resource.getImageServerPath();
|
||||
String unitName = "cloudstack-image-server";
|
||||
|
||||
private CreateImageTransferAnswer handleDownload(CreateImageTransferCommand cmd) {
|
||||
String exportName = cmd.getExportName();
|
||||
int nbdPort = cmd.getNbdPort();
|
||||
try {
|
||||
String hostIpAddress = cmd.getHostIpAddress();
|
||||
String transferUrl = String.format("nbd://%s:%d/%s", hostIpAddress, nbdPort, exportName);
|
||||
|
||||
return new CreateImageTransferAnswer(cmd, true, "Image transfer created for download",
|
||||
cmd.getTransferId(), transferUrl);
|
||||
|
||||
} catch (Exception e) {
|
||||
return new CreateImageTransferAnswer(cmd, false, "Error creating image transfer: " + e.getMessage());
|
||||
Script checkScript = new Script("/bin/bash", logger);
|
||||
checkScript.add("-c");
|
||||
checkScript.add(String.format("systemctl is-active --quiet %s", unitName));
|
||||
String checkResult = checkScript.execute();
|
||||
if (checkResult == null) {
|
||||
return true;
|
||||
}
|
||||
|
||||
String systemdRunCmd = String.format(
|
||||
"systemd-run --unit=%s --property=Restart=no /usr/bin/python3 %s --listen 0.0.0.0 --port %d",
|
||||
unitName, imageServerScript, imageServerPort);
|
||||
|
||||
Script startScript = new Script("/bin/bash", logger);
|
||||
startScript.add("-c");
|
||||
startScript.add(systemdRunCmd);
|
||||
String startResult = startScript.execute();
|
||||
|
||||
if (startResult != null) {
|
||||
logger.error(String.format("Failed to start the Image server: %s", startResult));
|
||||
return false;
|
||||
}
|
||||
|
||||
// Wait with timeout until the service is up
|
||||
int maxWaitSeconds = 10;
|
||||
int pollIntervalMs = 1000;
|
||||
int maxAttempts = (maxWaitSeconds * 1000) / pollIntervalMs;
|
||||
boolean serviceActive = false;
|
||||
|
||||
for (int attempt = 0; attempt < maxAttempts; attempt++) {
|
||||
Script verifyScript = new Script("/bin/bash", logger);
|
||||
verifyScript.add("-c");
|
||||
verifyScript.add(String.format("systemctl is-active --quiet %s", unitName));
|
||||
String verifyResult = verifyScript.execute();
|
||||
if (verifyResult == null) {
|
||||
serviceActive = true;
|
||||
logger.info(String.format("Image server is now active (attempt %d)", attempt + 1));
|
||||
break;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(pollIntervalMs);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if (!serviceActive) {
|
||||
logger.error(String.format("Image server failed to start within %d seconds", maxWaitSeconds));
|
||||
return false;
|
||||
}
|
||||
|
||||
String rule = String.format("-p tcp -m state --state NEW -m tcp --dport %d -j ACCEPT", imageServerPort);
|
||||
IpTablesHelper.addConditionally(IpTablesHelper.INPUT_CHAIN, true, rule,
|
||||
String.format("Error in opening up image server port %d", imageServerPort));
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Answer execute(CreateImageTransferCommand cmd, LibvirtComputingResource resource) {
|
||||
if (cmd.getDirection().equals("download")) {
|
||||
return handleDownload(cmd);
|
||||
} else {
|
||||
return handleUpload(cmd);
|
||||
final String transferId = cmd.getTransferId();
|
||||
ImageTransfer.Backend backend = cmd.getBackend();
|
||||
|
||||
if (StringUtils.isBlank(transferId)) {
|
||||
return new CreateImageTransferAnswer(cmd, false, "transferId is empty.");
|
||||
}
|
||||
|
||||
final Map<String, Object> payload = new HashMap<>();
|
||||
payload.put("backend", backend.toString());
|
||||
|
||||
if (backend == ImageTransfer.Backend.file) {
|
||||
final String filePath = cmd.getFile();
|
||||
if (StringUtils.isBlank(filePath)) {
|
||||
return new CreateImageTransferAnswer(cmd, false, "file path is empty for file backend.");
|
||||
}
|
||||
payload.put("file", filePath);
|
||||
} else {
|
||||
String socket = cmd.getSocket();
|
||||
final String exportName = cmd.getExportName();
|
||||
if (StringUtils.isBlank(socket)) {
|
||||
return new CreateImageTransferAnswer(cmd, false, "Empty socket.");
|
||||
}
|
||||
if (StringUtils.isBlank(exportName)) {
|
||||
return new CreateImageTransferAnswer(cmd, false, "exportName is empty.");
|
||||
}
|
||||
payload.put("socket", "/tmp/imagetransfer/" + socket + ".sock");
|
||||
payload.put("export", exportName);
|
||||
String checkpointId = cmd.getCheckpointId();
|
||||
if (checkpointId != null) {
|
||||
payload.put("export_bitmap", exportName + "-" + checkpointId.substring(0, 4));
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
final String json = new GsonBuilder().create().toJson(payload);
|
||||
File dir = new File("/tmp/imagetransfer");
|
||||
if (!dir.exists()) {
|
||||
dir.mkdirs();
|
||||
}
|
||||
final File transferFile = new File("/tmp/imagetransfer", transferId);
|
||||
FileUtils.writeStringToFile(transferFile, json, "UTF-8");
|
||||
|
||||
} catch (IOException e) {
|
||||
logger.warn("Failed to prepare image transfer on KVM host", e);
|
||||
return new CreateImageTransferAnswer(cmd, false, "Failed to prepare image transfer on KVM host: " + e.getMessage());
|
||||
}
|
||||
|
||||
final int imageServerPort = 54323;
|
||||
startImageServerIfNotRunning(imageServerPort, resource);
|
||||
|
||||
final String transferUrl = String.format("http://%s:%d/images/%s", resource.getPrivateIp(), imageServerPort, transferId);
|
||||
return new CreateImageTransferAnswer(cmd, true, "Image transfer prepared on KVM host.", transferId, transferUrl);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,110 @@
|
|||
//Licensed to the Apache Software Foundation (ASF) under one
|
||||
//or more contributor license agreements. See the NOTICE file
|
||||
//distributed with this work for additional information
|
||||
//regarding copyright ownership. The ASF licenses this file
|
||||
//to you under the Apache License, Version 2.0 (the
|
||||
//"License"); you may not use this file except in compliance
|
||||
//the License. You may obtain a copy of the License at
|
||||
//
|
||||
//http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
//Unless required by applicable law or agreed to in writing,
|
||||
//software distributed under the License is distributed on an
|
||||
//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
//KIND, either express or implied. See the License for the
|
||||
//specific language governing permissions and limitations
|
||||
//under the License.
|
||||
|
||||
package com.cloud.hypervisor.kvm.resource.wrapper;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.cloudstack.backup.FinalizeImageTransferCommand;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import com.cloud.agent.api.Answer;
|
||||
import com.cloud.hypervisor.kvm.resource.LibvirtComputingResource;
|
||||
import com.cloud.resource.CommandWrapper;
|
||||
import com.cloud.resource.ResourceWrapper;
|
||||
import com.cloud.utils.StringUtils;
|
||||
import com.cloud.utils.script.Script;
|
||||
|
||||
@ResourceWrapper(handles = FinalizeImageTransferCommand.class)
|
||||
public class LibvirtFinalizeImageTransferCommandWrapper extends CommandWrapper<FinalizeImageTransferCommand, Answer, LibvirtComputingResource> {
|
||||
protected Logger logger = LogManager.getLogger(getClass());
|
||||
private void resetService(String unitName) {
|
||||
Script resetScript = new Script("/bin/bash", logger);
|
||||
resetScript.add("-c");
|
||||
resetScript.add(String.format("systemctl reset-failed %s || true", unitName));
|
||||
resetScript.execute();
|
||||
}
|
||||
|
||||
private boolean stopImageServer() {
|
||||
String unitName = "cloudstack-image-server";
|
||||
final int imageServerPort = 54323;
|
||||
|
||||
Script checkScript = new Script("/bin/bash", logger);
|
||||
checkScript.add("-c");
|
||||
checkScript.add(String.format("systemctl is-active --quiet %s", unitName));
|
||||
String checkResult = checkScript.execute();
|
||||
if (checkResult != null) {
|
||||
logger.info(String.format("Image server not running, resetting failed state"));
|
||||
resetService(unitName);
|
||||
// Still try to remove firewall rule in case it exists
|
||||
removeFirewallRule(imageServerPort);
|
||||
return true;
|
||||
}
|
||||
|
||||
Script stopScript = new Script("/bin/bash", logger);
|
||||
stopScript.add("-c");
|
||||
stopScript.add(String.format("systemctl stop %s", unitName));
|
||||
stopScript.execute();
|
||||
resetService(unitName);
|
||||
logger.info(String.format("Image server %s stopped", unitName));
|
||||
|
||||
removeFirewallRule(imageServerPort);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private void removeFirewallRule(int port) {
|
||||
String rule = String.format("-p tcp -m state --state NEW -m tcp --dport %d -j ACCEPT", port);
|
||||
Script removeScript = new Script("/bin/bash", logger);
|
||||
removeScript.add("-c");
|
||||
removeScript.add(String.format("iptables -D INPUT %s || true", rule));
|
||||
String result = removeScript.execute();
|
||||
if (result != null && !result.isEmpty() && !result.contains("iptables: Bad rule")) {
|
||||
logger.debug(String.format("Firewall rule removal result for port %d: %s", port, result));
|
||||
} else {
|
||||
logger.info(String.format("Firewall rule removed for port %d (or did not exist)", port));
|
||||
}
|
||||
}
|
||||
|
||||
public Answer execute(FinalizeImageTransferCommand cmd, LibvirtComputingResource resource) {
|
||||
final String transferId = cmd.getTransferId();
|
||||
if (StringUtils.isBlank(transferId)) {
|
||||
return new Answer(cmd, false, "transferId is empty.");
|
||||
}
|
||||
|
||||
final File transferFile = new File("/tmp/imagetransfer", transferId);
|
||||
if (transferFile.exists() && !transferFile.delete()) {
|
||||
return new Answer(cmd, false, "Failed to delete transfer config file: " + transferFile.getAbsolutePath());
|
||||
}
|
||||
|
||||
try (Stream<Path> stream = Files.list(Paths.get("/tmp/imagetransfer"))) {
|
||||
if (!stream.findAny().isPresent()) {
|
||||
stopImageServer();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.warn("Failed to list /tmp/imagetransfer", e);
|
||||
}
|
||||
|
||||
return new Answer(cmd, true, "Image transfer finalized.");
|
||||
}
|
||||
}
|
||||
|
|
@ -39,7 +39,7 @@ public class LibvirtStartBackupCommandWrapper extends CommandWrapper<StartBackup
|
|||
@Override
|
||||
public Answer execute(StartBackupCommand cmd, LibvirtComputingResource resource) {
|
||||
if (cmd.isStoppedVM()) {
|
||||
return handleStoppedVmBackup(cmd, resource, cmd.getToCheckpointId());
|
||||
return handleStoppedVmBackup(cmd, cmd.getToCheckpointId());
|
||||
}
|
||||
return handleRunningVmBackup(cmd, resource);
|
||||
}
|
||||
|
|
@ -49,7 +49,7 @@ public class LibvirtStartBackupCommandWrapper extends CommandWrapper<StartBackup
|
|||
String toCheckpointId = cmd.getToCheckpointId();
|
||||
String fromCheckpointId = cmd.getFromCheckpointId();
|
||||
Long fromCheckpointCreateTime = cmd.getFromCheckpointCreateTime();
|
||||
int nbdPort = cmd.getNbdPort();
|
||||
String socket = cmd.getSocket();
|
||||
|
||||
try {
|
||||
if (StringUtils.isNotBlank(fromCheckpointId)) {
|
||||
|
|
@ -59,8 +59,13 @@ public class LibvirtStartBackupCommandWrapper extends CommandWrapper<StartBackup
|
|||
}
|
||||
}
|
||||
|
||||
File dir = new File("/tmp/imagetransfer");
|
||||
if (!dir.exists()) {
|
||||
dir.mkdirs();
|
||||
}
|
||||
|
||||
// Create backup XML
|
||||
String backupXml = createBackupXml(cmd, fromCheckpointId, nbdPort, resource);
|
||||
String backupXml = createBackupXml(cmd, fromCheckpointId, socket, resource);
|
||||
String checkpointXml = createCheckpointXml(toCheckpointId);
|
||||
|
||||
// Write XMLs to temp files
|
||||
|
|
@ -141,7 +146,7 @@ public class LibvirtStartBackupCommandWrapper extends CommandWrapper<StartBackup
|
|||
return xml.toString();
|
||||
}
|
||||
|
||||
private String createBackupXml(StartBackupCommand cmd, String fromCheckpointId, int nbdPort, LibvirtComputingResource resource) {
|
||||
private String createBackupXml(StartBackupCommand cmd, String fromCheckpointId, String socket, LibvirtComputingResource resource) {
|
||||
StringBuilder xml = new StringBuilder();
|
||||
xml.append("<domainbackup mode=\"pull\">\n");
|
||||
|
||||
|
|
@ -149,7 +154,8 @@ public class LibvirtStartBackupCommandWrapper extends CommandWrapper<StartBackup
|
|||
xml.append(" <incremental>").append(fromCheckpointId).append("</incremental>\n");
|
||||
}
|
||||
|
||||
xml.append(String.format(" <server transport=\"tcp\" name=\"%s\" port=\"%d\"/>\n", cmd.getHostIpAddress(), nbdPort));
|
||||
xml.append(String.format(" <server transport=\"unix\" socket=\"/tmp/imagetransfer/%s.sock\"/>\n", socket));
|
||||
|
||||
xml.append(" <disks>\n");
|
||||
|
||||
Map<String, String> diskPathUuidMap = cmd.getDiskPathUuidMap();
|
||||
|
|
@ -185,7 +191,7 @@ public class LibvirtStartBackupCommandWrapper extends CommandWrapper<StartBackup
|
|||
"</domaincheckpoint>";
|
||||
}
|
||||
|
||||
private Answer handleStoppedVmBackup(StartBackupCommand cmd, LibvirtComputingResource resource, String toCheckpointId) {
|
||||
private Answer handleStoppedVmBackup(StartBackupCommand cmd, String toCheckpointId) {
|
||||
String vmName = cmd.getVmName();
|
||||
Map<String, String> diskPathUuidMap = cmd.getDiskPathUuidMap();
|
||||
for (Map.Entry<String, String> entry : diskPathUuidMap.entrySet()) {
|
||||
|
|
|
|||
|
|
@ -17,6 +17,8 @@
|
|||
|
||||
package com.cloud.hypervisor.kvm.resource.wrapper;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
import org.apache.cloudstack.backup.StartNBDServerAnswer;
|
||||
import org.apache.cloudstack.backup.StartNBDServerCommand;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
|
@ -26,6 +28,7 @@ import com.cloud.agent.api.Answer;
|
|||
import com.cloud.hypervisor.kvm.resource.LibvirtComputingResource;
|
||||
import com.cloud.resource.CommandWrapper;
|
||||
import com.cloud.resource.ResourceWrapper;
|
||||
import com.cloud.utils.StringUtils;
|
||||
import com.cloud.utils.script.Script;
|
||||
|
||||
@ResourceWrapper(handles = StartNBDServerCommand.class)
|
||||
|
|
@ -35,22 +38,25 @@ public class LibvirtStartNBDServerCommandWrapper extends CommandWrapper<StartNBD
|
|||
@Override
|
||||
public Answer execute(StartNBDServerCommand cmd, LibvirtComputingResource resource) {
|
||||
String volumePath = cmd.getVolumePath();
|
||||
int nbdPort = cmd.getNbdPort();
|
||||
String socket = cmd.getSocket();
|
||||
String hostIpAddress = cmd.getHostIpAddress();
|
||||
String exportName = cmd.getExportName();
|
||||
String transferId = cmd.getTransferId();
|
||||
|
||||
if (volumePath == null || volumePath.isEmpty()) {
|
||||
return new StartNBDServerAnswer(cmd, false, "Volume path is required for upload");
|
||||
if (StringUtils.isBlank(volumePath)) {
|
||||
return new StartNBDServerAnswer(cmd, false, "Volume path is required for the nbd server");
|
||||
}
|
||||
if (exportName == null || exportName.isEmpty()) {
|
||||
return new StartNBDServerAnswer(cmd, false, "Export name is required for upload");
|
||||
if (StringUtils.isBlank(exportName)) {
|
||||
return new StartNBDServerAnswer(cmd, false, "Export name is required for the nbd server");
|
||||
}
|
||||
if (hostIpAddress == null || hostIpAddress.isEmpty()) {
|
||||
return new StartNBDServerAnswer(cmd, false, "Host IP address is required for upload");
|
||||
if (StringUtils.isBlank(hostIpAddress)) {
|
||||
return new StartNBDServerAnswer(cmd, false, "Host IP address is required for the nbd server");
|
||||
}
|
||||
if (StringUtils.isBlank(socket)) {
|
||||
return new StartNBDServerAnswer(cmd, false, "Socket is required for the nbd server");
|
||||
}
|
||||
|
||||
String unitName = String.format("qemu-nbd-%d", nbdPort);
|
||||
String unitName = "qemu-nbd-" + transferId.hashCode();
|
||||
|
||||
Script checkScript = new Script("/bin/bash", logger);
|
||||
checkScript.add("-c");
|
||||
|
|
@ -60,17 +66,22 @@ public class LibvirtStartNBDServerCommandWrapper extends CommandWrapper<StartNBD
|
|||
return new StartNBDServerAnswer(cmd, false, "A qemu-nbd service is already running on the port.");
|
||||
}
|
||||
|
||||
File dir = new File("/tmp/imagetransfer");
|
||||
if (!dir.exists()) {
|
||||
dir.mkdirs();
|
||||
}
|
||||
|
||||
String socketName = "/tmp/imagetransfer/" + socket + ".sock";
|
||||
String systemdRunCmd = String.format(
|
||||
"systemd-run --unit=%s --property=Restart=no " +
|
||||
"qemu-nbd --export-name %s --bind %s --port %d --persistent %s %s",
|
||||
"systemd-run --unit=%s --property=Restart=no qemu-nbd --export-name %s --socket %s --persistent %s %s",
|
||||
unitName,
|
||||
exportName,
|
||||
hostIpAddress,
|
||||
nbdPort,
|
||||
socketName,
|
||||
cmd.getDirection().equals("download") ? "--read-only" : "",
|
||||
volumePath
|
||||
);
|
||||
|
||||
|
||||
Script startScript = new Script("/bin/bash", logger);
|
||||
startScript.add("-c");
|
||||
startScript.add(systemdRunCmd);
|
||||
|
|
@ -111,7 +122,7 @@ public class LibvirtStartNBDServerCommandWrapper extends CommandWrapper<StartNBD
|
|||
String.format("qemu-nbd service failed to start within %d seconds", maxWaitSeconds));
|
||||
}
|
||||
|
||||
String transferUrl = String.format("nbd://%s:%d/%s", hostIpAddress, nbdPort, exportName);
|
||||
String transferUrl = String.format("nbd+unix:///%s", cmd.getSocket());
|
||||
return new StartNBDServerAnswer(cmd, true, "qemu-nbd service started for upload",
|
||||
transferId, transferUrl);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -40,8 +40,7 @@ public class LibvirtStopNBDServerCommandWrapper extends CommandWrapper<StopNBDSe
|
|||
|
||||
private Answer handleUpload(StopNBDServerCommand cmd) {
|
||||
try {
|
||||
int nbdPort = cmd.getNbdPort();
|
||||
String unitName = String.format("qemu-nbd-%d", nbdPort);
|
||||
String unitName = "qemu-nbd-" + cmd.getTransferId().hashCode();
|
||||
|
||||
// Check if the service is running
|
||||
Script checkScript = new Script("/bin/bash", logger);
|
||||
|
|
@ -81,6 +80,5 @@ public class LibvirtStopNBDServerCommandWrapper extends CommandWrapper<StopNBDSe
|
|||
} else {
|
||||
return handleUpload(cmd);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load Diff
|
|
@ -23,7 +23,6 @@ import java.util.Date;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
import java.util.UUID;
|
||||
|
|
@ -108,15 +107,8 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme
|
|||
@Inject
|
||||
private PrimaryDataStoreDao primaryDataStoreDao;
|
||||
|
||||
@Inject
|
||||
EndPointSelector _epSelector;
|
||||
|
||||
private Timer imageTransferTimer;
|
||||
|
||||
private static final int NBD_PORT_RANGE_START = 10809;
|
||||
private static final int NBD_PORT_RANGE_END = 10909;
|
||||
private static final boolean DATAPLANE_PROXY_MODE = true;
|
||||
|
||||
private boolean isDummyOffering(Long backupOfferingId) {
|
||||
if (backupOfferingId == null) {
|
||||
throw new CloudRuntimeException("VM not assigned a backup offering");
|
||||
|
|
@ -174,9 +166,7 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme
|
|||
backup.setToCheckpointId(toCheckpointId);
|
||||
backup.setFromCheckpointId(fromCheckpointId);
|
||||
|
||||
int nbdPort = allocateNbdPort();
|
||||
Long hostId = vm.getHostId() != null ? vm.getHostId() : vm.getLastHostId();
|
||||
backup.setNbdPort(nbdPort);
|
||||
backup.setHostId(hostId);
|
||||
// Will be changed later if incremental was done
|
||||
backup.setType("FULL");
|
||||
|
|
@ -206,9 +196,8 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme
|
|||
backup.getToCheckpointId(),
|
||||
backup.getFromCheckpointId(),
|
||||
vm.getActiveCheckpointCreateTime(),
|
||||
backup.getNbdPort(),
|
||||
backup.getUuid(),
|
||||
diskPathUuidMap,
|
||||
host.getPrivateIpAddress(),
|
||||
vm.getState() == State.Stopped
|
||||
);
|
||||
|
||||
|
|
@ -334,29 +323,26 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme
|
|||
}
|
||||
|
||||
String transferId = UUID.randomUUID().toString();
|
||||
Host host = hostDao.findById(backup.getHostId());
|
||||
|
||||
String socket = backup.getUuid();
|
||||
VMInstanceVO vm = vmInstanceDao.findById(backup.getVmId());
|
||||
if (vm.getState() == State.Stopped) {
|
||||
String volumePath = getVolumePathForFileBasedBackend(volume);
|
||||
startNBDServer(transferId, direction, host, volume.getUuid(), volumePath, backup.getNbdPort());
|
||||
startNBDServer(transferId, direction, backup.getHostId(), volume.getUuid(), volumePath);
|
||||
socket = transferId;
|
||||
}
|
||||
|
||||
CreateImageTransferCommand transferCmd = new CreateImageTransferCommand(
|
||||
transferId,
|
||||
host.getPrivateIpAddress(),
|
||||
direction,
|
||||
volume.getUuid(),
|
||||
backup.getNbdPort(),
|
||||
socket,
|
||||
backup.getFromCheckpointId());
|
||||
|
||||
try {
|
||||
CreateImageTransferAnswer answer;
|
||||
if (dummyOffering) {
|
||||
answer = new CreateImageTransferAnswer(transferCmd, true, "Dummy answer", "image-transfer-id", "nbd://127.0.0.1:10809/vda");
|
||||
} else if (DATAPLANE_PROXY_MODE) {
|
||||
EndPoint ssvm = _epSelector.findSsvm(backup.getZoneId());
|
||||
answer = (CreateImageTransferAnswer) ssvm.sendMessage(transferCmd);
|
||||
} else {
|
||||
answer = (CreateImageTransferAnswer) agentManager.send(backup.getHostId(), transferCmd);
|
||||
}
|
||||
|
|
@ -370,7 +356,7 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme
|
|||
backupId,
|
||||
volume.getId(),
|
||||
backup.getHostId(),
|
||||
backup.getNbdPort(),
|
||||
socket,
|
||||
ImageTransferVO.Phase.transferring,
|
||||
ImageTransfer.Direction.download,
|
||||
backup.getAccountId(),
|
||||
|
|
@ -398,18 +384,17 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme
|
|||
return hosts.get(0);
|
||||
}
|
||||
|
||||
private void startNBDServer(String transferId, String direction, Host host, String exportName, String volumePath, int nbdPort) {
|
||||
private void startNBDServer(String transferId, String direction, Long hostId, String exportName, String volumePath) {
|
||||
StartNBDServerAnswer nbdServerAnswer;
|
||||
StartNBDServerCommand nbdServerCmd = new StartNBDServerCommand(
|
||||
transferId,
|
||||
host.getPrivateIpAddress(),
|
||||
exportName,
|
||||
volumePath,
|
||||
nbdPort,
|
||||
transferId,
|
||||
direction
|
||||
);
|
||||
try {
|
||||
nbdServerAnswer = (StartNBDServerAnswer) agentManager.send(host.getId(), nbdServerCmd);
|
||||
nbdServerAnswer = (StartNBDServerAnswer) agentManager.send(hostId, nbdServerCmd);
|
||||
} catch (AgentUnavailableException | OperationTimedoutException e) {
|
||||
throw new CloudRuntimeException("Failed to communicate with agent", e);
|
||||
}
|
||||
|
|
@ -451,19 +436,18 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme
|
|||
|
||||
transferCmd = new CreateImageTransferCommand(
|
||||
transferId,
|
||||
host.getPrivateIpAddress(),
|
||||
direction,
|
||||
transferId,
|
||||
volumePath);
|
||||
|
||||
} else {
|
||||
int nbdPort = allocateNbdPort();
|
||||
startNBDServer(transferId, direction, host, volume.getUuid(), volumePath, nbdPort);
|
||||
startNBDServer(transferId, direction, host.getId(), volume.getUuid(), volumePath);
|
||||
imageTransfer = new ImageTransferVO(
|
||||
transferId,
|
||||
null,
|
||||
volume.getId(),
|
||||
host.getId(),
|
||||
nbdPort,
|
||||
transferId,
|
||||
ImageTransferVO.Phase.transferring,
|
||||
ImageTransfer.Direction.upload,
|
||||
volume.getAccountId(),
|
||||
|
|
@ -472,16 +456,17 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme
|
|||
|
||||
transferCmd = new CreateImageTransferCommand(
|
||||
transferId,
|
||||
host.getPrivateIpAddress(),
|
||||
direction,
|
||||
volume.getUuid(),
|
||||
nbdPort,
|
||||
transferId,
|
||||
null);
|
||||
}
|
||||
|
||||
|
||||
EndPoint ssvm = _epSelector.findSsvm(volume.getDataCenterId());
|
||||
CreateImageTransferAnswer transferAnswer = (CreateImageTransferAnswer) ssvm.sendMessage(transferCmd);
|
||||
CreateImageTransferAnswer transferAnswer;
|
||||
try {
|
||||
transferAnswer = (CreateImageTransferAnswer) agentManager.send(imageTransfer.getHostId(), transferCmd);
|
||||
} catch (AgentUnavailableException | OperationTimedoutException e) {
|
||||
throw new CloudRuntimeException("Failed to communicate with agent", e);
|
||||
}
|
||||
|
||||
if (!transferAnswer.getResult()) {
|
||||
if (!backend.equals(ImageTransfer.Backend.file)) {
|
||||
|
|
@ -554,32 +539,27 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme
|
|||
private void finalizeDownloadImageTransfer(ImageTransferVO imageTransfer) {
|
||||
|
||||
String transferId = imageTransfer.getUuid();
|
||||
int nbdPort = imageTransfer.getNbdPort();
|
||||
String direction = imageTransfer.getDirection().toString();
|
||||
FinalizeImageTransferCommand finalizeCmd = new FinalizeImageTransferCommand(transferId, direction, nbdPort);
|
||||
FinalizeImageTransferCommand finalizeCmd = new FinalizeImageTransferCommand(transferId);
|
||||
|
||||
BackupVO backup = backupDao.findById(imageTransfer.getBackupId());
|
||||
boolean dummyOffering = isDummyOffering(backup.getBackupOfferingId());
|
||||
|
||||
Answer answer;
|
||||
try {
|
||||
Answer answer;
|
||||
if (dummyOffering) {
|
||||
answer = new Answer(finalizeCmd, true, "Image transfer finalized.");
|
||||
} else if (DATAPLANE_PROXY_MODE) {
|
||||
EndPoint ssvm = _epSelector.findSsvm(backup.getZoneId());
|
||||
answer = ssvm.sendMessage(finalizeCmd);
|
||||
} else {
|
||||
answer = agentManager.send(backup.getHostId(), finalizeCmd);
|
||||
}
|
||||
|
||||
if (!answer.getResult()) {
|
||||
throw new CloudRuntimeException("Failed to finalize image transfer: " + answer.getDetails());
|
||||
}
|
||||
|
||||
} catch (AgentUnavailableException | OperationTimedoutException e) {
|
||||
throw new CloudRuntimeException("Failed to communicate with agent", e);
|
||||
}
|
||||
|
||||
if (!answer.getResult()) {
|
||||
throw new CloudRuntimeException("Failed to finalize image transfer: " + answer.getDetails());
|
||||
}
|
||||
|
||||
VMInstanceVO vm = vmInstanceDao.findById(backup.getVmId());
|
||||
if (vm.getState() == State.Stopped) {
|
||||
boolean stopNbdServerResult = stopNbdServer(imageTransfer);
|
||||
|
|
@ -591,9 +571,8 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme
|
|||
|
||||
private boolean stopNbdServer(ImageTransferVO imageTransfer) {
|
||||
String transferId = imageTransfer.getUuid();
|
||||
int nbdPort = imageTransfer.getNbdPort();
|
||||
String direction = imageTransfer.getDirection().toString();
|
||||
StopNBDServerCommand stopNbdServerCommand = new StopNBDServerCommand(transferId, direction, nbdPort);
|
||||
StopNBDServerCommand stopNbdServerCommand = new StopNBDServerCommand(transferId, direction);
|
||||
Answer answer;
|
||||
try {
|
||||
answer = agentManager.send(imageTransfer.getHostId(), stopNbdServerCommand);
|
||||
|
|
@ -606,17 +585,19 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme
|
|||
|
||||
private void finalizeUploadImageTransfer(ImageTransferVO imageTransfer) {
|
||||
String transferId = imageTransfer.getUuid();
|
||||
int nbdPort = imageTransfer.getNbdPort();
|
||||
String direction = imageTransfer.getDirection().toString();
|
||||
|
||||
boolean stopNbdServerResult = stopNbdServer(imageTransfer);
|
||||
if (!stopNbdServerResult) {
|
||||
throw new CloudRuntimeException("Failed to stop the nbd server");
|
||||
}
|
||||
|
||||
FinalizeImageTransferCommand finalizeCmd = new FinalizeImageTransferCommand(transferId, direction, nbdPort);
|
||||
EndPoint ssvm = _epSelector.findSsvm(imageTransfer.getDataCenterId());
|
||||
Answer answer = ssvm.sendMessage(finalizeCmd);
|
||||
FinalizeImageTransferCommand finalizeCmd = new FinalizeImageTransferCommand(transferId);
|
||||
Answer answer;
|
||||
try {
|
||||
answer = agentManager.send(imageTransfer.getHostId(), finalizeCmd);
|
||||
} catch (AgentUnavailableException | OperationTimedoutException e) {
|
||||
throw new CloudRuntimeException("Failed to communicate with agent", e);
|
||||
}
|
||||
|
||||
if (!answer.getResult()) {
|
||||
throw new CloudRuntimeException("Failed to finalize image transfer: " + answer.getDetails());
|
||||
|
|
@ -717,19 +698,6 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme
|
|||
return cmdList;
|
||||
}
|
||||
|
||||
private int getRandomNbdPort() {
|
||||
Random random = new Random();
|
||||
return NBD_PORT_RANGE_START + random.nextInt(NBD_PORT_RANGE_END - NBD_PORT_RANGE_START);
|
||||
}
|
||||
|
||||
private int allocateNbdPort() {
|
||||
int port = getRandomNbdPort();
|
||||
while (imageTransferDao.findByNbdPort(port) != null) {
|
||||
port = getRandomNbdPort();
|
||||
}
|
||||
return port;
|
||||
}
|
||||
|
||||
private ImageTransferResponse toImageTransferResponse(ImageTransferVO imageTransferVO) {
|
||||
ImageTransferResponse response = new ImageTransferResponse();
|
||||
response.setId(imageTransferVO.getUuid());
|
||||
|
|
|
|||
|
|
@ -54,10 +54,6 @@ import java.util.stream.Stream;
|
|||
|
||||
import javax.naming.ConfigurationException;
|
||||
|
||||
import org.apache.cloudstack.backup.CreateImageTransferAnswer;
|
||||
import org.apache.cloudstack.backup.CreateImageTransferCommand;
|
||||
import org.apache.cloudstack.backup.FinalizeImageTransferCommand;
|
||||
import org.apache.cloudstack.backup.ImageTransfer;
|
||||
import org.apache.cloudstack.framework.security.keystore.KeystoreManager;
|
||||
import org.apache.cloudstack.storage.NfsMountManagerImpl.PathParser;
|
||||
import org.apache.cloudstack.storage.command.CopyCmdAnswer;
|
||||
|
|
@ -342,10 +338,6 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S
|
|||
return execute((ListDataStoreObjectsCommand)cmd);
|
||||
} else if (cmd instanceof QuerySnapshotZoneCopyCommand) {
|
||||
return execute((QuerySnapshotZoneCopyCommand)cmd);
|
||||
} else if (cmd instanceof CreateImageTransferCommand) {
|
||||
return execute((CreateImageTransferCommand)cmd);
|
||||
} else if (cmd instanceof FinalizeImageTransferCommand) {
|
||||
return execute((FinalizeImageTransferCommand)cmd);
|
||||
} else {
|
||||
return Answer.createUnsupportedCommandAnswer(cmd);
|
||||
}
|
||||
|
|
@ -3716,212 +3708,4 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S
|
|||
return new QuerySnapshotZoneCopyAnswer(cmd, files);
|
||||
}
|
||||
|
||||
private void resetService(String unitName) {
|
||||
Script resetScript = new Script("/bin/bash", logger);
|
||||
resetScript.add("-c");
|
||||
resetScript.add(String.format("systemctl reset-failed %s || true", unitName));
|
||||
resetScript.execute();
|
||||
}
|
||||
|
||||
private boolean stopImageServer() {
|
||||
String unitName = "cloudstack-image-server";
|
||||
final int imageServerPort = 54323;
|
||||
|
||||
Script checkScript = new Script("/bin/bash", logger);
|
||||
checkScript.add("-c");
|
||||
checkScript.add(String.format("systemctl is-active --quiet %s", unitName));
|
||||
String checkResult = checkScript.execute();
|
||||
if (checkResult != null) {
|
||||
logger.info(String.format("Image server not running, resetting failed state"));
|
||||
resetService(unitName);
|
||||
// Still try to remove firewall rule in case it exists
|
||||
if (_inSystemVM) {
|
||||
removeFirewallRule(imageServerPort);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
Script stopScript = new Script("/bin/bash", logger);
|
||||
stopScript.add("-c");
|
||||
stopScript.add(String.format("systemctl stop %s", unitName));
|
||||
stopScript.execute();
|
||||
resetService(unitName);
|
||||
logger.info(String.format("Image server %s stopped", unitName));
|
||||
|
||||
// Close firewall port for image server
|
||||
if (_inSystemVM) {
|
||||
removeFirewallRule(imageServerPort);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private void removeFirewallRule(int port) {
|
||||
String rule = String.format("-p tcp -m state --state NEW -m tcp --dport %d -j ACCEPT", port);
|
||||
Script removeScript = new Script("/bin/bash", logger);
|
||||
removeScript.add("-c");
|
||||
removeScript.add(String.format("iptables -D INPUT %s || true", rule));
|
||||
String result = removeScript.execute();
|
||||
if (result != null && !result.isEmpty() && !result.contains("iptables: Bad rule")) {
|
||||
logger.debug(String.format("Firewall rule removal result for port %d: %s", port, result));
|
||||
} else {
|
||||
logger.info(String.format("Firewall rule removed for port %d (or did not exist)", port));
|
||||
}
|
||||
}
|
||||
|
||||
private boolean startImageServerIfNotRunning(int imageServerPort) {
|
||||
final String imageServerScript = "/opt/cloud/bin/image_server.py";
|
||||
String unitName = "cloudstack-image-server";
|
||||
|
||||
Script checkScript = new Script("/bin/bash", logger);
|
||||
checkScript.add("-c");
|
||||
checkScript.add(String.format("systemctl is-active --quiet %s", unitName));
|
||||
String checkResult = checkScript.execute();
|
||||
if (checkResult == null) {
|
||||
return true;
|
||||
}
|
||||
|
||||
String systemdRunCmd = String.format(
|
||||
"systemd-run --unit=%s --property=Restart=no /usr/bin/python3 %s --listen 0.0.0.0 --port %d",
|
||||
unitName, imageServerScript, imageServerPort);
|
||||
|
||||
Script startScript = new Script("/bin/bash", logger);
|
||||
startScript.add("-c");
|
||||
startScript.add(systemdRunCmd);
|
||||
String startResult = startScript.execute();
|
||||
|
||||
if (startResult != null) {
|
||||
logger.error(String.format("Failed to start the Image serer: %s", startResult));
|
||||
return false;
|
||||
}
|
||||
|
||||
// Wait with timeout until the service is up
|
||||
int maxWaitSeconds = 10;
|
||||
int pollIntervalMs = 1000;
|
||||
int maxAttempts = (maxWaitSeconds * 1000) / pollIntervalMs;
|
||||
boolean serviceActive = false;
|
||||
|
||||
for (int attempt = 0; attempt < maxAttempts; attempt++) {
|
||||
Script verifyScript = new Script("/bin/bash", logger);
|
||||
verifyScript.add("-c");
|
||||
verifyScript.add(String.format("systemctl is-active --quiet %s", unitName));
|
||||
String verifyResult = verifyScript.execute();
|
||||
if (verifyResult == null) {
|
||||
serviceActive = true;
|
||||
logger.info(String.format("Image server is now active (attempt %d)", attempt + 1));
|
||||
break;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(pollIntervalMs);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if (!serviceActive) {
|
||||
logger.error(String.format("Image server failed to start within %d seconds", maxWaitSeconds));
|
||||
return false;
|
||||
}
|
||||
|
||||
// Open firewall port for image server
|
||||
if (_inSystemVM) {
|
||||
String rule = String.format("-p tcp -m state --state NEW -m tcp --dport %d -j ACCEPT", imageServerPort);
|
||||
IpTablesHelper.addConditionally(IpTablesHelper.INPUT_CHAIN, false, rule,
|
||||
String.format("Error in opening up image server port %d", imageServerPort));
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
protected Answer execute(CreateImageTransferCommand cmd) {
|
||||
if (!_inSystemVM) {
|
||||
return new CreateImageTransferAnswer(cmd, true, "Not running inside SSVM; skipping image transfer setup.");
|
||||
}
|
||||
|
||||
final String transferId = cmd.getTransferId();
|
||||
final String hostIp = cmd.getHostIpAddress();
|
||||
final ImageTransfer.Backend backend = cmd.getBackend();
|
||||
|
||||
if (StringUtils.isBlank(transferId)) {
|
||||
return new CreateImageTransferAnswer(cmd, false, "transferId is empty.");
|
||||
}
|
||||
if (StringUtils.isBlank(hostIp)) {
|
||||
return new CreateImageTransferAnswer(cmd, false, "hostIpAddress is empty.");
|
||||
}
|
||||
|
||||
final Map<String, Object> payload = new HashMap<>();
|
||||
payload.put("backend", backend.toString());
|
||||
|
||||
if (backend == ImageTransfer.Backend.file) {
|
||||
final String filePath = cmd.getFile();
|
||||
if (StringUtils.isBlank(filePath)) {
|
||||
return new CreateImageTransferAnswer(cmd, false, "file path is empty for file backend.");
|
||||
}
|
||||
payload.put("file", filePath);
|
||||
} else {
|
||||
final String exportName = cmd.getExportName();
|
||||
final int nbdPort = cmd.getNbdPort();
|
||||
if (StringUtils.isBlank(exportName)) {
|
||||
return new CreateImageTransferAnswer(cmd, false, "exportName is empty.");
|
||||
}
|
||||
if (nbdPort <= 0) {
|
||||
return new CreateImageTransferAnswer(cmd, false, "Invalid nbdPort: " + nbdPort);
|
||||
}
|
||||
payload.put("host", hostIp);
|
||||
payload.put("port", nbdPort);
|
||||
payload.put("export", exportName);
|
||||
String checkpointId = cmd.getCheckpointId();
|
||||
if (checkpointId != null) {
|
||||
payload.put("export_bitmap", exportName + "-" + checkpointId.substring(0, 4));
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
final String json = new GsonBuilder().create().toJson(payload);
|
||||
File dir = new File("/tmp/imagetransfer");
|
||||
if (!dir.exists()) {
|
||||
dir.mkdirs();
|
||||
}
|
||||
final File transferFile = new File("/tmp/imagetransfer", transferId);
|
||||
FileUtils.writeStringToFile(transferFile, json, "UTF-8");
|
||||
|
||||
} catch (IOException e) {
|
||||
logger.warn("Failed to prepare image transfer on SSVM", e);
|
||||
return new CreateImageTransferAnswer(cmd, false, "Failed to prepare image transfer on SSVM: " + e.getMessage());
|
||||
}
|
||||
|
||||
final int imageServerPort = 54323;
|
||||
startImageServerIfNotRunning(imageServerPort);
|
||||
|
||||
final String transferUrl = String.format("http://%s:%d/images/%s", _publicIp, imageServerPort, transferId);
|
||||
return new CreateImageTransferAnswer(cmd, true, "Image transfer prepared on SSVM.", transferId, transferUrl);
|
||||
}
|
||||
|
||||
protected Answer execute(FinalizeImageTransferCommand cmd) {
|
||||
if (!_inSystemVM) {
|
||||
return new Answer(cmd, true, "Not running inside SSVM; skipping image transfer finalization.");
|
||||
}
|
||||
|
||||
final String transferId = cmd.getTransferId();
|
||||
if (StringUtils.isBlank(transferId)) {
|
||||
return new Answer(cmd, false, "transferId is empty.");
|
||||
}
|
||||
|
||||
final File transferFile = new File("/tmp/imagetransfer", transferId);
|
||||
if (transferFile.exists() && !transferFile.delete()) {
|
||||
return new Answer(cmd, false, "Failed to delete transfer config file: " + transferFile.getAbsolutePath());
|
||||
}
|
||||
|
||||
try (Stream<Path> stream = Files.list(Paths.get("/tmp/imagetransfer"))) {
|
||||
if (!stream.findAny().isPresent()) {
|
||||
stopImageServer();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.warn("Failed to list /tmp/imagetransfer", e);
|
||||
}
|
||||
|
||||
return new Answer(cmd, true, "Image transfer finalized.");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue