Add tests for qcow2 file parallel range reads and puts

This commit is contained in:
Abhisar Sinha 2026-03-29 07:34:59 +05:30 committed by Abhishek Kumar
parent 9a7008a86e
commit 2bbbcae3a9
2 changed files with 469 additions and 33 deletions

View File

@ -20,19 +20,21 @@ Shared infrastructure for the image-server test suite (stdlib unittest only).
Provides:
- A singleton image server process started once for the entire test run.
- Control-socket helpers using pure-Python AF_UNIX (no socat).
- Server stdout/stderr appended to ``<tmp>/imageserver.log``.
- On shutdown: stop the child process, close the log handle, unlink the control socket;
the temp directory and ``imageserver.log`` are left on disk.
- Control-socket helpers using pure-Python AF_UNIX.
- qemu-nbd server management.
- Transfer registration / teardown helpers.
- HTTP helper functions.
"""
import atexit
import functools
import json
import logging
import os
import random
import select
import shutil
import signal
import socket
import subprocess
@ -42,7 +44,7 @@ import time
import unittest
import uuid
from pathlib import Path
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, TextIO
IMAGE_SIZE = 1 * 1024 * 1024 # 1 MiB
SERVER_STARTUP_TIMEOUT = 10
@ -87,6 +89,9 @@ def test_timeout(seconds):
_tmp_dir: Optional[str] = None
_server_proc: Optional[subprocess.Popen] = None
_server_info: Optional[Dict[str, Any]] = None
_server_log_fp: Optional[TextIO] = None
_server_log_path: Optional[str] = None
_atexit_registered: bool = False
def _free_port() -> int:
@ -158,9 +163,21 @@ def get_tmp_dir() -> str:
return _tmp_dir
def _read_log_tail(path: str, max_bytes: int = 65536) -> str:
"""Return up to *max_bytes* of UTF-8 text from the end of *path*."""
try:
with open(path, "rb") as f:
f.seek(0, os.SEEK_END)
size = f.tell()
f.seek(max(0, size - max_bytes))
return f.read().decode("utf-8", errors="replace")
except OSError as e:
return f"(could not read log: {e})"
def get_image_server() -> Dict[str, Any]:
"""Return the singleton image-server info dict, starting it if needed."""
global _server_proc, _server_info
global _server_proc, _server_info, _server_log_fp, _server_log_path, _atexit_registered
if _server_info is not None:
return _server_info
@ -168,6 +185,8 @@ def get_image_server() -> Dict[str, Any]:
tmp = get_tmp_dir()
port = _free_port()
ctrl_sock = os.path.join(tmp, "ctrl.sock")
log_path = os.path.join(tmp, "imageserver.log")
_server_log_path = log_path
imageserver_pkg = str(Path(__file__).resolve().parent.parent)
parent_dir = str(Path(imageserver_pkg).parent)
@ -175,6 +194,17 @@ def get_image_server() -> Dict[str, Any]:
env = os.environ.copy()
env["PYTHONPATH"] = parent_dir + os.pathsep + env.get("PYTHONPATH", "")
_server_log_fp = open(
log_path, "a", encoding="utf-8", buffering=1, errors="replace"
)
try:
_server_log_fp.write(
"\n========== imageserver test subprocess log ==========\n"
)
_server_log_fp.flush()
except OSError:
pass
proc = subprocess.Popen(
[
sys.executable, "-m", "imageserver",
@ -184,8 +214,8 @@ def get_image_server() -> Dict[str, Any]:
],
cwd=parent_dir,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdout=_server_log_fp,
stderr=_server_log_fp,
)
_server_proc = proc
@ -193,9 +223,26 @@ def get_image_server() -> Dict[str, Any]:
_wait_for_control_socket(ctrl_sock)
except RuntimeError:
proc.kill()
stdout, stderr = proc.communicate(timeout=5)
try:
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()
proc.wait(timeout=5)
try:
_server_log_fp.flush()
except OSError:
pass
tail = _read_log_tail(log_path)
try:
_server_log_fp.close()
except OSError:
pass
_server_log_fp = None
_server_proc = None
raise RuntimeError(
f"Image server failed to start.\nstdout: {stdout.decode()}\nstderr: {stderr.decode()}"
"Image server failed to start.\n"
f"Log file: {log_path}\n"
f"--- log tail ---\n{tail}"
)
def send(msg: dict) -> dict:
@ -206,19 +253,25 @@ def get_image_server() -> Dict[str, Any]:
"port": port,
"ctrl_sock": ctrl_sock,
"send": send,
"imageserver_log": log_path,
}
if not _atexit_registered:
atexit.register(shutdown_image_server)
_atexit_registered = True
sys.stdout.write(
"\n[IMAGESERVER_TEST] child image server log file: %s\n\n" % log_path
)
sys.stdout.flush()
return _server_info
def shutdown_image_server() -> None:
global _server_proc, _server_info, _tmp_dir
global _server_proc, _server_info, _tmp_dir, _server_log_fp, _server_log_path
ctrl_sock: Optional[str] = None
if _server_info is not None:
ctrl_sock = _server_info.get("ctrl_sock")
if _server_proc is not None:
for pipe in (_server_proc.stdout, _server_proc.stderr):
if pipe:
try:
pipe.close()
except Exception:
pass
_server_proc.terminate()
try:
_server_proc.wait(timeout=5)
@ -226,33 +279,59 @@ def shutdown_image_server() -> None:
_server_proc.kill()
_server_proc.wait(timeout=5)
_server_proc = None
if _server_log_fp is not None:
try:
_server_log_fp.flush()
_server_log_fp.close()
except OSError:
pass
_server_log_fp = None
_server_info = None
if _tmp_dir is not None:
shutil.rmtree(_tmp_dir, ignore_errors=True)
_tmp_dir = None
_server_log_path = None
if ctrl_sock:
try:
os.unlink(ctrl_sock)
except FileNotFoundError:
pass
# Leave temp dir and imageserver.log on disk for debugging; clear pointer only.
_tmp_dir = None
# ── qemu-nbd server ────────────────────────────────────────────────────
class QemuNbdServer:
"""Manages a qemu-nbd process exporting a raw image over a Unix socket."""
"""Manages a qemu-nbd process exporting a disk image over a Unix socket."""
def __init__(self, image_path: str, socket_path: str, image_size: int = IMAGE_SIZE):
def __init__(
self,
image_path: str,
socket_path: str,
image_size: int = IMAGE_SIZE,
image_format: str = "raw",
):
self.image_path = image_path
self.socket_path = socket_path
self.image_size = image_size
self.image_format = image_format
self._proc: Optional[subprocess.Popen] = None
def start(self) -> None:
if not os.path.exists(self.image_path):
with open(self.image_path, "wb") as f:
f.truncate(self.image_size)
if self.image_format == "raw":
with open(self.image_path, "wb") as f:
f.truncate(self.image_size)
else:
raise FileNotFoundError(
f"disk image not found for format {self.image_format!r}: {self.image_path}"
)
self._proc = subprocess.Popen(
[
"qemu-nbd",
"--socket", self.socket_path,
"--format", "raw",
"--format", self.image_format,
"--persistent",
"--shared=8",
"--cache=none",
@ -355,6 +434,43 @@ def make_nbd_transfer(image_size=IMAGE_SIZE):
return transfer_id, url, server, cleanup
def make_nbd_transfer_existing_disk(image_path: str, image_format: str = "qcow2"):
"""
Start qemu-nbd for an existing on-disk image (e.g. qcow2) and register a transfer.
Does not delete *image_path* on cleanup (only the Unix socket under tmp).
Returns (transfer_id, url, QemuNbdServer, cleanup_callable).
"""
srv = get_image_server()
tmp = get_tmp_dir()
sock_path = os.path.join(tmp, f"nbd_{uuid.uuid4().hex[:8]}.sock")
server = QemuNbdServer(
image_path, sock_path, image_format=image_format
)
server.start()
transfer_id = f"nbd-{uuid.uuid4().hex[:8]}"
resp = srv["send"]({
"action": "register",
"transfer_id": transfer_id,
"config": {"backend": "nbd", "socket": sock_path},
})
assert resp["status"] == "ok", f"register failed: {resp}"
url = f"{srv['base_url']}/images/{transfer_id}"
def cleanup():
srv["send"]({"action": "unregister", "transfer_id": transfer_id})
server.stop()
try:
os.unlink(sock_path)
except FileNotFoundError:
pass
return transfer_id, url, server, cleanup
# ── HTTP helpers ────────────────────────────────────────────────────────
import urllib.request
@ -425,16 +541,23 @@ class ImageServerTestCase(unittest.TestCase):
return make_nbd_transfer()
@staticmethod
def dump_server_logs():
"""Read any available server stderr and print it for post-mortem debugging."""
if _server_proc is None or _server_proc.stderr is None:
def dump_server_logs(max_bytes: int = 256 * 1024):
"""Print a tail of the image-server log file (shared by all tests in the run)."""
path = _server_log_path
if not path or not os.path.isfile(path):
return
try:
if select.select([_server_proc.stderr], [], [], 0)[0]:
data = _server_proc.stderr.read1(64 * 1024)
if data:
sys.stderr.write("\n=== IMAGE SERVER STDERR ===\n")
sys.stderr.write(data.decode(errors="replace"))
sys.stderr.write("\n=== END SERVER STDERR ===\n")
if _server_log_fp is not None:
_server_log_fp.flush()
except OSError:
pass
try:
data = _read_log_tail(path, max_bytes=max_bytes)
if data.strip():
sys.stderr.write("\n=== IMAGE SERVER LOG (tail) ===\n")
sys.stderr.write(data)
if not data.endswith("\n"):
sys.stderr.write("\n")
sys.stderr.write("=== END IMAGE SERVER LOG ===\n")
except Exception:
pass

View File

@ -18,19 +18,27 @@
"""Tests for HTTP operations against an NBD-backend transfer (real qemu-nbd)."""
import json
import os
import subprocess
import unittest
import uuid
import urllib.error
import urllib.request
from concurrent.futures import ThreadPoolExecutor
from imageserver.constants import MAX_PARALLEL_READS, MAX_PARALLEL_WRITES
from .test_base import (
IMAGE_SIZE,
ImageServerTestCase,
get_tmp_dir,
http_get,
http_options,
http_patch,
http_post,
http_put,
make_nbd_transfer,
make_nbd_transfer_existing_disk,
randbytes,
shutdown_image_server,
)
@ -322,6 +330,311 @@ class TestExtents(NbdBackendTestCase):
self.assertEqual(total, IMAGE_SIZE)
def _allocated_subranges(extents, granularity):
"""Split each non-hole extent (zero=False) into [start, end] inclusive byte ranges."""
out = []
for ext in extents:
if ext.get("zero"):
continue
start = int(ext["start"])
length = int(ext["length"])
pos = start
end_abs = start + length
while pos < end_abs:
chunk_end = min(pos + granularity, end_abs)
out.append((pos, chunk_end - 1))
pos = chunk_end
return out
def _qemu_img_virtual_size(path: str) -> int:
"""Return virtual size in bytes (requires ``qemu-img`` on PATH)."""
# stdout=PIPE + universal_newlines: Python 3.6 compatible (no capture_output/text).
cp = subprocess.run(
["qemu-img", "info", "--output=json", path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
check=True,
)
return int(json.loads(cp.stdout)["virtual-size"])
def _http_error_detail(exc: urllib.error.HTTPError) -> str:
"""Build a readable message from an ``HTTPError`` (status, url, JSON/text body)."""
parts = ["HTTP %s %r" % (exc.code, exc.reason), "url=%r" % getattr(exc, "url", "")]
try:
if exc.fp is not None:
raw = exc.fp.read()
if raw:
text = raw.decode("utf-8", errors="replace")
parts.append("response_body=%r" % (text,))
except Exception as read_err:
parts.append("read_body_error=%r" % (read_err,))
return "; ".join(parts)
def _http_get_checked(
url,
headers=None,
expected_status=200,
label="GET",
):
"""
Like ``http_get`` but raises ``AssertionError`` with ``_http_error_detail`` on failure.
"""
try:
resp = http_get(url, headers=headers)
except urllib.error.HTTPError as e:
raise AssertionError(
"%s failed for %r: %s" % (label, url, _http_error_detail(e))
) from e
if resp.status != expected_status:
body = resp.read()
raise AssertionError(
"%s %r: expected HTTP %s, got %s; body=%r"
% (label, url, expected_status, resp.status, body)
)
return resp
def _http_put_checked(url, data, headers, label="PUT"):
try:
resp = http_put(url, data, headers=headers)
except urllib.error.HTTPError as e:
raise AssertionError(
"%s failed for %r: %s" % (label, url, _http_error_detail(e))
) from e
body = resp.read()
if resp.status != 200:
raise AssertionError(
"%s %r: expected HTTP 200, got %s; body=%r"
% (label, url, resp.status, body)
)
return resp, body
def _http_post_checked(url, data=b"", headers=None, label="POST"):
try:
resp = http_post(url, data=data, headers=headers)
except urllib.error.HTTPError as e:
raise AssertionError(
"%s failed for %r: %s" % (label, url, _http_error_detail(e))
) from e
body = resp.read()
if resp.status != 200:
raise AssertionError(
"%s %r: expected HTTP 200, got %s; body=%r"
% (label, url, resp.status, body)
)
return resp, body
class TestQcow2ExtentsParallelReads(ImageServerTestCase):
"""
Optional integration tests: export a user-supplied qcow2 via qemu-nbd, fetch
allocation extents, parallel range GETs over allocated regions, and (second
test) per-range GET-then-PUT pipeline with ``min(MAX_PARALLEL_READS,
MAX_PARALLEL_WRITES)`` workers.
Requires ``qemu-img`` and ``qemu-nbd`` on PATH.
Set IMAGESERVER_TEST_QCOW2 to the absolute path of a qcow2 file.
Optional: IMAGESERVER_TEST_QCOW2_READ_GRANULARITY byte step (default 4 MiB).
"""
def setUp(self):
super().setUp()
self._qcow2_path = os.environ.get("IMAGESERVER_TEST_QCOW2", "").strip()
if not self._qcow2_path or not os.path.isfile(self._qcow2_path):
self.skipTest(
"Set IMAGESERVER_TEST_QCOW2 to an existing qcow2 path to run this test"
)
raw_g = os.environ.get("IMAGESERVER_TEST_QCOW2_READ_GRANULARITY", "").strip()
self._read_granularity = int(raw_g) if raw_g else 4 * 1024 * 1024
if self._read_granularity <= 0:
self.skipTest("IMAGESERVER_TEST_QCOW2_READ_GRANULARITY must be positive")
def test_parallel_range_reads_allocated_extents(self):
_, url, _, cleanup = make_nbd_transfer_existing_disk(
self._qcow2_path, "qcow2"
)
try:
resp = _http_get_checked(
"%s/extents" % (url,),
expected_status=200,
label="GET /extents",
)
extents = json.loads(resp.read())
self.assertIsInstance(extents, list)
ranges = _allocated_subranges(extents, self._read_granularity)
if not ranges:
self.skipTest("no allocated extents (all holes/zero) in qcow2")
def fetch(span):
start_b, end_b = span
range_hdr = "bytes=%s-%s" % (start_b, end_b)
r = _http_get_checked(
url,
headers={"Range": range_hdr},
expected_status=206,
label="Range GET %s" % (range_hdr,),
)
data = r.read()
expected_len = end_b - start_b + 1
if len(data) != expected_len:
raise AssertionError(
"Range GET %s: got %d bytes, expected %d (url=%r)"
% (range_hdr, len(data), expected_len, url)
)
with ThreadPoolExecutor(max_workers=MAX_PARALLEL_READS) as pool:
pool.map(fetch, ranges)
finally:
cleanup()
def test_parallel_reads_then_put_range_copy_matches_source(self):
"""
Create an empty qcow2 with the same virtual size as the source, copy every
allocated range using one worker pool: for each span, Range GET from src
then Content-Range PUT to dest.
Worker count is ``min(MAX_PARALLEL_READS, MAX_PARALLEL_WRITES)`` so each
worker holds at most one chunk.
"""
src_path = self._qcow2_path
try:
vsize = _qemu_img_virtual_size(src_path)
except (FileNotFoundError, subprocess.CalledProcessError, KeyError, json.JSONDecodeError, TypeError, ValueError) as e:
self.skipTest(f"qemu-img info failed: {e}")
tmp = get_tmp_dir()
dest_path = os.path.join(tmp, f"qcow2_copy_{uuid.uuid4().hex[:8]}.qcow2")
try:
subprocess.run(
["qemu-img", "create", "-f", "qcow2", dest_path, str(vsize)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
check=True,
)
except (FileNotFoundError, subprocess.CalledProcessError) as e:
self.skipTest(f"qemu-img create failed: {e}")
_, src_url, _, cleanup_src = make_nbd_transfer_existing_disk(
src_path, "qcow2"
)
_, dest_url, _, cleanup_dest = make_nbd_transfer_existing_disk(
dest_path, "qcow2"
)
try:
resp = _http_get_checked(
"%s/extents" % (src_url,),
expected_status=200,
label="GET src /extents",
)
extents = json.loads(resp.read())
ranges = _allocated_subranges(extents, self._read_granularity)
if not ranges:
self.skipTest("no allocated extents (all holes/zero) in qcow2")
transfer_workers = max(
1, min(MAX_PARALLEL_READS, MAX_PARALLEL_WRITES)
)
def transfer_span(span):
start_b, end_b = span
range_hdr = "bytes=%s-%s" % (start_b, end_b)
r = _http_get_checked(
src_url,
headers={"Range": range_hdr},
expected_status=206,
label="Range GET src %s" % (range_hdr,),
)
data = r.read()
expected_len = end_b - start_b + 1
if len(data) != expected_len:
raise AssertionError(
"Range GET src %s: got %d bytes, expected %d (url=%r)"
% (range_hdr, len(data), expected_len, src_url)
)
end_inclusive = start_b + len(data) - 1
cr = "bytes %s-%s/*" % (start_b, end_inclusive)
_put_resp, put_body = _http_put_checked(
dest_url,
data,
headers={
"Content-Range": cr,
"Content-Length": str(len(data)),
},
label="PUT dest %s" % (cr,),
)
try:
body = json.loads(put_body)
except ValueError:
raise AssertionError(
"PUT dest %s: invalid JSON body=%r (url=%r)"
% (cr, put_body, dest_url)
)
if not body.get("ok"):
raise AssertionError(
"PUT dest %s: JSON ok=false, full=%r (url=%r)"
% (cr, body, dest_url)
)
if body.get("bytes_written") != len(data):
raise AssertionError(
"PUT dest %s: bytes_written=%r expected %d (url=%r)"
% (cr, body.get("bytes_written"), len(data), dest_url)
)
with ThreadPoolExecutor(max_workers=transfer_workers) as pool:
pool.map(transfer_span, ranges)
_flush, flush_body = _http_post_checked(
"%s/flush" % (dest_url,),
label="POST dest /flush",
)
try:
flush_json = json.loads(flush_body)
except ValueError:
raise AssertionError(
"POST dest /flush: invalid JSON body=%r (url=%r)"
% (flush_body, dest_url)
)
if not flush_json.get("ok"):
raise AssertionError(
"POST dest /flush: ok=false, full=%r (url=%r)"
% (flush_json, dest_url)
)
finally:
cleanup_dest()
cleanup_src()
try:
cmp = subprocess.run(
["qemu-img", "compare", src_path, dest_path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
)
self.assertEqual(
cmp.returncode,
0,
"qemu-img compare %r vs %r failed (rc=%s): stderr=%r stdout=%r"
% (
src_path,
dest_path,
cmp.returncode,
cmp.stderr,
cmp.stdout,
),
)
finally:
try:
os.unlink(dest_path)
except FileNotFoundError:
pass
class TestErrorCases(NbdBackendTestCase):
def test_patch_unsupported_op(self):
payload = json.dumps({"op": "invalid"}).encode()