From 605f7bff3f114db1b474cf67301d87c8b2aad8ed Mon Sep 17 00:00:00 2001 From: Abhisar Sinha <63767682+abh1sar@users.noreply.github.com> Date: Fri, 10 Apr 2026 07:49:27 +0530 Subject: [PATCH] Add stress test. Fix concurrency. --- .../vm/hypervisor/kvm/imageserver/__init__.py | 7 +- .../hypervisor/kvm/imageserver/concurrency.py | 7 +- .../hypervisor/kvm/imageserver/constants.py | 4 +- .../vm/hypervisor/kvm/imageserver/handler.py | 36 -- .../kvm/imageserver/tests/__init__.py | 6 + .../kvm/imageserver/tests/test_stress_io.py | 414 ++++++++++++++++++ 6 files changed, 425 insertions(+), 49 deletions(-) create mode 100644 scripts/vm/hypervisor/kvm/imageserver/tests/test_stress_io.py diff --git a/scripts/vm/hypervisor/kvm/imageserver/__init__.py b/scripts/vm/hypervisor/kvm/imageserver/__init__.py index dc950531039..7392dfd3b75 100644 --- a/scripts/vm/hypervisor/kvm/imageserver/__init__.py +++ b/scripts/vm/hypervisor/kvm/imageserver/__init__.py @@ -28,10 +28,5 @@ Supports two backends (configured per-transfer at registration time): - file: read/write a local qcow2/raw file; full PUT only, GET with optional ranges, flush. -Usage:: - - # As a module - python -m imageserver --listen 127.0.0.1 --port 54322 - - # Or via the systemd service started by createImageTransfer +Run as a systemd service by the CreateImageTransfer CloudStack Agent Command """ diff --git a/scripts/vm/hypervisor/kvm/imageserver/concurrency.py b/scripts/vm/hypervisor/kvm/imageserver/concurrency.py index 7d91aea6013..6b2d28a4069 100644 --- a/scripts/vm/hypervisor/kvm/imageserver/concurrency.py +++ b/scripts/vm/hypervisor/kvm/imageserver/concurrency.py @@ -38,8 +38,8 @@ class ConcurrencyManager: """ def __init__(self, max_reads: int, max_writes: int): - self._max_reads = max_reads - self._max_writes = max_writes + self._max_reads = max_reads + 4 + self._max_writes = max_writes + 4 self._images: Dict[str, _ImageState] = {} self._guard = threading.Lock() @@ -66,6 +66,3 @@ class ConcurrencyManager: def release_write(self, image_id: str) -> None: self._state_for(image_id).write_sem.release() - - def get_image_lock(self, image_id: str) -> threading.Lock: - return self._state_for(image_id).lock diff --git a/scripts/vm/hypervisor/kvm/imageserver/constants.py b/scripts/vm/hypervisor/kvm/imageserver/constants.py index 0b6465527f4..4f5bfd1a737 100644 --- a/scripts/vm/hypervisor/kvm/imageserver/constants.py +++ b/scripts/vm/hypervisor/kvm/imageserver/constants.py @@ -23,8 +23,8 @@ NBD_STATE_ZERO = 2 # NBD qemu:dirty-bitmap flags (dirty=1) NBD_STATE_DIRTY = 1 -MAX_PARALLEL_READS = 8 -MAX_PARALLEL_WRITES = 1 +MAX_PARALLEL_READS = 4 +MAX_PARALLEL_WRITES = 4 # HTTP server defaults DEFAULT_LISTEN_ADDRESS = "127.0.0.1" diff --git a/scripts/vm/hypervisor/kvm/imageserver/handler.py b/scripts/vm/hypervisor/kvm/imageserver/handler.py index c28a0657581..9775e7049f9 100644 --- a/scripts/vm/hypervisor/kvm/imageserver/handler.py +++ b/scripts/vm/hypervisor/kvm/imageserver/handler.py @@ -570,11 +570,7 @@ class Handler(BaseHTTPRequestHandler): def _handle_put_image( self, image_id: str, cfg: Dict[str, Any], content_length: int, flush: bool ) -> None: - lock = self._concurrency.get_image_lock(image_id) - lock.acquire() - if not self._concurrency.acquire_write(image_id): - lock.release() self._send_error_json(HTTPStatus.SERVICE_UNAVAILABLE, "too many parallel writes") return @@ -598,7 +594,6 @@ class Handler(BaseHTTPRequestHandler): self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error") finally: self._concurrency.release_write(image_id) - lock.release() dur = now_s() - start logging.info( "PUT end image_id=%s bytes=%d duration_s=%.3f", image_id, bytes_written, dur @@ -612,11 +607,7 @@ class Handler(BaseHTTPRequestHandler): content_length: int, flush: bool, ) -> None: - lock = self._concurrency.get_image_lock(image_id) - lock.acquire() - if not self._concurrency.acquire_write(image_id): - lock.release() self._send_error_json(HTTPStatus.SERVICE_UNAVAILABLE, "too many parallel writes") return @@ -657,7 +648,6 @@ class Handler(BaseHTTPRequestHandler): self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error") finally: self._concurrency.release_write(image_id) - lock.release() dur = now_s() - start logging.info( "PUT range end image_id=%s bytes=%d duration_s=%.3f flush=%s", @@ -667,11 +657,6 @@ class Handler(BaseHTTPRequestHandler): def _handle_get_extents( self, image_id: str, cfg: Dict[str, Any], context: Optional[str] = None ) -> None: - lock = self._concurrency.get_image_lock(image_id) - if not lock.acquire(blocking=False): - self._send_error_json(HTTPStatus.CONFLICT, "image busy") - return - start = now_s() try: logging.info("EXTENTS start image_id=%s context=%s", image_id, context) @@ -709,16 +694,10 @@ class Handler(BaseHTTPRequestHandler): logging.error("EXTENTS error image_id=%s err=%r", image_id, e) self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error") finally: - lock.release() dur = now_s() - start logging.info("EXTENTS end image_id=%s duration_s=%.3f", image_id, dur) def _handle_post_flush(self, image_id: str, cfg: Dict[str, Any]) -> None: - lock = self._concurrency.get_image_lock(image_id) - if not lock.acquire(blocking=False): - self._send_error_json(HTTPStatus.CONFLICT, "image busy") - return - start = now_s() try: logging.info("FLUSH start image_id=%s", image_id) @@ -732,7 +711,6 @@ class Handler(BaseHTTPRequestHandler): logging.error("FLUSH error image_id=%s err=%r", image_id, e) self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error") finally: - lock.release() dur = now_s() - start logging.info("FLUSH end image_id=%s duration_s=%.3f", image_id, dur) @@ -744,13 +722,7 @@ class Handler(BaseHTTPRequestHandler): size: int, flush: bool, ) -> None: - lock = self._concurrency.get_image_lock(image_id) - if not lock.acquire(blocking=False): - self._send_error_json(HTTPStatus.CONFLICT, "image busy") - return - if not self._concurrency.acquire_write(image_id): - lock.release() self._send_error_json(HTTPStatus.SERVICE_UNAVAILABLE, "too many parallel writes") return @@ -775,7 +747,6 @@ class Handler(BaseHTTPRequestHandler): self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error") finally: self._concurrency.release_write(image_id) - lock.release() dur = now_s() - start logging.info("PATCH zero end image_id=%s duration_s=%.3f", image_id, dur) @@ -786,13 +757,7 @@ class Handler(BaseHTTPRequestHandler): range_header: str, content_length: int, ) -> None: - lock = self._concurrency.get_image_lock(image_id) - if not lock.acquire(blocking=False): - self._send_error_json(HTTPStatus.CONFLICT, "image busy") - return - if not self._concurrency.acquire_write(image_id): - lock.release() self._send_error_json(HTTPStatus.SERVICE_UNAVAILABLE, "too many parallel writes") return @@ -840,7 +805,6 @@ class Handler(BaseHTTPRequestHandler): self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error") finally: self._concurrency.release_write(image_id) - lock.release() dur = now_s() - start logging.info( "PATCH range end image_id=%s bytes=%d duration_s=%.3f", diff --git a/scripts/vm/hypervisor/kvm/imageserver/tests/__init__.py b/scripts/vm/hypervisor/kvm/imageserver/tests/__init__.py index 0ccbeeeafb7..09102f9da2b 100644 --- a/scripts/vm/hypervisor/kvm/imageserver/tests/__init__.py +++ b/scripts/vm/hypervisor/kvm/imageserver/tests/__init__.py @@ -14,3 +14,9 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. + +""" +Run: +cd to the directory containing the imageserver folder +python3 -m unittest discover -s imageserver/tests -t . -v +""" \ No newline at end of file diff --git a/scripts/vm/hypervisor/kvm/imageserver/tests/test_stress_io.py b/scripts/vm/hypervisor/kvm/imageserver/tests/test_stress_io.py new file mode 100644 index 00000000000..87b10726344 --- /dev/null +++ b/scripts/vm/hypervisor/kvm/imageserver/tests/test_stress_io.py @@ -0,0 +1,414 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Stress IO tests +They run only when IMAGESERVER_STRESS_TEST_QCOW_DIR is set to an existing +directory containing qcow2 files. +""" + +import json +import os +import subprocess +import time +import unittest +import uuid +import urllib.error +from concurrent.futures import ThreadPoolExecutor, as_completed + +from imageserver.constants import MAX_PARALLEL_READS, MAX_PARALLEL_WRITES + +from .test_base import get_tmp_dir, http_get, http_post, http_put, make_nbd_transfer_existing_disk + + +def _allocated_subranges(extents, granularity): + """Split each non-hole extent (zero=False) into [start, end] inclusive byte ranges.""" + out = [] + for ext in extents: + if ext.get("zero"): + continue + start = int(ext["start"]) + length = int(ext["length"]) + pos = start + end_abs = start + length + while pos < end_abs: + chunk_end = min(pos + granularity, end_abs) + out.append((pos, chunk_end - 1)) + pos = chunk_end + return out + + +def _qemu_img_virtual_size(path: str) -> int: + """Return virtual size in bytes (requires ``qemu-img`` on PATH).""" + cp = subprocess.run( + ["qemu-img", "info", "--output=json", path], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + check=True, + ) + return int(json.loads(cp.stdout)["virtual-size"]) + + +def _http_error_detail(exc: urllib.error.HTTPError) -> str: + parts = ["HTTP %s %r" % (exc.code, exc.reason), "url=%r" % getattr(exc, "url", "")] + try: + if exc.fp is not None: + raw = exc.fp.read() + if raw: + text = raw.decode("utf-8", errors="replace") + parts.append("response_body=%r" % (text,)) + except Exception as read_err: + parts.append("read_body_error=%r" % (read_err,)) + return "; ".join(parts) + + +def _http_get_checked(url, headers=None, expected_status=200, label="GET"): + try: + resp = http_get(url, headers=headers) + except urllib.error.HTTPError as e: + raise AssertionError("%s failed for %r: %s" % (label, url, _http_error_detail(e))) from e + if resp.status != expected_status: + body = resp.read() + raise AssertionError( + "%s %r: expected HTTP %s, got %s; body=%r" + % (label, url, expected_status, resp.status, body) + ) + return resp + + +def _http_put_checked(url, data, headers, label="PUT"): + try: + resp = http_put(url, data, headers=headers) + except urllib.error.HTTPError as e: + raise AssertionError("%s failed for %r: %s" % (label, url, _http_error_detail(e))) from e + body = resp.read() + if resp.status != 200: + raise AssertionError( + "%s %r: expected HTTP 200, got %s; body=%r" % (label, url, resp.status, body) + ) + return resp, body + + +def _http_post_checked(url, data=b"", headers=None, label="POST"): + try: + resp = http_post(url, data=data, headers=headers) + except urllib.error.HTTPError as e: + raise AssertionError("%s failed for %r: %s" % (label, url, _http_error_detail(e))) from e + body = resp.read() + if resp.status != 200: + raise AssertionError( + "%s %r: expected HTTP 200, got %s; body=%r" % (label, url, resp.status, body) + ) + return resp, body + + +def _list_qcow2_files(dir_path: str): + entries = [] + for name in os.listdir(dir_path): + p = os.path.join(dir_path, name) + if not os.path.isfile(p): + continue + # Keep this intentionally permissive; qemu-nbd can still reject invalid files. + if name.lower().endswith(".qcow2") or name.lower().endswith(".qcow"): + entries.append(p) + entries.sort() + return entries + + +class TestQcow2ExtentsParallelReads(unittest.TestCase): + """ + For each qcow2 in IMAGESERVER_STRESS_TEST_QCOW_DIR, + export it via qemu-nbd, fetch allocation extents, and perform parallel range reads + over allocated regions. A second test copies allocated extents into a new qcow2 + and validates via qemu-img compare. + + Env: + - IMAGESERVER_STRESS_TEST_QCOW_DIR: directory containing qcow2 files (required) + - IMAGESERVER_STRESS_TEST_READ_GRANULARITY: byte step (default 4 MiB) + (fallback: IMAGESERVER_TEST_QCOW2_READ_GRANULARITY for compatibility) + """ + + def setUp(self): + super().setUp() + self._qcow_dir = os.environ.get("IMAGESERVER_STRESS_TEST_QCOW_DIR", "").strip() + if not self._qcow_dir or not os.path.isdir(self._qcow_dir): + self.skipTest( + "Set IMAGESERVER_STRESS_TEST_QCOW_DIR to an existing directory containing qcow2 files" + ) + + self._dest_dir = self._qcow_dir.rstrip(os.sep) + ".test" + try: + os.makedirs(self._dest_dir, exist_ok=True) + except OSError as e: + self.skipTest("failed to create dest dir %r: %r" % (self._dest_dir, e)) + + raw_g = os.environ.get("IMAGESERVER_STRESS_TEST_READ_GRANULARITY", "").strip() + if not raw_g: + raw_g = os.environ.get("IMAGESERVER_TEST_QCOW2_READ_GRANULARITY", "").strip() + self._read_granularity = int(raw_g) if raw_g else 4 * 1024 * 1024 + if self._read_granularity <= 0: + self.skipTest("IMAGESERVER_STRESS_TEST_READ_GRANULARITY must be positive") + + self._qcow2_files = _list_qcow2_files(self._qcow_dir) + if not self._qcow2_files: + self.skipTest("no qcow2 files found in IMAGESERVER_STRESS_TEST_QCOW_DIR") + + # Avoid pathological oversubscription by default; still runs multiple files concurrently. + cpu = os.cpu_count() or 4 + self._file_workers = max(1, min(len(self._qcow2_files), cpu)) + + def test_parallel_range_reads_allocated_extents(self): + """ + For every qcow2 in the directory: GET /extents then do parallel Range GETs across + allocated spans. All qcow2 files are processed concurrently. + """ + + def run_one(path: str): + _, url, server, cleanup = make_nbd_transfer_existing_disk(path, "qcow2") + try: + resp = _http_get_checked( + "%s/extents" % (url,), + expected_status=200, + label="GET /extents", + ) + extents = json.loads(resp.read()) + ranges = _allocated_subranges(extents, self._read_granularity) + if not ranges: + # Not an error; some images can legitimately be all holes. + return {"path": path, "ranges": 0, "skipped": True} + + def fetch(span): + start_b, end_b = span + range_hdr = "bytes=%s-%s" % (start_b, end_b) + r = _http_get_checked( + url, + headers={"Range": range_hdr}, + expected_status=206, + label="Range GET %s" % (range_hdr,), + ) + data = r.read() + expected_len = end_b - start_b + 1 + if len(data) != expected_len: + raise AssertionError( + "Range GET %s: got %d bytes, expected %d (url=%r, file=%r)" + % (range_hdr, len(data), expected_len, url, path) + ) + + with ThreadPoolExecutor(max_workers=MAX_PARALLEL_READS) as pool: + list(pool.map(fetch, ranges)) + return {"path": path, "ranges": len(ranges), "skipped": False} + finally: + cleanup() + + started = time.perf_counter() + results = [] + failures = [] + with ThreadPoolExecutor(max_workers=self._file_workers) as pool: + futs = {pool.submit(run_one, p): p for p in self._qcow2_files} + for fut in as_completed(futs): + p = futs[fut] + try: + results.append(fut.result()) + except Exception as e: + failures.append((p, e)) + + elapsed = time.perf_counter() - started + skipped = sum(1 for r in results if r.get("skipped")) + total_ranges = sum(int(r.get("ranges", 0)) for r in results) + print( + "stress_io: test_parallel_range_reads_allocated_extents: files=%d workers=%d skipped=%d total_ranges=%d elapsed=%.3fs" + % (len(self._qcow2_files), self._file_workers, skipped, total_ranges, elapsed) + ) + + if failures: + first_path, first_exc = failures[0] + raise AssertionError( + "stress_io: %d/%d files failed (first=%r): %r" + % (len(failures), len(self._qcow2_files), first_path, first_exc) + ) from first_exc + + def test_parallel_reads_then_put_range_copy_matches_source(self): + """ + For every qcow2 in the directory: create an empty qcow2 with same virtual size, + then copy every allocated range using a worker pool (Range GET then Content-Range PUT), + flush, and validate via qemu-img compare. All qcow2 files are processed concurrently. + """ + + def run_one(src_path: str): + try: + vsize = _qemu_img_virtual_size(src_path) + except ( + FileNotFoundError, + subprocess.CalledProcessError, + KeyError, + json.JSONDecodeError, + TypeError, + ValueError, + ) as e: + raise AssertionError("qemu-img info failed for %r: %r" % (src_path, e)) from e + + base = os.path.basename(src_path) + # Keep dest names unique in case the same basename appears more than once. + dest_name = "%s.copy.%s.qcow2" % (base, uuid.uuid4().hex[:8]) + dest_path = os.path.join(self._dest_dir, dest_name) + try: + subprocess.run( + ["qemu-img", "create", "-f", "qcow2", dest_path, str(vsize)], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + check=True, + ) + except (FileNotFoundError, subprocess.CalledProcessError) as e: + raise AssertionError("qemu-img create failed for %r: %r" % (dest_path, e)) from e + + _, src_url, _, cleanup_src = make_nbd_transfer_existing_disk(src_path, "qcow2") + _, dest_url, _, cleanup_dest = make_nbd_transfer_existing_disk(dest_path, "qcow2") + try: + resp = _http_get_checked( + "%s/extents" % (src_url,), + expected_status=200, + label="GET src /extents", + ) + extents = json.loads(resp.read()) + ranges = _allocated_subranges(extents, self._read_granularity) + if not ranges: + return {"path": src_path, "ranges": 0, "skipped": True} + + transfer_workers = max(1, min(MAX_PARALLEL_READS, MAX_PARALLEL_WRITES)) + + def transfer_span(span): + start_b, end_b = span + range_hdr = "bytes=%s-%s" % (start_b, end_b) + r = _http_get_checked( + src_url, + headers={"Range": range_hdr}, + expected_status=206, + label="Range GET src %s" % (range_hdr,), + ) + data = r.read() + expected_len = end_b - start_b + 1 + if len(data) != expected_len: + raise AssertionError( + "Range GET src %s: got %d bytes, expected %d (url=%r, file=%r)" + % (range_hdr, len(data), expected_len, src_url, src_path) + ) + end_inclusive = start_b + len(data) - 1 + cr = "bytes %s-%s/*" % (start_b, end_inclusive) + _put_resp, put_body = _http_put_checked( + dest_url, + data, + headers={ + "Content-Range": cr, + "Content-Length": str(len(data)), + }, + label="PUT dest %s" % (cr,), + ) + try: + body = json.loads(put_body) + except ValueError: + raise AssertionError( + "PUT dest %s: invalid JSON body=%r (url=%r, file=%r)" + % (cr, put_body, dest_url, src_path) + ) + if not body.get("ok"): + raise AssertionError( + "PUT dest %s: JSON ok=false, full=%r (url=%r, file=%r)" + % (cr, body, dest_url, src_path) + ) + if body.get("bytes_written") != len(data): + raise AssertionError( + "PUT dest %s: bytes_written=%r expected %d (url=%r, file=%r)" + % (cr, body.get("bytes_written"), len(data), dest_url, src_path) + ) + + with ThreadPoolExecutor(max_workers=transfer_workers) as pool: + list(pool.map(transfer_span, ranges)) + + _flush, flush_body = _http_post_checked( + "%s/flush" % (dest_url,), + label="POST dest /flush", + ) + try: + flush_json = json.loads(flush_body) + except ValueError as e: + raise AssertionError( + "POST dest /flush: invalid JSON body=%r (url=%r, file=%r)" + % (flush_body, dest_url, src_path) + ) from e + if not flush_json.get("ok"): + raise AssertionError( + "POST dest /flush: ok=false, full=%r (url=%r, file=%r)" + % (flush_json, dest_url, src_path) + ) + finally: + cleanup_dest() + cleanup_src() + + try: + cmp = subprocess.run( + ["qemu-img", "compare", src_path, dest_path], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + ) + if cmp.returncode != 0: + raise AssertionError( + "qemu-img compare %r vs %r failed (rc=%s): stderr=%r stdout=%r" + % (src_path, dest_path, cmp.returncode, cmp.stderr, cmp.stdout) + ) + finally: + try: + os.unlink(dest_path) + except FileNotFoundError: + pass + + return {"path": src_path, "ranges": len(ranges), "skipped": False} + + started = time.perf_counter() + results = [] + failures = [] + with ThreadPoolExecutor(max_workers=self._file_workers) as pool: + futs = {pool.submit(run_one, p): p for p in self._qcow2_files} + for fut in as_completed(futs): + p = futs[fut] + try: + results.append(fut.result()) + except Exception as e: + failures.append((p, e)) + + elapsed = time.perf_counter() - started + skipped = sum(1 for r in results if r.get("skipped")) + total_ranges = sum(int(r.get("ranges", 0)) for r in results) + print( + "stress_io: test_parallel_reads_then_put_range_copy_matches_source: files=%d workers=%d skipped=%d total_ranges=%d elapsed=%.3fs" + % (len(self._qcow2_files), self._file_workers, skipped, total_ranges, elapsed) + ) + + if failures: + first_path, first_exc = failures[0] + raise AssertionError( + "stress_io: %d/%d files failed (first=%r): %r" + % (len(failures), len(self._qcow2_files), first_path, first_exc) + ) from first_exc + + +if __name__ == "__main__": + unittest.main() + +