diff --git a/scripts/vm/hypervisor/kvm/image_server.py b/scripts/vm/hypervisor/kvm/image_server.py index 5119b8b7e6a..891bac5bf53 100644 --- a/scripts/vm/hypervisor/kvm/image_server.py +++ b/scripts/vm/hypervisor/kvm/image_server.py @@ -20,8 +20,12 @@ POC "imageio-like" HTTP server backed by NBD over Unix socket or a local file. Supports two backends (see config payload): -- nbd: proxy to an NBD server via Unix socket (socket path, export, export_bitmap); supports range reads/writes, extents, zero, flush. -- file: read/write a local qcow2 (or raw) file path; full PUT only (no range writes), GET with optional ranges, flush. +- nbd: proxy to an NBD server via Unix socket (socket path, export, export_bitmap); + supports range reads/writes (GET/PUT/PATCH), extents, zero, flush. PUT accepts + full upload or ranged upload (Content-Range). Concurrent PUTs on the same image + are serialized (blocking). +- file: read/write a local qcow2 (or raw) file path; full PUT only (no range + writes), GET with optional ranges, flush. How to run ---------- @@ -44,8 +48,16 @@ Example curl commands - GET a byte range: curl -v -H "Range: bytes=0-1048575" http://127.0.0.1:54323/images/demo -o first_1MiB.bin -- PUT full image (Content-Length must equal export size exactly): +- PUT full image (Content-Length must equal export size exactly). Optional ?flush=y|n: curl -v -T demo.img http://127.0.0.1:54323/images/demo + curl -v -T demo.img "http://127.0.0.1:54323/images/demo?flush=y" + +- PUT ranged (NBD backend only). Content-Range: bytes start-end/* or bytes start-end/size + (server uses only start; length from Content-Length). Optional ?flush=y|n: + curl -v -X PUT -H "Content-Range: bytes 0-1048575/*" -H "Content-Length: 1048576" \ + --data-binary @chunk.bin http://127.0.0.1:54323/images/demo + curl -v -X PUT -H "Content-Range: bytes 1048576-2097151/52428800" -H "Content-Length: 1048576" \ + --data-binary @chunk2.bin "http://127.0.0.1:54323/images/demo?flush=n" - GET extents (zero/hole extents from NBD base:allocation): curl -s http://127.0.0.1:54323/images/demo/extents | jq . @@ -89,6 +101,7 @@ import argparse import json import logging import os +import re import socket import threading import time @@ -608,6 +621,9 @@ class Handler(BaseHTTPRequestHandler): server_version = "imageio-poc/0.1" server_protocol = "HTTP/1.1" + # Accept both "bytes start-end/*" and "bytes start-end/size"; we only use start. + _CONTENT_RANGE_RE = re.compile(r"^bytes\s+(\d+)-(\d+)/(?:\*|\d+)$") + # Keep BaseHTTPRequestHandler from printing noisy default logs def log_message(self, fmt: str, *args: Any) -> None: logging.info("%s - - %s", self.address_string(), fmt % args) @@ -740,6 +756,23 @@ class Handler(BaseHTTPRequestHandler): return None, None return image_id, tail + def _parse_content_range(self, header: str) -> Tuple[int, int]: + """ + Parse Content-Range header "bytes start-end/*" or "bytes start-end/size" + and return (start, end_inclusive). Raises ValueError on invalid input. + """ + if not header: + raise ValueError("empty Content-Range") + m = self._CONTENT_RANGE_RE.match(header.strip()) + if not m: + raise ValueError("invalid Content-Range") + start_s, end_s = m.groups() + start = int(start_s, 10) + end = int(end_s, 10) + if start < 0 or end < start: + raise ValueError("invalid Content-Range range") + return start, end + def _parse_query(self) -> Dict[str, List[str]]: """Parse query string from self.path into a dict of name -> list of values.""" if "?" not in self.path: @@ -855,9 +888,11 @@ class Handler(BaseHTTPRequestHandler): self._send_error_json(HTTPStatus.NOT_FOUND, "unknown image_id") return - if self.headers.get("Range") is not None or self.headers.get("Content-Range") is not None: + # For PUT we only support Content-Range (for NBD backend); Range is rejected. + if self.headers.get("Range") is not None: self._send_error_json( - HTTPStatus.BAD_REQUEST, "Range/Content-Range not supported; full writes only" + HTTPStatus.BAD_REQUEST, + "Range header not supported for PUT; use Content-Range or PATCH", ) return @@ -874,7 +909,24 @@ class Handler(BaseHTTPRequestHandler): self._send_error_json(HTTPStatus.BAD_REQUEST, "Invalid Content-Length") return - self._handle_put_image(image_id, cfg, content_length) + # Optional flush=y|n query parameter. + query = self._parse_query() + flush_param = (query.get("flush") or ["n"])[0].lower() + flush = flush_param in ("y", "yes", "true", "1") + + content_range_hdr = self.headers.get("Content-Range") + if content_range_hdr is not None: + if self._is_file_backend(cfg): + self._send_error_json( + HTTPStatus.BAD_REQUEST, + "Content-Range PUT not supported for file backend; use full PUT", + ) + return + self._handle_put_range(image_id, cfg, content_range_hdr, content_length, flush) + return + + # No Content-Range: full image PUT. + self._handle_put_image(image_id, cfg, content_length, flush) def do_POST(self) -> None: image_id, tail = self._parse_route() @@ -1004,7 +1056,7 @@ class Handler(BaseHTTPRequestHandler): self._handle_patch_zero(image_id, cfg, offset=offset, size=size, flush=flush) def _handle_get_image( - self, image_id: str, cfg: Dict[str, Any], range_header: Optional[str] + self, image_id: str, cfg: Dict[str, Any], range_header: Optional[str] ) -> None: if not _READ_SEM.acquire(blocking=False): self._send_error_json(HTTPStatus.SERVICE_UNAVAILABLE, "too many parallel reads") @@ -1125,11 +1177,12 @@ class Handler(BaseHTTPRequestHandler): "GET end image_id=%s bytes=%d duration_s=%.3f", image_id, bytes_sent, dur ) - def _handle_put_image(self, image_id: str, cfg: Dict[str, Any], content_length: int) -> None: + def _handle_put_image( + self, image_id: str, cfg: Dict[str, Any], content_length: int, flush: bool + ) -> None: lock = _get_image_lock(image_id) - if not lock.acquire(blocking=False): - self._send_error_json(HTTPStatus.CONFLICT, "image busy") - return + # Block until we can write this image + lock.acquire() if not _WRITE_SEM.acquire(blocking=False): lock.release() @@ -1155,7 +1208,13 @@ class Handler(BaseHTTPRequestHandler): f.write(chunk) bytes_written += len(chunk) remaining -= len(chunk) - self._send_json(HTTPStatus.OK, {"ok": True, "bytes_written": bytes_written}) + if flush: + f.flush() + os.fsync(f.fileno()) + self._send_json( + HTTPStatus.OK, + {"ok": True, "bytes_written": bytes_written, "flushed": flush}, + ) else: with _NbdConn(cfg["socket"], cfg.get("export")) as conn: offset = 0 @@ -1173,8 +1232,12 @@ class Handler(BaseHTTPRequestHandler): remaining -= len(chunk) bytes_written += len(chunk) - # POC-level: do not auto-flush on PUT; expose explicit /flush endpoint. - self._send_json(HTTPStatus.OK, {"ok": True, "bytes_written": bytes_written}) + if flush: + conn.flush() + self._send_json( + HTTPStatus.OK, + {"ok": True, "bytes_written": bytes_written, "flushed": flush}, + ) except Exception as e: logging.error("PUT error image_id=%s err=%r", image_id, e) self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error") @@ -1186,6 +1249,49 @@ class Handler(BaseHTTPRequestHandler): "PUT end image_id=%s bytes=%d duration_s=%.3f", image_id, bytes_written, dur ) + def _write_range_nbd( + self, + image_id: str, + cfg: Dict[str, Any], + start_off: int, + content_length: int, + ) -> Tuple[int, bool]: + """ + Low-level helper: write request body to NBD backend starting at start_off. + The length is taken from Content-Length. Returns (bytes_written, ok). + If ok is False, an error response was already sent. + """ + bytes_written = 0 + with _NbdConn(cfg["socket"], cfg.get("export")) as conn: + image_size = conn.size() + if start_off >= image_size: + self._send_range_not_satisfiable(image_size) + return 0, False + + # Clamp to image size: we do not allow writes beyond end of image. + max_len = image_size - start_off + if content_length > max_len: + self._send_range_not_satisfiable(image_size) + return 0, False + + offset = start_off + remaining = content_length + while remaining > 0: + chunk = self.rfile.read(min(CHUNK_SIZE, remaining)) + if not chunk: + self._send_error_json( + HTTPStatus.BAD_REQUEST, + f"request body ended early at {bytes_written} bytes", + ) + return bytes_written, False + conn.pwrite(chunk, offset) + n = len(chunk) + offset += n + remaining -= n + bytes_written += n + + return bytes_written, True + def _handle_get_extents( self, image_id: str, cfg: Dict[str, Any], context: Optional[str] = None ) -> None: @@ -1356,40 +1462,28 @@ class Handler(BaseHTTPRequestHandler): ) with _NbdConn(cfg["socket"], cfg.get("export")) as conn: image_size = conn.size() - try: - start_off, end_inclusive = self._parse_single_range( - range_header, image_size - ) - except ValueError as e: - if "unsatisfiable" in str(e).lower(): - self._send_range_not_satisfiable(image_size) - else: - self._send_error_json( - HTTPStatus.BAD_REQUEST, f"invalid Range header: {e}" - ) - return - expected_len = end_inclusive - start_off + 1 - if content_length != expected_len: + try: + start_off, end_inclusive = self._parse_single_range( + range_header, image_size + ) + except ValueError as e: + if "unsatisfiable" in str(e).lower(): + self._send_range_not_satisfiable(image_size) + else: self._send_error_json( - HTTPStatus.BAD_REQUEST, - f"Content-Length ({content_length}) must equal range length ({expected_len})", + HTTPStatus.BAD_REQUEST, f"invalid Range header: {e}" ) - return - offset = start_off - remaining = content_length - while remaining > 0: - chunk = self.rfile.read(min(CHUNK_SIZE, remaining)) - if not chunk: - self._send_error_json( - HTTPStatus.BAD_REQUEST, - f"request body ended early at {bytes_written} bytes", - ) - return - conn.pwrite(chunk, offset) - n = len(chunk) - offset += n - remaining -= n - bytes_written += n + return + expected_len = end_inclusive - start_off + 1 + if content_length != expected_len: + self._send_error_json( + HTTPStatus.BAD_REQUEST, + f"Content-Length ({content_length}) must equal range length ({expected_len})", + ) + return + bytes_written, ok = self._write_range_nbd(image_id, cfg, start_off, content_length) + if not ok: + return self._send_json(HTTPStatus.OK, {"ok": True, "bytes_written": bytes_written}) except Exception as e: logging.error("PATCH range error image_id=%s err=%r", image_id, e) @@ -1403,6 +1497,71 @@ class Handler(BaseHTTPRequestHandler): image_id, bytes_written, dur, ) + def _handle_put_range( + self, + image_id: str, + cfg: Dict[str, Any], + content_range: str, + content_length: int, + flush: bool, + ) -> None: + """Handle PUT with Content-Range: bytes start-end/* for NBD backend.""" + lock = _get_image_lock(image_id) + # Block until we can write this image. + lock.acquire() + + if not _WRITE_SEM.acquire(blocking=False): + lock.release() + self._send_error_json(HTTPStatus.SERVICE_UNAVAILABLE, "too many parallel writes") + return + + start = _now_s() + bytes_written = 0 + try: + logging.info( + "PUT range start image_id=%s Content-Range=%s content_length=%d flush=%s", + image_id, + content_range, + content_length, + flush, + ) + try: + start_off, end_inclusive = self._parse_content_range(content_range) + except ValueError as e: + self._send_error_json( + HTTPStatus.BAD_REQUEST, f"invalid Content-Range header: {e}" + ) + return + + # Per contract we only use the start byte from Content-Range; + # length comes from Content-Length. + bytes_written, ok = self._write_range_nbd(image_id, cfg, start_off, content_length) + if not ok: + return + + if flush: + with _NbdConn(cfg["socket"], cfg.get("export")) as conn: + conn.flush() + + self._send_json( + HTTPStatus.OK, + {"ok": True, "bytes_written": bytes_written, "flushed": flush}, + ) + except Exception as e: + logging.error("PUT range error image_id=%s err=%r", image_id, e) + self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error") + finally: + _WRITE_SEM.release() + lock.release() + dur = _now_s() - start + logging.info( + "PUT range end image_id=%s bytes=%d duration_s=%.3f flush=%s", + image_id, + bytes_written, + dur, + flush, + ) + def main() -> None: parser = argparse.ArgumentParser(description="POC imageio-like HTTP server backed by NBD")