diff --git a/services/secondary-storage/server/src/main/java/org/apache/cloudstack/storage/resource/NfsSecondaryStorageResource.java b/services/secondary-storage/server/src/main/java/org/apache/cloudstack/storage/resource/NfsSecondaryStorageResource.java index 458eb32ca89..2358bdcc832 100644 --- a/services/secondary-storage/server/src/main/java/org/apache/cloudstack/storage/resource/NfsSecondaryStorageResource.java +++ b/services/secondary-storage/server/src/main/java/org/apache/cloudstack/storage/resource/NfsSecondaryStorageResource.java @@ -57,6 +57,7 @@ import javax.naming.ConfigurationException; import org.apache.cloudstack.backup.CreateImageTransferAnswer; import org.apache.cloudstack.backup.CreateImageTransferCommand; import org.apache.cloudstack.backup.FinalizeImageTransferCommand; +import org.apache.cloudstack.backup.ImageTransfer; import org.apache.cloudstack.framework.security.keystore.KeystoreManager; import org.apache.cloudstack.storage.NfsMountManagerImpl.PathParser; import org.apache.cloudstack.storage.command.CopyCmdAnswer; @@ -3839,10 +3840,8 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S } final String transferId = cmd.getTransferId(); - final String hostIp = cmd.getHostIpAddress(); - final String exportName = cmd.getExportName(); - final int nbdPort = cmd.getNbdPort(); + final ImageTransfer.Backend backend = cmd.getBackend(); if (StringUtils.isBlank(transferId)) { return new CreateImageTransferAnswer(cmd, false, "transferId is empty."); @@ -3850,18 +3849,25 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S if (StringUtils.isBlank(hostIp)) { return new CreateImageTransferAnswer(cmd, false, "hostIpAddress is empty."); } - if (StringUtils.isBlank(exportName)) { - return new CreateImageTransferAnswer(cmd, false, "exportName is empty."); - } - if (nbdPort <= 0) { - return new CreateImageTransferAnswer(cmd, false, "Invalid nbdPort: " + nbdPort); - } - final int imageServerPort = 54323; + final Map payload = new HashMap<>(); + payload.put("backend", backend.toString()); - try { - // 1) Write /tmp/ with NBD endpoint details. - final Map payload = new HashMap<>(); + if (backend == ImageTransfer.Backend.file) { + final String filePath = cmd.getFile(); + if (StringUtils.isBlank(filePath)) { + return new CreateImageTransferAnswer(cmd, false, "file path is empty for file backend."); + } + payload.put("file", filePath); + } else { + final String exportName = cmd.getExportName(); + final int nbdPort = cmd.getNbdPort(); + if (StringUtils.isBlank(exportName)) { + return new CreateImageTransferAnswer(cmd, false, "exportName is empty."); + } + if (nbdPort <= 0) { + return new CreateImageTransferAnswer(cmd, false, "Invalid nbdPort: " + nbdPort); + } payload.put("host", hostIp); payload.put("port", nbdPort); payload.put("export", exportName); @@ -3869,7 +3875,9 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S if (checkpointId != null) { payload.put("export_bitmap", exportName + "-" + checkpointId.substring(0, 4)); } + } + try { final String json = new GsonBuilder().create().toJson(payload); File dir = new File("/tmp/imagetransfer"); if (!dir.exists()) { @@ -3883,6 +3891,7 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S return new CreateImageTransferAnswer(cmd, false, "Failed to prepare image transfer on SSVM: " + e.getMessage()); } + final int imageServerPort = 54323; startImageServerIfNotRunning(imageServerPort); final String transferUrl = String.format("http://%s:%d/images/%s", _publicIp, imageServerPort, transferId); diff --git a/systemvm/debian/opt/cloud/bin/image_server.py b/systemvm/debian/opt/cloud/bin/image_server.py index 848eb41983c..a176513698c 100644 --- a/systemvm/debian/opt/cloud/bin/image_server.py +++ b/systemvm/debian/opt/cloud/bin/image_server.py @@ -17,7 +17,11 @@ # under the License. """ -POC "imageio-like" HTTP server backed by NBD over TCP. +POC "imageio-like" HTTP server backed by NBD over TCP or a local file. + +Supports two backends (see config payload): +- nbd: proxy to an NBD server (port, export, export_bitmap); supports range reads/writes, extents, zero, flush. +- file: read/write a local qcow2 (or raw) file path; full PUT only (no range writes), GET with optional ranges, flush. How to run ---------- @@ -116,9 +120,10 @@ _IMAGE_LOCKS: Dict[str, threading.Lock] = {} _IMAGE_LOCKS_GUARD = threading.Lock() -# Dynamic image_id(transferId) -> NBD export mapping: +# Dynamic image_id(transferId) -> backend mapping: # CloudStack writes a JSON file at /tmp/imagetransfer/ with: -# {"host": "...", "port": 10809, "export": "vda", "export_bitmap":"bitmap1"} +# - NBD backend: {"backend": "nbd", "host": "...", "port": 10809, "export": "vda", "export_bitmap": "..."} +# - File backend: {"backend": "file", "file": "/path/to/image.qcow2"} # # This server reads that file on-demand. _CFG_DIR = "/tmp/imagetransfer" @@ -249,26 +254,49 @@ def _load_image_cfg(image_id: str) -> Optional[Dict[str, Any]]: logging.error("cfg invalid type image_id=%s type=%s", image_id, type(obj).__name__) return None - host = obj.get("host") - port = obj.get("port") - export = obj.get("export") - export_bitmap = obj.get("export_bitmap") - if not isinstance(host, str) or not host: - logging.error("cfg missing/invalid host image_id=%s", image_id) + backend = obj.get("backend") + if backend is None: + backend = "nbd" + if not isinstance(backend, str): + logging.error("cfg invalid backend type image_id=%s", image_id) return None - try: - port_i = int(port) - except Exception: - logging.error("cfg missing/invalid port image_id=%s", image_id) - return None - if port_i <= 0 or port_i > 65535: - logging.error("cfg out-of-range port image_id=%s port=%r", image_id, port) - return None - if export is not None and (not isinstance(export, str) or not export): - logging.error("cfg missing/invalid export image_id=%s", image_id) + backend = backend.lower() + if backend not in ("nbd", "file"): + logging.error("cfg unsupported backend image_id=%s backend=%s", image_id, backend) return None - cfg: Dict[str, Any] = {"host": host, "port": port_i, "export": export, "export_bitmap": export_bitmap} + if backend == "file": + file_path = obj.get("file") + if not isinstance(file_path, str) or not file_path.strip(): + logging.error("cfg missing/invalid file path for file backend image_id=%s", image_id) + return None + cfg = {"backend": "file", "file": file_path.strip()} + else: + host = obj.get("host") + port = obj.get("port") + export = obj.get("export") + export_bitmap = obj.get("export_bitmap") + if not isinstance(host, str) or not host: + logging.error("cfg missing/invalid host image_id=%s", image_id) + return None + try: + port_i = int(port) + except Exception: + logging.error("cfg missing/invalid port image_id=%s", image_id) + return None + if port_i <= 0 or port_i > 65535: + logging.error("cfg out-of-range port image_id=%s port=%r", image_id, port) + return None + if export is not None and (not isinstance(export, str) or not export): + logging.error("cfg missing/invalid export image_id=%s", image_id) + return None + cfg = { + "backend": "nbd", + "host": host, + "port": port_i, + "export": export, + "export_bitmap": export_bitmap, + } with _CFG_CACHE_GUARD: _CFG_CACHE[safe_id] = (float(st.st_mtime), cfg) @@ -813,6 +841,9 @@ class Handler(BaseHTTPRequestHandler): def _image_cfg(self, image_id: str) -> Optional[Dict[str, Any]]: return _load_image_cfg(image_id) + def _is_file_backend(self, cfg: Dict[str, Any]) -> bool: + return cfg.get("backend") == "file" + def do_OPTIONS(self) -> None: image_id, tail = self._parse_route() if image_id is None or tail is not None: @@ -822,6 +853,19 @@ class Handler(BaseHTTPRequestHandler): if cfg is None: self._send_error_json(HTTPStatus.NOT_FOUND, "unknown image_id") return + if self._is_file_backend(cfg): + # File backend: full PUT only, no range writes; GET with ranges allowed; flush supported. + allowed_methods = "GET, PUT, POST, OPTIONS" + features = ["flush"] + max_writers = MAX_PARALLEL_WRITES + response = { + "unix_socket": None, + "features": features, + "max_readers": MAX_PARALLEL_READS, + "max_writers": max_writers, + } + self._send_json(HTTPStatus.OK, response, allowed_methods=allowed_methods) + return # Query NBD backend for capabilities (like nbdinfo); fall back to config. read_only = True can_flush = False @@ -876,6 +920,11 @@ class Handler(BaseHTTPRequestHandler): return if tail == "extents": + if self._is_file_backend(cfg): + self._send_error_json( + HTTPStatus.BAD_REQUEST, "extents not supported for file backend" + ) + return query = self._parse_query() context = (query.get("context") or [None])[0] self._handle_get_extents(image_id, cfg, context=context) @@ -945,6 +994,12 @@ class Handler(BaseHTTPRequestHandler): if cfg is None: self._send_error_json(HTTPStatus.NOT_FOUND, "unknown image_id") return + if self._is_file_backend(cfg): + self._send_error_json( + HTTPStatus.BAD_REQUEST, + "range writes and PATCH not supported for file backend; use PUT for full upload", + ) + return content_type = self.headers.get("Content-Type", "").split(";")[0].strip().lower() range_header = self.headers.get("Range") @@ -1057,9 +1112,14 @@ class Handler(BaseHTTPRequestHandler): bytes_sent = 0 try: logging.info("GET start image_id=%s range=%s", image_id, range_header or "-") - with _NbdConn(cfg["host"], int(cfg["port"]), cfg.get("export")) as conn: - size = conn.size() - + if self._is_file_backend(cfg): + file_path = cfg["file"] + try: + size = os.path.getsize(file_path) + except OSError as e: + logging.error("GET file size error image_id=%s path=%s err=%r", image_id, file_path, e) + self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "failed to access file") + return start_off = 0 end_off_incl = size - 1 if size > 0 else -1 status = HTTPStatus.OK @@ -1089,18 +1149,65 @@ class Handler(BaseHTTPRequestHandler): offset = start_off end_excl = end_off_incl + 1 - while offset < end_excl: - to_read = min(CHUNK_SIZE, end_excl - offset) - data = conn.pread(to_read, offset) - if not data: - raise RuntimeError("backend returned empty read") - try: - self.wfile.write(data) - except BrokenPipeError: - logging.info("GET client disconnected image_id=%s at=%d", image_id, offset) - break - offset += len(data) - bytes_sent += len(data) + with open(file_path, "rb") as f: + f.seek(offset) + while offset < end_excl: + to_read = min(CHUNK_SIZE, end_excl - offset) + data = f.read(to_read) + if not data: + break + try: + self.wfile.write(data) + except BrokenPipeError: + logging.info("GET client disconnected image_id=%s at=%d", image_id, offset) + break + offset += len(data) + bytes_sent += len(data) + else: + with _NbdConn(cfg["host"], int(cfg["port"]), cfg.get("export")) as conn: + size = conn.size() + + start_off = 0 + end_off_incl = size - 1 if size > 0 else -1 + status = HTTPStatus.OK + content_length = size + if range_header is not None: + try: + start_off, end_off_incl = self._parse_single_range(range_header, size) + except ValueError as e: + if str(e) == "unsatisfiable": + self._send_range_not_satisfiable(size) + return + if "unsatisfiable" in str(e): + self._send_range_not_satisfiable(size) + return + self._send_error_json(HTTPStatus.BAD_REQUEST, "invalid Range header") + return + status = HTTPStatus.PARTIAL_CONTENT + content_length = (end_off_incl - start_off) + 1 + + self.send_response(status) + self._send_imageio_headers() + self.send_header("Content-Type", "application/octet-stream") + self.send_header("Content-Length", str(content_length)) + if status == HTTPStatus.PARTIAL_CONTENT: + self.send_header("Content-Range", f"bytes {start_off}-{end_off_incl}/{size}") + self.end_headers() + + offset = start_off + end_excl = end_off_incl + 1 + while offset < end_excl: + to_read = min(CHUNK_SIZE, end_excl - offset) + data = conn.pread(to_read, offset) + if not data: + raise RuntimeError("backend returned empty read") + try: + self.wfile.write(data) + except BrokenPipeError: + logging.info("GET client disconnected image_id=%s at=%d", image_id, offset) + break + offset += len(data) + bytes_sent += len(data) except Exception as e: # If headers already sent, we can't return JSON reliably; just log. logging.error("GET error image_id=%s err=%r", image_id, e) @@ -1132,24 +1239,41 @@ class Handler(BaseHTTPRequestHandler): bytes_written = 0 try: logging.info("PUT start image_id=%s content_length=%d", image_id, content_length) - with _NbdConn(cfg["host"], int(cfg["port"]), cfg.get("export")) as conn: - offset = 0 + if self._is_file_backend(cfg): + file_path = cfg["file"] remaining = content_length - while remaining > 0: - chunk = self.rfile.read(min(CHUNK_SIZE, remaining)) - if not chunk: - self._send_error_json( - HTTPStatus.BAD_REQUEST, - f"request body ended early at {offset} bytes", - ) - return - conn.pwrite(chunk, offset) - offset += len(chunk) - remaining -= len(chunk) - bytes_written += len(chunk) - - # POC-level: do not auto-flush on PUT; expose explicit /flush endpoint. + with open(file_path, "wb") as f: + while remaining > 0: + chunk = self.rfile.read(min(CHUNK_SIZE, remaining)) + if not chunk: + self._send_error_json( + HTTPStatus.BAD_REQUEST, + f"request body ended early at {bytes_written} bytes", + ) + return + f.write(chunk) + bytes_written += len(chunk) + remaining -= len(chunk) self._send_json(HTTPStatus.OK, {"ok": True, "bytes_written": bytes_written}) + else: + with _NbdConn(cfg["host"], int(cfg["port"]), cfg.get("export")) as conn: + offset = 0 + remaining = content_length + while remaining > 0: + chunk = self.rfile.read(min(CHUNK_SIZE, remaining)) + if not chunk: + self._send_error_json( + HTTPStatus.BAD_REQUEST, + f"request body ended early at {offset} bytes", + ) + return + conn.pwrite(chunk, offset) + offset += len(chunk) + remaining -= len(chunk) + bytes_written += len(chunk) + + # POC-level: do not auto-flush on PUT; expose explicit /flush endpoint. + self._send_json(HTTPStatus.OK, {"ok": True, "bytes_written": bytes_written}) except Exception as e: logging.error("PUT error image_id=%s err=%r", image_id, e) self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error") @@ -1244,9 +1368,16 @@ class Handler(BaseHTTPRequestHandler): start = _now_s() try: logging.info("FLUSH start image_id=%s", image_id) - with _NbdConn(cfg["host"], int(cfg["port"]), cfg.get("export")) as conn: - conn.flush() - self._send_json(HTTPStatus.OK, {"ok": True}) + if self._is_file_backend(cfg): + file_path = cfg["file"] + with open(file_path, "rb") as f: + f.flush() + os.fsync(f.fileno()) + self._send_json(HTTPStatus.OK, {"ok": True}) + else: + with _NbdConn(cfg["host"], int(cfg["port"]), cfg.get("export")) as conn: + conn.flush() + self._send_json(HTTPStatus.OK, {"ok": True}) except Exception as e: logging.error("FLUSH error image_id=%s err=%r", image_id, e) self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error")