diff --git a/scripts/vm/hypervisor/kvm/imageserver/concurrency.py b/scripts/vm/hypervisor/kvm/imageserver/concurrency.py deleted file mode 100644 index 6b2d28a4069..00000000000 --- a/scripts/vm/hypervisor/kvm/imageserver/concurrency.py +++ /dev/null @@ -1,68 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import threading -from typing import Dict, NamedTuple - - -class _ImageState(NamedTuple): - read_sem: threading.Semaphore - write_sem: threading.Semaphore - lock: threading.Lock - - -class ConcurrencyManager: - """ - Manages per-image read/write semaphores and per-image mutual-exclusion locks. - - Each image_id gets its own independent pool of read slots (default MAX_PARALLEL_READS) - and write slots (default MAX_PARALLEL_WRITES), so concurrent transfers to different images - do not contend with each other. - - The per-image lock serialises operations that must not overlap on the - same image (e.g. flush while writing, extents while writing). - """ - - def __init__(self, max_reads: int, max_writes: int): - self._max_reads = max_reads + 4 - self._max_writes = max_writes + 4 - self._images: Dict[str, _ImageState] = {} - self._guard = threading.Lock() - - def _state_for(self, image_id: str) -> _ImageState: - with self._guard: - state = self._images.get(image_id) - if state is None: - state = _ImageState( - read_sem=threading.Semaphore(self._max_reads), - write_sem=threading.Semaphore(self._max_writes), - lock=threading.Lock(), - ) - self._images[image_id] = state - return state - - def acquire_read(self, image_id: str, blocking: bool = False) -> bool: - return self._state_for(image_id).read_sem.acquire(blocking=blocking) - - def release_read(self, image_id: str) -> None: - self._state_for(image_id).read_sem.release() - - def acquire_write(self, image_id: str, blocking: bool = False) -> bool: - return self._state_for(image_id).write_sem.acquire(blocking=blocking) - - def release_write(self, image_id: str) -> None: - self._state_for(image_id).write_sem.release() diff --git a/scripts/vm/hypervisor/kvm/imageserver/handler.py b/scripts/vm/hypervisor/kvm/imageserver/handler.py index 4701a7581a9..d2d97d7810b 100644 --- a/scripts/vm/hypervisor/kvm/imageserver/handler.py +++ b/scripts/vm/hypervisor/kvm/imageserver/handler.py @@ -24,7 +24,6 @@ from typing import Any, Dict, List, Optional, Tuple from urllib.parse import parse_qs from .backends import NbdBackend, create_backend -from .concurrency import ConcurrencyManager from .config import TransferRegistry from .constants import CHUNK_SIZE, MAX_PARALLEL_READS, MAX_PARALLEL_WRITES, MAX_PATCH_JSON_SIZE from .util import is_fallback_dirty_response, json_bytes, now_s @@ -38,14 +37,13 @@ class Handler(BaseHTTPRequestHandler): All backend I/O is delegated to ImageBackend implementations via the create_backend() factory. - Class-level attributes _concurrency and _registry are injected + Class-level attribute _registry is injected by the server at startup (see server.py / make_handler()). """ server_version = "cloudstack-image-server/1.0" server_protocol = "HTTP/1.1" - _concurrency: ConcurrencyManager _registry: TransferRegistry _CONTENT_RANGE_RE = re.compile(r"^bytes\s+(\d+)-(\d+)/(?:\*|\d+)$") @@ -493,10 +491,6 @@ class Handler(BaseHTTPRequestHandler): def _handle_get_image( self, image_id: str, cfg: Dict[str, Any], range_header: Optional[str] ) -> None: - if not self._concurrency.acquire_read(image_id): - self._send_error_json(HTTPStatus.SERVICE_UNAVAILABLE, "too many parallel reads") - return - start = now_s() bytes_sent = 0 try: @@ -564,7 +558,6 @@ class Handler(BaseHTTPRequestHandler): except Exception: pass finally: - self._concurrency.release_read(image_id) dur = now_s() - start logging.info( "GET end image_id=%s bytes=%d duration_s=%.3f", image_id, bytes_sent, dur @@ -573,10 +566,6 @@ class Handler(BaseHTTPRequestHandler): def _handle_put_image( self, image_id: str, cfg: Dict[str, Any], content_length: int, flush: bool ) -> None: - if not self._concurrency.acquire_write(image_id): - self._send_error_json(HTTPStatus.SERVICE_UNAVAILABLE, "too many parallel writes") - return - start = now_s() bytes_written = 0 try: @@ -596,7 +585,6 @@ class Handler(BaseHTTPRequestHandler): logging.error("PUT error image_id=%s err=%r", image_id, e) self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error") finally: - self._concurrency.release_write(image_id) dur = now_s() - start logging.info( "PUT end image_id=%s bytes=%d duration_s=%.3f", image_id, bytes_written, dur @@ -610,10 +598,6 @@ class Handler(BaseHTTPRequestHandler): content_length: int, flush: bool, ) -> None: - if not self._concurrency.acquire_write(image_id): - self._send_error_json(HTTPStatus.SERVICE_UNAVAILABLE, "too many parallel writes") - return - start = now_s() bytes_written = 0 try: @@ -650,7 +634,6 @@ class Handler(BaseHTTPRequestHandler): logging.error("PUT range error image_id=%s err=%r", image_id, e) self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error") finally: - self._concurrency.release_write(image_id) dur = now_s() - start logging.info( "PUT range end image_id=%s bytes=%d duration_s=%.3f flush=%s", @@ -725,10 +708,6 @@ class Handler(BaseHTTPRequestHandler): size: int, flush: bool, ) -> None: - if not self._concurrency.acquire_write(image_id): - self._send_error_json(HTTPStatus.SERVICE_UNAVAILABLE, "too many parallel writes") - return - start = now_s() try: logging.info( @@ -749,7 +728,6 @@ class Handler(BaseHTTPRequestHandler): logging.error("PATCH zero error image_id=%s err=%r", image_id, e) self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error") finally: - self._concurrency.release_write(image_id) dur = now_s() - start logging.info("PATCH zero end image_id=%s duration_s=%.3f", image_id, dur) @@ -760,10 +738,6 @@ class Handler(BaseHTTPRequestHandler): range_header: str, content_length: int, ) -> None: - if not self._concurrency.acquire_write(image_id): - self._send_error_json(HTTPStatus.SERVICE_UNAVAILABLE, "too many parallel writes") - return - start = now_s() bytes_written = 0 try: @@ -807,7 +781,6 @@ class Handler(BaseHTTPRequestHandler): logging.error("PATCH range error image_id=%s err=%r", image_id, e) self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error") finally: - self._concurrency.release_write(image_id) dur = now_s() - start logging.info( "PATCH range end image_id=%s bytes=%d duration_s=%.3f", diff --git a/scripts/vm/hypervisor/kvm/imageserver/server.py b/scripts/vm/hypervisor/kvm/imageserver/server.py index 1bc42252d4f..50bd4e0b139 100644 --- a/scripts/vm/hypervisor/kvm/imageserver/server.py +++ b/scripts/vm/hypervisor/kvm/imageserver/server.py @@ -33,7 +33,6 @@ except ImportError: class ThreadingHTTPServer(ThreadingMixIn, HTTPServer): # type: ignore[no-redef] pass -from .concurrency import ConcurrencyManager from .config import TransferRegistry, validate_transfer_config from .constants import ( CONTROL_RECV_BUFFER, @@ -42,14 +41,11 @@ from .constants import ( CONTROL_SOCKET_PERMISSIONS, DEFAULT_HTTP_PORT, DEFAULT_LISTEN_ADDRESS, - MAX_PARALLEL_READS, - MAX_PARALLEL_WRITES, ) from .handler import Handler def make_handler( - concurrency: ConcurrencyManager, registry: TransferRegistry, ) -> Type[Handler]: """ @@ -60,7 +56,6 @@ def make_handler( """ class ConfiguredHandler(Handler): - _concurrency = concurrency _registry = registry return ConfiguredHandler @@ -186,8 +181,7 @@ def main() -> None: ) registry = TransferRegistry() - concurrency = ConcurrencyManager(MAX_PARALLEL_READS, MAX_PARALLEL_WRITES) - handler_cls = make_handler(concurrency, registry) + handler_cls = make_handler(registry) ctrl_thread = threading.Thread( target=_control_listener,