diff --git a/scripts/vm/hypervisor/kvm/imageserver/tests/test_base.py b/scripts/vm/hypervisor/kvm/imageserver/tests/test_base.py index 91e7eda79ed..c322a992047 100644 --- a/scripts/vm/hypervisor/kvm/imageserver/tests/test_base.py +++ b/scripts/vm/hypervisor/kvm/imageserver/tests/test_base.py @@ -20,19 +20,21 @@ Shared infrastructure for the image-server test suite (stdlib unittest only). Provides: - A singleton image server process started once for the entire test run. -- Control-socket helpers using pure-Python AF_UNIX (no socat). +- Server stdout/stderr appended to ``/imageserver.log``. +- On shutdown: stop the child process, close the log handle, unlink the control socket; + the temp directory and ``imageserver.log`` are left on disk. +- Control-socket helpers using pure-Python AF_UNIX. - qemu-nbd server management. - Transfer registration / teardown helpers. - HTTP helper functions. """ +import atexit import functools import json import logging import os import random -import select -import shutil import signal import socket import subprocess @@ -42,7 +44,7 @@ import time import unittest import uuid from pathlib import Path -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, TextIO IMAGE_SIZE = 1 * 1024 * 1024 # 1 MiB SERVER_STARTUP_TIMEOUT = 10 @@ -87,6 +89,9 @@ def test_timeout(seconds): _tmp_dir: Optional[str] = None _server_proc: Optional[subprocess.Popen] = None _server_info: Optional[Dict[str, Any]] = None +_server_log_fp: Optional[TextIO] = None +_server_log_path: Optional[str] = None +_atexit_registered: bool = False def _free_port() -> int: @@ -158,9 +163,21 @@ def get_tmp_dir() -> str: return _tmp_dir +def _read_log_tail(path: str, max_bytes: int = 65536) -> str: + """Return up to *max_bytes* of UTF-8 text from the end of *path*.""" + try: + with open(path, "rb") as f: + f.seek(0, os.SEEK_END) + size = f.tell() + f.seek(max(0, size - max_bytes)) + return f.read().decode("utf-8", errors="replace") + except OSError as e: + return f"(could not read log: {e})" + + def get_image_server() -> Dict[str, Any]: """Return the singleton image-server info dict, starting it if needed.""" - global _server_proc, _server_info + global _server_proc, _server_info, _server_log_fp, _server_log_path, _atexit_registered if _server_info is not None: return _server_info @@ -168,6 +185,8 @@ def get_image_server() -> Dict[str, Any]: tmp = get_tmp_dir() port = _free_port() ctrl_sock = os.path.join(tmp, "ctrl.sock") + log_path = os.path.join(tmp, "imageserver.log") + _server_log_path = log_path imageserver_pkg = str(Path(__file__).resolve().parent.parent) parent_dir = str(Path(imageserver_pkg).parent) @@ -175,6 +194,17 @@ def get_image_server() -> Dict[str, Any]: env = os.environ.copy() env["PYTHONPATH"] = parent_dir + os.pathsep + env.get("PYTHONPATH", "") + _server_log_fp = open( + log_path, "a", encoding="utf-8", buffering=1, errors="replace" + ) + try: + _server_log_fp.write( + "\n========== imageserver test subprocess log ==========\n" + ) + _server_log_fp.flush() + except OSError: + pass + proc = subprocess.Popen( [ sys.executable, "-m", "imageserver", @@ -184,8 +214,8 @@ def get_image_server() -> Dict[str, Any]: ], cwd=parent_dir, env=env, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, + stdout=_server_log_fp, + stderr=_server_log_fp, ) _server_proc = proc @@ -193,9 +223,26 @@ def get_image_server() -> Dict[str, Any]: _wait_for_control_socket(ctrl_sock) except RuntimeError: proc.kill() - stdout, stderr = proc.communicate(timeout=5) + try: + proc.wait(timeout=5) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait(timeout=5) + try: + _server_log_fp.flush() + except OSError: + pass + tail = _read_log_tail(log_path) + try: + _server_log_fp.close() + except OSError: + pass + _server_log_fp = None + _server_proc = None raise RuntimeError( - f"Image server failed to start.\nstdout: {stdout.decode()}\nstderr: {stderr.decode()}" + "Image server failed to start.\n" + f"Log file: {log_path}\n" + f"--- log tail ---\n{tail}" ) def send(msg: dict) -> dict: @@ -206,19 +253,25 @@ def get_image_server() -> Dict[str, Any]: "port": port, "ctrl_sock": ctrl_sock, "send": send, + "imageserver_log": log_path, } + if not _atexit_registered: + atexit.register(shutdown_image_server) + _atexit_registered = True + sys.stdout.write( + "\n[IMAGESERVER_TEST] child image server log file: %s\n\n" % log_path + ) + sys.stdout.flush() return _server_info def shutdown_image_server() -> None: - global _server_proc, _server_info, _tmp_dir + global _server_proc, _server_info, _tmp_dir, _server_log_fp, _server_log_path + ctrl_sock: Optional[str] = None + if _server_info is not None: + ctrl_sock = _server_info.get("ctrl_sock") + if _server_proc is not None: - for pipe in (_server_proc.stdout, _server_proc.stderr): - if pipe: - try: - pipe.close() - except Exception: - pass _server_proc.terminate() try: _server_proc.wait(timeout=5) @@ -226,33 +279,59 @@ def shutdown_image_server() -> None: _server_proc.kill() _server_proc.wait(timeout=5) _server_proc = None + if _server_log_fp is not None: + try: + _server_log_fp.flush() + _server_log_fp.close() + except OSError: + pass + _server_log_fp = None _server_info = None - if _tmp_dir is not None: - shutil.rmtree(_tmp_dir, ignore_errors=True) - _tmp_dir = None + _server_log_path = None + + if ctrl_sock: + try: + os.unlink(ctrl_sock) + except FileNotFoundError: + pass + + # Leave temp dir and imageserver.log on disk for debugging; clear pointer only. + _tmp_dir = None # ── qemu-nbd server ──────────────────────────────────────────────────── class QemuNbdServer: - """Manages a qemu-nbd process exporting a raw image over a Unix socket.""" + """Manages a qemu-nbd process exporting a disk image over a Unix socket.""" - def __init__(self, image_path: str, socket_path: str, image_size: int = IMAGE_SIZE): + def __init__( + self, + image_path: str, + socket_path: str, + image_size: int = IMAGE_SIZE, + image_format: str = "raw", + ): self.image_path = image_path self.socket_path = socket_path self.image_size = image_size + self.image_format = image_format self._proc: Optional[subprocess.Popen] = None def start(self) -> None: if not os.path.exists(self.image_path): - with open(self.image_path, "wb") as f: - f.truncate(self.image_size) + if self.image_format == "raw": + with open(self.image_path, "wb") as f: + f.truncate(self.image_size) + else: + raise FileNotFoundError( + f"disk image not found for format {self.image_format!r}: {self.image_path}" + ) self._proc = subprocess.Popen( [ "qemu-nbd", "--socket", self.socket_path, - "--format", "raw", + "--format", self.image_format, "--persistent", "--shared=8", "--cache=none", @@ -355,6 +434,43 @@ def make_nbd_transfer(image_size=IMAGE_SIZE): return transfer_id, url, server, cleanup +def make_nbd_transfer_existing_disk(image_path: str, image_format: str = "qcow2"): + """ + Start qemu-nbd for an existing on-disk image (e.g. qcow2) and register a transfer. + + Does not delete *image_path* on cleanup (only the Unix socket under tmp). + + Returns (transfer_id, url, QemuNbdServer, cleanup_callable). + """ + srv = get_image_server() + tmp = get_tmp_dir() + sock_path = os.path.join(tmp, f"nbd_{uuid.uuid4().hex[:8]}.sock") + + server = QemuNbdServer( + image_path, sock_path, image_format=image_format + ) + server.start() + + transfer_id = f"nbd-{uuid.uuid4().hex[:8]}" + resp = srv["send"]({ + "action": "register", + "transfer_id": transfer_id, + "config": {"backend": "nbd", "socket": sock_path}, + }) + assert resp["status"] == "ok", f"register failed: {resp}" + url = f"{srv['base_url']}/images/{transfer_id}" + + def cleanup(): + srv["send"]({"action": "unregister", "transfer_id": transfer_id}) + server.stop() + try: + os.unlink(sock_path) + except FileNotFoundError: + pass + + return transfer_id, url, server, cleanup + + # ── HTTP helpers ──────────────────────────────────────────────────────── import urllib.request @@ -425,16 +541,23 @@ class ImageServerTestCase(unittest.TestCase): return make_nbd_transfer() @staticmethod - def dump_server_logs(): - """Read any available server stderr and print it for post-mortem debugging.""" - if _server_proc is None or _server_proc.stderr is None: + def dump_server_logs(max_bytes: int = 256 * 1024): + """Print a tail of the image-server log file (shared by all tests in the run).""" + path = _server_log_path + if not path or not os.path.isfile(path): return try: - if select.select([_server_proc.stderr], [], [], 0)[0]: - data = _server_proc.stderr.read1(64 * 1024) - if data: - sys.stderr.write("\n=== IMAGE SERVER STDERR ===\n") - sys.stderr.write(data.decode(errors="replace")) - sys.stderr.write("\n=== END SERVER STDERR ===\n") + if _server_log_fp is not None: + _server_log_fp.flush() + except OSError: + pass + try: + data = _read_log_tail(path, max_bytes=max_bytes) + if data.strip(): + sys.stderr.write("\n=== IMAGE SERVER LOG (tail) ===\n") + sys.stderr.write(data) + if not data.endswith("\n"): + sys.stderr.write("\n") + sys.stderr.write("=== END IMAGE SERVER LOG ===\n") except Exception: pass diff --git a/scripts/vm/hypervisor/kvm/imageserver/tests/test_nbd_backend.py b/scripts/vm/hypervisor/kvm/imageserver/tests/test_nbd_backend.py index 4c0e66003b3..da120ae6bad 100644 --- a/scripts/vm/hypervisor/kvm/imageserver/tests/test_nbd_backend.py +++ b/scripts/vm/hypervisor/kvm/imageserver/tests/test_nbd_backend.py @@ -18,19 +18,27 @@ """Tests for HTTP operations against an NBD-backend transfer (real qemu-nbd).""" import json +import os +import subprocess import unittest +import uuid import urllib.error import urllib.request +from concurrent.futures import ThreadPoolExecutor + +from imageserver.constants import MAX_PARALLEL_READS, MAX_PARALLEL_WRITES from .test_base import ( IMAGE_SIZE, ImageServerTestCase, + get_tmp_dir, http_get, http_options, http_patch, http_post, http_put, make_nbd_transfer, + make_nbd_transfer_existing_disk, randbytes, shutdown_image_server, ) @@ -322,6 +330,311 @@ class TestExtents(NbdBackendTestCase): self.assertEqual(total, IMAGE_SIZE) +def _allocated_subranges(extents, granularity): + """Split each non-hole extent (zero=False) into [start, end] inclusive byte ranges.""" + out = [] + for ext in extents: + if ext.get("zero"): + continue + start = int(ext["start"]) + length = int(ext["length"]) + pos = start + end_abs = start + length + while pos < end_abs: + chunk_end = min(pos + granularity, end_abs) + out.append((pos, chunk_end - 1)) + pos = chunk_end + return out + + +def _qemu_img_virtual_size(path: str) -> int: + """Return virtual size in bytes (requires ``qemu-img`` on PATH).""" + # stdout=PIPE + universal_newlines: Python 3.6 compatible (no capture_output/text). + cp = subprocess.run( + ["qemu-img", "info", "--output=json", path], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + check=True, + ) + return int(json.loads(cp.stdout)["virtual-size"]) + + +def _http_error_detail(exc: urllib.error.HTTPError) -> str: + """Build a readable message from an ``HTTPError`` (status, url, JSON/text body).""" + parts = ["HTTP %s %r" % (exc.code, exc.reason), "url=%r" % getattr(exc, "url", "")] + try: + if exc.fp is not None: + raw = exc.fp.read() + if raw: + text = raw.decode("utf-8", errors="replace") + parts.append("response_body=%r" % (text,)) + except Exception as read_err: + parts.append("read_body_error=%r" % (read_err,)) + return "; ".join(parts) + + +def _http_get_checked( + url, + headers=None, + expected_status=200, + label="GET", +): + """ + Like ``http_get`` but raises ``AssertionError`` with ``_http_error_detail`` on failure. + """ + try: + resp = http_get(url, headers=headers) + except urllib.error.HTTPError as e: + raise AssertionError( + "%s failed for %r: %s" % (label, url, _http_error_detail(e)) + ) from e + if resp.status != expected_status: + body = resp.read() + raise AssertionError( + "%s %r: expected HTTP %s, got %s; body=%r" + % (label, url, expected_status, resp.status, body) + ) + return resp + + +def _http_put_checked(url, data, headers, label="PUT"): + try: + resp = http_put(url, data, headers=headers) + except urllib.error.HTTPError as e: + raise AssertionError( + "%s failed for %r: %s" % (label, url, _http_error_detail(e)) + ) from e + body = resp.read() + if resp.status != 200: + raise AssertionError( + "%s %r: expected HTTP 200, got %s; body=%r" + % (label, url, resp.status, body) + ) + return resp, body + + +def _http_post_checked(url, data=b"", headers=None, label="POST"): + try: + resp = http_post(url, data=data, headers=headers) + except urllib.error.HTTPError as e: + raise AssertionError( + "%s failed for %r: %s" % (label, url, _http_error_detail(e)) + ) from e + body = resp.read() + if resp.status != 200: + raise AssertionError( + "%s %r: expected HTTP 200, got %s; body=%r" + % (label, url, resp.status, body) + ) + return resp, body + + +class TestQcow2ExtentsParallelReads(ImageServerTestCase): + """ + Optional integration tests: export a user-supplied qcow2 via qemu-nbd, fetch + allocation extents, parallel range GETs over allocated regions, and (second + test) per-range GET-then-PUT pipeline with ``min(MAX_PARALLEL_READS, + MAX_PARALLEL_WRITES)`` workers. + + Requires ``qemu-img`` and ``qemu-nbd`` on PATH. + + Set IMAGESERVER_TEST_QCOW2 to the absolute path of a qcow2 file. + Optional: IMAGESERVER_TEST_QCOW2_READ_GRANULARITY — byte step (default 4 MiB). + """ + + def setUp(self): + super().setUp() + self._qcow2_path = os.environ.get("IMAGESERVER_TEST_QCOW2", "").strip() + if not self._qcow2_path or not os.path.isfile(self._qcow2_path): + self.skipTest( + "Set IMAGESERVER_TEST_QCOW2 to an existing qcow2 path to run this test" + ) + raw_g = os.environ.get("IMAGESERVER_TEST_QCOW2_READ_GRANULARITY", "").strip() + self._read_granularity = int(raw_g) if raw_g else 4 * 1024 * 1024 + if self._read_granularity <= 0: + self.skipTest("IMAGESERVER_TEST_QCOW2_READ_GRANULARITY must be positive") + + def test_parallel_range_reads_allocated_extents(self): + _, url, _, cleanup = make_nbd_transfer_existing_disk( + self._qcow2_path, "qcow2" + ) + try: + resp = _http_get_checked( + "%s/extents" % (url,), + expected_status=200, + label="GET /extents", + ) + extents = json.loads(resp.read()) + self.assertIsInstance(extents, list) + ranges = _allocated_subranges(extents, self._read_granularity) + if not ranges: + self.skipTest("no allocated extents (all holes/zero) in qcow2") + + def fetch(span): + start_b, end_b = span + range_hdr = "bytes=%s-%s" % (start_b, end_b) + r = _http_get_checked( + url, + headers={"Range": range_hdr}, + expected_status=206, + label="Range GET %s" % (range_hdr,), + ) + data = r.read() + expected_len = end_b - start_b + 1 + if len(data) != expected_len: + raise AssertionError( + "Range GET %s: got %d bytes, expected %d (url=%r)" + % (range_hdr, len(data), expected_len, url) + ) + + with ThreadPoolExecutor(max_workers=MAX_PARALLEL_READS) as pool: + pool.map(fetch, ranges) + finally: + cleanup() + + def test_parallel_reads_then_put_range_copy_matches_source(self): + """ + Create an empty qcow2 with the same virtual size as the source, copy every + allocated range using one worker pool: for each span, Range GET from src + then Content-Range PUT to dest. + Worker count is ``min(MAX_PARALLEL_READS, MAX_PARALLEL_WRITES)`` so each + worker holds at most one chunk. + """ + src_path = self._qcow2_path + try: + vsize = _qemu_img_virtual_size(src_path) + except (FileNotFoundError, subprocess.CalledProcessError, KeyError, json.JSONDecodeError, TypeError, ValueError) as e: + self.skipTest(f"qemu-img info failed: {e}") + + tmp = get_tmp_dir() + dest_path = os.path.join(tmp, f"qcow2_copy_{uuid.uuid4().hex[:8]}.qcow2") + try: + subprocess.run( + ["qemu-img", "create", "-f", "qcow2", dest_path, str(vsize)], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + check=True, + ) + except (FileNotFoundError, subprocess.CalledProcessError) as e: + self.skipTest(f"qemu-img create failed: {e}") + + _, src_url, _, cleanup_src = make_nbd_transfer_existing_disk( + src_path, "qcow2" + ) + _, dest_url, _, cleanup_dest = make_nbd_transfer_existing_disk( + dest_path, "qcow2" + ) + try: + resp = _http_get_checked( + "%s/extents" % (src_url,), + expected_status=200, + label="GET src /extents", + ) + extents = json.loads(resp.read()) + ranges = _allocated_subranges(extents, self._read_granularity) + if not ranges: + self.skipTest("no allocated extents (all holes/zero) in qcow2") + + transfer_workers = max( + 1, min(MAX_PARALLEL_READS, MAX_PARALLEL_WRITES) + ) + + def transfer_span(span): + start_b, end_b = span + range_hdr = "bytes=%s-%s" % (start_b, end_b) + r = _http_get_checked( + src_url, + headers={"Range": range_hdr}, + expected_status=206, + label="Range GET src %s" % (range_hdr,), + ) + data = r.read() + expected_len = end_b - start_b + 1 + if len(data) != expected_len: + raise AssertionError( + "Range GET src %s: got %d bytes, expected %d (url=%r)" + % (range_hdr, len(data), expected_len, src_url) + ) + end_inclusive = start_b + len(data) - 1 + cr = "bytes %s-%s/*" % (start_b, end_inclusive) + _put_resp, put_body = _http_put_checked( + dest_url, + data, + headers={ + "Content-Range": cr, + "Content-Length": str(len(data)), + }, + label="PUT dest %s" % (cr,), + ) + try: + body = json.loads(put_body) + except ValueError: + raise AssertionError( + "PUT dest %s: invalid JSON body=%r (url=%r)" + % (cr, put_body, dest_url) + ) + if not body.get("ok"): + raise AssertionError( + "PUT dest %s: JSON ok=false, full=%r (url=%r)" + % (cr, body, dest_url) + ) + if body.get("bytes_written") != len(data): + raise AssertionError( + "PUT dest %s: bytes_written=%r expected %d (url=%r)" + % (cr, body.get("bytes_written"), len(data), dest_url) + ) + + with ThreadPoolExecutor(max_workers=transfer_workers) as pool: + pool.map(transfer_span, ranges) + + _flush, flush_body = _http_post_checked( + "%s/flush" % (dest_url,), + label="POST dest /flush", + ) + try: + flush_json = json.loads(flush_body) + except ValueError: + raise AssertionError( + "POST dest /flush: invalid JSON body=%r (url=%r)" + % (flush_body, dest_url) + ) + if not flush_json.get("ok"): + raise AssertionError( + "POST dest /flush: ok=false, full=%r (url=%r)" + % (flush_json, dest_url) + ) + finally: + cleanup_dest() + cleanup_src() + + try: + cmp = subprocess.run( + ["qemu-img", "compare", src_path, dest_path], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + ) + self.assertEqual( + cmp.returncode, + 0, + "qemu-img compare %r vs %r failed (rc=%s): stderr=%r stdout=%r" + % ( + src_path, + dest_path, + cmp.returncode, + cmp.stderr, + cmp.stdout, + ), + ) + finally: + try: + os.unlink(dest_path) + except FileNotFoundError: + pass + + class TestErrorCases(NbdBackendTestCase): def test_patch_unsupported_op(self): payload = json.dumps({"op": "invalid"}).encode()