Add stress test. Fix concurrency.

This commit is contained in:
Abhisar Sinha 2026-04-10 07:49:27 +05:30
parent 40cadd0369
commit 605f7bff3f
6 changed files with 425 additions and 49 deletions

View File

@ -28,10 +28,5 @@ Supports two backends (configured per-transfer at registration time):
- file: read/write a local qcow2/raw file; full PUT only, GET with optional
ranges, flush.
Usage::
# As a module
python -m imageserver --listen 127.0.0.1 --port 54322
# Or via the systemd service started by createImageTransfer
Run as a systemd service by the CreateImageTransfer CloudStack Agent Command
"""

View File

@ -38,8 +38,8 @@ class ConcurrencyManager:
"""
def __init__(self, max_reads: int, max_writes: int):
self._max_reads = max_reads
self._max_writes = max_writes
self._max_reads = max_reads + 4
self._max_writes = max_writes + 4
self._images: Dict[str, _ImageState] = {}
self._guard = threading.Lock()
@ -66,6 +66,3 @@ class ConcurrencyManager:
def release_write(self, image_id: str) -> None:
self._state_for(image_id).write_sem.release()
def get_image_lock(self, image_id: str) -> threading.Lock:
return self._state_for(image_id).lock

View File

@ -23,8 +23,8 @@ NBD_STATE_ZERO = 2
# NBD qemu:dirty-bitmap flags (dirty=1)
NBD_STATE_DIRTY = 1
MAX_PARALLEL_READS = 8
MAX_PARALLEL_WRITES = 1
MAX_PARALLEL_READS = 4
MAX_PARALLEL_WRITES = 4
# HTTP server defaults
DEFAULT_LISTEN_ADDRESS = "127.0.0.1"

View File

@ -570,11 +570,7 @@ class Handler(BaseHTTPRequestHandler):
def _handle_put_image(
self, image_id: str, cfg: Dict[str, Any], content_length: int, flush: bool
) -> None:
lock = self._concurrency.get_image_lock(image_id)
lock.acquire()
if not self._concurrency.acquire_write(image_id):
lock.release()
self._send_error_json(HTTPStatus.SERVICE_UNAVAILABLE, "too many parallel writes")
return
@ -598,7 +594,6 @@ class Handler(BaseHTTPRequestHandler):
self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error")
finally:
self._concurrency.release_write(image_id)
lock.release()
dur = now_s() - start
logging.info(
"PUT end image_id=%s bytes=%d duration_s=%.3f", image_id, bytes_written, dur
@ -612,11 +607,7 @@ class Handler(BaseHTTPRequestHandler):
content_length: int,
flush: bool,
) -> None:
lock = self._concurrency.get_image_lock(image_id)
lock.acquire()
if not self._concurrency.acquire_write(image_id):
lock.release()
self._send_error_json(HTTPStatus.SERVICE_UNAVAILABLE, "too many parallel writes")
return
@ -657,7 +648,6 @@ class Handler(BaseHTTPRequestHandler):
self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error")
finally:
self._concurrency.release_write(image_id)
lock.release()
dur = now_s() - start
logging.info(
"PUT range end image_id=%s bytes=%d duration_s=%.3f flush=%s",
@ -667,11 +657,6 @@ class Handler(BaseHTTPRequestHandler):
def _handle_get_extents(
self, image_id: str, cfg: Dict[str, Any], context: Optional[str] = None
) -> None:
lock = self._concurrency.get_image_lock(image_id)
if not lock.acquire(blocking=False):
self._send_error_json(HTTPStatus.CONFLICT, "image busy")
return
start = now_s()
try:
logging.info("EXTENTS start image_id=%s context=%s", image_id, context)
@ -709,16 +694,10 @@ class Handler(BaseHTTPRequestHandler):
logging.error("EXTENTS error image_id=%s err=%r", image_id, e)
self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error")
finally:
lock.release()
dur = now_s() - start
logging.info("EXTENTS end image_id=%s duration_s=%.3f", image_id, dur)
def _handle_post_flush(self, image_id: str, cfg: Dict[str, Any]) -> None:
lock = self._concurrency.get_image_lock(image_id)
if not lock.acquire(blocking=False):
self._send_error_json(HTTPStatus.CONFLICT, "image busy")
return
start = now_s()
try:
logging.info("FLUSH start image_id=%s", image_id)
@ -732,7 +711,6 @@ class Handler(BaseHTTPRequestHandler):
logging.error("FLUSH error image_id=%s err=%r", image_id, e)
self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error")
finally:
lock.release()
dur = now_s() - start
logging.info("FLUSH end image_id=%s duration_s=%.3f", image_id, dur)
@ -744,13 +722,7 @@ class Handler(BaseHTTPRequestHandler):
size: int,
flush: bool,
) -> None:
lock = self._concurrency.get_image_lock(image_id)
if not lock.acquire(blocking=False):
self._send_error_json(HTTPStatus.CONFLICT, "image busy")
return
if not self._concurrency.acquire_write(image_id):
lock.release()
self._send_error_json(HTTPStatus.SERVICE_UNAVAILABLE, "too many parallel writes")
return
@ -775,7 +747,6 @@ class Handler(BaseHTTPRequestHandler):
self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error")
finally:
self._concurrency.release_write(image_id)
lock.release()
dur = now_s() - start
logging.info("PATCH zero end image_id=%s duration_s=%.3f", image_id, dur)
@ -786,13 +757,7 @@ class Handler(BaseHTTPRequestHandler):
range_header: str,
content_length: int,
) -> None:
lock = self._concurrency.get_image_lock(image_id)
if not lock.acquire(blocking=False):
self._send_error_json(HTTPStatus.CONFLICT, "image busy")
return
if not self._concurrency.acquire_write(image_id):
lock.release()
self._send_error_json(HTTPStatus.SERVICE_UNAVAILABLE, "too many parallel writes")
return
@ -840,7 +805,6 @@ class Handler(BaseHTTPRequestHandler):
self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error")
finally:
self._concurrency.release_write(image_id)
lock.release()
dur = now_s() - start
logging.info(
"PATCH range end image_id=%s bytes=%d duration_s=%.3f",

View File

@ -14,3 +14,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Run:
cd to the directory containing the imageserver folder
python3 -m unittest discover -s imageserver/tests -t . -v
"""

View File

@ -0,0 +1,414 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Stress IO tests
They run only when IMAGESERVER_STRESS_TEST_QCOW_DIR is set to an existing
directory containing qcow2 files.
"""
import json
import os
import subprocess
import time
import unittest
import uuid
import urllib.error
from concurrent.futures import ThreadPoolExecutor, as_completed
from imageserver.constants import MAX_PARALLEL_READS, MAX_PARALLEL_WRITES
from .test_base import get_tmp_dir, http_get, http_post, http_put, make_nbd_transfer_existing_disk
def _allocated_subranges(extents, granularity):
"""Split each non-hole extent (zero=False) into [start, end] inclusive byte ranges."""
out = []
for ext in extents:
if ext.get("zero"):
continue
start = int(ext["start"])
length = int(ext["length"])
pos = start
end_abs = start + length
while pos < end_abs:
chunk_end = min(pos + granularity, end_abs)
out.append((pos, chunk_end - 1))
pos = chunk_end
return out
def _qemu_img_virtual_size(path: str) -> int:
"""Return virtual size in bytes (requires ``qemu-img`` on PATH)."""
cp = subprocess.run(
["qemu-img", "info", "--output=json", path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
check=True,
)
return int(json.loads(cp.stdout)["virtual-size"])
def _http_error_detail(exc: urllib.error.HTTPError) -> str:
parts = ["HTTP %s %r" % (exc.code, exc.reason), "url=%r" % getattr(exc, "url", "")]
try:
if exc.fp is not None:
raw = exc.fp.read()
if raw:
text = raw.decode("utf-8", errors="replace")
parts.append("response_body=%r" % (text,))
except Exception as read_err:
parts.append("read_body_error=%r" % (read_err,))
return "; ".join(parts)
def _http_get_checked(url, headers=None, expected_status=200, label="GET"):
try:
resp = http_get(url, headers=headers)
except urllib.error.HTTPError as e:
raise AssertionError("%s failed for %r: %s" % (label, url, _http_error_detail(e))) from e
if resp.status != expected_status:
body = resp.read()
raise AssertionError(
"%s %r: expected HTTP %s, got %s; body=%r"
% (label, url, expected_status, resp.status, body)
)
return resp
def _http_put_checked(url, data, headers, label="PUT"):
try:
resp = http_put(url, data, headers=headers)
except urllib.error.HTTPError as e:
raise AssertionError("%s failed for %r: %s" % (label, url, _http_error_detail(e))) from e
body = resp.read()
if resp.status != 200:
raise AssertionError(
"%s %r: expected HTTP 200, got %s; body=%r" % (label, url, resp.status, body)
)
return resp, body
def _http_post_checked(url, data=b"", headers=None, label="POST"):
try:
resp = http_post(url, data=data, headers=headers)
except urllib.error.HTTPError as e:
raise AssertionError("%s failed for %r: %s" % (label, url, _http_error_detail(e))) from e
body = resp.read()
if resp.status != 200:
raise AssertionError(
"%s %r: expected HTTP 200, got %s; body=%r" % (label, url, resp.status, body)
)
return resp, body
def _list_qcow2_files(dir_path: str):
entries = []
for name in os.listdir(dir_path):
p = os.path.join(dir_path, name)
if not os.path.isfile(p):
continue
# Keep this intentionally permissive; qemu-nbd can still reject invalid files.
if name.lower().endswith(".qcow2") or name.lower().endswith(".qcow"):
entries.append(p)
entries.sort()
return entries
class TestQcow2ExtentsParallelReads(unittest.TestCase):
"""
For each qcow2 in IMAGESERVER_STRESS_TEST_QCOW_DIR,
export it via qemu-nbd, fetch allocation extents, and perform parallel range reads
over allocated regions. A second test copies allocated extents into a new qcow2
and validates via qemu-img compare.
Env:
- IMAGESERVER_STRESS_TEST_QCOW_DIR: directory containing qcow2 files (required)
- IMAGESERVER_STRESS_TEST_READ_GRANULARITY: byte step (default 4 MiB)
(fallback: IMAGESERVER_TEST_QCOW2_READ_GRANULARITY for compatibility)
"""
def setUp(self):
super().setUp()
self._qcow_dir = os.environ.get("IMAGESERVER_STRESS_TEST_QCOW_DIR", "").strip()
if not self._qcow_dir or not os.path.isdir(self._qcow_dir):
self.skipTest(
"Set IMAGESERVER_STRESS_TEST_QCOW_DIR to an existing directory containing qcow2 files"
)
self._dest_dir = self._qcow_dir.rstrip(os.sep) + ".test"
try:
os.makedirs(self._dest_dir, exist_ok=True)
except OSError as e:
self.skipTest("failed to create dest dir %r: %r" % (self._dest_dir, e))
raw_g = os.environ.get("IMAGESERVER_STRESS_TEST_READ_GRANULARITY", "").strip()
if not raw_g:
raw_g = os.environ.get("IMAGESERVER_TEST_QCOW2_READ_GRANULARITY", "").strip()
self._read_granularity = int(raw_g) if raw_g else 4 * 1024 * 1024
if self._read_granularity <= 0:
self.skipTest("IMAGESERVER_STRESS_TEST_READ_GRANULARITY must be positive")
self._qcow2_files = _list_qcow2_files(self._qcow_dir)
if not self._qcow2_files:
self.skipTest("no qcow2 files found in IMAGESERVER_STRESS_TEST_QCOW_DIR")
# Avoid pathological oversubscription by default; still runs multiple files concurrently.
cpu = os.cpu_count() or 4
self._file_workers = max(1, min(len(self._qcow2_files), cpu))
def test_parallel_range_reads_allocated_extents(self):
"""
For every qcow2 in the directory: GET /extents then do parallel Range GETs across
allocated spans. All qcow2 files are processed concurrently.
"""
def run_one(path: str):
_, url, server, cleanup = make_nbd_transfer_existing_disk(path, "qcow2")
try:
resp = _http_get_checked(
"%s/extents" % (url,),
expected_status=200,
label="GET /extents",
)
extents = json.loads(resp.read())
ranges = _allocated_subranges(extents, self._read_granularity)
if not ranges:
# Not an error; some images can legitimately be all holes.
return {"path": path, "ranges": 0, "skipped": True}
def fetch(span):
start_b, end_b = span
range_hdr = "bytes=%s-%s" % (start_b, end_b)
r = _http_get_checked(
url,
headers={"Range": range_hdr},
expected_status=206,
label="Range GET %s" % (range_hdr,),
)
data = r.read()
expected_len = end_b - start_b + 1
if len(data) != expected_len:
raise AssertionError(
"Range GET %s: got %d bytes, expected %d (url=%r, file=%r)"
% (range_hdr, len(data), expected_len, url, path)
)
with ThreadPoolExecutor(max_workers=MAX_PARALLEL_READS) as pool:
list(pool.map(fetch, ranges))
return {"path": path, "ranges": len(ranges), "skipped": False}
finally:
cleanup()
started = time.perf_counter()
results = []
failures = []
with ThreadPoolExecutor(max_workers=self._file_workers) as pool:
futs = {pool.submit(run_one, p): p for p in self._qcow2_files}
for fut in as_completed(futs):
p = futs[fut]
try:
results.append(fut.result())
except Exception as e:
failures.append((p, e))
elapsed = time.perf_counter() - started
skipped = sum(1 for r in results if r.get("skipped"))
total_ranges = sum(int(r.get("ranges", 0)) for r in results)
print(
"stress_io: test_parallel_range_reads_allocated_extents: files=%d workers=%d skipped=%d total_ranges=%d elapsed=%.3fs"
% (len(self._qcow2_files), self._file_workers, skipped, total_ranges, elapsed)
)
if failures:
first_path, first_exc = failures[0]
raise AssertionError(
"stress_io: %d/%d files failed (first=%r): %r"
% (len(failures), len(self._qcow2_files), first_path, first_exc)
) from first_exc
def test_parallel_reads_then_put_range_copy_matches_source(self):
"""
For every qcow2 in the directory: create an empty qcow2 with same virtual size,
then copy every allocated range using a worker pool (Range GET then Content-Range PUT),
flush, and validate via qemu-img compare. All qcow2 files are processed concurrently.
"""
def run_one(src_path: str):
try:
vsize = _qemu_img_virtual_size(src_path)
except (
FileNotFoundError,
subprocess.CalledProcessError,
KeyError,
json.JSONDecodeError,
TypeError,
ValueError,
) as e:
raise AssertionError("qemu-img info failed for %r: %r" % (src_path, e)) from e
base = os.path.basename(src_path)
# Keep dest names unique in case the same basename appears more than once.
dest_name = "%s.copy.%s.qcow2" % (base, uuid.uuid4().hex[:8])
dest_path = os.path.join(self._dest_dir, dest_name)
try:
subprocess.run(
["qemu-img", "create", "-f", "qcow2", dest_path, str(vsize)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
check=True,
)
except (FileNotFoundError, subprocess.CalledProcessError) as e:
raise AssertionError("qemu-img create failed for %r: %r" % (dest_path, e)) from e
_, src_url, _, cleanup_src = make_nbd_transfer_existing_disk(src_path, "qcow2")
_, dest_url, _, cleanup_dest = make_nbd_transfer_existing_disk(dest_path, "qcow2")
try:
resp = _http_get_checked(
"%s/extents" % (src_url,),
expected_status=200,
label="GET src /extents",
)
extents = json.loads(resp.read())
ranges = _allocated_subranges(extents, self._read_granularity)
if not ranges:
return {"path": src_path, "ranges": 0, "skipped": True}
transfer_workers = max(1, min(MAX_PARALLEL_READS, MAX_PARALLEL_WRITES))
def transfer_span(span):
start_b, end_b = span
range_hdr = "bytes=%s-%s" % (start_b, end_b)
r = _http_get_checked(
src_url,
headers={"Range": range_hdr},
expected_status=206,
label="Range GET src %s" % (range_hdr,),
)
data = r.read()
expected_len = end_b - start_b + 1
if len(data) != expected_len:
raise AssertionError(
"Range GET src %s: got %d bytes, expected %d (url=%r, file=%r)"
% (range_hdr, len(data), expected_len, src_url, src_path)
)
end_inclusive = start_b + len(data) - 1
cr = "bytes %s-%s/*" % (start_b, end_inclusive)
_put_resp, put_body = _http_put_checked(
dest_url,
data,
headers={
"Content-Range": cr,
"Content-Length": str(len(data)),
},
label="PUT dest %s" % (cr,),
)
try:
body = json.loads(put_body)
except ValueError:
raise AssertionError(
"PUT dest %s: invalid JSON body=%r (url=%r, file=%r)"
% (cr, put_body, dest_url, src_path)
)
if not body.get("ok"):
raise AssertionError(
"PUT dest %s: JSON ok=false, full=%r (url=%r, file=%r)"
% (cr, body, dest_url, src_path)
)
if body.get("bytes_written") != len(data):
raise AssertionError(
"PUT dest %s: bytes_written=%r expected %d (url=%r, file=%r)"
% (cr, body.get("bytes_written"), len(data), dest_url, src_path)
)
with ThreadPoolExecutor(max_workers=transfer_workers) as pool:
list(pool.map(transfer_span, ranges))
_flush, flush_body = _http_post_checked(
"%s/flush" % (dest_url,),
label="POST dest /flush",
)
try:
flush_json = json.loads(flush_body)
except ValueError as e:
raise AssertionError(
"POST dest /flush: invalid JSON body=%r (url=%r, file=%r)"
% (flush_body, dest_url, src_path)
) from e
if not flush_json.get("ok"):
raise AssertionError(
"POST dest /flush: ok=false, full=%r (url=%r, file=%r)"
% (flush_json, dest_url, src_path)
)
finally:
cleanup_dest()
cleanup_src()
try:
cmp = subprocess.run(
["qemu-img", "compare", src_path, dest_path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
)
if cmp.returncode != 0:
raise AssertionError(
"qemu-img compare %r vs %r failed (rc=%s): stderr=%r stdout=%r"
% (src_path, dest_path, cmp.returncode, cmp.stderr, cmp.stdout)
)
finally:
try:
os.unlink(dest_path)
except FileNotFoundError:
pass
return {"path": src_path, "ranges": len(ranges), "skipped": False}
started = time.perf_counter()
results = []
failures = []
with ThreadPoolExecutor(max_workers=self._file_workers) as pool:
futs = {pool.submit(run_one, p): p for p in self._qcow2_files}
for fut in as_completed(futs):
p = futs[fut]
try:
results.append(fut.result())
except Exception as e:
failures.append((p, e))
elapsed = time.perf_counter() - started
skipped = sum(1 for r in results if r.get("skipped"))
total_ranges = sum(int(r.get("ranges", 0)) for r in results)
print(
"stress_io: test_parallel_reads_then_put_range_copy_matches_source: files=%d workers=%d skipped=%d total_ranges=%d elapsed=%.3fs"
% (len(self._qcow2_files), self._file_workers, skipped, total_ranges, elapsed)
)
if failures:
first_path, first_exc = failures[0]
raise AssertionError(
"stress_io: %d/%d files failed (first=%r): %r"
% (len(failures), len(self._qcow2_files), first_path, first_exc)
) from first_exc
if __name__ == "__main__":
unittest.main()