Remove image server per-image concurrency locks

This commit is contained in:
Abhisar Sinha 2026-04-14 21:51:54 +05:30
parent c40b30bc4a
commit 00d1dbc363
3 changed files with 2 additions and 103 deletions

View File

@ -1,68 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import threading
from typing import Dict, NamedTuple
class _ImageState(NamedTuple):
read_sem: threading.Semaphore
write_sem: threading.Semaphore
lock: threading.Lock
class ConcurrencyManager:
"""
Manages per-image read/write semaphores and per-image mutual-exclusion locks.
Each image_id gets its own independent pool of read slots (default MAX_PARALLEL_READS)
and write slots (default MAX_PARALLEL_WRITES), so concurrent transfers to different images
do not contend with each other.
The per-image lock serialises operations that must not overlap on the
same image (e.g. flush while writing, extents while writing).
"""
def __init__(self, max_reads: int, max_writes: int):
self._max_reads = max_reads + 4
self._max_writes = max_writes + 4
self._images: Dict[str, _ImageState] = {}
self._guard = threading.Lock()
def _state_for(self, image_id: str) -> _ImageState:
with self._guard:
state = self._images.get(image_id)
if state is None:
state = _ImageState(
read_sem=threading.Semaphore(self._max_reads),
write_sem=threading.Semaphore(self._max_writes),
lock=threading.Lock(),
)
self._images[image_id] = state
return state
def acquire_read(self, image_id: str, blocking: bool = False) -> bool:
return self._state_for(image_id).read_sem.acquire(blocking=blocking)
def release_read(self, image_id: str) -> None:
self._state_for(image_id).read_sem.release()
def acquire_write(self, image_id: str, blocking: bool = False) -> bool:
return self._state_for(image_id).write_sem.acquire(blocking=blocking)
def release_write(self, image_id: str) -> None:
self._state_for(image_id).write_sem.release()

View File

@ -24,7 +24,6 @@ from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import parse_qs
from .backends import NbdBackend, create_backend
from .concurrency import ConcurrencyManager
from .config import TransferRegistry
from .constants import CHUNK_SIZE, MAX_PARALLEL_READS, MAX_PARALLEL_WRITES, MAX_PATCH_JSON_SIZE
from .util import is_fallback_dirty_response, json_bytes, now_s
@ -38,14 +37,13 @@ class Handler(BaseHTTPRequestHandler):
All backend I/O is delegated to ImageBackend implementations via the
create_backend() factory.
Class-level attributes _concurrency and _registry are injected
Class-level attribute _registry is injected
by the server at startup (see server.py / make_handler()).
"""
server_version = "cloudstack-image-server/1.0"
server_protocol = "HTTP/1.1"
_concurrency: ConcurrencyManager
_registry: TransferRegistry
_CONTENT_RANGE_RE = re.compile(r"^bytes\s+(\d+)-(\d+)/(?:\*|\d+)$")
@ -493,10 +491,6 @@ class Handler(BaseHTTPRequestHandler):
def _handle_get_image(
self, image_id: str, cfg: Dict[str, Any], range_header: Optional[str]
) -> None:
if not self._concurrency.acquire_read(image_id):
self._send_error_json(HTTPStatus.SERVICE_UNAVAILABLE, "too many parallel reads")
return
start = now_s()
bytes_sent = 0
try:
@ -564,7 +558,6 @@ class Handler(BaseHTTPRequestHandler):
except Exception:
pass
finally:
self._concurrency.release_read(image_id)
dur = now_s() - start
logging.info(
"GET end image_id=%s bytes=%d duration_s=%.3f", image_id, bytes_sent, dur
@ -573,10 +566,6 @@ class Handler(BaseHTTPRequestHandler):
def _handle_put_image(
self, image_id: str, cfg: Dict[str, Any], content_length: int, flush: bool
) -> None:
if not self._concurrency.acquire_write(image_id):
self._send_error_json(HTTPStatus.SERVICE_UNAVAILABLE, "too many parallel writes")
return
start = now_s()
bytes_written = 0
try:
@ -596,7 +585,6 @@ class Handler(BaseHTTPRequestHandler):
logging.error("PUT error image_id=%s err=%r", image_id, e)
self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error")
finally:
self._concurrency.release_write(image_id)
dur = now_s() - start
logging.info(
"PUT end image_id=%s bytes=%d duration_s=%.3f", image_id, bytes_written, dur
@ -610,10 +598,6 @@ class Handler(BaseHTTPRequestHandler):
content_length: int,
flush: bool,
) -> None:
if not self._concurrency.acquire_write(image_id):
self._send_error_json(HTTPStatus.SERVICE_UNAVAILABLE, "too many parallel writes")
return
start = now_s()
bytes_written = 0
try:
@ -650,7 +634,6 @@ class Handler(BaseHTTPRequestHandler):
logging.error("PUT range error image_id=%s err=%r", image_id, e)
self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error")
finally:
self._concurrency.release_write(image_id)
dur = now_s() - start
logging.info(
"PUT range end image_id=%s bytes=%d duration_s=%.3f flush=%s",
@ -725,10 +708,6 @@ class Handler(BaseHTTPRequestHandler):
size: int,
flush: bool,
) -> None:
if not self._concurrency.acquire_write(image_id):
self._send_error_json(HTTPStatus.SERVICE_UNAVAILABLE, "too many parallel writes")
return
start = now_s()
try:
logging.info(
@ -749,7 +728,6 @@ class Handler(BaseHTTPRequestHandler):
logging.error("PATCH zero error image_id=%s err=%r", image_id, e)
self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error")
finally:
self._concurrency.release_write(image_id)
dur = now_s() - start
logging.info("PATCH zero end image_id=%s duration_s=%.3f", image_id, dur)
@ -760,10 +738,6 @@ class Handler(BaseHTTPRequestHandler):
range_header: str,
content_length: int,
) -> None:
if not self._concurrency.acquire_write(image_id):
self._send_error_json(HTTPStatus.SERVICE_UNAVAILABLE, "too many parallel writes")
return
start = now_s()
bytes_written = 0
try:
@ -807,7 +781,6 @@ class Handler(BaseHTTPRequestHandler):
logging.error("PATCH range error image_id=%s err=%r", image_id, e)
self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error")
finally:
self._concurrency.release_write(image_id)
dur = now_s() - start
logging.info(
"PATCH range end image_id=%s bytes=%d duration_s=%.3f",

View File

@ -33,7 +33,6 @@ except ImportError:
class ThreadingHTTPServer(ThreadingMixIn, HTTPServer): # type: ignore[no-redef]
pass
from .concurrency import ConcurrencyManager
from .config import TransferRegistry, validate_transfer_config
from .constants import (
CONTROL_RECV_BUFFER,
@ -42,14 +41,11 @@ from .constants import (
CONTROL_SOCKET_PERMISSIONS,
DEFAULT_HTTP_PORT,
DEFAULT_LISTEN_ADDRESS,
MAX_PARALLEL_READS,
MAX_PARALLEL_WRITES,
)
from .handler import Handler
def make_handler(
concurrency: ConcurrencyManager,
registry: TransferRegistry,
) -> Type[Handler]:
"""
@ -60,7 +56,6 @@ def make_handler(
"""
class ConfiguredHandler(Handler):
_concurrency = concurrency
_registry = registry
return ConfiguredHandler
@ -186,8 +181,7 @@ def main() -> None:
)
registry = TransferRegistry()
concurrency = ConcurrencyManager(MAX_PARALLEL_READS, MAX_PARALLEL_WRITES)
handler_cls = make_handler(concurrency, registry)
handler_cls = make_handler(registry)
ctrl_thread = threading.Thread(
target=_control_listener,