diff --git a/core/src/main/java/org/apache/cloudstack/backup/CreateImageTransferCommand.java b/core/src/main/java/org/apache/cloudstack/backup/CreateImageTransferCommand.java index a4905fe46f7..f9dfd256c39 100644 --- a/core/src/main/java/org/apache/cloudstack/backup/CreateImageTransferCommand.java +++ b/core/src/main/java/org/apache/cloudstack/backup/CreateImageTransferCommand.java @@ -20,35 +20,21 @@ package org.apache.cloudstack.backup; import com.cloud.agent.api.Command; public class CreateImageTransferCommand extends Command { - private Long vmId; - private Long backupId; - private Long diskId; + private String transferId; + private String hostIpAddress; private String deviceName; private int nbdPort; public CreateImageTransferCommand() { } - public CreateImageTransferCommand(Long vmId, Long backupId, Long diskId, String deviceName, int nbdPort) { - this.vmId = vmId; - this.backupId = backupId; - this.diskId = diskId; + public CreateImageTransferCommand(Long vmId, String transferId, String hostIpAddress, Long backupId, Long diskId, String deviceName, int nbdPort) { + this.transferId = transferId; + this.hostIpAddress = hostIpAddress; this.deviceName = deviceName; this.nbdPort = nbdPort; } - public Long getVmId() { - return vmId; - } - - public Long getBackupId() { - return backupId; - } - - public Long getDiskId() { - return diskId; - } - public String getDeviceName() { return deviceName; } @@ -57,6 +43,14 @@ public class CreateImageTransferCommand extends Command { return nbdPort; } + public String getHostIpAddress() { + return hostIpAddress; + } + + public String getTransferId() { + return transferId; + } + @Override public boolean executeInSequence() { return true; diff --git a/core/src/main/java/org/apache/cloudstack/backup/FinalizeImageTransferCommand.java b/core/src/main/java/org/apache/cloudstack/backup/FinalizeImageTransferCommand.java new file mode 100644 index 00000000000..84d9b1ff818 --- /dev/null +++ b/core/src/main/java/org/apache/cloudstack/backup/FinalizeImageTransferCommand.java @@ -0,0 +1,40 @@ +//Licensed to the Apache Software Foundation (ASF) under one +//or more contributor license agreements. See the NOTICE file +//distributed with this work for additional information +//regarding copyright ownership. The ASF licenses this file +//to you under the Apache License, Version 2.0 (the +//"License"); you may not use this file except in compliance +//the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +//Unless required by applicable law or agreed to in writing, +//software distributed under the License is distributed on an +//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +//KIND, either express or implied. See the License for the +//specific language governing permissions and limitations +//under the License. + +package org.apache.cloudstack.backup; + +import com.cloud.agent.api.Command; + +public class FinalizeImageTransferCommand extends Command { + private String transferId; + + public FinalizeImageTransferCommand() { + } + + public FinalizeImageTransferCommand(String transferId) { + this.transferId = transferId; + } + + public String getTransferId() { + return transferId; + } + + @Override + public boolean executeInSequence() { + return true; + } +} diff --git a/core/src/main/java/org/apache/cloudstack/backup/StartBackupCommand.java b/core/src/main/java/org/apache/cloudstack/backup/StartBackupCommand.java index 29fbccafb1f..ac2cc8af70a 100644 --- a/core/src/main/java/org/apache/cloudstack/backup/StartBackupCommand.java +++ b/core/src/main/java/org/apache/cloudstack/backup/StartBackupCommand.java @@ -28,18 +28,20 @@ public class StartBackupCommand extends Command { private String fromCheckpointId; private int nbdPort; private Map diskVolumePaths; // volumeId -> path mapping + private String hostIpAddress; public StartBackupCommand() { } public StartBackupCommand(String vmName, Long vmId, String toCheckpointId, String fromCheckpointId, - int nbdPort, Map diskVolumePaths) { + int nbdPort, Map diskVolumePaths, String hostIpAddress) { this.vmName = vmName; this.vmId = vmId; this.toCheckpointId = toCheckpointId; this.fromCheckpointId = fromCheckpointId; this.nbdPort = nbdPort; this.diskVolumePaths = diskVolumePaths; + this.hostIpAddress = hostIpAddress; } public String getVmName() { @@ -70,6 +72,10 @@ public class StartBackupCommand extends Command { return fromCheckpointId != null && !fromCheckpointId.isEmpty(); } + public String getHostIpAddress() { + return hostIpAddress; + } + @Override public boolean executeInSequence() { return true; diff --git a/engine/schema/src/main/java/org/apache/cloudstack/backup/ImageTransferVO.java b/engine/schema/src/main/java/org/apache/cloudstack/backup/ImageTransferVO.java index 79953e4cffd..4efad8d3fd1 100644 --- a/engine/schema/src/main/java/org/apache/cloudstack/backup/ImageTransferVO.java +++ b/engine/schema/src/main/java/org/apache/cloudstack/backup/ImageTransferVO.java @@ -18,7 +18,6 @@ package org.apache.cloudstack.backup; import java.util.Date; -import java.util.UUID; import javax.persistence.Column; import javax.persistence.Entity; @@ -94,11 +93,10 @@ public class ImageTransferVO implements ImageTransfer { private Date removed; public ImageTransferVO() { - this.uuid = UUID.randomUUID().toString(); } - public ImageTransferVO(long backupId, long vmId, long diskId, String deviceName, long hostId, int nbdPort, Phase phase, Direction direction, Long accountId, Long domainId) { - this(); + public ImageTransferVO(String uuid, long backupId, long vmId, long diskId, String deviceName, long hostId, int nbdPort, Phase phase, Direction direction, Long accountId, Long domainId) { + this.uuid = uuid; this.backupId = backupId; this.vmId = vmId; this.diskId = diskId; diff --git a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtCreateImageTransferCommandWrapper.java b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtCreateImageTransferCommandWrapper.java index b4b39fa2c98..1c3ec2ae3dc 100644 --- a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtCreateImageTransferCommandWrapper.java +++ b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtCreateImageTransferCommandWrapper.java @@ -43,13 +43,12 @@ public class LibvirtCreateImageTransferCommandWrapper extends CommandWrapper").append(fromCheckpointId).append("\n"); } - xml.append(" \n"); + xml.append(String.format(" \n", cmd.getHostIpAddress(), nbdPort)); xml.append(" \n"); // Add disk entries - simplified for POC diff --git a/server/src/main/java/org/apache/cloudstack/backup/IncrementalBackupServiceImpl.java b/server/src/main/java/org/apache/cloudstack/backup/IncrementalBackupServiceImpl.java index cfc36fa76cd..0723b49bd2e 100644 --- a/server/src/main/java/org/apache/cloudstack/backup/IncrementalBackupServiceImpl.java +++ b/server/src/main/java/org/apache/cloudstack/backup/IncrementalBackupServiceImpl.java @@ -41,13 +41,18 @@ import org.apache.cloudstack.api.response.ImageTransferResponse; import org.apache.cloudstack.backup.dao.BackupDao; import org.apache.cloudstack.backup.dao.BackupOfferingDao; import org.apache.cloudstack.backup.dao.ImageTransferDao; +import org.apache.cloudstack.engine.subsystem.api.storage.EndPoint; +import org.apache.cloudstack.engine.subsystem.api.storage.EndPointSelector; import org.apache.commons.collections.CollectionUtils; import org.joda.time.DateTime; import org.springframework.stereotype.Component; import com.cloud.agent.AgentManager; +import com.cloud.agent.api.Answer; import com.cloud.exception.AgentUnavailableException; import com.cloud.exception.OperationTimedoutException; +import com.cloud.host.Host; +import com.cloud.host.dao.HostDao; import com.cloud.storage.Volume; import com.cloud.storage.dao.VolumeDao; import com.cloud.utils.component.ManagerBase; @@ -77,8 +82,15 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme @Inject private BackupOfferingDao backupOfferingDao; + @Inject + private HostDao hostDao; + + @Inject + EndPointSelector _epSelector; + private static final int NBD_PORT_RANGE_START = 10809; private static final int NBD_PORT_RANGE_END = 10909; + private static final boolean DATAPLANE_PROXY_MODE = true; private boolean isDummyOffering(Long backupOfferingId) { if (backupOfferingId == null) { @@ -151,14 +163,15 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme diskVolumePaths.put(vol.getId(), vol.getPath()); } - // Send StartBackupCommand to agent + Host host = hostDao.findById(vm.getHostId()); StartBackupCommand startCmd = new StartBackupCommand( vm.getInstanceName(), vmId, toCheckpointId, fromCheckpointId, nbdPort, - diskVolumePaths + diskVolumePaths, + host.getPrivateIpAddress() ); try { @@ -282,9 +295,13 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme List volumes = volumeDao.findByInstance(backup.getVmId()); String deviceName = resolveDeviceName(volumes, volumeId); + String transferId = UUID.randomUUID().toString(); + Host host = hostDao.findById(backup.getHostId()); // Create CreateImageTransferCommand CreateImageTransferCommand transferCmd = new CreateImageTransferCommand( backup.getVmId(), + transferId, + host.getPrivateIpAddress(), backupId, volumeId, deviceName, @@ -295,6 +312,9 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme CreateImageTransferAnswer answer; if (dummyOffering) { answer = new CreateImageTransferAnswer(transferCmd, true, "Dummy answer", "image-transfer-id", "nbd://127.0.0.1:10809/vda", "initializing"); + } else if (DATAPLANE_PROXY_MODE) { + EndPoint ssvm = _epSelector.findSsvm(backup.getZoneId()); + answer = (CreateImageTransferAnswer) ssvm.sendMessage(transferCmd); } else { answer = (CreateImageTransferAnswer) agentManager.send(backup.getHostId(), transferCmd); } @@ -305,6 +325,7 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme // Create ImageTransfer record ImageTransferVO imageTransfer = new ImageTransferVO( + transferId, backupId, backup.getVmId(), volumeId, @@ -347,10 +368,32 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme throw new CloudRuntimeException("Image transfer not found: " + imageTransferId); } - // Mark as finished (NBD is closed in backup finalize, not here) - imageTransfer.setPhase(ImageTransferVO.Phase.finished); - imageTransferDao.update(imageTransferId, imageTransfer); - imageTransferDao.remove(imageTransferId); + BackupVO backup = backupDao.findById(imageTransfer.getBackupId()); + boolean dummyOffering = isDummyOffering(backup.getBackupOfferingId()); + + FinalizeImageTransferCommand finalizeCmd = new FinalizeImageTransferCommand(imageTransfer.getUuid()); + try { + Answer answer; + if (dummyOffering) { + answer = new Answer(finalizeCmd, true, "Image transfer finalized."); + } else if (DATAPLANE_PROXY_MODE) { + EndPoint ssvm = _epSelector.findSsvm(backup.getZoneId()); + answer = ssvm.sendMessage(finalizeCmd); + } else { + answer = agentManager.send(backup.getHostId(), finalizeCmd); + } + + if (!answer.getResult()) { + throw new CloudRuntimeException("Failed to create image transfer: " + answer.getDetails()); + } + + imageTransfer.setPhase(ImageTransferVO.Phase.finished); + imageTransferDao.update(imageTransferId, imageTransfer); + imageTransferDao.remove(imageTransferId); + + } catch (AgentUnavailableException | OperationTimedoutException e) { + throw new CloudRuntimeException("Failed to communicate with agent: " + e.getMessage(), e); + } return true; } @@ -396,8 +439,14 @@ public class IncrementalBackupServiceImpl extends ManagerBase implements Increme @Override public boolean deleteVmCheckpoint(DeleteVmCheckpointCmd cmd) { - // No-op for normal flow as per spec - // Kept for API parity with oVirt + // Todo : backend support? + VMInstanceVO vm = vmInstanceDao.findById(cmd.getVmId()); + if (vm == null) { + throw new CloudRuntimeException("VM not found: " + cmd.getVmId()); + } + vm.setActiveCheckpointId(null); + vm.setActiveCheckpointCreateTime(null); + vmInstanceDao.update(cmd.getVmId(), vm); return true; } diff --git a/services/secondary-storage/server/src/main/java/org/apache/cloudstack/storage/resource/NfsSecondaryStorageResource.java b/services/secondary-storage/server/src/main/java/org/apache/cloudstack/storage/resource/NfsSecondaryStorageResource.java index 8dd2fa23169..0f8de122d88 100644 --- a/services/secondary-storage/server/src/main/java/org/apache/cloudstack/storage/resource/NfsSecondaryStorageResource.java +++ b/services/secondary-storage/server/src/main/java/org/apache/cloudstack/storage/resource/NfsSecondaryStorageResource.java @@ -55,6 +55,10 @@ import java.util.stream.Stream; import javax.naming.ConfigurationException; import com.cloud.agent.api.ConvertSnapshotCommand; + +import org.apache.cloudstack.backup.CreateImageTransferAnswer; +import org.apache.cloudstack.backup.CreateImageTransferCommand; +import org.apache.cloudstack.backup.FinalizeImageTransferCommand; import org.apache.cloudstack.framework.security.keystore.KeystoreManager; import org.apache.cloudstack.storage.NfsMountManagerImpl.PathParser; import org.apache.cloudstack.storage.command.CopyCmdAnswer; @@ -338,6 +342,10 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S return execute((ListDataStoreObjectsCommand)cmd); } else if (cmd instanceof QuerySnapshotZoneCopyCommand) { return execute((QuerySnapshotZoneCopyCommand)cmd); + } else if (cmd instanceof CreateImageTransferCommand) { + return execute((CreateImageTransferCommand)cmd); + } else if (cmd instanceof FinalizeImageTransferCommand) { + return execute((FinalizeImageTransferCommand)cmd); } else { return Answer.createUnsupportedCommandAnswer(cmd); } @@ -3708,4 +3716,111 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S return new QuerySnapshotZoneCopyAnswer(cmd, files); } + protected Answer execute(CreateImageTransferCommand cmd) { + if (!_inSystemVM) { + return new CreateImageTransferAnswer(cmd, true, "Not running inside SSVM; skipping image transfer setup."); + } + + final String transferId = cmd.getTransferId(); + + final String hostIp = cmd.getHostIpAddress(); + final String exportName = cmd.getDeviceName(); + final int nbdPort = cmd.getNbdPort(); + + if (StringUtils.isBlank(transferId)) { + return new CreateImageTransferAnswer(cmd, false, "transferId is empty."); + } + if (StringUtils.isBlank(hostIp)) { + return new CreateImageTransferAnswer(cmd, false, "hostIpAddress is empty."); + } + if (StringUtils.isBlank(exportName)) { + return new CreateImageTransferAnswer(cmd, false, "deviceName is empty."); + } + if (nbdPort <= 0) { + return new CreateImageTransferAnswer(cmd, false, "Invalid nbdPort: " + nbdPort); + } + + final String imageServerScript = "/opt/cloud/bin/image_server.py"; + final int imageServerPort = 54323; + final String imageServerLogFile = "/var/log/image_server.log"; + + try { + // 1) Write /tmp/ with NBD endpoint details. + final Map payload = new HashMap<>(); + payload.put("host", hostIp); + payload.put("port", nbdPort); + payload.put("export", exportName); + + final String json = new GsonBuilder().create().toJson(payload); + final File transferFile = new File("/tmp", transferId); + FileUtils.writeStringToFile(transferFile, json, "UTF-8"); + + // 2) Start image_server if not already running. + final File scriptFile = new File(imageServerScript); + if (!scriptFile.exists()) { + return new CreateImageTransferAnswer(cmd, false, "Missing image server script: " + imageServerScript); + } + + final Script isRunning = new Script("/bin/bash", logger); + isRunning.add("-c"); + isRunning.add(String.format("pgrep -f '%s.*--port %d' >/dev/null 2>&1", imageServerScript, imageServerPort)); + final String runningResult = isRunning.execute(); + if (runningResult != null) { + try { + ProcessBuilder pb = new ProcessBuilder( + "python3", imageServerScript, + "--listen", "0.0.0.0", + "--port", String.valueOf(imageServerPort) + ); + pb.redirectOutput(ProcessBuilder.Redirect.appendTo(new File(imageServerLogFile))); + pb.redirectErrorStream(true); + pb.start(); + } catch (IOException e) { + logger.warn("Failed to start Image Server"); + return new CreateImageTransferAnswer(cmd, false, "Failed to start image server"); + } + } + final String transferUrl = String.format("http://%s:%d/images/%s", _publicIp, imageServerPort, transferId); + return new CreateImageTransferAnswer(cmd, true, "Image transfer prepared on SSVM.", transferId, transferUrl, "initializing"); + } catch (Exception e) { + logger.warn("Failed to prepare image transfer on SSVM", e); + return new CreateImageTransferAnswer(cmd, false, "Failed to prepare image transfer on SSVM: " + e.getMessage()); + } + } + + protected Answer execute(FinalizeImageTransferCommand cmd) { + if (!_inSystemVM) { + return new Answer(cmd, true, "Not running inside SSVM; skipping image transfer finalization."); + } + + final String transferId = cmd.getTransferId(); + if (StringUtils.isBlank(transferId)) { + return new Answer(cmd, false, "transferId is empty."); + } + + final File transferFile = new File("/tmp", transferId); + if (transferFile.exists() && !transferFile.delete()) { + return new Answer(cmd, false, "Failed to delete transfer config file: " + transferFile.getAbsolutePath()); + } + + // Stop image_server.py only if /tmp directory is empty. + final File tmpDir = new File("/tmp"); + final File[] tmpEntries = tmpDir.listFiles(); + if (tmpEntries != null && tmpEntries.length == 0) { + final String imageServerScript = "/opt/cloud/bin/image_server.py"; + final int imageServerPort = 54323; + + // Use bash "|| true" so Script returns success even if process isn't running. + final Script stop = new Script("/bin/bash", logger); + stop.add("-c"); + stop.add(String.format("pkill -f '%s.*--port %d' >/dev/null 2>&1 || true", imageServerScript, imageServerPort)); + final String stopResult = stop.execute(); + if (stopResult != null) { + return new Answer(cmd, false, "Failed to stop image server: " + stopResult); + } + } + + return new Answer(cmd, true, "Image transfer finalized."); + } + } diff --git a/systemvm/debian/opt/cloud/bin/image_server.py b/systemvm/debian/opt/cloud/bin/image_server.py new file mode 100644 index 00000000000..2a9013fb4c5 --- /dev/null +++ b/systemvm/debian/opt/cloud/bin/image_server.py @@ -0,0 +1,678 @@ +#!/usr/bin/env python3 +""" +POC "imageio-like" HTTP server backed by NBD over TCP. + +How to run +---------- +- Install dependency: + dnf install python3-libnbd + or + apt install python3-libnbd + +- Run server: + python image_server.py --listen 0.0.0.0 --port 54323 + +Example curl commands +-------------------- +- OPTIONS: + curl -i -X OPTIONS http://127.0.0.1:54323/images/demo + +- GET full image: + curl -v http://127.0.0.1:54323/images/demo -o demo.img + +- GET a byte range: + curl -v -H "Range: bytes=0-1048575" http://127.0.0.1:54323/images/demo -o first_1MiB.bin + +- PUT full image (Content-Length must equal export size exactly): + curl -v -T demo.img http://127.0.0.1:54323/images/demo + +- GET extents (POC-level; may return a single allocated extent): + curl -s http://127.0.0.1:54323/images/demo/extents | jq . + +- POST flush: + curl -s -X POST http://127.0.0.1:54323/images/demo/flush | jq . +""" + +from __future__ import annotations + +import argparse +import json +import logging +import os +import socket +import threading +import time +from http import HTTPStatus +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from typing import Any, Dict, Optional, Tuple +import nbd + +CHUNK_SIZE = 256 * 1024 # 256 KiB + +# Concurrency limits across ALL images. +MAX_PARALLEL_READS = 8 +MAX_PARALLEL_WRITES = 2 + +_READ_SEM = threading.Semaphore(MAX_PARALLEL_READS) +_WRITE_SEM = threading.Semaphore(MAX_PARALLEL_WRITES) + +# In-memory per-image lock: single lock gates both read and write. +_IMAGE_LOCKS: Dict[str, threading.Lock] = {} +_IMAGE_LOCKS_GUARD = threading.Lock() + + +# Dynamic image_id(transferId) -> NBD export mapping: +# CloudStack writes a JSON file at /tmp/ with: +# {"host": "...", "port": 10809, "export": "vda"} +# +# This server reads that file on-demand. +_CFG_DIR = "/tmp" +_CFG_CACHE: Dict[str, Tuple[float, Dict[str, Any]]] = {} +_CFG_CACHE_GUARD = threading.Lock() + + +def _json_bytes(obj: Any) -> bytes: + return json.dumps(obj, separators=(",", ":"), ensure_ascii=False).encode("utf-8") + + +def _get_image_lock(image_id: str) -> threading.Lock: + with _IMAGE_LOCKS_GUARD: + lock = _IMAGE_LOCKS.get(image_id) + if lock is None: + lock = threading.Lock() + _IMAGE_LOCKS[image_id] = lock + return lock + + +def _now_s() -> float: + return time.monotonic() + + +def _safe_transfer_id(image_id: str) -> Optional[str]: + """ + Only allow a single filename component to avoid path traversal. + We intentionally keep validation simple: reject anything containing '/' or '\\'. + """ + if not image_id: + return None + if image_id != os.path.basename(image_id): + return None + if "/" in image_id or "\\" in image_id: + return None + if image_id in (".", ".."): + return None + return image_id + + +def _load_image_cfg(image_id: str) -> Optional[Dict[str, Any]]: + safe_id = _safe_transfer_id(image_id) + if safe_id is None: + return None + + cfg_path = os.path.join(_CFG_DIR, safe_id) + try: + st = os.stat(cfg_path) + except FileNotFoundError: + return None + except OSError as e: + logging.warning("cfg stat failed image_id=%s err=%r", image_id, e) + return None + + with _CFG_CACHE_GUARD: + cached = _CFG_CACHE.get(safe_id) + if cached is not None: + cached_mtime, cached_cfg = cached + # Use cached config if the file hasn't changed. + if float(st.st_mtime) == float(cached_mtime): + return cached_cfg + + try: + with open(cfg_path, "rb") as f: + raw = f.read(4096) + except OSError as e: + logging.warning("cfg read failed image_id=%s err=%r", image_id, e) + return None + + try: + obj = json.loads(raw.decode("utf-8")) + except Exception as e: + logging.warning("cfg parse failed image_id=%s err=%r", image_id, e) + return None + + if not isinstance(obj, dict): + logging.warning("cfg invalid type image_id=%s type=%s", image_id, type(obj).__name__) + return None + + host = obj.get("host") + port = obj.get("port") + export = obj.get("export") + if not isinstance(host, str) or not host: + logging.warning("cfg missing/invalid host image_id=%s", image_id) + return None + try: + port_i = int(port) + except Exception: + logging.warning("cfg missing/invalid port image_id=%s", image_id) + return None + if port_i <= 0 or port_i > 65535: + logging.warning("cfg out-of-range port image_id=%s port=%r", image_id, port) + return None + if export is not None and (not isinstance(export, str) or not export): + logging.warning("cfg missing/invalid export image_id=%s", image_id) + return None + + cfg: Dict[str, Any] = {"host": host, "port": port_i, "export": export} + + with _CFG_CACHE_GUARD: + _CFG_CACHE[safe_id] = (float(st.st_mtime), cfg) + return cfg + + +class _NbdConn: + """ + Small helper to connect to NBD using an already-open TCP socket. + Opens a fresh handle per request, per POC requirements. + """ + + def __init__(self, host: str, port: int, export: Optional[str]): + self._sock = socket.create_connection((host, port)) + self._nbd = nbd.NBD() + + # Select export name if supported/needed. + if export and hasattr(self._nbd, "set_export_name"): + self._nbd.set_export_name(export) + + self._connect_existing_socket(self._sock) + + def _connect_existing_socket(self, sock: socket.socket) -> None: + # Requirement: attach libnbd to an existing socket / FD (no qemu-nbd). + # libnbd python API varies slightly by version, so try common options. + last_err: Optional[BaseException] = None + if hasattr(self._nbd, "connect_socket"): + try: + self._nbd.connect_socket(sock) + return + except Exception as e: # pragma: no cover (depends on binding) + last_err = e + try: + self._nbd.connect_socket(sock.fileno()) + return + except Exception as e2: # pragma: no cover + last_err = e2 + if hasattr(self._nbd, "connect_fd"): + try: + self._nbd.connect_fd(sock.fileno()) + return + except Exception as e: # pragma: no cover + last_err = e + raise RuntimeError( + "Unable to connect libnbd using existing socket/fd; " + f"binding missing connect_socket/connect_fd or call failed: {last_err!r}" + ) + + def size(self) -> int: + return int(self._nbd.get_size()) + + def pread(self, length: int, offset: int) -> bytes: + # Expected signature: pread(length, offset) + try: + return self._nbd.pread(length, offset) + except TypeError: # pragma: no cover (binding differences) + return self._nbd.pread(offset, length) + + def pwrite(self, buf: bytes, offset: int) -> None: + # Expected signature: pwrite(buf, offset) + try: + self._nbd.pwrite(buf, offset) + except TypeError: # pragma: no cover (binding differences) + self._nbd.pwrite(offset, buf) + + def flush(self) -> None: + if hasattr(self._nbd, "flush"): + self._nbd.flush() + return + if hasattr(self._nbd, "fsync"): + self._nbd.fsync() + return + raise RuntimeError("libnbd binding has no flush/fsync method") + + def close(self) -> None: + # Best-effort; bindings may differ. + try: + if hasattr(self._nbd, "shutdown"): + self._nbd.shutdown() + except Exception: + pass + try: + if hasattr(self._nbd, "close"): + self._nbd.close() + except Exception: + pass + try: + self._sock.close() + except Exception: + pass + + def __enter__(self) -> "_NbdConn": + return self + + def __exit__(self, exc_type, exc, tb) -> None: + self.close() + + +class Handler(BaseHTTPRequestHandler): + server_version = "imageio-poc/0.1" + + # Keep BaseHTTPRequestHandler from printing noisy default logs + def log_message(self, fmt: str, *args: Any) -> None: + logging.info("%s - - %s", self.address_string(), fmt % args) + + def _send_imageio_headers(self) -> None: + # Include these headers for compatibility with the imageio contract. + self.send_header("Access-Control-Allow-Origin", "*") + self.send_header("Access-Control-Allow-Methods", "GET, PUT, OPTIONS") + self.send_header( + "Access-Control-Allow-Headers", + "Range, Content-Range, Content-Type, Content-Length", + ) + self.send_header( + "Access-Control-Expose-Headers", + "Accept-Ranges, Content-Range, Content-Length", + ) + self.send_header("Accept-Ranges", "bytes") + + def _send_json(self, status: int, obj: Any) -> None: + body = _json_bytes(obj) + self.send_response(status) + self._send_imageio_headers() + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + try: + self.wfile.write(body) + except BrokenPipeError: + pass + + def _send_error_json(self, status: int, message: str) -> None: + self._send_json(status, {"error": message}) + + def _send_range_not_satisfiable(self, size: int) -> None: + # RFC 7233: reply with Content-Range: bytes */ + self.send_response(HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE) + self._send_imageio_headers() + self.send_header("Content-Type", "application/json") + self.send_header("Content-Range", f"bytes */{size}") + body = _json_bytes({"error": "range not satisfiable"}) + self.send_header("Content-Length", str(len(body))) + self.end_headers() + try: + self.wfile.write(body) + except BrokenPipeError: + pass + + def _parse_single_range(self, range_header: str, size: int) -> Tuple[int, int]: + """ + Parse a single HTTP byte range (RFC 7233) and return (start, end_inclusive). + + Supported: + - Range: bytes=START-END + - Range: bytes=START- + - Range: bytes=-SUFFIX + + Raises ValueError for invalid headers. Caller handles 416 vs 400. + """ + if size < 0: + raise ValueError("invalid size") + if not range_header: + raise ValueError("empty Range") + if "," in range_header: + raise ValueError("multiple ranges not supported") + + prefix = "bytes=" + if not range_header.startswith(prefix): + raise ValueError("only bytes ranges supported") + spec = range_header[len(prefix) :].strip() + if "-" not in spec: + raise ValueError("invalid bytes range") + + left, right = spec.split("-", 1) + left = left.strip() + right = right.strip() + + if left == "": + # Suffix range: last N bytes. + if right == "": + raise ValueError("invalid suffix range") + try: + suffix_len = int(right, 10) + except ValueError as e: + raise ValueError("invalid suffix length") from e + if suffix_len <= 0: + raise ValueError("invalid suffix length") + if size == 0: + # Nothing to serve + raise ValueError("unsatisfiable") + if suffix_len >= size: + return 0, size - 1 + return size - suffix_len, size - 1 + + # START is present + try: + start = int(left, 10) + except ValueError as e: + raise ValueError("invalid range start") from e + if start < 0: + raise ValueError("invalid range start") + if start >= size: + raise ValueError("unsatisfiable") + + if right == "": + # START- + return start, size - 1 + + try: + end = int(right, 10) + except ValueError as e: + raise ValueError("invalid range end") from e + if end < start: + raise ValueError("unsatisfiable") + if end >= size: + end = size - 1 + return start, end + + def _parse_route(self) -> Tuple[Optional[str], Optional[str]]: + # Returns (image_id, tail) where tail is: + # None => /images/{id} + # "extents" => /images/{id}/extents + # "flush" => /images/{id}/flush + path = self.path.split("?", 1)[0] + parts = [p for p in path.split("/") if p] + if len(parts) < 2 or parts[0] != "images": + return None, None + image_id = parts[1] + tail = parts[2] if len(parts) >= 3 else None + if len(parts) > 3: + return None, None + return image_id, tail + + def _image_cfg(self, image_id: str) -> Optional[Dict[str, Any]]: + return _load_image_cfg(image_id) + + def do_OPTIONS(self) -> None: + image_id, tail = self._parse_route() + if image_id is None or tail is not None: + self._send_error_json(HTTPStatus.NOT_FOUND, "not found") + return + if self._image_cfg(image_id) is None: + self._send_error_json(HTTPStatus.NOT_FOUND, "unknown image_id") + return + self.send_response(HTTPStatus.OK) + self._send_imageio_headers() + self.send_header("Content-Length", "0") + self.end_headers() + + def do_GET(self) -> None: + image_id, tail = self._parse_route() + if image_id is None: + self._send_error_json(HTTPStatus.NOT_FOUND, "not found") + return + + cfg = self._image_cfg(image_id) + if cfg is None: + self._send_error_json(HTTPStatus.NOT_FOUND, "unknown image_id") + return + + if tail == "extents": + self._handle_get_extents(image_id, cfg) + return + if tail is not None: + self._send_error_json(HTTPStatus.NOT_FOUND, "not found") + return + + range_header = self.headers.get("Range") + self._handle_get_image(image_id, cfg, range_header) + + def do_PUT(self) -> None: + image_id, tail = self._parse_route() + if image_id is None or tail is not None: + self._send_error_json(HTTPStatus.NOT_FOUND, "not found") + return + + cfg = self._image_cfg(image_id) + if cfg is None: + self._send_error_json(HTTPStatus.NOT_FOUND, "unknown image_id") + return + + if self.headers.get("Range") is not None or self.headers.get("Content-Range") is not None: + self._send_error_json( + HTTPStatus.BAD_REQUEST, "Range/Content-Range not supported; full writes only" + ) + return + + content_length_hdr = self.headers.get("Content-Length") + if content_length_hdr is None: + self._send_error_json(HTTPStatus.BAD_REQUEST, "Content-Length required") + return + try: + content_length = int(content_length_hdr) + except ValueError: + self._send_error_json(HTTPStatus.BAD_REQUEST, "Invalid Content-Length") + return + if content_length < 0: + self._send_error_json(HTTPStatus.BAD_REQUEST, "Invalid Content-Length") + return + + self._handle_put_image(image_id, cfg, content_length) + + def do_POST(self) -> None: + image_id, tail = self._parse_route() + if image_id is None: + self._send_error_json(HTTPStatus.NOT_FOUND, "not found") + return + + cfg = self._image_cfg(image_id) + if cfg is None: + self._send_error_json(HTTPStatus.NOT_FOUND, "unknown image_id") + return + + if tail == "flush": + self._handle_post_flush(image_id, cfg) + return + self._send_error_json(HTTPStatus.NOT_FOUND, "not found") + + def _handle_get_image( + self, image_id: str, cfg: Dict[str, Any], range_header: Optional[str] + ) -> None: + lock = _get_image_lock(image_id) + if not lock.acquire(blocking=False): + self._send_error_json(HTTPStatus.CONFLICT, "image busy") + return + + if not _READ_SEM.acquire(blocking=False): + lock.release() + self._send_error_json(HTTPStatus.SERVICE_UNAVAILABLE, "too many parallel reads") + return + + start = _now_s() + bytes_sent = 0 + try: + logging.info("GET start image_id=%s range=%s", image_id, range_header or "-") + with _NbdConn(cfg["host"], int(cfg["port"]), cfg.get("export")) as conn: + size = conn.size() + + start_off = 0 + end_off_incl = size - 1 if size > 0 else -1 + status = HTTPStatus.OK + content_length = size + if range_header is not None: + try: + start_off, end_off_incl = self._parse_single_range(range_header, size) + except ValueError as e: + if str(e) == "unsatisfiable": + self._send_range_not_satisfiable(size) + return + if "unsatisfiable" in str(e): + self._send_range_not_satisfiable(size) + return + self._send_error_json(HTTPStatus.BAD_REQUEST, "invalid Range header") + return + status = HTTPStatus.PARTIAL_CONTENT + content_length = (end_off_incl - start_off) + 1 + + self.send_response(status) + self._send_imageio_headers() + self.send_header("Content-Type", "application/octet-stream") + self.send_header("Content-Length", str(content_length)) + if status == HTTPStatus.PARTIAL_CONTENT: + self.send_header("Content-Range", f"bytes {start_off}-{end_off_incl}/{size}") + self.end_headers() + + offset = start_off + end_excl = end_off_incl + 1 + while offset < end_excl: + to_read = min(CHUNK_SIZE, end_excl - offset) + data = conn.pread(to_read, offset) + if not data: + raise RuntimeError("backend returned empty read") + try: + self.wfile.write(data) + except BrokenPipeError: + logging.info("GET client disconnected image_id=%s at=%d", image_id, offset) + break + offset += len(data) + bytes_sent += len(data) + except Exception as e: + # If headers already sent, we can't return JSON reliably; just log. + logging.warning("GET error image_id=%s err=%r", image_id, e) + try: + if not self.wfile.closed: + self.close_connection = True + except Exception: + pass + finally: + _READ_SEM.release() + lock.release() + dur = _now_s() - start + logging.info( + "GET end image_id=%s bytes=%d duration_s=%.3f", image_id, bytes_sent, dur + ) + + def _handle_put_image(self, image_id: str, cfg: Dict[str, Any], content_length: int) -> None: + lock = _get_image_lock(image_id) + if not lock.acquire(blocking=False): + self._send_error_json(HTTPStatus.CONFLICT, "image busy") + return + + if not _WRITE_SEM.acquire(blocking=False): + lock.release() + self._send_error_json(HTTPStatus.SERVICE_UNAVAILABLE, "too many parallel writes") + return + + start = _now_s() + bytes_written = 0 + try: + logging.info("PUT start image_id=%s content_length=%d", image_id, content_length) + with _NbdConn(cfg["host"], int(cfg["port"]), cfg.get("export")) as conn: + size = conn.size() + if content_length != size: + self._send_error_json( + HTTPStatus.BAD_REQUEST, + f"Content-Length must equal image size ({size})", + ) + return + + offset = 0 + remaining = content_length + while remaining > 0: + chunk = self.rfile.read(min(CHUNK_SIZE, remaining)) + if not chunk: + self._send_error_json( + HTTPStatus.BAD_REQUEST, + f"request body ended early at {offset} bytes", + ) + return + conn.pwrite(chunk, offset) + offset += len(chunk) + remaining -= len(chunk) + bytes_written += len(chunk) + + # POC-level: do not auto-flush on PUT; expose explicit /flush endpoint. + self._send_json(HTTPStatus.OK, {"ok": True, "bytes_written": bytes_written}) + except Exception as e: + logging.warning("PUT error image_id=%s err=%r", image_id, e) + self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error") + finally: + _WRITE_SEM.release() + lock.release() + dur = _now_s() - start + logging.info( + "PUT end image_id=%s bytes=%d duration_s=%.3f", image_id, bytes_written, dur + ) + + def _handle_get_extents(self, image_id: str, cfg: Dict[str, Any]) -> None: + # Keep deterministic and simple (POC): report entire image allocated. + # No per-image lock required by spec, but we still take it to avoid racing + # with a write and to keep behavior consistent. + lock = _get_image_lock(image_id) + if not lock.acquire(blocking=False): + self._send_error_json(HTTPStatus.CONFLICT, "image busy") + return + + start = _now_s() + try: + logging.info("EXTENTS start image_id=%s", image_id) + with _NbdConn(cfg["host"], int(cfg["port"]), cfg.get("export")) as conn: + size = conn.size() + self._send_json( + HTTPStatus.OK, + [{"start": 0, "length": size, "allocated": True}], + ) + except Exception as e: + logging.warning("EXTENTS error image_id=%s err=%r", image_id, e) + self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error") + finally: + lock.release() + dur = _now_s() - start + logging.info("EXTENTS end image_id=%s duration_s=%.3f", image_id, dur) + + def _handle_post_flush(self, image_id: str, cfg: Dict[str, Any]) -> None: + lock = _get_image_lock(image_id) + if not lock.acquire(blocking=False): + self._send_error_json(HTTPStatus.CONFLICT, "image busy") + return + + start = _now_s() + try: + logging.info("FLUSH start image_id=%s", image_id) + with _NbdConn(cfg["host"], int(cfg["port"]), cfg.get("export")) as conn: + conn.flush() + self._send_json(HTTPStatus.OK, {"ok": True}) + except Exception as e: + logging.warning("FLUSH error image_id=%s err=%r", image_id, e) + self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error") + finally: + lock.release() + dur = _now_s() - start + logging.info("FLUSH end image_id=%s duration_s=%.3f", image_id, dur) + + +def main() -> None: + parser = argparse.ArgumentParser(description="POC imageio-like HTTP server backed by NBD") + parser.add_argument("--listen", default="127.0.0.1", help="Address to bind") + parser.add_argument("--port", type=int, default=54323, help="Port to listen on") + args = parser.parse_args() + + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(message)s", + ) + + addr = (args.listen, args.port) + httpd = ThreadingHTTPServer(addr, Handler) + logging.info("listening on http://%s:%d", args.listen, args.port) + logging.info("image configs are read from %s/", _CFG_DIR) + httpd.serve_forever() + + +if __name__ == "__main__": + main()