diff --git a/api/src/main/java/org/apache/cloudstack/backup/Backup.java b/api/src/main/java/org/apache/cloudstack/backup/Backup.java index bc464beeb6d..42afc7f196c 100644 --- a/api/src/main/java/org/apache/cloudstack/backup/Backup.java +++ b/api/src/main/java/org/apache/cloudstack/backup/Backup.java @@ -38,8 +38,6 @@ public interface Backup extends ControlledEntity, InternalIdentity, Identity { Long getHostId(); - Integer getNbdPort(); - enum Status { Allocated, Queued, BackingUp, ReadyForTransfer, FinalizingTransfer, BackedUp, Error, Failed, Restoring, Removed, Expunged } diff --git a/api/src/main/java/org/apache/cloudstack/backup/ImageTransfer.java b/api/src/main/java/org/apache/cloudstack/backup/ImageTransfer.java index cf09749bcfc..f7fe1e9c2bb 100644 --- a/api/src/main/java/org/apache/cloudstack/backup/ImageTransfer.java +++ b/api/src/main/java/org/apache/cloudstack/backup/ImageTransfer.java @@ -49,8 +49,6 @@ public interface ImageTransfer extends ControlledEntity, InternalIdentity { long getHostId(); - int getNbdPort(); - String getTransferUrl(); Phase getPhase(); diff --git a/core/src/main/java/org/apache/cloudstack/backup/CreateImageTransferCommand.java b/core/src/main/java/org/apache/cloudstack/backup/CreateImageTransferCommand.java index 4fb8743b625..3e042bf4249 100644 --- a/core/src/main/java/org/apache/cloudstack/backup/CreateImageTransferCommand.java +++ b/core/src/main/java/org/apache/cloudstack/backup/CreateImageTransferCommand.java @@ -21,9 +21,8 @@ import com.cloud.agent.api.Command; public class CreateImageTransferCommand extends Command { private String transferId; - private String hostIpAddress; private String exportName; - private int nbdPort; + private String socket; private String direction; private String checkpointId; private String file; @@ -32,22 +31,21 @@ public class CreateImageTransferCommand extends Command { public CreateImageTransferCommand() { } - private CreateImageTransferCommand(String transferId, String hostIpAddress, String direction) { + private CreateImageTransferCommand(String transferId, String direction, String socket) { this.transferId = transferId; - this.hostIpAddress = hostIpAddress; this.direction = direction; + this.socket = socket; } - public CreateImageTransferCommand(String transferId, String hostIpAddress, String direction, String exportName, int nbdPort, String checkpointId) { - this(transferId, hostIpAddress, direction); + public CreateImageTransferCommand(String transferId, String direction, String exportName, String socket, String checkpointId) { + this(transferId, direction, socket); this.backend = ImageTransfer.Backend.nbd; this.exportName = exportName; - this.nbdPort = nbdPort; this.checkpointId = checkpointId; } - public CreateImageTransferCommand(String transferId, String hostIpAddress, String direction, String file) { - this(transferId, hostIpAddress, direction); + public CreateImageTransferCommand(String transferId, String direction, String socket, String file) { + this(transferId, direction, socket); if (direction == ImageTransfer.Direction.download.toString()) { throw new IllegalArgumentException("File backend is only supported for upload"); } @@ -59,8 +57,8 @@ public class CreateImageTransferCommand extends Command { return exportName; } - public int getNbdPort() { - return nbdPort; + public String getSocket() { + return socket; } public String getFile() { @@ -71,10 +69,6 @@ public class CreateImageTransferCommand extends Command { return backend; } - public String getHostIpAddress() { - return hostIpAddress; - } - public String getTransferId() { return transferId; } diff --git a/core/src/main/java/org/apache/cloudstack/backup/FinalizeImageTransferCommand.java b/core/src/main/java/org/apache/cloudstack/backup/FinalizeImageTransferCommand.java index f1a0285ef6e..84d9b1ff818 100644 --- a/core/src/main/java/org/apache/cloudstack/backup/FinalizeImageTransferCommand.java +++ b/core/src/main/java/org/apache/cloudstack/backup/FinalizeImageTransferCommand.java @@ -21,30 +21,18 @@ import com.cloud.agent.api.Command; public class FinalizeImageTransferCommand extends Command { private String transferId; - private String direction; - private int nbdPort; public FinalizeImageTransferCommand() { } - public FinalizeImageTransferCommand(String transferId, String direction, int nbdPort) { + public FinalizeImageTransferCommand(String transferId) { this.transferId = transferId; - this.direction = direction; - this.nbdPort = nbdPort; } public String getTransferId() { return transferId; } - public int getNbdPort() { - return nbdPort; - } - - public String getDirection() { - return direction; - } - @Override public boolean executeInSequence() { return true; diff --git a/core/src/main/java/org/apache/cloudstack/backup/StartBackupCommand.java b/core/src/main/java/org/apache/cloudstack/backup/StartBackupCommand.java index b43c4661843..0fc7d4e26b3 100644 --- a/core/src/main/java/org/apache/cloudstack/backup/StartBackupCommand.java +++ b/core/src/main/java/org/apache/cloudstack/backup/StartBackupCommand.java @@ -26,23 +26,21 @@ public class StartBackupCommand extends Command { private String toCheckpointId; private String fromCheckpointId; private Long fromCheckpointCreateTime; - private int nbdPort; + private String socket; private Map diskPathUuidMap; - private String hostIpAddress; private boolean stoppedVM; public StartBackupCommand() { } public StartBackupCommand(String vmName, String toCheckpointId, String fromCheckpointId, Long fromCheckpointCreateTime, - int nbdPort, Map diskPathUuidMap, String hostIpAddress, boolean stoppedVM) { + String socket, Map diskPathUuidMap, boolean stoppedVM) { this.vmName = vmName; this.toCheckpointId = toCheckpointId; this.fromCheckpointId = fromCheckpointId; this.fromCheckpointCreateTime = fromCheckpointCreateTime; - this.nbdPort = nbdPort; + this.socket = socket; this.diskPathUuidMap = diskPathUuidMap; - this.hostIpAddress = hostIpAddress; this.stoppedVM = stoppedVM; } @@ -62,8 +60,8 @@ public class StartBackupCommand extends Command { return fromCheckpointCreateTime; } - public int getNbdPort() { - return nbdPort; + public String getSocket() { + return socket; } public Map getDiskPathUuidMap() { @@ -74,10 +72,6 @@ public class StartBackupCommand extends Command { return fromCheckpointId != null && !fromCheckpointId.isEmpty(); } - public String getHostIpAddress() { - return hostIpAddress; - } - public boolean isStoppedVM() { return stoppedVM; } diff --git a/core/src/main/java/org/apache/cloudstack/backup/StartNBDServerCommand.java b/core/src/main/java/org/apache/cloudstack/backup/StartNBDServerCommand.java index 887937ffb4c..b0e452df33c 100644 --- a/core/src/main/java/org/apache/cloudstack/backup/StartNBDServerCommand.java +++ b/core/src/main/java/org/apache/cloudstack/backup/StartNBDServerCommand.java @@ -24,27 +24,31 @@ public class StartNBDServerCommand extends Command { private String hostIpAddress; private String exportName; private String volumePath; - private int nbdPort; + private String socket; private String direction; public StartNBDServerCommand() { } - public StartNBDServerCommand(String transferId, String hostIpAddress, String exportName, String volumePath, int nbdPort, String direction) { + protected StartNBDServerCommand(String transferId, String hostIpAddress, String exportName, String volumePath, String direction) { this.transferId = transferId; this.hostIpAddress = hostIpAddress; this.exportName = exportName; this.volumePath = volumePath; - this.nbdPort = nbdPort; this.direction = direction; } + public StartNBDServerCommand(String transferId, String hostIpAddress, String exportName, String volumePath, String socket, String direction) { + this(transferId, hostIpAddress, exportName, volumePath, direction); + this.socket = socket; + } + public String getExportName() { return exportName; } - public int getNbdPort() { - return nbdPort; + public String getSocket() { + return socket; } public String getHostIpAddress() { diff --git a/core/src/main/java/org/apache/cloudstack/backup/StopNBDServerCommand.java b/core/src/main/java/org/apache/cloudstack/backup/StopNBDServerCommand.java index 4f2b6401480..d75168a22eb 100644 --- a/core/src/main/java/org/apache/cloudstack/backup/StopNBDServerCommand.java +++ b/core/src/main/java/org/apache/cloudstack/backup/StopNBDServerCommand.java @@ -22,25 +22,19 @@ import com.cloud.agent.api.Command; public class StopNBDServerCommand extends Command { private String transferId; private String direction; - private int nbdPort; public StopNBDServerCommand() { } - public StopNBDServerCommand(String transferId, String direction, int nbdPort) { + public StopNBDServerCommand(String transferId, String direction) { this.transferId = transferId; this.direction = direction; - this.nbdPort = nbdPort; } public String getTransferId() { return transferId; } - public int getNbdPort() { - return nbdPort; - } - public String getDirection() { return direction; } diff --git a/engine/schema/src/main/java/org/apache/cloudstack/backup/BackupVO.java b/engine/schema/src/main/java/org/apache/cloudstack/backup/BackupVO.java index 4705cd0159b..d589f9e6bef 100644 --- a/engine/schema/src/main/java/org/apache/cloudstack/backup/BackupVO.java +++ b/engine/schema/src/main/java/org/apache/cloudstack/backup/BackupVO.java @@ -115,9 +115,6 @@ public class BackupVO implements Backup { @Column(name = "host_id") private Long hostId; - @Column(name = "nbd_port") - private Integer nbdPort; - @Transient Map details; @@ -339,13 +336,4 @@ public class BackupVO implements Backup { public void setHostId(Long hostId) { this.hostId = hostId; } - - @Override - public Integer getNbdPort() { - return nbdPort; - } - - public void setNbdPort(Integer nbdPort) { - this.nbdPort = nbdPort; - } } diff --git a/engine/schema/src/main/java/org/apache/cloudstack/backup/ImageTransferVO.java b/engine/schema/src/main/java/org/apache/cloudstack/backup/ImageTransferVO.java index 6562ba74a77..c391eae2e86 100644 --- a/engine/schema/src/main/java/org/apache/cloudstack/backup/ImageTransferVO.java +++ b/engine/schema/src/main/java/org/apache/cloudstack/backup/ImageTransferVO.java @@ -51,8 +51,8 @@ public class ImageTransferVO implements ImageTransfer { @Column(name = "host_id") private long hostId; - @Column(name = "nbd_port") - private int nbdPort; + @Column(name = "socket") + private String socket; @Column(name = "file") private String file; @@ -114,10 +114,10 @@ public class ImageTransferVO implements ImageTransfer { this.created = new Date(); } - public ImageTransferVO(String uuid, Long backupId, long diskId, long hostId, int nbdPort, Phase phase, Direction direction, Long accountId, Long domainId, Long dataCenterId) { + public ImageTransferVO(String uuid, Long backupId, long diskId, long hostId, String socket, Phase phase, Direction direction, Long accountId, Long domainId, Long dataCenterId) { this(uuid, diskId, hostId, phase, direction, accountId, domainId, dataCenterId); this.backupId = backupId; - this.nbdPort = nbdPort; + this.socket = socket; this.backend = Backend.nbd; } @@ -164,13 +164,8 @@ public class ImageTransferVO implements ImageTransfer { this.hostId = hostId; } - @Override - public int getNbdPort() { - return nbdPort; - } - - public void setNbdPort(int nbdPort) { - this.nbdPort = nbdPort; + public void setSocket(String socket) { + this.socket = socket; } @Override diff --git a/engine/schema/src/main/java/org/apache/cloudstack/backup/dao/ImageTransferDao.java b/engine/schema/src/main/java/org/apache/cloudstack/backup/dao/ImageTransferDao.java index e8c30d27ee7..e71dffb22d5 100644 --- a/engine/schema/src/main/java/org/apache/cloudstack/backup/dao/ImageTransferDao.java +++ b/engine/schema/src/main/java/org/apache/cloudstack/backup/dao/ImageTransferDao.java @@ -27,7 +27,6 @@ import com.cloud.utils.db.GenericDao; public interface ImageTransferDao extends GenericDao { List listByBackupId(Long backupId); ImageTransferVO findByUuid(String uuid); - ImageTransferVO findByNbdPort(int port); ImageTransferVO findByVolume(Long volumeId); ImageTransferVO findUnfinishedByVolume(Long volumeId); List listByPhaseAndDirection(ImageTransfer.Phase phase, ImageTransfer.Direction direction); diff --git a/engine/schema/src/main/java/org/apache/cloudstack/backup/dao/ImageTransferDaoImpl.java b/engine/schema/src/main/java/org/apache/cloudstack/backup/dao/ImageTransferDaoImpl.java index 7e311d2a00f..95741fa054d 100644 --- a/engine/schema/src/main/java/org/apache/cloudstack/backup/dao/ImageTransferDaoImpl.java +++ b/engine/schema/src/main/java/org/apache/cloudstack/backup/dao/ImageTransferDaoImpl.java @@ -34,7 +34,6 @@ public class ImageTransferDaoImpl extends GenericDaoBase private SearchBuilder backupIdSearch; private SearchBuilder uuidSearch; - private SearchBuilder nbdPortSearch; private SearchBuilder volumeSearch; private SearchBuilder volumeUnfinishedSearch; private SearchBuilder phaseDirectionSearch; @@ -52,10 +51,6 @@ public class ImageTransferDaoImpl extends GenericDaoBase uuidSearch.and("uuid", uuidSearch.entity().getUuid(), SearchCriteria.Op.EQ); uuidSearch.done(); - nbdPortSearch = createSearchBuilder(); - nbdPortSearch.and("nbdPort", nbdPortSearch.entity().getNbdPort(), SearchCriteria.Op.EQ); - nbdPortSearch.done(); - volumeSearch = createSearchBuilder(); volumeSearch.and("volumeId", volumeSearch.entity().getDiskId(), SearchCriteria.Op.EQ); volumeSearch.done(); @@ -85,13 +80,6 @@ public class ImageTransferDaoImpl extends GenericDaoBase return findOneBy(sc); } - @Override - public ImageTransferVO findByNbdPort(int port) { - SearchCriteria sc = nbdPortSearch.create(); - sc.setParameters("nbdPort", port); - return findOneBy(sc); - } - @Override public ImageTransferVO findByVolume(Long volumeId) { SearchCriteria sc = volumeSearch.create(); diff --git a/engine/schema/src/main/resources/META-INF/db/schema-42100to42200.sql b/engine/schema/src/main/resources/META-INF/db/schema-42100to42200.sql index 1e265421387..044f7475324 100644 --- a/engine/schema/src/main/resources/META-INF/db/schema-42100to42200.sql +++ b/engine/schema/src/main/resources/META-INF/db/schema-42100to42200.sql @@ -92,7 +92,3 @@ CALL `cloud`.`IDEMPOTENT_ADD_UNIQUE_KEY`('cloud.counter', 'uc_counter__provider_ UPDATE `cloud`.`configuration` SET `scope` = 2 WHERE `name` = 'use.https.to.upload'; -- Delete the configuration for 'use.https.to.upload' from StoragePool DELETE FROM `cloud`.`storage_pool_details` WHERE `name` = 'use.https.to.upload'; - -<<<<<<< HEAD -======= ->>>>>>> 1ec4e52fa6 (Support file backend for cow format: api and server) diff --git a/engine/schema/src/main/resources/META-INF/db/schema-42210to42300.sql b/engine/schema/src/main/resources/META-INF/db/schema-42210to42300.sql index f81e2904841..b0063bff53e 100644 --- a/engine/schema/src/main/resources/META-INF/db/schema-42210to42300.sql +++ b/engine/schema/src/main/resources/META-INF/db/schema-42210to42300.sql @@ -123,7 +123,6 @@ CALL `cloud`.`IDEMPOTENT_ADD_COLUMN`('cloud.backups', 'from_checkpoint_id', 'VAR CALL `cloud`.`IDEMPOTENT_ADD_COLUMN`('cloud.backups', 'to_checkpoint_id', 'VARCHAR(255) DEFAULT NULL COMMENT "New checkpoint id created for this backup session"'); CALL `cloud`.`IDEMPOTENT_ADD_COLUMN`('cloud.backups', 'checkpoint_create_time', 'BIGINT DEFAULT NULL COMMENT "Checkpoint creation timestamp from libvirt"'); CALL `cloud`.`IDEMPOTENT_ADD_COLUMN`('cloud.backups', 'host_id', 'BIGINT UNSIGNED DEFAULT NULL COMMENT "Host where backup is running"'); -CALL `cloud`.`IDEMPOTENT_ADD_COLUMN`('cloud.backups', 'nbd_port', 'INT DEFAULT NULL COMMENT "NBD server port for backup"'); -- Add checkpoint tracking fields to vm_instance table for domain recreation CALL `cloud`.`IDEMPOTENT_ADD_COLUMN`('cloud.vm_instance', 'active_checkpoint_id', 'VARCHAR(255) DEFAULT NULL COMMENT "Active checkpoint id tracked for incremental backups"'); @@ -139,10 +138,10 @@ CREATE TABLE IF NOT EXISTS `cloud`.`image_transfer`( `backup_id` bigint unsigned COMMENT 'Backup ID', `disk_id` bigint unsigned NOT NULL COMMENT 'Disk/Volume ID', `host_id` bigint unsigned NOT NULL COMMENT 'Host ID', - `nbd_port` int NOT NULL COMMENT 'NBD port', `transfer_url` varchar(255) COMMENT 'ImageIO transfer URL', `file` varchar(255) COMMENT 'File for the file backend', `phase` varchar(20) NOT NULL COMMENT 'Transfer phase: initializing, transferring, finished, failed', + `socket` varchar(255) COMMENT 'Unix socket for nbd backend', `direction` varchar(20) NOT NULL COMMENT 'Direction: upload, download', `backend` varchar(20) NOT NULL COMMENT 'Backend: nbd, file', `progress` int COMMENT 'Transfer progress percentage (0-100)', diff --git a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/LibvirtComputingResource.java b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/LibvirtComputingResource.java index dc137376f7c..dfba9ad1115 100644 --- a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/LibvirtComputingResource.java +++ b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/LibvirtComputingResource.java @@ -395,6 +395,7 @@ public class LibvirtComputingResource extends ServerResourceBase implements Serv private String heartBeatPath; private String vmActivityCheckPath; private String nasBackupPath; + private String imageServerPath; private String securityGroupPath; private String ovsPvlanDhcpHostPath; private String ovsPvlanVmPath; @@ -809,6 +810,10 @@ public class LibvirtComputingResource extends ServerResourceBase implements Serv return nasBackupPath; } + public String getImageServerPath() { + return imageServerPath; + } + public String getOvsPvlanDhcpHostPath() { return ovsPvlanDhcpHostPath; } @@ -1095,6 +1100,11 @@ public class LibvirtComputingResource extends ServerResourceBase implements Serv throw new ConfigurationException("Unable to find nasbackup.sh"); } + imageServerPath = Script.findScript(kvmScriptsDir, "image_server.py"); + if (imageServerPath == null) { + throw new ConfigurationException("Unable to find image_server.py"); + } + createTmplPath = Script.findScript(storageScriptsDir, "createtmplt.sh"); if (createTmplPath == null) { throw new ConfigurationException("Unable to find the createtmplt.sh"); diff --git a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtCreateImageTransferCommandWrapper.java b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtCreateImageTransferCommandWrapper.java index 1db594d169f..d3eca1aeb23 100644 --- a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtCreateImageTransferCommandWrapper.java +++ b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtCreateImageTransferCommandWrapper.java @@ -17,8 +17,16 @@ package com.cloud.hypervisor.kvm.resource.wrapper; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + import org.apache.cloudstack.backup.CreateImageTransferAnswer; import org.apache.cloudstack.backup.CreateImageTransferCommand; +import org.apache.cloudstack.backup.ImageTransfer; +import org.apache.cloudstack.storage.resource.IpTablesHelper; +import org.apache.commons.io.FileUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -26,36 +34,128 @@ import com.cloud.agent.api.Answer; import com.cloud.hypervisor.kvm.resource.LibvirtComputingResource; import com.cloud.resource.CommandWrapper; import com.cloud.resource.ResourceWrapper; +import com.cloud.utils.StringUtils; +import com.cloud.utils.script.Script; +import com.google.gson.GsonBuilder; @ResourceWrapper(handles = CreateImageTransferCommand.class) public class LibvirtCreateImageTransferCommandWrapper extends CommandWrapper { protected Logger logger = LogManager.getLogger(getClass()); - private CreateImageTransferAnswer handleUpload(CreateImageTransferCommand cmd) { - return new CreateImageTransferAnswer(cmd, false, "Image Upload is not handled by KVM agent"); - } + private boolean startImageServerIfNotRunning(int imageServerPort, LibvirtComputingResource resource) { + final String imageServerScript = resource.getImageServerPath(); + String unitName = "cloudstack-image-server"; - private CreateImageTransferAnswer handleDownload(CreateImageTransferCommand cmd) { - String exportName = cmd.getExportName(); - int nbdPort = cmd.getNbdPort(); - try { - String hostIpAddress = cmd.getHostIpAddress(); - String transferUrl = String.format("nbd://%s:%d/%s", hostIpAddress, nbdPort, exportName); - - return new CreateImageTransferAnswer(cmd, true, "Image transfer created for download", - cmd.getTransferId(), transferUrl); - - } catch (Exception e) { - return new CreateImageTransferAnswer(cmd, false, "Error creating image transfer: " + e.getMessage()); + Script checkScript = new Script("/bin/bash", logger); + checkScript.add("-c"); + checkScript.add(String.format("systemctl is-active --quiet %s", unitName)); + String checkResult = checkScript.execute(); + if (checkResult == null) { + return true; } + + String systemdRunCmd = String.format( + "systemd-run --unit=%s --property=Restart=no /usr/bin/python3 %s --listen 0.0.0.0 --port %d", + unitName, imageServerScript, imageServerPort); + + Script startScript = new Script("/bin/bash", logger); + startScript.add("-c"); + startScript.add(systemdRunCmd); + String startResult = startScript.execute(); + + if (startResult != null) { + logger.error(String.format("Failed to start the Image server: %s", startResult)); + return false; + } + + // Wait with timeout until the service is up + int maxWaitSeconds = 10; + int pollIntervalMs = 1000; + int maxAttempts = (maxWaitSeconds * 1000) / pollIntervalMs; + boolean serviceActive = false; + + for (int attempt = 0; attempt < maxAttempts; attempt++) { + Script verifyScript = new Script("/bin/bash", logger); + verifyScript.add("-c"); + verifyScript.add(String.format("systemctl is-active --quiet %s", unitName)); + String verifyResult = verifyScript.execute(); + if (verifyResult == null) { + serviceActive = true; + logger.info(String.format("Image server is now active (attempt %d)", attempt + 1)); + break; + } + try { + Thread.sleep(pollIntervalMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + } + + if (!serviceActive) { + logger.error(String.format("Image server failed to start within %d seconds", maxWaitSeconds)); + return false; + } + + String rule = String.format("-p tcp -m state --state NEW -m tcp --dport %d -j ACCEPT", imageServerPort); + IpTablesHelper.addConditionally(IpTablesHelper.INPUT_CHAIN, true, rule, + String.format("Error in opening up image server port %d", imageServerPort)); + + return true; } - @Override public Answer execute(CreateImageTransferCommand cmd, LibvirtComputingResource resource) { - if (cmd.getDirection().equals("download")) { - return handleDownload(cmd); - } else { - return handleUpload(cmd); + final String transferId = cmd.getTransferId(); + ImageTransfer.Backend backend = cmd.getBackend(); + + if (StringUtils.isBlank(transferId)) { + return new CreateImageTransferAnswer(cmd, false, "transferId is empty."); } + + final Map payload = new HashMap<>(); + payload.put("backend", backend.toString()); + + if (backend == ImageTransfer.Backend.file) { + final String filePath = cmd.getFile(); + if (StringUtils.isBlank(filePath)) { + return new CreateImageTransferAnswer(cmd, false, "file path is empty for file backend."); + } + payload.put("file", filePath); + } else { + String socket = cmd.getSocket(); + final String exportName = cmd.getExportName(); + if (StringUtils.isBlank(socket)) { + return new CreateImageTransferAnswer(cmd, false, "Empty socket."); + } + if (StringUtils.isBlank(exportName)) { + return new CreateImageTransferAnswer(cmd, false, "exportName is empty."); + } + payload.put("socket", "/tmp/imagetransfer/" + socket + ".sock"); + payload.put("export", exportName); + String checkpointId = cmd.getCheckpointId(); + if (checkpointId != null) { + payload.put("export_bitmap", exportName + "-" + checkpointId.substring(0, 4)); + } + } + + try { + final String json = new GsonBuilder().create().toJson(payload); + File dir = new File("/tmp/imagetransfer"); + if (!dir.exists()) { + dir.mkdirs(); + } + final File transferFile = new File("/tmp/imagetransfer", transferId); + FileUtils.writeStringToFile(transferFile, json, "UTF-8"); + + } catch (IOException e) { + logger.warn("Failed to prepare image transfer on KVM host", e); + return new CreateImageTransferAnswer(cmd, false, "Failed to prepare image transfer on KVM host: " + e.getMessage()); + } + + final int imageServerPort = 54323; + startImageServerIfNotRunning(imageServerPort, resource); + + final String transferUrl = String.format("http://%s:%d/images/%s", resource.getPrivateIp(), imageServerPort, transferId); + return new CreateImageTransferAnswer(cmd, true, "Image transfer prepared on KVM host.", transferId, transferUrl); } } diff --git a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtFinalizeImageTransferCommandWrapper.java b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtFinalizeImageTransferCommandWrapper.java new file mode 100644 index 00000000000..c2c9d7a797d --- /dev/null +++ b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtFinalizeImageTransferCommandWrapper.java @@ -0,0 +1,110 @@ +//Licensed to the Apache Software Foundation (ASF) under one +//or more contributor license agreements. See the NOTICE file +//distributed with this work for additional information +//regarding copyright ownership. The ASF licenses this file +//to you under the Apache License, Version 2.0 (the +//"License"); you may not use this file except in compliance +//the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +//Unless required by applicable law or agreed to in writing, +//software distributed under the License is distributed on an +//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +//KIND, either express or implied. See the License for the +//specific language governing permissions and limitations +//under the License. + +package com.cloud.hypervisor.kvm.resource.wrapper; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.stream.Stream; + +import org.apache.cloudstack.backup.FinalizeImageTransferCommand; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import com.cloud.agent.api.Answer; +import com.cloud.hypervisor.kvm.resource.LibvirtComputingResource; +import com.cloud.resource.CommandWrapper; +import com.cloud.resource.ResourceWrapper; +import com.cloud.utils.StringUtils; +import com.cloud.utils.script.Script; + +@ResourceWrapper(handles = FinalizeImageTransferCommand.class) +public class LibvirtFinalizeImageTransferCommandWrapper extends CommandWrapper { + protected Logger logger = LogManager.getLogger(getClass()); + private void resetService(String unitName) { + Script resetScript = new Script("/bin/bash", logger); + resetScript.add("-c"); + resetScript.add(String.format("systemctl reset-failed %s || true", unitName)); + resetScript.execute(); + } + + private boolean stopImageServer() { + String unitName = "cloudstack-image-server"; + final int imageServerPort = 54323; + + Script checkScript = new Script("/bin/bash", logger); + checkScript.add("-c"); + checkScript.add(String.format("systemctl is-active --quiet %s", unitName)); + String checkResult = checkScript.execute(); + if (checkResult != null) { + logger.info(String.format("Image server not running, resetting failed state")); + resetService(unitName); + // Still try to remove firewall rule in case it exists + removeFirewallRule(imageServerPort); + return true; + } + + Script stopScript = new Script("/bin/bash", logger); + stopScript.add("-c"); + stopScript.add(String.format("systemctl stop %s", unitName)); + stopScript.execute(); + resetService(unitName); + logger.info(String.format("Image server %s stopped", unitName)); + + removeFirewallRule(imageServerPort); + + return true; + } + + private void removeFirewallRule(int port) { + String rule = String.format("-p tcp -m state --state NEW -m tcp --dport %d -j ACCEPT", port); + Script removeScript = new Script("/bin/bash", logger); + removeScript.add("-c"); + removeScript.add(String.format("iptables -D INPUT %s || true", rule)); + String result = removeScript.execute(); + if (result != null && !result.isEmpty() && !result.contains("iptables: Bad rule")) { + logger.debug(String.format("Firewall rule removal result for port %d: %s", port, result)); + } else { + logger.info(String.format("Firewall rule removed for port %d (or did not exist)", port)); + } + } + + public Answer execute(FinalizeImageTransferCommand cmd, LibvirtComputingResource resource) { + final String transferId = cmd.getTransferId(); + if (StringUtils.isBlank(transferId)) { + return new Answer(cmd, false, "transferId is empty."); + } + + final File transferFile = new File("/tmp/imagetransfer", transferId); + if (transferFile.exists() && !transferFile.delete()) { + return new Answer(cmd, false, "Failed to delete transfer config file: " + transferFile.getAbsolutePath()); + } + + try (Stream stream = Files.list(Paths.get("/tmp/imagetransfer"))) { + if (!stream.findAny().isPresent()) { + stopImageServer(); + } + } catch (IOException e) { + logger.warn("Failed to list /tmp/imagetransfer", e); + } + + return new Answer(cmd, true, "Image transfer finalized."); + } +} diff --git a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtStartBackupCommandWrapper.java b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtStartBackupCommandWrapper.java index bc3faa04493..04416559c57 100644 --- a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtStartBackupCommandWrapper.java +++ b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtStartBackupCommandWrapper.java @@ -39,7 +39,7 @@ public class LibvirtStartBackupCommandWrapper extends CommandWrapper\n"); @@ -149,7 +154,8 @@ public class LibvirtStartBackupCommandWrapper extends CommandWrapper").append(fromCheckpointId).append("\n"); } - xml.append(String.format(" \n", cmd.getHostIpAddress(), nbdPort)); + xml.append(String.format(" \n", socket)); + xml.append(" \n"); Map diskPathUuidMap = cmd.getDiskPathUuidMap(); @@ -185,7 +191,7 @@ public class LibvirtStartBackupCommandWrapper extends CommandWrapper"; } - private Answer handleStoppedVmBackup(StartBackupCommand cmd, LibvirtComputingResource resource, String toCheckpointId) { + private Answer handleStoppedVmBackup(StartBackupCommand cmd, String toCheckpointId) { String vmName = cmd.getVmName(); Map diskPathUuidMap = cmd.getDiskPathUuidMap(); for (Map.Entry entry : diskPathUuidMap.entrySet()) { diff --git a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtStartNBDServerCommandWrapper.java b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtStartNBDServerCommandWrapper.java index 7a8588809df..71d9a06a360 100644 --- a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtStartNBDServerCommandWrapper.java +++ b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtStartNBDServerCommandWrapper.java @@ -17,6 +17,8 @@ package com.cloud.hypervisor.kvm.resource.wrapper; +import java.io.File; + import org.apache.cloudstack.backup.StartNBDServerAnswer; import org.apache.cloudstack.backup.StartNBDServerCommand; import org.apache.logging.log4j.Logger; @@ -26,6 +28,7 @@ import com.cloud.agent.api.Answer; import com.cloud.hypervisor.kvm.resource.LibvirtComputingResource; import com.cloud.resource.CommandWrapper; import com.cloud.resource.ResourceWrapper; +import com.cloud.utils.StringUtils; import com.cloud.utils.script.Script; @ResourceWrapper(handles = StartNBDServerCommand.class) @@ -35,22 +38,25 @@ public class LibvirtStartNBDServerCommandWrapper extends CommandWrapper backend mapping: +# CloudStack writes a JSON file at /tmp/imagetransfer/ with: +# - NBD backend: {"backend": "nbd", "socket": "/tmp/imagetransfer/.sock", "export": "vda", "export_bitmap": "..."} +# - File backend: {"backend": "file", "file": "/path/to/image.qcow2"} +# +# This server reads that file on-demand. +_CFG_DIR = "/tmp/imagetransfer" +_CFG_CACHE: Dict[str, Tuple[float, Dict[str, Any]]] = {} +_CFG_CACHE_GUARD = threading.Lock() + + +def _json_bytes(obj: Any) -> bytes: + return json.dumps(obj, separators=(",", ":"), ensure_ascii=False).encode("utf-8") + + +def _merge_dirty_zero_extents( + allocation_extents: List[Tuple[int, int, bool]], + dirty_extents: List[Tuple[int, int, bool]], + size: int, +) -> List[Dict[str, Any]]: + """ + Merge allocation (start, length, zero) and dirty (start, length, dirty) extents + into a single list of {start, length, dirty, zero} with unified boundaries. + """ + boundaries: Set[int] = {0, size} + for start, length, _ in allocation_extents: + boundaries.add(start) + boundaries.add(start + length) + for start, length, _ in dirty_extents: + boundaries.add(start) + boundaries.add(start + length) + sorted_boundaries = sorted(boundaries) + + def lookup( + extents: List[Tuple[int, int, bool]], offset: int, default: bool + ) -> bool: + for start, length, flag in extents: + if start <= offset < start + length: + return flag + return default + + result: List[Dict[str, Any]] = [] + for i in range(len(sorted_boundaries) - 1): + a, b = sorted_boundaries[i], sorted_boundaries[i + 1] + if a >= b: + continue + result.append( + { + "start": a, + "length": b - a, + "dirty": lookup(dirty_extents, a, False), + "zero": lookup(allocation_extents, a, False), + } + ) + return result + + +def _is_fallback_dirty_response(extents: List[Dict[str, Any]]) -> bool: + """True if extents is the single-extent fallback (dirty=false, zero=false).""" + return ( + len(extents) == 1 + and extents[0].get("dirty") is False + and extents[0].get("zero") is False + ) + + +def _get_image_lock(image_id: str) -> threading.Lock: + with _IMAGE_LOCKS_GUARD: + lock = _IMAGE_LOCKS.get(image_id) + if lock is None: + lock = threading.Lock() + _IMAGE_LOCKS[image_id] = lock + return lock + + +def _now_s() -> float: + return time.monotonic() + + +def _safe_transfer_id(image_id: str) -> Optional[str]: + """ + Only allow a single filename component to avoid path traversal. + We intentionally keep validation simple: reject anything containing '/' or '\\'. + """ + if not image_id: + return None + if image_id != os.path.basename(image_id): + return None + if "/" in image_id or "\\" in image_id: + return None + if image_id in (".", ".."): + return None + return image_id + + +def _load_image_cfg(image_id: str) -> Optional[Dict[str, Any]]: + safe_id = _safe_transfer_id(image_id) + if safe_id is None: + return None + + cfg_path = os.path.join(_CFG_DIR, safe_id) + try: + st = os.stat(cfg_path) + except FileNotFoundError: + return None + except OSError as e: + logging.error("cfg stat failed image_id=%s err=%r", image_id, e) + return None + + with _CFG_CACHE_GUARD: + cached = _CFG_CACHE.get(safe_id) + if cached is not None: + cached_mtime, cached_cfg = cached + # Use cached config if the file hasn't changed. + if float(st.st_mtime) == float(cached_mtime): + return cached_cfg + + try: + with open(cfg_path, "rb") as f: + raw = f.read(4096) + except OSError as e: + logging.error("cfg read failed image_id=%s err=%r", image_id, e) + return None + + try: + obj = json.loads(raw.decode("utf-8")) + except Exception as e: + logging.error("cfg parse failed image_id=%s err=%r", image_id, e) + return None + + if not isinstance(obj, dict): + logging.error("cfg invalid type image_id=%s type=%s", image_id, type(obj).__name__) + return None + + backend = obj.get("backend") + if backend is None: + backend = "nbd" + if not isinstance(backend, str): + logging.error("cfg invalid backend type image_id=%s", image_id) + return None + backend = backend.lower() + if backend not in ("nbd", "file"): + logging.error("cfg unsupported backend image_id=%s backend=%s", image_id, backend) + return None + + if backend == "file": + file_path = obj.get("file") + if not isinstance(file_path, str) or not file_path.strip(): + logging.error("cfg missing/invalid file path for file backend image_id=%s", image_id) + return None + cfg = {"backend": "file", "file": file_path.strip()} + else: + socket_path = obj.get("socket") + export = obj.get("export") + export_bitmap = obj.get("export_bitmap") + if not isinstance(socket_path, str) or not socket_path.strip(): + logging.error("cfg missing/invalid socket path for nbd backend image_id=%s", image_id) + return None + socket_path = socket_path.strip() + if export is not None and (not isinstance(export, str) or not export): + logging.error("cfg missing/invalid export image_id=%s", image_id) + return None + cfg = { + "backend": "nbd", + "socket": socket_path, + "export": export, + "export_bitmap": export_bitmap, + } + + with _CFG_CACHE_GUARD: + _CFG_CACHE[safe_id] = (float(st.st_mtime), cfg) + return cfg + + +class _NbdConn: + """ + Small helper to connect to NBD over a Unix socket. + Opens a fresh handle per request, per POC requirements. + """ + + def __init__( + self, + socket_path: str, + export: Optional[str], + need_block_status: bool = False, + extra_meta_contexts: Optional[List[str]] = None, + ): + self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self._sock.connect(socket_path) + self._nbd = nbd.NBD() + + # Select export name if supported/needed. + if export and hasattr(self._nbd, "set_export_name"): + self._nbd.set_export_name(export) + + # Request meta contexts before connect (for block status / dirty bitmap). + if need_block_status and hasattr(self._nbd, "add_meta_context"): + for ctx in ["base:allocation"] + (extra_meta_contexts or []): + try: + self._nbd.add_meta_context(ctx) + except Exception as e: + logging.warning("add_meta_context %r failed: %r", ctx, e) + + self._connect_existing_socket(self._sock) + + def _connect_existing_socket(self, sock: socket.socket) -> None: + # Requirement: attach libnbd to an existing socket / FD (no qemu-nbd). + # libnbd python API varies slightly by version, so try common options. + last_err: Optional[BaseException] = None + if hasattr(self._nbd, "connect_socket"): + try: + self._nbd.connect_socket(sock) + return + except Exception as e: # pragma: no cover (depends on binding) + last_err = e + try: + self._nbd.connect_socket(sock.fileno()) + return + except Exception as e2: # pragma: no cover + last_err = e2 + if hasattr(self._nbd, "connect_fd"): + try: + self._nbd.connect_fd(sock.fileno()) + return + except Exception as e: # pragma: no cover + last_err = e + raise RuntimeError( + "Unable to connect libnbd using existing socket/fd; " + f"binding missing connect_socket/connect_fd or call failed: {last_err!r}" + ) + + def size(self) -> int: + return int(self._nbd.get_size()) + + def get_capabilities(self) -> Dict[str, bool]: + """ + Query NBD export capabilities (read_only, can_flush, can_zero) from the + server handshake. Returns dict with keys read_only, can_flush, can_zero. + Uses getattr for binding name variations (is_read_only/get_read_only, etc.). + """ + out: Dict[str, bool] = { + "read_only": True, + "can_flush": False, + "can_zero": False, + } + for name, keys in [ + ("read_only", ("is_read_only", "get_read_only")), + ("can_flush", ("can_flush", "get_can_flush")), + ("can_zero", ("can_zero", "get_can_zero")), + ]: + for attr in keys: + if hasattr(self._nbd, attr): + try: + val = getattr(self._nbd, attr)() + out[name] = bool(val) + except Exception: + pass + break + return out + + def pread(self, length: int, offset: int) -> bytes: + # Expected signature: pread(length, offset) + try: + return self._nbd.pread(length, offset) + except TypeError: # pragma: no cover (binding differences) + return self._nbd.pread(offset, length) + + def pwrite(self, buf: bytes, offset: int) -> None: + # Expected signature: pwrite(buf, offset) + try: + self._nbd.pwrite(buf, offset) + except TypeError: # pragma: no cover (binding differences) + self._nbd.pwrite(offset, buf) + + def pzero(self, offset: int, size: int) -> None: + """ + Zero a byte range. Uses NBD WRITE_ZEROES when available (efficient/punch hole), + otherwise falls back to writing zero bytes via pwrite. + """ + if size <= 0: + return + # Try libnbd pwrite_zeros / zero; argument order varies by binding. + for name in ("pwrite_zeros", "zero"): + if not hasattr(self._nbd, name): + continue + fn = getattr(self._nbd, name) + try: + fn(size, offset) + return + except TypeError: + try: + fn(offset, size) + return + except TypeError: + pass + # Fallback: write zeros in chunks. + remaining = size + pos = offset + zero_buf = b"\x00" * min(CHUNK_SIZE, size) + while remaining > 0: + chunk = min(len(zero_buf), remaining) + self.pwrite(zero_buf[:chunk], pos) + pos += chunk + remaining -= chunk + + def flush(self) -> None: + if hasattr(self._nbd, "flush"): + self._nbd.flush() + return + if hasattr(self._nbd, "fsync"): + self._nbd.fsync() + return + raise RuntimeError("libnbd binding has no flush/fsync method") + + def get_zero_extents(self) -> List[Dict[str, Any]]: + """ + Query NBD block status (base:allocation) and return extents that are + hole or zero in imageio format: [{"start": ..., "length": ..., "zero": true}, ...]. + Returns [] if block status is not supported; fallback to one full-image + zero extent when we have size but block status fails. + """ + size = self.size() + if size == 0: + return [] + + if not hasattr(self._nbd, "block_status") and not hasattr( + self._nbd, "block_status_64" + ): + logging.error("get_zero_extents: no block_status/block_status_64") + return self._fallback_zero_extent(size) + if hasattr(self._nbd, "can_meta_context") and not self._nbd.can_meta_context( + "base:allocation" + ): + logging.error( + "get_zero_extents: server did not negotiate base:allocation" + ) + return self._fallback_zero_extent(size) + + zero_extents: List[Dict[str, Any]] = [] + chunk = min(size, 64 * 1024 * 1024) # 64 MiB + offset = 0 + + def extent_cb(*args: Any, **kwargs: Any) -> int: + # Binding typically passes (metacontext, offset, entries[, nr_entries][, error]). + metacontext = None + off = 0 + entries = None + if len(args) >= 3: + metacontext, off, entries = args[0], args[1], args[2] + else: + for a in args: + if isinstance(a, str): + metacontext = a + elif isinstance(a, int): + off = a + elif a is not None and hasattr(a, "__iter__"): + entries = a + if metacontext != "base:allocation" or entries is None: + return 0 + current = off + try: + flat = list(entries) + for i in range(0, len(flat), 2): + if i + 1 >= len(flat): + break + length = int(flat[i]) + flags = int(flat[i + 1]) + if (flags & (_NBD_STATE_HOLE | _NBD_STATE_ZERO)) != 0: + zero_extents.append( + {"start": current, "length": length, "zero": True} + ) + current += length + except (TypeError, ValueError, IndexError): + pass + return 0 + + block_status_fn = getattr( + self._nbd, "block_status_64", getattr(self._nbd, "block_status", None) + ) + if block_status_fn is None: + return self._fallback_zero_extent(size) + + try: + while offset < size: + count = min(chunk, size - offset) + # Try (count, offset, callback) then (offset, count, callback) + try: + block_status_fn(count, offset, extent_cb) + except TypeError: + block_status_fn(offset, count, extent_cb) + offset += count + except Exception as e: + logging.error("get_zero_extents block_status failed: %r", e) + return self._fallback_zero_extent(size) + if not zero_extents: + return self._fallback_zero_extent(size) + return zero_extents + + def _fallback_zero_extent(self, size: int) -> List[Dict[str, Any]]: + """Return one zero extent covering the whole image when block status unavailable.""" + return [{"start": 0, "length": size, "zero": True}] + + def get_allocation_extents(self) -> List[Dict[str, Any]]: + """ + Query base:allocation and return all extents (allocated and hole/zero) + as [{"start": ..., "length": ..., "zero": bool}, ...]. + Fallback when block status unavailable: one extent with zero=False. + """ + size = self.size() + if size == 0: + return [] + if not hasattr(self._nbd, "block_status") and not hasattr( + self._nbd, "block_status_64" + ): + return [{"start": 0, "length": size, "zero": False}] + if hasattr(self._nbd, "can_meta_context") and not self._nbd.can_meta_context( + "base:allocation" + ): + return [{"start": 0, "length": size, "zero": False}] + + allocation_extents: List[Dict[str, Any]] = [] + chunk = min(size, 64 * 1024 * 1024) + offset = 0 + + def extent_cb(*args: Any, **kwargs: Any) -> int: + if len(args) < 3: + return 0 + metacontext, off, entries = args[0], args[1], args[2] + if metacontext != "base:allocation" or entries is None: + return 0 + current = off + try: + flat = list(entries) + for i in range(0, len(flat), 2): + if i + 1 >= len(flat): + break + length = int(flat[i]) + flags = int(flat[i + 1]) + zero = (flags & (_NBD_STATE_HOLE | _NBD_STATE_ZERO)) != 0 + allocation_extents.append( + {"start": current, "length": length, "zero": zero} + ) + current += length + except (TypeError, ValueError, IndexError): + pass + return 0 + + block_status_fn = getattr( + self._nbd, "block_status_64", getattr(self._nbd, "block_status", None) + ) + if block_status_fn is None: + return [{"start": 0, "length": size, "zero": False}] + try: + while offset < size: + count = min(chunk, size - offset) + try: + block_status_fn(count, offset, extent_cb) + except TypeError: + block_status_fn(offset, count, extent_cb) + offset += count + except Exception as e: + logging.warning("get_allocation_extents block_status failed: %r", e) + return [{"start": 0, "length": size, "zero": False}] + if not allocation_extents: + return [{"start": 0, "length": size, "zero": False}] + return allocation_extents + + def get_extents_dirty_and_zero( + self, dirty_bitmap_context: str + ) -> List[Dict[str, Any]]: + """ + Query block status for base:allocation and qemu:dirty-bitmap:, + merge boundaries, and return extents with dirty and zero flags. + Format: [{"start": ..., "length": ..., "dirty": bool, "zero": bool}, ...]. + """ + size = self.size() + if size == 0: + return [] + if not hasattr(self._nbd, "block_status") and not hasattr( + self._nbd, "block_status_64" + ): + return self._fallback_dirty_zero_extents(size) + if hasattr(self._nbd, "can_meta_context"): + if not self._nbd.can_meta_context("base:allocation"): + return self._fallback_dirty_zero_extents(size) + if not self._nbd.can_meta_context(dirty_bitmap_context): + logging.warning( + "dirty bitmap context %r not negotiated", dirty_bitmap_context + ) + return self._fallback_dirty_zero_extents(size) + + allocation_extents: List[Tuple[int, int, bool]] = [] # (start, length, zero) + dirty_extents: List[Tuple[int, int, bool]] = [] # (start, length, dirty) + chunk = min(size, 64 * 1024 * 1024) + offset = 0 + + def extent_cb(*args: Any, **kwargs: Any) -> int: + if len(args) < 3: + return 0 + metacontext, off, entries = args[0], args[1], args[2] + if entries is None or not hasattr(entries, "__iter__"): + return 0 + current = off + try: + flat = list(entries) + for i in range(0, len(flat), 2): + if i + 1 >= len(flat): + break + length = int(flat[i]) + flags = int(flat[i + 1]) + if metacontext == "base:allocation": + zero = (flags & (_NBD_STATE_HOLE | _NBD_STATE_ZERO)) != 0 + allocation_extents.append((current, length, zero)) + elif metacontext == dirty_bitmap_context: + dirty = (flags & _NBD_STATE_DIRTY) != 0 + dirty_extents.append((current, length, dirty)) + current += length + except (TypeError, ValueError, IndexError): + pass + return 0 + + block_status_fn = getattr( + self._nbd, "block_status_64", getattr(self._nbd, "block_status", None) + ) + if block_status_fn is None: + return self._fallback_dirty_zero_extents(size) + try: + while offset < size: + count = min(chunk, size - offset) + try: + block_status_fn(count, offset, extent_cb) + except TypeError: + block_status_fn(offset, count, extent_cb) + offset += count + except Exception as e: + logging.warning("get_extents_dirty_and_zero block_status failed: %r", e) + return self._fallback_dirty_zero_extents(size) + return _merge_dirty_zero_extents(allocation_extents, dirty_extents, size) + + def _fallback_dirty_zero_extents(self, size: int) -> List[Dict[str, Any]]: + """One extent: whole image, dirty=false, zero=false when bitmap unavailable.""" + return [{"start": 0, "length": size, "dirty": False, "zero": False}] + + def close(self) -> None: + # Best-effort; bindings may differ. + try: + if hasattr(self._nbd, "shutdown"): + self._nbd.shutdown() + except Exception: + pass + try: + if hasattr(self._nbd, "close"): + self._nbd.close() + except Exception: + pass + try: + self._sock.close() + except Exception: + pass + + def __enter__(self) -> "_NbdConn": + return self + + def __exit__(self, exc_type, exc, tb) -> None: + self.close() + + +class Handler(BaseHTTPRequestHandler): + server_version = "imageio-poc/0.1" + + # Keep BaseHTTPRequestHandler from printing noisy default logs + def log_message(self, fmt: str, *args: Any) -> None: + logging.info("%s - - %s", self.address_string(), fmt % args) + + def _send_imageio_headers( + self, allowed_methods: Optional[str] = None + ) -> None: + # Include these headers for compatibility with the imageio contract. + if allowed_methods is None: + allowed_methods = "GET, PUT, OPTIONS" + self.send_header("Access-Control-Allow-Methods", allowed_methods) + self.send_header("Accept-Ranges", "bytes") + + def _send_json( + self, + status: int, + obj: Any, + allowed_methods: Optional[str] = None, + ) -> None: + body = _json_bytes(obj) + self.send_response(status) + self._send_imageio_headers(allowed_methods) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + try: + self.wfile.write(body) + except BrokenPipeError: + pass + + def _send_error_json(self, status: int, message: str) -> None: + self._send_json(status, {"error": message}) + + def _send_range_not_satisfiable(self, size: int) -> None: + # RFC 7233: reply with Content-Range: bytes */ + self.send_response(HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE) + self._send_imageio_headers() + self.send_header("Content-Type", "application/json") + self.send_header("Content-Range", f"bytes */{size}") + body = _json_bytes({"error": "range not satisfiable"}) + self.send_header("Content-Length", str(len(body))) + self.end_headers() + try: + self.wfile.write(body) + except BrokenPipeError: + pass + + def _parse_single_range(self, range_header: str, size: int) -> Tuple[int, int]: + """ + Parse a single HTTP byte range (RFC 7233) and return (start, end_inclusive). + + Supported: + - Range: bytes=START-END + - Range: bytes=START- + - Range: bytes=-SUFFIX + + Raises ValueError for invalid headers. Caller handles 416 vs 400. + """ + if size < 0: + raise ValueError("invalid size") + if not range_header: + raise ValueError("empty Range") + if "," in range_header: + raise ValueError("multiple ranges not supported") + + prefix = "bytes=" + if not range_header.startswith(prefix): + raise ValueError("only bytes ranges supported") + spec = range_header[len(prefix) :].strip() + if "-" not in spec: + raise ValueError("invalid bytes range") + + left, right = spec.split("-", 1) + left = left.strip() + right = right.strip() + + if left == "": + # Suffix range: last N bytes. + if right == "": + raise ValueError("invalid suffix range") + try: + suffix_len = int(right, 10) + except ValueError as e: + raise ValueError("invalid suffix length") from e + if suffix_len <= 0: + raise ValueError("invalid suffix length") + if size == 0: + # Nothing to serve + raise ValueError("unsatisfiable") + if suffix_len >= size: + return 0, size - 1 + return size - suffix_len, size - 1 + + # START is present + try: + start = int(left, 10) + except ValueError as e: + raise ValueError("invalid range start") from e + if start < 0: + raise ValueError("invalid range start") + if start >= size: + raise ValueError("unsatisfiable") + + if right == "": + # START- + return start, size - 1 + + try: + end = int(right, 10) + except ValueError as e: + raise ValueError("invalid range end") from e + if end < start: + raise ValueError("unsatisfiable") + if end >= size: + end = size - 1 + return start, end + + def _parse_route(self) -> Tuple[Optional[str], Optional[str]]: + # Returns (image_id, tail) where tail is: + # None => /images/{id} + # "extents" => /images/{id}/extents + # "flush" => /images/{id}/flush + path = self.path.split("?", 1)[0] + parts = [p for p in path.split("/") if p] + if len(parts) < 2 or parts[0] != "images": + return None, None + image_id = parts[1] + tail = parts[2] if len(parts) >= 3 else None + if len(parts) > 3: + return None, None + return image_id, tail + + def _parse_query(self) -> Dict[str, List[str]]: + """Parse query string from self.path into a dict of name -> list of values.""" + if "?" not in self.path: + return {} + query = self.path.split("?", 1)[1] + return parse_qs(query, keep_blank_values=True) + + def _image_cfg(self, image_id: str) -> Optional[Dict[str, Any]]: + return _load_image_cfg(image_id) + + def _is_file_backend(self, cfg: Dict[str, Any]) -> bool: + return cfg.get("backend") == "file" + + def do_OPTIONS(self) -> None: + image_id, tail = self._parse_route() + if image_id is None or tail is not None: + self._send_error_json(HTTPStatus.NOT_FOUND, "not found") + return + cfg = self._image_cfg(image_id) + if cfg is None: + self._send_error_json(HTTPStatus.NOT_FOUND, "unknown image_id") + return + if self._is_file_backend(cfg): + # File backend: full PUT only, no range writes; GET with ranges allowed; flush supported. + allowed_methods = "GET, PUT, POST, OPTIONS" + features = ["flush"] + max_writers = MAX_PARALLEL_WRITES + response = { + "unix_socket": None, + "features": features, + "max_readers": MAX_PARALLEL_READS, + "max_writers": max_writers, + } + self._send_json(HTTPStatus.OK, response, allowed_methods=allowed_methods) + return + # Query NBD backend for capabilities (like nbdinfo); fall back to config. + read_only = True + can_flush = False + can_zero = False + try: + with _NbdConn( + cfg["socket"], + cfg.get("export"), + ) as conn: + caps = conn.get_capabilities() + read_only = caps["read_only"] + can_flush = caps["can_flush"] + can_zero = caps["can_zero"] + except Exception as e: + logging.warning("OPTIONS: could not query NBD capabilities: %r", e) + read_only = bool(cfg.get("read_only")) + if not read_only: + can_flush = True + can_zero = True + # Report options for this image from NBD: read-only => no PUT; only advertise supported features. + if read_only: + allowed_methods = "GET, OPTIONS" + features = ["extents"] + max_writers = 0 + else: + # PATCH: JSON (zero/flush) and Range+binary (write byte range). + allowed_methods = "GET, PUT, PATCH, OPTIONS" + features = ["extents"] + if can_zero: + features.append("zero") + if can_flush: + features.append("flush") + max_writers = MAX_PARALLEL_WRITES if not read_only else 0 + response = { + "unix_socket": None, # Not used in this implementation + "features": features, + "max_readers": MAX_PARALLEL_READS, + "max_writers": max_writers, + } + self._send_json(HTTPStatus.OK, response, allowed_methods=allowed_methods) + + def do_GET(self) -> None: + image_id, tail = self._parse_route() + if image_id is None: + self._send_error_json(HTTPStatus.NOT_FOUND, "not found") + return + + cfg = self._image_cfg(image_id) + if cfg is None: + self._send_error_json(HTTPStatus.NOT_FOUND, "unknown image_id") + return + + if tail == "extents": + if self._is_file_backend(cfg): + self._send_error_json( + HTTPStatus.BAD_REQUEST, "extents not supported for file backend" + ) + return + query = self._parse_query() + context = (query.get("context") or [None])[0] + self._handle_get_extents(image_id, cfg, context=context) + return + if tail is not None: + self._send_error_json(HTTPStatus.NOT_FOUND, "not found") + return + + range_header = self.headers.get("Range") + self._handle_get_image(image_id, cfg, range_header) + + def do_PUT(self) -> None: + image_id, tail = self._parse_route() + if image_id is None or tail is not None: + self._send_error_json(HTTPStatus.NOT_FOUND, "not found") + return + + cfg = self._image_cfg(image_id) + if cfg is None: + self._send_error_json(HTTPStatus.NOT_FOUND, "unknown image_id") + return + + if self.headers.get("Range") is not None or self.headers.get("Content-Range") is not None: + self._send_error_json( + HTTPStatus.BAD_REQUEST, "Range/Content-Range not supported; full writes only" + ) + return + + content_length_hdr = self.headers.get("Content-Length") + if content_length_hdr is None: + self._send_error_json(HTTPStatus.BAD_REQUEST, "Content-Length required") + return + try: + content_length = int(content_length_hdr) + except ValueError: + self._send_error_json(HTTPStatus.BAD_REQUEST, "Invalid Content-Length") + return + if content_length < 0: + self._send_error_json(HTTPStatus.BAD_REQUEST, "Invalid Content-Length") + return + + self._handle_put_image(image_id, cfg, content_length) + + def do_POST(self) -> None: + image_id, tail = self._parse_route() + if image_id is None: + self._send_error_json(HTTPStatus.NOT_FOUND, "not found") + return + + cfg = self._image_cfg(image_id) + if cfg is None: + self._send_error_json(HTTPStatus.NOT_FOUND, "unknown image_id") + return + + if tail == "flush": + self._handle_post_flush(image_id, cfg) + return + self._send_error_json(HTTPStatus.NOT_FOUND, "not found") + + def do_PATCH(self) -> None: + image_id, tail = self._parse_route() + if image_id is None or tail is not None: + self._send_error_json(HTTPStatus.NOT_FOUND, "not found") + return + + cfg = self._image_cfg(image_id) + if cfg is None: + self._send_error_json(HTTPStatus.NOT_FOUND, "unknown image_id") + return + if self._is_file_backend(cfg): + self._send_error_json( + HTTPStatus.BAD_REQUEST, + "range writes and PATCH not supported for file backend; use PUT for full upload", + ) + return + + content_type = self.headers.get("Content-Type", "").split(";")[0].strip().lower() + range_header = self.headers.get("Range") + + # Binary PATCH: Range + body writes bytes at that range (e.g. curl -X PATCH -H "Range: bytes=0-1048576" --data-binary @chunk.bin). + if range_header is not None and content_type != "application/json": + content_length_hdr = self.headers.get("Content-Length") + if content_length_hdr is None: + self._send_error_json(HTTPStatus.BAD_REQUEST, "Content-Length required") + return + try: + content_length = int(content_length_hdr) + except ValueError: + self._send_error_json(HTTPStatus.BAD_REQUEST, "Invalid Content-Length") + return + if content_length <= 0: + self._send_error_json(HTTPStatus.BAD_REQUEST, "Content-Length must be positive") + return + self._handle_patch_range(image_id, cfg, range_header, content_length) + return + + # JSON PATCH: application/json with op (zero, flush). + if content_type != "application/json": + self._send_error_json( + HTTPStatus.UNSUPPORTED_MEDIA_TYPE, + "PATCH requires Content-Type: application/json (for zero/flush) or Range with binary body", + ) + return + + content_length_hdr = self.headers.get("Content-Length") + if content_length_hdr is None: + self._send_error_json(HTTPStatus.BAD_REQUEST, "Content-Length required") + return + try: + content_length = int(content_length_hdr) + except ValueError: + self._send_error_json(HTTPStatus.BAD_REQUEST, "Invalid Content-Length") + return + if content_length <= 0 or content_length > 64 * 1024: + self._send_error_json(HTTPStatus.BAD_REQUEST, "Invalid Content-Length") + return + + body = self.rfile.read(content_length) + if len(body) != content_length: + self._send_error_json(HTTPStatus.BAD_REQUEST, "request body truncated") + return + + try: + payload = json.loads(body.decode("utf-8")) + except (json.JSONDecodeError, UnicodeDecodeError) as e: + self._send_error_json(HTTPStatus.BAD_REQUEST, f"invalid JSON: {e}") + return + + if not isinstance(payload, dict): + self._send_error_json(HTTPStatus.BAD_REQUEST, "body must be a JSON object") + return + + op = payload.get("op") + if op == "flush": + # Flush entire image; offset and size are ignored (per spec). + self._handle_post_flush(image_id, cfg) + return + if op != "zero": + self._send_error_json( + HTTPStatus.BAD_REQUEST, + "unsupported op; only \"zero\" and \"flush\" are supported", + ) + return + + try: + size = int(payload.get("size")) + except (TypeError, ValueError): + self._send_error_json(HTTPStatus.BAD_REQUEST, "missing or invalid \"size\"") + return + if size <= 0: + self._send_error_json(HTTPStatus.BAD_REQUEST, "\"size\" must be positive") + return + + offset = payload.get("offset") + if offset is None: + offset = 0 + else: + try: + offset = int(offset) + except (TypeError, ValueError): + self._send_error_json(HTTPStatus.BAD_REQUEST, "invalid \"offset\"") + return + if offset < 0: + self._send_error_json(HTTPStatus.BAD_REQUEST, "\"offset\" must be non-negative") + return + + flush = bool(payload.get("flush", False)) + + self._handle_patch_zero(image_id, cfg, offset=offset, size=size, flush=flush) + + def _handle_get_image( + self, image_id: str, cfg: Dict[str, Any], range_header: Optional[str] + ) -> None: + lock = _get_image_lock(image_id) + if not lock.acquire(blocking=False): + self._send_error_json(HTTPStatus.CONFLICT, "image busy") + return + + if not _READ_SEM.acquire(blocking=False): + lock.release() + self._send_error_json(HTTPStatus.SERVICE_UNAVAILABLE, "too many parallel reads") + return + + start = _now_s() + bytes_sent = 0 + try: + logging.info("GET start image_id=%s range=%s", image_id, range_header or "-") + if self._is_file_backend(cfg): + file_path = cfg["file"] + try: + size = os.path.getsize(file_path) + except OSError as e: + logging.error("GET file size error image_id=%s path=%s err=%r", image_id, file_path, e) + self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "failed to access file") + return + start_off = 0 + end_off_incl = size - 1 if size > 0 else -1 + status = HTTPStatus.OK + content_length = size + if range_header is not None: + try: + start_off, end_off_incl = self._parse_single_range(range_header, size) + except ValueError as e: + if str(e) == "unsatisfiable": + self._send_range_not_satisfiable(size) + return + if "unsatisfiable" in str(e): + self._send_range_not_satisfiable(size) + return + self._send_error_json(HTTPStatus.BAD_REQUEST, "invalid Range header") + return + status = HTTPStatus.PARTIAL_CONTENT + content_length = (end_off_incl - start_off) + 1 + + self.send_response(status) + self._send_imageio_headers() + self.send_header("Content-Type", "application/octet-stream") + self.send_header("Content-Length", str(content_length)) + if status == HTTPStatus.PARTIAL_CONTENT: + self.send_header("Content-Range", f"bytes {start_off}-{end_off_incl}/{size}") + self.end_headers() + + offset = start_off + end_excl = end_off_incl + 1 + with open(file_path, "rb") as f: + f.seek(offset) + while offset < end_excl: + to_read = min(CHUNK_SIZE, end_excl - offset) + data = f.read(to_read) + if not data: + break + try: + self.wfile.write(data) + except BrokenPipeError: + logging.info("GET client disconnected image_id=%s at=%d", image_id, offset) + break + offset += len(data) + bytes_sent += len(data) + else: + with _NbdConn(cfg["socket"], cfg.get("export")) as conn: + size = conn.size() + + start_off = 0 + end_off_incl = size - 1 if size > 0 else -1 + status = HTTPStatus.OK + content_length = size + if range_header is not None: + try: + start_off, end_off_incl = self._parse_single_range(range_header, size) + except ValueError as e: + if str(e) == "unsatisfiable": + self._send_range_not_satisfiable(size) + return + if "unsatisfiable" in str(e): + self._send_range_not_satisfiable(size) + return + self._send_error_json(HTTPStatus.BAD_REQUEST, "invalid Range header") + return + status = HTTPStatus.PARTIAL_CONTENT + content_length = (end_off_incl - start_off) + 1 + + self.send_response(status) + self._send_imageio_headers() + self.send_header("Content-Type", "application/octet-stream") + self.send_header("Content-Length", str(content_length)) + if status == HTTPStatus.PARTIAL_CONTENT: + self.send_header("Content-Range", f"bytes {start_off}-{end_off_incl}/{size}") + self.end_headers() + + offset = start_off + end_excl = end_off_incl + 1 + while offset < end_excl: + to_read = min(CHUNK_SIZE, end_excl - offset) + data = conn.pread(to_read, offset) + if not data: + raise RuntimeError("backend returned empty read") + try: + self.wfile.write(data) + except BrokenPipeError: + logging.info("GET client disconnected image_id=%s at=%d", image_id, offset) + break + offset += len(data) + bytes_sent += len(data) + except Exception as e: + # If headers already sent, we can't return JSON reliably; just log. + logging.error("GET error image_id=%s err=%r", image_id, e) + try: + if not self.wfile.closed: + self.close_connection = True + except Exception: + pass + finally: + _READ_SEM.release() + lock.release() + dur = _now_s() - start + logging.info( + "GET end image_id=%s bytes=%d duration_s=%.3f", image_id, bytes_sent, dur + ) + + def _handle_put_image(self, image_id: str, cfg: Dict[str, Any], content_length: int) -> None: + lock = _get_image_lock(image_id) + if not lock.acquire(blocking=False): + self._send_error_json(HTTPStatus.CONFLICT, "image busy") + return + + if not _WRITE_SEM.acquire(blocking=False): + lock.release() + self._send_error_json(HTTPStatus.SERVICE_UNAVAILABLE, "too many parallel writes") + return + + start = _now_s() + bytes_written = 0 + try: + logging.info("PUT start image_id=%s content_length=%d", image_id, content_length) + if self._is_file_backend(cfg): + file_path = cfg["file"] + remaining = content_length + with open(file_path, "wb") as f: + while remaining > 0: + chunk = self.rfile.read(min(CHUNK_SIZE, remaining)) + if not chunk: + self._send_error_json( + HTTPStatus.BAD_REQUEST, + f"request body ended early at {bytes_written} bytes", + ) + return + f.write(chunk) + bytes_written += len(chunk) + remaining -= len(chunk) + self._send_json(HTTPStatus.OK, {"ok": True, "bytes_written": bytes_written}) + else: + with _NbdConn(cfg["socket"], cfg.get("export")) as conn: + offset = 0 + remaining = content_length + while remaining > 0: + chunk = self.rfile.read(min(CHUNK_SIZE, remaining)) + if not chunk: + self._send_error_json( + HTTPStatus.BAD_REQUEST, + f"request body ended early at {offset} bytes", + ) + return + conn.pwrite(chunk, offset) + offset += len(chunk) + remaining -= len(chunk) + bytes_written += len(chunk) + + # POC-level: do not auto-flush on PUT; expose explicit /flush endpoint. + self._send_json(HTTPStatus.OK, {"ok": True, "bytes_written": bytes_written}) + except Exception as e: + logging.error("PUT error image_id=%s err=%r", image_id, e) + self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error") + finally: + _WRITE_SEM.release() + lock.release() + dur = _now_s() - start + logging.info( + "PUT end image_id=%s bytes=%d duration_s=%.3f", image_id, bytes_written, dur + ) + + def _handle_get_extents( + self, image_id: str, cfg: Dict[str, Any], context: Optional[str] = None + ) -> None: + # context=dirty: return extents with dirty and zero from base:allocation + bitmap. + # Otherwise: return zero/hole extents from base:allocation only. + lock = _get_image_lock(image_id) + if not lock.acquire(blocking=False): + self._send_error_json(HTTPStatus.CONFLICT, "image busy") + return + + start = _now_s() + try: + logging.info("EXTENTS start image_id=%s context=%s", image_id, context) + if context == "dirty": + export_bitmap = cfg.get("export_bitmap") + if not export_bitmap: + # Fallback: same structure as zero extents but dirty=true for all ranges + with _NbdConn( + cfg["socket"], + cfg.get("export"), + need_block_status=True, + ) as conn: + allocation = conn.get_allocation_extents() + extents = [ + {"start": e["start"], "length": e["length"], "dirty": True, "zero": e["zero"]} + for e in allocation + ] + else: + dirty_bitmap_ctx = f"qemu:dirty-bitmap:{export_bitmap}" + extra_contexts: List[str] = [dirty_bitmap_ctx] + with _NbdConn( + cfg["socket"], + cfg.get("export"), + need_block_status=True, + extra_meta_contexts=extra_contexts, + ) as conn: + extents = conn.get_extents_dirty_and_zero(dirty_bitmap_ctx) + # When bitmap not actually available, same fallback: zero structure + dirty=true + if _is_fallback_dirty_response(extents): + with _NbdConn( + cfg["socket"], + cfg.get("export"), + need_block_status=True, + ) as conn: + allocation = conn.get_allocation_extents() + extents = [ + { + "start": e["start"], + "length": e["length"], + "dirty": True, + "zero": e["zero"], + } + for e in allocation + ] + else: + with _NbdConn( + cfg["socket"], + cfg.get("export"), + need_block_status=True, + ) as conn: + extents = conn.get_zero_extents() + self._send_json(HTTPStatus.OK, extents) + except Exception as e: + logging.error("EXTENTS error image_id=%s err=%r", image_id, e) + self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error") + finally: + lock.release() + dur = _now_s() - start + logging.info("EXTENTS end image_id=%s duration_s=%.3f", image_id, dur) + + def _handle_post_flush(self, image_id: str, cfg: Dict[str, Any]) -> None: + lock = _get_image_lock(image_id) + if not lock.acquire(blocking=False): + self._send_error_json(HTTPStatus.CONFLICT, "image busy") + return + + start = _now_s() + try: + logging.info("FLUSH start image_id=%s", image_id) + if self._is_file_backend(cfg): + file_path = cfg["file"] + with open(file_path, "rb") as f: + f.flush() + os.fsync(f.fileno()) + self._send_json(HTTPStatus.OK, {"ok": True}) + else: + with _NbdConn(cfg["socket"], cfg.get("export")) as conn: + conn.flush() + self._send_json(HTTPStatus.OK, {"ok": True}) + except Exception as e: + logging.error("FLUSH error image_id=%s err=%r", image_id, e) + self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error") + finally: + lock.release() + dur = _now_s() - start + logging.info("FLUSH end image_id=%s duration_s=%.3f", image_id, dur) + + def _handle_patch_zero( + self, + image_id: str, + cfg: Dict[str, Any], + offset: int, + size: int, + flush: bool, + ) -> None: + lock = _get_image_lock(image_id) + if not lock.acquire(blocking=False): + self._send_error_json(HTTPStatus.CONFLICT, "image busy") + return + + if not _WRITE_SEM.acquire(blocking=False): + lock.release() + self._send_error_json(HTTPStatus.SERVICE_UNAVAILABLE, "too many parallel writes") + return + + start = _now_s() + try: + logging.info( + "PATCH zero start image_id=%s offset=%d size=%d flush=%s", + image_id, offset, size, flush, + ) + with _NbdConn(cfg["socket"], cfg.get("export")) as conn: + image_size = conn.size() + if offset >= image_size: + self._send_error_json( + HTTPStatus.BAD_REQUEST, + "offset must be less than image size", + ) + return + zero_size = min(size, image_size - offset) + conn.pzero(offset, zero_size) + if flush: + conn.flush() + self._send_json(HTTPStatus.OK, {"ok": True}) + except Exception as e: + logging.error("PATCH zero error image_id=%s err=%r", image_id, e) + self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error") + finally: + _WRITE_SEM.release() + lock.release() + dur = _now_s() - start + logging.info("PATCH zero end image_id=%s duration_s=%.3f", image_id, dur) + + def _handle_patch_range( + self, + image_id: str, + cfg: Dict[str, Any], + range_header: str, + content_length: int, + ) -> None: + """Write request body to the image at the byte range from Range header.""" + lock = _get_image_lock(image_id) + if not lock.acquire(blocking=False): + self._send_error_json(HTTPStatus.CONFLICT, "image busy") + return + + if not _WRITE_SEM.acquire(blocking=False): + lock.release() + self._send_error_json(HTTPStatus.SERVICE_UNAVAILABLE, "too many parallel writes") + return + + start = _now_s() + bytes_written = 0 + try: + logging.info( + "PATCH range start image_id=%s range=%s content_length=%d", + image_id, range_header, content_length, + ) + with _NbdConn(cfg["socket"], cfg.get("export")) as conn: + image_size = conn.size() + try: + start_off, end_inclusive = self._parse_single_range( + range_header, image_size + ) + except ValueError as e: + if "unsatisfiable" in str(e).lower(): + self._send_range_not_satisfiable(image_size) + else: + self._send_error_json( + HTTPStatus.BAD_REQUEST, f"invalid Range header: {e}" + ) + return + expected_len = end_inclusive - start_off + 1 + if content_length != expected_len: + self._send_error_json( + HTTPStatus.BAD_REQUEST, + f"Content-Length ({content_length}) must equal range length ({expected_len})", + ) + return + offset = start_off + remaining = content_length + while remaining > 0: + chunk = self.rfile.read(min(CHUNK_SIZE, remaining)) + if not chunk: + self._send_error_json( + HTTPStatus.BAD_REQUEST, + f"request body ended early at {bytes_written} bytes", + ) + return + conn.pwrite(chunk, offset) + n = len(chunk) + offset += n + remaining -= n + bytes_written += n + self._send_json(HTTPStatus.OK, {"ok": True, "bytes_written": bytes_written}) + except Exception as e: + logging.error("PATCH range error image_id=%s err=%r", image_id, e) + self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error") + finally: + _WRITE_SEM.release() + lock.release() + dur = _now_s() - start + logging.info( + "PATCH range end image_id=%s bytes=%d duration_s=%.3f", + image_id, bytes_written, dur, + ) + + +def main() -> None: + parser = argparse.ArgumentParser(description="POC imageio-like HTTP server backed by NBD") + parser.add_argument("--listen", default="127.0.0.1", help="Address to bind") + parser.add_argument("--port", type=int, default=54323, help="Port to listen on") + args = parser.parse_args() + + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(message)s", + ) + + addr = (args.listen, args.port) + httpd = ThreadingHTTPServer(addr, Handler) + logging.info("listening on http://%s:%d", args.listen, args.port) + logging.info("image configs are read from %s/", _CFG_DIR) + httpd.serve_forever() + + +if __name__ == "__main__": + main() diff --git a/server/src/main/java/org/apache/cloudstack/backup/IncrementalBackupServiceImpl.java b/server/src/main/java/org/apache/cloudstack/backup/IncrementalBackupServiceImpl.java index be6dcae12b8..ed44ded2280 100644 --- a/server/src/main/java/org/apache/cloudstack/backup/IncrementalBackupServiceImpl.java +++ b/server/src/main/java/org/apache/cloudstack/backup/IncrementalBackupServiceImpl.java @@ -23,7 +23,6 @@ import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.Timer; import java.util.TimerTask; import java.util.UUID; @@ -108,15 +107,8 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme @Inject private PrimaryDataStoreDao primaryDataStoreDao; - @Inject - EndPointSelector _epSelector; - private Timer imageTransferTimer; - private static final int NBD_PORT_RANGE_START = 10809; - private static final int NBD_PORT_RANGE_END = 10909; - private static final boolean DATAPLANE_PROXY_MODE = true; - private boolean isDummyOffering(Long backupOfferingId) { if (backupOfferingId == null) { throw new CloudRuntimeException("VM not assigned a backup offering"); @@ -174,9 +166,7 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme backup.setToCheckpointId(toCheckpointId); backup.setFromCheckpointId(fromCheckpointId); - int nbdPort = allocateNbdPort(); Long hostId = vm.getHostId() != null ? vm.getHostId() : vm.getLastHostId(); - backup.setNbdPort(nbdPort); backup.setHostId(hostId); // Will be changed later if incremental was done backup.setType("FULL"); @@ -206,9 +196,8 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme backup.getToCheckpointId(), backup.getFromCheckpointId(), vm.getActiveCheckpointCreateTime(), - backup.getNbdPort(), + backup.getUuid(), diskPathUuidMap, - host.getPrivateIpAddress(), vm.getState() == State.Stopped ); @@ -334,29 +323,26 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme } String transferId = UUID.randomUUID().toString(); - Host host = hostDao.findById(backup.getHostId()); + String socket = backup.getUuid(); VMInstanceVO vm = vmInstanceDao.findById(backup.getVmId()); if (vm.getState() == State.Stopped) { String volumePath = getVolumePathForFileBasedBackend(volume); - startNBDServer(transferId, direction, host, volume.getUuid(), volumePath, backup.getNbdPort()); + startNBDServer(transferId, direction, backup.getHostId(), volume.getUuid(), volumePath); + socket = transferId; } CreateImageTransferCommand transferCmd = new CreateImageTransferCommand( transferId, - host.getPrivateIpAddress(), direction, volume.getUuid(), - backup.getNbdPort(), + socket, backup.getFromCheckpointId()); try { CreateImageTransferAnswer answer; if (dummyOffering) { answer = new CreateImageTransferAnswer(transferCmd, true, "Dummy answer", "image-transfer-id", "nbd://127.0.0.1:10809/vda"); - } else if (DATAPLANE_PROXY_MODE) { - EndPoint ssvm = _epSelector.findSsvm(backup.getZoneId()); - answer = (CreateImageTransferAnswer) ssvm.sendMessage(transferCmd); } else { answer = (CreateImageTransferAnswer) agentManager.send(backup.getHostId(), transferCmd); } @@ -370,7 +356,7 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme backupId, volume.getId(), backup.getHostId(), - backup.getNbdPort(), + socket, ImageTransferVO.Phase.transferring, ImageTransfer.Direction.download, backup.getAccountId(), @@ -398,18 +384,17 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme return hosts.get(0); } - private void startNBDServer(String transferId, String direction, Host host, String exportName, String volumePath, int nbdPort) { + private void startNBDServer(String transferId, String direction, Long hostId, String exportName, String volumePath) { StartNBDServerAnswer nbdServerAnswer; StartNBDServerCommand nbdServerCmd = new StartNBDServerCommand( transferId, - host.getPrivateIpAddress(), exportName, volumePath, - nbdPort, + transferId, direction ); try { - nbdServerAnswer = (StartNBDServerAnswer) agentManager.send(host.getId(), nbdServerCmd); + nbdServerAnswer = (StartNBDServerAnswer) agentManager.send(hostId, nbdServerCmd); } catch (AgentUnavailableException | OperationTimedoutException e) { throw new CloudRuntimeException("Failed to communicate with agent", e); } @@ -451,19 +436,18 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme transferCmd = new CreateImageTransferCommand( transferId, - host.getPrivateIpAddress(), direction, + transferId, volumePath); } else { - int nbdPort = allocateNbdPort(); - startNBDServer(transferId, direction, host, volume.getUuid(), volumePath, nbdPort); + startNBDServer(transferId, direction, host.getId(), volume.getUuid(), volumePath); imageTransfer = new ImageTransferVO( transferId, null, volume.getId(), host.getId(), - nbdPort, + transferId, ImageTransferVO.Phase.transferring, ImageTransfer.Direction.upload, volume.getAccountId(), @@ -472,16 +456,17 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme transferCmd = new CreateImageTransferCommand( transferId, - host.getPrivateIpAddress(), direction, volume.getUuid(), - nbdPort, + transferId, null); } - - - EndPoint ssvm = _epSelector.findSsvm(volume.getDataCenterId()); - CreateImageTransferAnswer transferAnswer = (CreateImageTransferAnswer) ssvm.sendMessage(transferCmd); + CreateImageTransferAnswer transferAnswer; + try { + transferAnswer = (CreateImageTransferAnswer) agentManager.send(imageTransfer.getHostId(), transferCmd); + } catch (AgentUnavailableException | OperationTimedoutException e) { + throw new CloudRuntimeException("Failed to communicate with agent", e); + } if (!transferAnswer.getResult()) { if (!backend.equals(ImageTransfer.Backend.file)) { @@ -554,32 +539,27 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme private void finalizeDownloadImageTransfer(ImageTransferVO imageTransfer) { String transferId = imageTransfer.getUuid(); - int nbdPort = imageTransfer.getNbdPort(); - String direction = imageTransfer.getDirection().toString(); - FinalizeImageTransferCommand finalizeCmd = new FinalizeImageTransferCommand(transferId, direction, nbdPort); + FinalizeImageTransferCommand finalizeCmd = new FinalizeImageTransferCommand(transferId); BackupVO backup = backupDao.findById(imageTransfer.getBackupId()); boolean dummyOffering = isDummyOffering(backup.getBackupOfferingId()); + Answer answer; try { - Answer answer; if (dummyOffering) { answer = new Answer(finalizeCmd, true, "Image transfer finalized."); - } else if (DATAPLANE_PROXY_MODE) { - EndPoint ssvm = _epSelector.findSsvm(backup.getZoneId()); - answer = ssvm.sendMessage(finalizeCmd); } else { answer = agentManager.send(backup.getHostId(), finalizeCmd); } - if (!answer.getResult()) { - throw new CloudRuntimeException("Failed to finalize image transfer: " + answer.getDetails()); - } - } catch (AgentUnavailableException | OperationTimedoutException e) { throw new CloudRuntimeException("Failed to communicate with agent", e); } + if (!answer.getResult()) { + throw new CloudRuntimeException("Failed to finalize image transfer: " + answer.getDetails()); + } + VMInstanceVO vm = vmInstanceDao.findById(backup.getVmId()); if (vm.getState() == State.Stopped) { boolean stopNbdServerResult = stopNbdServer(imageTransfer); @@ -591,9 +571,8 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme private boolean stopNbdServer(ImageTransferVO imageTransfer) { String transferId = imageTransfer.getUuid(); - int nbdPort = imageTransfer.getNbdPort(); String direction = imageTransfer.getDirection().toString(); - StopNBDServerCommand stopNbdServerCommand = new StopNBDServerCommand(transferId, direction, nbdPort); + StopNBDServerCommand stopNbdServerCommand = new StopNBDServerCommand(transferId, direction); Answer answer; try { answer = agentManager.send(imageTransfer.getHostId(), stopNbdServerCommand); @@ -606,17 +585,19 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme private void finalizeUploadImageTransfer(ImageTransferVO imageTransfer) { String transferId = imageTransfer.getUuid(); - int nbdPort = imageTransfer.getNbdPort(); - String direction = imageTransfer.getDirection().toString(); boolean stopNbdServerResult = stopNbdServer(imageTransfer); if (!stopNbdServerResult) { throw new CloudRuntimeException("Failed to stop the nbd server"); } - FinalizeImageTransferCommand finalizeCmd = new FinalizeImageTransferCommand(transferId, direction, nbdPort); - EndPoint ssvm = _epSelector.findSsvm(imageTransfer.getDataCenterId()); - Answer answer = ssvm.sendMessage(finalizeCmd); + FinalizeImageTransferCommand finalizeCmd = new FinalizeImageTransferCommand(transferId); + Answer answer; + try { + answer = agentManager.send(imageTransfer.getHostId(), finalizeCmd); + } catch (AgentUnavailableException | OperationTimedoutException e) { + throw new CloudRuntimeException("Failed to communicate with agent", e); + } if (!answer.getResult()) { throw new CloudRuntimeException("Failed to finalize image transfer: " + answer.getDetails()); @@ -717,19 +698,6 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme return cmdList; } - private int getRandomNbdPort() { - Random random = new Random(); - return NBD_PORT_RANGE_START + random.nextInt(NBD_PORT_RANGE_END - NBD_PORT_RANGE_START); - } - - private int allocateNbdPort() { - int port = getRandomNbdPort(); - while (imageTransferDao.findByNbdPort(port) != null) { - port = getRandomNbdPort(); - } - return port; - } - private ImageTransferResponse toImageTransferResponse(ImageTransferVO imageTransferVO) { ImageTransferResponse response = new ImageTransferResponse(); response.setId(imageTransferVO.getUuid()); diff --git a/services/secondary-storage/server/src/main/java/org/apache/cloudstack/storage/resource/NfsSecondaryStorageResource.java b/services/secondary-storage/server/src/main/java/org/apache/cloudstack/storage/resource/NfsSecondaryStorageResource.java index 2358bdcc832..db95a58f222 100644 --- a/services/secondary-storage/server/src/main/java/org/apache/cloudstack/storage/resource/NfsSecondaryStorageResource.java +++ b/services/secondary-storage/server/src/main/java/org/apache/cloudstack/storage/resource/NfsSecondaryStorageResource.java @@ -54,10 +54,6 @@ import java.util.stream.Stream; import javax.naming.ConfigurationException; -import org.apache.cloudstack.backup.CreateImageTransferAnswer; -import org.apache.cloudstack.backup.CreateImageTransferCommand; -import org.apache.cloudstack.backup.FinalizeImageTransferCommand; -import org.apache.cloudstack.backup.ImageTransfer; import org.apache.cloudstack.framework.security.keystore.KeystoreManager; import org.apache.cloudstack.storage.NfsMountManagerImpl.PathParser; import org.apache.cloudstack.storage.command.CopyCmdAnswer; @@ -342,10 +338,6 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S return execute((ListDataStoreObjectsCommand)cmd); } else if (cmd instanceof QuerySnapshotZoneCopyCommand) { return execute((QuerySnapshotZoneCopyCommand)cmd); - } else if (cmd instanceof CreateImageTransferCommand) { - return execute((CreateImageTransferCommand)cmd); - } else if (cmd instanceof FinalizeImageTransferCommand) { - return execute((FinalizeImageTransferCommand)cmd); } else { return Answer.createUnsupportedCommandAnswer(cmd); } @@ -3716,212 +3708,4 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S return new QuerySnapshotZoneCopyAnswer(cmd, files); } - private void resetService(String unitName) { - Script resetScript = new Script("/bin/bash", logger); - resetScript.add("-c"); - resetScript.add(String.format("systemctl reset-failed %s || true", unitName)); - resetScript.execute(); - } - - private boolean stopImageServer() { - String unitName = "cloudstack-image-server"; - final int imageServerPort = 54323; - - Script checkScript = new Script("/bin/bash", logger); - checkScript.add("-c"); - checkScript.add(String.format("systemctl is-active --quiet %s", unitName)); - String checkResult = checkScript.execute(); - if (checkResult != null) { - logger.info(String.format("Image server not running, resetting failed state")); - resetService(unitName); - // Still try to remove firewall rule in case it exists - if (_inSystemVM) { - removeFirewallRule(imageServerPort); - } - return true; - } - - Script stopScript = new Script("/bin/bash", logger); - stopScript.add("-c"); - stopScript.add(String.format("systemctl stop %s", unitName)); - stopScript.execute(); - resetService(unitName); - logger.info(String.format("Image server %s stopped", unitName)); - - // Close firewall port for image server - if (_inSystemVM) { - removeFirewallRule(imageServerPort); - } - - return true; - } - - private void removeFirewallRule(int port) { - String rule = String.format("-p tcp -m state --state NEW -m tcp --dport %d -j ACCEPT", port); - Script removeScript = new Script("/bin/bash", logger); - removeScript.add("-c"); - removeScript.add(String.format("iptables -D INPUT %s || true", rule)); - String result = removeScript.execute(); - if (result != null && !result.isEmpty() && !result.contains("iptables: Bad rule")) { - logger.debug(String.format("Firewall rule removal result for port %d: %s", port, result)); - } else { - logger.info(String.format("Firewall rule removed for port %d (or did not exist)", port)); - } - } - - private boolean startImageServerIfNotRunning(int imageServerPort) { - final String imageServerScript = "/opt/cloud/bin/image_server.py"; - String unitName = "cloudstack-image-server"; - - Script checkScript = new Script("/bin/bash", logger); - checkScript.add("-c"); - checkScript.add(String.format("systemctl is-active --quiet %s", unitName)); - String checkResult = checkScript.execute(); - if (checkResult == null) { - return true; - } - - String systemdRunCmd = String.format( - "systemd-run --unit=%s --property=Restart=no /usr/bin/python3 %s --listen 0.0.0.0 --port %d", - unitName, imageServerScript, imageServerPort); - - Script startScript = new Script("/bin/bash", logger); - startScript.add("-c"); - startScript.add(systemdRunCmd); - String startResult = startScript.execute(); - - if (startResult != null) { - logger.error(String.format("Failed to start the Image serer: %s", startResult)); - return false; - } - - // Wait with timeout until the service is up - int maxWaitSeconds = 10; - int pollIntervalMs = 1000; - int maxAttempts = (maxWaitSeconds * 1000) / pollIntervalMs; - boolean serviceActive = false; - - for (int attempt = 0; attempt < maxAttempts; attempt++) { - Script verifyScript = new Script("/bin/bash", logger); - verifyScript.add("-c"); - verifyScript.add(String.format("systemctl is-active --quiet %s", unitName)); - String verifyResult = verifyScript.execute(); - if (verifyResult == null) { - serviceActive = true; - logger.info(String.format("Image server is now active (attempt %d)", attempt + 1)); - break; - } - try { - Thread.sleep(pollIntervalMs); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return false; - } - } - - if (!serviceActive) { - logger.error(String.format("Image server failed to start within %d seconds", maxWaitSeconds)); - return false; - } - - // Open firewall port for image server - if (_inSystemVM) { - String rule = String.format("-p tcp -m state --state NEW -m tcp --dport %d -j ACCEPT", imageServerPort); - IpTablesHelper.addConditionally(IpTablesHelper.INPUT_CHAIN, false, rule, - String.format("Error in opening up image server port %d", imageServerPort)); - } - - return true; - } - - protected Answer execute(CreateImageTransferCommand cmd) { - if (!_inSystemVM) { - return new CreateImageTransferAnswer(cmd, true, "Not running inside SSVM; skipping image transfer setup."); - } - - final String transferId = cmd.getTransferId(); - final String hostIp = cmd.getHostIpAddress(); - final ImageTransfer.Backend backend = cmd.getBackend(); - - if (StringUtils.isBlank(transferId)) { - return new CreateImageTransferAnswer(cmd, false, "transferId is empty."); - } - if (StringUtils.isBlank(hostIp)) { - return new CreateImageTransferAnswer(cmd, false, "hostIpAddress is empty."); - } - - final Map payload = new HashMap<>(); - payload.put("backend", backend.toString()); - - if (backend == ImageTransfer.Backend.file) { - final String filePath = cmd.getFile(); - if (StringUtils.isBlank(filePath)) { - return new CreateImageTransferAnswer(cmd, false, "file path is empty for file backend."); - } - payload.put("file", filePath); - } else { - final String exportName = cmd.getExportName(); - final int nbdPort = cmd.getNbdPort(); - if (StringUtils.isBlank(exportName)) { - return new CreateImageTransferAnswer(cmd, false, "exportName is empty."); - } - if (nbdPort <= 0) { - return new CreateImageTransferAnswer(cmd, false, "Invalid nbdPort: " + nbdPort); - } - payload.put("host", hostIp); - payload.put("port", nbdPort); - payload.put("export", exportName); - String checkpointId = cmd.getCheckpointId(); - if (checkpointId != null) { - payload.put("export_bitmap", exportName + "-" + checkpointId.substring(0, 4)); - } - } - - try { - final String json = new GsonBuilder().create().toJson(payload); - File dir = new File("/tmp/imagetransfer"); - if (!dir.exists()) { - dir.mkdirs(); - } - final File transferFile = new File("/tmp/imagetransfer", transferId); - FileUtils.writeStringToFile(transferFile, json, "UTF-8"); - - } catch (IOException e) { - logger.warn("Failed to prepare image transfer on SSVM", e); - return new CreateImageTransferAnswer(cmd, false, "Failed to prepare image transfer on SSVM: " + e.getMessage()); - } - - final int imageServerPort = 54323; - startImageServerIfNotRunning(imageServerPort); - - final String transferUrl = String.format("http://%s:%d/images/%s", _publicIp, imageServerPort, transferId); - return new CreateImageTransferAnswer(cmd, true, "Image transfer prepared on SSVM.", transferId, transferUrl); - } - - protected Answer execute(FinalizeImageTransferCommand cmd) { - if (!_inSystemVM) { - return new Answer(cmd, true, "Not running inside SSVM; skipping image transfer finalization."); - } - - final String transferId = cmd.getTransferId(); - if (StringUtils.isBlank(transferId)) { - return new Answer(cmd, false, "transferId is empty."); - } - - final File transferFile = new File("/tmp/imagetransfer", transferId); - if (transferFile.exists() && !transferFile.delete()) { - return new Answer(cmd, false, "Failed to delete transfer config file: " + transferFile.getAbsolutePath()); - } - - try (Stream stream = Files.list(Paths.get("/tmp/imagetransfer"))) { - if (!stream.findAny().isPresent()) { - stopImageServer(); - } - } catch (IOException e) { - logger.warn("Failed to list /tmp/imagetransfer", e); - } - - return new Answer(cmd, true, "Image transfer finalized."); - } - }