diff --git a/api/src/main/java/org/apache/cloudstack/backup/KVMBackupExportService.java b/api/src/main/java/org/apache/cloudstack/backup/KVMBackupExportService.java index 7a53c1370c6..fbbde961ad1 100644 --- a/api/src/main/java/org/apache/cloudstack/backup/KVMBackupExportService.java +++ b/api/src/main/java/org/apache/cloudstack/backup/KVMBackupExportService.java @@ -43,6 +43,12 @@ public interface KVMBackupExportService extends Configurable, PluggableService { "10", "The image transfer progress polling interval in seconds.", true, ConfigKey.Scope.Global); + ConfigKey ImageTransferIdleTimeoutSeconds = new ConfigKey<>("Advanced", Integer.class, + "image.transfer.idle.timeout.seconds", + "600", + "Seconds since last completed HTTP request to an image transfer before the image server unregisters it (idle timeout).", + true, ConfigKey.Scope.Zone); + ConfigKey ExposeKVMBackupExportServiceApis = new ConfigKey<>("Advanced", Boolean.class, "expose.kvm.backup.export.service.apis", "false", diff --git a/core/src/main/java/org/apache/cloudstack/backup/CreateImageTransferCommand.java b/core/src/main/java/org/apache/cloudstack/backup/CreateImageTransferCommand.java index 3e042bf4249..95b56c9a9c3 100644 --- a/core/src/main/java/org/apache/cloudstack/backup/CreateImageTransferCommand.java +++ b/core/src/main/java/org/apache/cloudstack/backup/CreateImageTransferCommand.java @@ -27,25 +27,27 @@ public class CreateImageTransferCommand extends Command { private String checkpointId; private String file; private ImageTransfer.Backend backend; + private int idleTimeoutSeconds; public CreateImageTransferCommand() { } - private CreateImageTransferCommand(String transferId, String direction, String socket) { + private CreateImageTransferCommand(String transferId, String direction, String socket, int idleTimeoutSeconds) { this.transferId = transferId; this.direction = direction; this.socket = socket; + this.idleTimeoutSeconds = idleTimeoutSeconds; } - public CreateImageTransferCommand(String transferId, String direction, String exportName, String socket, String checkpointId) { - this(transferId, direction, socket); + public CreateImageTransferCommand(String transferId, String direction, String exportName, String socket, String checkpointId, int idleTimeoutSeconds) { + this(transferId, direction, socket, idleTimeoutSeconds); this.backend = ImageTransfer.Backend.nbd; this.exportName = exportName; this.checkpointId = checkpointId; } - public CreateImageTransferCommand(String transferId, String direction, String socket, String file) { - this(transferId, direction, socket); + public CreateImageTransferCommand(String transferId, String direction, String socket, String file, int idleTimeoutSeconds) { + this(transferId, direction, socket, idleTimeoutSeconds); if (direction == ImageTransfer.Direction.download.toString()) { throw new IllegalArgumentException("File backend is only supported for upload"); } @@ -85,4 +87,8 @@ public class CreateImageTransferCommand extends Command { public String getCheckpointId() { return checkpointId; } + + public int getIdleTimeoutSeconds() { + return idleTimeoutSeconds; + } } diff --git a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtCreateImageTransferCommandWrapper.java b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtCreateImageTransferCommandWrapper.java index 1b9b33f83a9..01fd11524bc 100644 --- a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtCreateImageTransferCommandWrapper.java +++ b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtCreateImageTransferCommandWrapper.java @@ -131,6 +131,7 @@ public class LibvirtCreateImageTransferCommandWrapper extends CommandWrapper payload = new HashMap<>(); payload.put("backend", backend.toString()); + payload.put("idle_timeout_seconds", cmd.getIdleTimeoutSeconds()); if (backend == ImageTransfer.Backend.file) { final String filePath = cmd.getFile(); diff --git a/scripts/vm/hypervisor/kvm/imageserver/config.py b/scripts/vm/hypervisor/kvm/imageserver/config.py index 3b1fd686f05..98515d7519b 100644 --- a/scripts/vm/hypervisor/kvm/imageserver/config.py +++ b/scripts/vm/hypervisor/kvm/imageserver/config.py @@ -18,7 +18,60 @@ import logging import os import threading -from typing import Any, Dict, Optional +import time +from contextlib import contextmanager +from typing import Any, Dict, Iterator, List, Optional + +from .constants import DEFAULT_IDLE_TIMEOUT_SECONDS + + +def parse_idle_timeout_seconds(obj: dict) -> int: + """Seconds of idle time (no completed HTTP requests) before unregister.""" + v = obj.get("idle_timeout_seconds", DEFAULT_IDLE_TIMEOUT_SECONDS) + if not isinstance(v, int): + raise ValueError("idle_timeout_seconds must be an integer") + v = int(v) + if v < 1: + v = 86400 * 7 + return v + + +def validate_transfer_config(obj: dict) -> dict: + """ + Validate and normalize a transfer config dict received over the control + socket. Returns the cleaned config or raises ValueError. + """ + idle_sec = parse_idle_timeout_seconds(obj) + + backend = obj.get("backend") + if backend is None: + backend = "nbd" + if not isinstance(backend, str): + raise ValueError("invalid backend type") + backend = backend.lower() + if backend not in ("nbd", "file"): + raise ValueError(f"unsupported backend: {backend}") + + if backend == "file": + file_path = obj.get("file") + if not isinstance(file_path, str) or not file_path.strip(): + raise ValueError("missing/invalid file path for file backend") + return {"backend": "file", "file": file_path.strip(), "idle_timeout_seconds": idle_sec} + + socket_path = obj.get("socket") + export = obj.get("export") + export_bitmap = obj.get("export_bitmap") + if not isinstance(socket_path, str) or not socket_path.strip(): + raise ValueError("missing/invalid socket path for nbd backend") + if export is not None and (not isinstance(export, str) or not export): + raise ValueError("invalid export name") + return { + "backend": "nbd", + "socket": socket_path.strip(), + "export": export, + "export_bitmap": export_bitmap, + "idle_timeout_seconds": idle_sec, + } def safe_transfer_id(image_id: str) -> Optional[str]: @@ -43,11 +96,17 @@ class TransferRegistry: The cloudstack-agent registers/unregisters transfers via the Unix domain control socket. The HTTP handler looks up configs through get(). + + Each transfer may specify idle_timeout_seconds (default DEFAULT_IDLE_TIMEOUT_SECONDS). + After no in-flight HTTP requests have completed for that idle period, the transfer + is removed (same effect as unregister). """ def __init__(self) -> None: self._lock = threading.Lock() self._transfers: Dict[str, Dict[str, Any]] = {} + self._last_activity: Dict[str, float] = {} + self._inflight: Dict[str, int] = {} def register(self, transfer_id: str, config: Dict[str, Any]) -> bool: safe_id = safe_transfer_id(transfer_id) @@ -56,6 +115,8 @@ class TransferRegistry: return False with self._lock: self._transfers[safe_id] = config + self._last_activity[safe_id] = time.monotonic() + self._inflight.pop(safe_id, None) logging.info("registered transfer_id=%s active=%d", safe_id, len(self._transfers)) return True @@ -68,6 +129,8 @@ class TransferRegistry: return len(self._transfers) with self._lock: self._transfers.pop(safe_id, None) + self._last_activity.pop(safe_id, None) + self._inflight.pop(safe_id, None) remaining = len(self._transfers) logging.info("unregistered transfer_id=%s active=%d", safe_id, remaining) return remaining @@ -82,3 +145,56 @@ class TransferRegistry: def active_count(self) -> int: with self._lock: return len(self._transfers) + + @contextmanager + def request_lifecycle(self, transfer_id: str) -> Iterator[None]: + """ + Track an HTTP request for idle-timeout purposes. + + Expiry is based on time since the last request *completed* (all in-flight + work for this transfer_id finished). Transfers with active requests are + never expired. + """ + safe_id = safe_transfer_id(transfer_id) + if safe_id is None: + yield + return + with self._lock: + if safe_id not in self._transfers: + yield + return + self._inflight[safe_id] = self._inflight.get(safe_id, 0) + 1 + try: + yield + finally: + now = time.monotonic() + with self._lock: + count = self._inflight.get(safe_id, 1) - 1 + if count <= 0: + self._inflight.pop(safe_id, None) + if safe_id in self._transfers: + self._last_activity[safe_id] = now + else: + self._inflight[safe_id] = count + + def sweep_expired_transfers(self) -> None: + """Remove transfers that exceeded idle_timeout_seconds with no in-flight HTTP work.""" + now = time.monotonic() + with self._lock: + expired: List[str] = [] + for tid, cfg in list(self._transfers.items()): + if self._inflight.get(tid, 0) > 0: + continue + timeout = int(cfg.get("idle_timeout_seconds", DEFAULT_IDLE_TIMEOUT_SECONDS)) + last = self._last_activity.get(tid, now) + if now - last >= timeout: + expired.append(tid) + for tid in expired: + self._transfers.pop(tid, None) + self._last_activity.pop(tid, None) + self._inflight.pop(tid, None) + logging.info( + "idle expiry: unregistered transfer_id=%s active=%d", + tid, + len(self._transfers), + ) diff --git a/scripts/vm/hypervisor/kvm/imageserver/constants.py b/scripts/vm/hypervisor/kvm/imageserver/constants.py index 33cf3001d7a..0b6465527f4 100644 --- a/scripts/vm/hypervisor/kvm/imageserver/constants.py +++ b/scripts/vm/hypervisor/kvm/imageserver/constants.py @@ -36,6 +36,10 @@ CONTROL_SOCKET_BACKLOG = 32 CONTROL_SOCKET_PERMISSIONS = 0o660 CONTROL_RECV_BUFFER = 4096 +# Transfer idle timeout (seconds). A transfer is expired when no in-flight HTTP +# requests have completed for this duration. +DEFAULT_IDLE_TIMEOUT_SECONDS = 600 + # Maximum size of a JSON body in a PATCH request (zero / flush ops) MAX_PATCH_JSON_SIZE = 64 * 1024 # 64 KiB diff --git a/scripts/vm/hypervisor/kvm/imageserver/handler.py b/scripts/vm/hypervisor/kvm/imageserver/handler.py index 9bfed8d52f9..c28a0657581 100644 --- a/scripts/vm/hypervisor/kvm/imageserver/handler.py +++ b/scripts/vm/hypervisor/kvm/imageserver/handler.py @@ -213,57 +213,58 @@ class Handler(BaseHTTPRequestHandler): self._send_error_json(HTTPStatus.NOT_FOUND, "unknown image_id") return - backend = create_backend(cfg) - try: - if not backend.supports_extents: - allowed_methods = "GET, PUT, POST, OPTIONS" - features = ["flush"] + with self._registry.request_lifecycle(image_id): + backend = create_backend(cfg) + try: + if not backend.supports_extents: + allowed_methods = "GET, PUT, POST, OPTIONS" + features = ["flush"] + response = { + "unix_socket": None, + "features": features, + "max_readers": MAX_PARALLEL_READS, + "max_writers": MAX_PARALLEL_WRITES, + } + self._send_json(HTTPStatus.OK, response, allowed_methods=allowed_methods) + return + + read_only = True + can_flush = False + can_zero = False + try: + caps = backend.get_capabilities() + read_only = caps["read_only"] + can_flush = caps["can_flush"] + can_zero = caps["can_zero"] + except Exception as e: + logging.warning("OPTIONS: could not query backend capabilities: %r", e) + read_only = bool(cfg.get("read_only")) + if not read_only: + can_flush = True + can_zero = True + + if read_only: + allowed_methods = "GET, OPTIONS" + features = ["extents"] + max_writers = 0 + else: + allowed_methods = "GET, PUT, PATCH, OPTIONS" + features = ["extents"] + if can_zero: + features.append("zero") + if can_flush: + features.append("flush") + max_writers = MAX_PARALLEL_WRITES + response = { "unix_socket": None, "features": features, "max_readers": MAX_PARALLEL_READS, - "max_writers": MAX_PARALLEL_WRITES, + "max_writers": max_writers, } self._send_json(HTTPStatus.OK, response, allowed_methods=allowed_methods) - return - - read_only = True - can_flush = False - can_zero = False - try: - caps = backend.get_capabilities() - read_only = caps["read_only"] - can_flush = caps["can_flush"] - can_zero = caps["can_zero"] - except Exception as e: - logging.warning("OPTIONS: could not query backend capabilities: %r", e) - read_only = bool(cfg.get("read_only")) - if not read_only: - can_flush = True - can_zero = True - - if read_only: - allowed_methods = "GET, OPTIONS" - features = ["extents"] - max_writers = 0 - else: - allowed_methods = "GET, PUT, PATCH, OPTIONS" - features = ["extents"] - if can_zero: - features.append("zero") - if can_flush: - features.append("flush") - max_writers = MAX_PARALLEL_WRITES - - response = { - "unix_socket": None, - "features": features, - "max_readers": MAX_PARALLEL_READS, - "max_writers": max_writers, - } - self._send_json(HTTPStatus.OK, response, allowed_methods=allowed_methods) - finally: - backend.close() + finally: + backend.close() def do_GET(self) -> None: image_id, tail = self._parse_route() @@ -277,25 +278,27 @@ class Handler(BaseHTTPRequestHandler): return if tail == "extents": - backend = create_backend(cfg) - try: - if not backend.supports_extents: - self._send_error_json( - HTTPStatus.BAD_REQUEST, "extents not supported for file backend" - ) - return - finally: - backend.close() - query = self._parse_query() - context = (query.get("context") or [None])[0] - self._handle_get_extents(image_id, cfg, context=context) + with self._registry.request_lifecycle(image_id): + backend = create_backend(cfg) + try: + if not backend.supports_extents: + self._send_error_json( + HTTPStatus.BAD_REQUEST, "extents not supported for file backend" + ) + return + finally: + backend.close() + query = self._parse_query() + context = (query.get("context") or [None])[0] + self._handle_get_extents(image_id, cfg, context=context) return if tail is not None: self._send_error_json(HTTPStatus.NOT_FOUND, "not found") return range_header = self.headers.get("Range") - self._handle_get_image(image_id, cfg, range_header) + with self._registry.request_lifecycle(image_id): + self._handle_get_image(image_id, cfg, range_header) def do_PUT(self) -> None: image_id, tail = self._parse_route() @@ -308,46 +311,47 @@ class Handler(BaseHTTPRequestHandler): self._send_error_json(HTTPStatus.NOT_FOUND, "unknown image_id") return - if self.headers.get("Range") is not None: - self._send_error_json( - HTTPStatus.BAD_REQUEST, - "Range header not supported for PUT; use Content-Range or PATCH", - ) - return + with self._registry.request_lifecycle(image_id): + if self.headers.get("Range") is not None: + self._send_error_json( + HTTPStatus.BAD_REQUEST, + "Range header not supported for PUT; use Content-Range or PATCH", + ) + return - content_length_hdr = self.headers.get("Content-Length") - if content_length_hdr is None: - self._send_error_json(HTTPStatus.BAD_REQUEST, "Content-Length required") - return - try: - content_length = int(content_length_hdr) - except ValueError: - self._send_error_json(HTTPStatus.BAD_REQUEST, "Invalid Content-Length") - return - if content_length < 0: - self._send_error_json(HTTPStatus.BAD_REQUEST, "Invalid Content-Length") - return - - query = self._parse_query() - flush_param = (query.get("flush") or ["n"])[0].lower() - flush = flush_param in ("y", "yes", "true", "1") - - content_range_hdr = self.headers.get("Content-Range") - if content_range_hdr is not None: - backend = create_backend(cfg) + content_length_hdr = self.headers.get("Content-Length") + if content_length_hdr is None: + self._send_error_json(HTTPStatus.BAD_REQUEST, "Content-Length required") + return try: - if not backend.supports_range_write: - self._send_error_json( - HTTPStatus.BAD_REQUEST, - "Content-Range PUT not supported for file backend; use full PUT", - ) - return - finally: - backend.close() - self._handle_put_range(image_id, cfg, content_range_hdr, content_length, flush) - return + content_length = int(content_length_hdr) + except ValueError: + self._send_error_json(HTTPStatus.BAD_REQUEST, "Invalid Content-Length") + return + if content_length < 0: + self._send_error_json(HTTPStatus.BAD_REQUEST, "Invalid Content-Length") + return - self._handle_put_image(image_id, cfg, content_length, flush) + query = self._parse_query() + flush_param = (query.get("flush") or ["n"])[0].lower() + flush = flush_param in ("y", "yes", "true", "1") + + content_range_hdr = self.headers.get("Content-Range") + if content_range_hdr is not None: + backend = create_backend(cfg) + try: + if not backend.supports_range_write: + self._send_error_json( + HTTPStatus.BAD_REQUEST, + "Content-Range PUT not supported for file backend; use full PUT", + ) + return + finally: + backend.close() + self._handle_put_range(image_id, cfg, content_range_hdr, content_length, flush) + return + + self._handle_put_image(image_id, cfg, content_length, flush) def do_POST(self) -> None: image_id, tail = self._parse_route() @@ -361,7 +365,8 @@ class Handler(BaseHTTPRequestHandler): return if tail == "flush": - self._handle_post_flush(image_id, cfg) + with self._registry.request_lifecycle(image_id): + self._handle_post_flush(image_id, cfg) return self._send_error_json(HTTPStatus.NOT_FOUND, "not found") @@ -376,21 +381,44 @@ class Handler(BaseHTTPRequestHandler): self._send_error_json(HTTPStatus.NOT_FOUND, "unknown image_id") return - backend = create_backend(cfg) - try: - if not backend.supports_range_write: + with self._registry.request_lifecycle(image_id): + backend = create_backend(cfg) + try: + if not backend.supports_range_write: + self._send_error_json( + HTTPStatus.BAD_REQUEST, + "range writes and PATCH not supported for file backend; use PUT for full upload", + ) + return + finally: + backend.close() + + content_type = self.headers.get("Content-Type", "").split(";")[0].strip().lower() + range_header = self.headers.get("Range") + + if range_header is not None and content_type != "application/json": + content_length_hdr = self.headers.get("Content-Length") + if content_length_hdr is None: + self._send_error_json(HTTPStatus.BAD_REQUEST, "Content-Length required") + return + try: + content_length = int(content_length_hdr) + except ValueError: + self._send_error_json(HTTPStatus.BAD_REQUEST, "Invalid Content-Length") + return + if content_length <= 0: + self._send_error_json(HTTPStatus.BAD_REQUEST, "Content-Length must be positive") + return + self._handle_patch_range(image_id, cfg, range_header, content_length) + return + + if content_type != "application/json": self._send_error_json( - HTTPStatus.BAD_REQUEST, - "range writes and PATCH not supported for file backend; use PUT for full upload", + HTTPStatus.UNSUPPORTED_MEDIA_TYPE, + "PATCH requires Content-Type: application/json (for zero/flush) or Range with binary body", ) return - finally: - backend.close() - content_type = self.headers.get("Content-Type", "").split(";")[0].strip().lower() - range_header = self.headers.get("Range") - - if range_header is not None and content_type != "application/json": content_length_hdr = self.headers.get("Content-Length") if content_length_hdr is None: self._send_error_json(HTTPStatus.BAD_REQUEST, "Content-Length required") @@ -400,82 +428,60 @@ class Handler(BaseHTTPRequestHandler): except ValueError: self._send_error_json(HTTPStatus.BAD_REQUEST, "Invalid Content-Length") return - if content_length <= 0: - self._send_error_json(HTTPStatus.BAD_REQUEST, "Content-Length must be positive") + if content_length <= 0 or content_length > MAX_PATCH_JSON_SIZE: + self._send_error_json(HTTPStatus.BAD_REQUEST, "Invalid Content-Length") return - self._handle_patch_range(image_id, cfg, range_header, content_length) - return - if content_type != "application/json": - self._send_error_json( - HTTPStatus.UNSUPPORTED_MEDIA_TYPE, - "PATCH requires Content-Type: application/json (for zero/flush) or Range with binary body", - ) - return + body = self.rfile.read(content_length) + if len(body) != content_length: + self._send_error_json(HTTPStatus.BAD_REQUEST, "request body truncated") + return - content_length_hdr = self.headers.get("Content-Length") - if content_length_hdr is None: - self._send_error_json(HTTPStatus.BAD_REQUEST, "Content-Length required") - return - try: - content_length = int(content_length_hdr) - except ValueError: - self._send_error_json(HTTPStatus.BAD_REQUEST, "Invalid Content-Length") - return - if content_length <= 0 or content_length > MAX_PATCH_JSON_SIZE: - self._send_error_json(HTTPStatus.BAD_REQUEST, "Invalid Content-Length") - return - - body = self.rfile.read(content_length) - if len(body) != content_length: - self._send_error_json(HTTPStatus.BAD_REQUEST, "request body truncated") - return - - try: - payload = json.loads(body.decode("utf-8")) - except (json.JSONDecodeError, UnicodeDecodeError) as e: - self._send_error_json(HTTPStatus.BAD_REQUEST, f"invalid JSON: {e}") - return - - if not isinstance(payload, dict): - self._send_error_json(HTTPStatus.BAD_REQUEST, "body must be a JSON object") - return - - op = payload.get("op") - if op == "flush": - self._handle_post_flush(image_id, cfg) - return - if op != "zero": - self._send_error_json( - HTTPStatus.BAD_REQUEST, - "unsupported op; only \"zero\" and \"flush\" are supported", - ) - return - - try: - size = int(payload.get("size")) - except (TypeError, ValueError): - self._send_error_json(HTTPStatus.BAD_REQUEST, "missing or invalid \"size\"") - return - if size <= 0: - self._send_error_json(HTTPStatus.BAD_REQUEST, "\"size\" must be positive") - return - - offset = payload.get("offset") - if offset is None: - offset = 0 - else: try: - offset = int(offset) - except (TypeError, ValueError): - self._send_error_json(HTTPStatus.BAD_REQUEST, "invalid \"offset\"") - return - if offset < 0: - self._send_error_json(HTTPStatus.BAD_REQUEST, "\"offset\" must be non-negative") + payload = json.loads(body.decode("utf-8")) + except (json.JSONDecodeError, UnicodeDecodeError) as e: + self._send_error_json(HTTPStatus.BAD_REQUEST, f"invalid JSON: {e}") return - flush = bool(payload.get("flush", False)) - self._handle_patch_zero(image_id, cfg, offset=offset, size=size, flush=flush) + if not isinstance(payload, dict): + self._send_error_json(HTTPStatus.BAD_REQUEST, "body must be a JSON object") + return + + op = payload.get("op") + if op == "flush": + self._handle_post_flush(image_id, cfg) + return + if op != "zero": + self._send_error_json( + HTTPStatus.BAD_REQUEST, + "unsupported op; only \"zero\" and \"flush\" are supported", + ) + return + + try: + size = int(payload.get("size")) + except (TypeError, ValueError): + self._send_error_json(HTTPStatus.BAD_REQUEST, "missing or invalid \"size\"") + return + if size <= 0: + self._send_error_json(HTTPStatus.BAD_REQUEST, "\"size\" must be positive") + return + + offset = payload.get("offset") + if offset is None: + offset = 0 + else: + try: + offset = int(offset) + except (TypeError, ValueError): + self._send_error_json(HTTPStatus.BAD_REQUEST, "invalid \"offset\"") + return + if offset < 0: + self._send_error_json(HTTPStatus.BAD_REQUEST, "\"offset\" must be non-negative") + return + + flush = bool(payload.get("flush", False)) + self._handle_patch_zero(image_id, cfg, offset=offset, size=size, flush=flush) # ------------------------------------------------------------------ # Operation handlers diff --git a/scripts/vm/hypervisor/kvm/imageserver/server.py b/scripts/vm/hypervisor/kvm/imageserver/server.py index 99318bc58fc..1bc42252d4f 100644 --- a/scripts/vm/hypervisor/kvm/imageserver/server.py +++ b/scripts/vm/hypervisor/kvm/imageserver/server.py @@ -22,6 +22,7 @@ import os import socket import ssl import threading +import time from http.server import HTTPServer from socketserver import ThreadingMixIn from typing import Type @@ -33,7 +34,7 @@ except ImportError: pass from .concurrency import ConcurrencyManager -from .config import TransferRegistry +from .config import TransferRegistry, validate_transfer_config from .constants import ( CONTROL_RECV_BUFFER, CONTROL_SOCKET, @@ -65,41 +66,6 @@ def make_handler( return ConfiguredHandler -def _validate_config(obj: dict) -> dict: - """ - Validate and normalize a transfer config dict received over the control - socket. Returns the cleaned config or raises ValueError. - """ - backend = obj.get("backend") - if backend is None: - backend = "nbd" - if not isinstance(backend, str): - raise ValueError("invalid backend type") - backend = backend.lower() - if backend not in ("nbd", "file"): - raise ValueError(f"unsupported backend: {backend}") - - if backend == "file": - file_path = obj.get("file") - if not isinstance(file_path, str) or not file_path.strip(): - raise ValueError("missing/invalid file path for file backend") - return {"backend": "file", "file": file_path.strip()} - - socket_path = obj.get("socket") - export = obj.get("export") - export_bitmap = obj.get("export_bitmap") - if not isinstance(socket_path, str) or not socket_path.strip(): - raise ValueError("missing/invalid socket path for nbd backend") - if export is not None and (not isinstance(export, str) or not export): - raise ValueError("invalid export name") - return { - "backend": "nbd", - "socket": socket_path.strip(), - "export": export, - "export_bitmap": export_bitmap, - } - - def _handle_control_conn(conn: socket.socket, registry: TransferRegistry) -> None: """Handle a single control-socket connection (one JSON request/response).""" try: @@ -122,7 +88,7 @@ def _handle_control_conn(conn: socket.socket, registry: TransferRegistry) -> Non resp = {"status": "error", "message": "missing transfer_id or config"} else: try: - config = _validate_config(raw_config) + config = validate_transfer_config(raw_config) except ValueError as e: resp = {"status": "error", "message": str(e)} else: @@ -153,6 +119,15 @@ def _handle_control_conn(conn: socket.socket, registry: TransferRegistry) -> Non conn.close() +def _idle_sweep_loop(registry: TransferRegistry, interval_s: float = 10.0) -> None: + while True: + time.sleep(interval_s) + try: + registry.sweep_expired_transfers() + except Exception: + logging.exception("idle sweep error") + + def _control_listener(registry: TransferRegistry, sock_path: str) -> None: """Accept loop for the Unix domain control socket (runs in a daemon thread).""" if os.path.exists(sock_path): @@ -221,6 +196,13 @@ def main() -> None: ) ctrl_thread.start() + sweep_thread = threading.Thread( + target=_idle_sweep_loop, + args=(registry,), + daemon=True, + ) + sweep_thread.start() + addr = (args.listen, args.port) httpd = ThreadingHTTPServer(addr, handler_cls) diff --git a/scripts/vm/hypervisor/kvm/imageserver/tests/test_base.py b/scripts/vm/hypervisor/kvm/imageserver/tests/test_base.py index 2ae95d01f4b..c8703f8a108 100644 --- a/scripts/vm/hypervisor/kvm/imageserver/tests/test_base.py +++ b/scripts/vm/hypervisor/kvm/imageserver/tests/test_base.py @@ -374,18 +374,23 @@ def make_tmp_image(data=None, image_size=IMAGE_SIZE) -> str: return path -def make_file_transfer(data=None, image_size=IMAGE_SIZE): +def make_file_transfer(data=None, image_size=IMAGE_SIZE, idle_timeout_seconds=None): """ Create a temp file + register a file-backend transfer. Returns (transfer_id, url, file_path, cleanup_callable). + + If *idle_timeout_seconds* is set, it is sent in the transfer config (for idle expiry tests). """ srv = get_image_server() path = make_tmp_image(data=data, image_size=image_size) transfer_id = f"file-{uuid.uuid4().hex[:8]}" + cfg = {"backend": "file", "file": path} + if idle_timeout_seconds is not None: + cfg["idle_timeout_seconds"] = idle_timeout_seconds resp = srv["send"]({ "action": "register", "transfer_id": transfer_id, - "config": {"backend": "file", "file": path}, + "config": cfg, }) assert resp["status"] == "ok", f"register failed: {resp}" url = f"{srv['base_url']}/images/{transfer_id}" diff --git a/scripts/vm/hypervisor/kvm/imageserver/tests/test_registry_idle.py b/scripts/vm/hypervisor/kvm/imageserver/tests/test_registry_idle.py new file mode 100644 index 00000000000..7fa95941661 --- /dev/null +++ b/scripts/vm/hypervisor/kvm/imageserver/tests/test_registry_idle.py @@ -0,0 +1,100 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Unit tests for transfer idle timeout (no image server / nbd dependency).""" + +import unittest +from unittest.mock import patch + +from imageserver.config import ( + TransferRegistry, + parse_idle_timeout_seconds, + validate_transfer_config, +) +from imageserver.constants import DEFAULT_IDLE_TIMEOUT_SECONDS + + +class TestParseIdleTimeout(unittest.TestCase): + def test_default_600(self): + self.assertEqual(parse_idle_timeout_seconds({}), DEFAULT_IDLE_TIMEOUT_SECONDS) + + def test_explicit(self): + self.assertEqual( + parse_idle_timeout_seconds({"idle_timeout_seconds": 30}), 30 + ) + + def test_rejects_zero(self): + with self.assertRaises(ValueError): + parse_idle_timeout_seconds({"idle_timeout_seconds": 0}) + + +class TestValidateTransferConfig(unittest.TestCase): + def test_file_merges_idle(self): + c = validate_transfer_config( + {"backend": "file", "file": "/tmp/x", "idle_timeout_seconds": 3} + ) + self.assertEqual(c["idle_timeout_seconds"], 3) + self.assertEqual(c["backend"], "file") + + +class TestRegistryIdleSweep(unittest.TestCase): + def test_sweep_unregisters_after_idle(self): + clock = [0.0] + + def mono(): + return clock[0] + + with patch("imageserver.config.time.monotonic", mono): + r = TransferRegistry() + r.register( + "t1", + validate_transfer_config( + {"backend": "file", "file": "/x", "idle_timeout_seconds": 2} + ), + ) + clock[0] = 5.0 + r.sweep_expired_transfers() + self.assertIsNone(r.get("t1")) + + def test_inflight_prevents_sweep_until_request_ends(self): + clock = [0.0] + + def mono(): + return clock[0] + + with patch("imageserver.config.time.monotonic", mono): + r = TransferRegistry() + r.register( + "t1", + validate_transfer_config( + {"backend": "file", "file": "/x", "idle_timeout_seconds": 2} + ), + ) + clock[0] = 1.0 + ctx = r.request_lifecycle("t1") + ctx.__enter__() + clock[0] = 100.0 + r.sweep_expired_transfers() + self.assertIsNotNone(r.get("t1")) + ctx.__exit__(None, None, None) + clock[0] = 103.0 + r.sweep_expired_transfers() + self.assertIsNone(r.get("t1")) + + +if __name__ == "__main__": + unittest.main() diff --git a/scripts/vm/hypervisor/kvm/imageserver/tests/test_transfer_idle_expiry.py b/scripts/vm/hypervisor/kvm/imageserver/tests/test_transfer_idle_expiry.py new file mode 100644 index 00000000000..0cfbfc40ee9 --- /dev/null +++ b/scripts/vm/hypervisor/kvm/imageserver/tests/test_transfer_idle_expiry.py @@ -0,0 +1,57 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Integration tests for per-transfer HTTP idle timeout (requires image server deps e.g. nbd).""" + +import time +import urllib.error + +from .test_base import ( + ImageServerTestCase, + http_options, + make_file_transfer, +) + + +class TestTransferIdleExpiry(ImageServerTestCase): + def test_transfer_expires_after_idle(self): + """No HTTP activity after registration: transfer is unregistered after idle_timeout_seconds.""" + _tid, url, _path, cleanup = make_file_transfer(idle_timeout_seconds=2) + try: + time.sleep(3.5) + with self.assertRaises(urllib.error.HTTPError) as ctx: + http_options(url) + self.assertEqual(ctx.exception.code, 404) + st = self.ctrl({"action": "status"}) + self.assertEqual(st.get("status"), "ok") + finally: + cleanup() + + def test_http_activity_resets_idle_deadline(self): + """Completing a request resets the idle timer; transfer stays past a single interval.""" + _tid, url, _path, cleanup = make_file_transfer(idle_timeout_seconds=2) + try: + http_options(url) + time.sleep(1.2) + http_options(url) + time.sleep(1.2) + http_options(url) + time.sleep(1.2) + resp = http_options(url) + self.assertEqual(resp.status, 200) + finally: + cleanup() diff --git a/server/src/main/java/org/apache/cloudstack/backup/KVMBackupExportServiceImpl.java b/server/src/main/java/org/apache/cloudstack/backup/KVMBackupExportServiceImpl.java index 9ddf8099c48..d71f7b66848 100644 --- a/server/src/main/java/org/apache/cloudstack/backup/KVMBackupExportServiceImpl.java +++ b/server/src/main/java/org/apache/cloudstack/backup/KVMBackupExportServiceImpl.java @@ -327,12 +327,18 @@ public class KVMBackupExportServiceImpl extends ManagerBase implements KVMBackup socket = transferId; } + HostVO backupHost = hostDao.findById(backup.getHostId()); + if (backupHost == null) { + throw new CloudRuntimeException("Host not found for backup: " + backupId); + } + int idleTimeoutSec = ImageTransferIdleTimeoutSeconds.valueIn(backupHost.getDataCenterId()); CreateImageTransferCommand transferCmd = new CreateImageTransferCommand( transferId, direction, volume.getUuid(), socket, - backup.getFromCheckpointId()); + backup.getFromCheckpointId(), + idleTimeoutSec); try { CreateImageTransferAnswer answer; @@ -443,6 +449,7 @@ public class KVMBackupExportServiceImpl extends ManagerBase implements KVMBackup Host host = getRandomHostFromStoragePool(storagePool); String volumePath = getVolumePathForFileBasedBackend(volume); + int idleTimeoutSec = ImageTransferIdleTimeoutSeconds.valueIn(host.getDataCenterId()); ImageTransferVO imageTransfer; CreateImageTransferCommand transferCmd; @@ -462,7 +469,8 @@ public class KVMBackupExportServiceImpl extends ManagerBase implements KVMBackup transferId, direction, transferId, - volumePath); + volumePath, + idleTimeoutSec); } else { startNBDServer(transferId, direction, host.getId(), volume.getUuid(), volumePath, null); @@ -483,7 +491,8 @@ public class KVMBackupExportServiceImpl extends ManagerBase implements KVMBackup direction, volume.getUuid(), transferId, - null); + null, + idleTimeoutSec); } CreateImageTransferAnswer transferAnswer; try { @@ -899,7 +908,8 @@ public class KVMBackupExportServiceImpl extends ManagerBase implements KVMBackup @Override public ConfigKey[] getConfigKeys() { return new ConfigKey[]{ - ImageTransferPollingInterval + ImageTransferPollingInterval, + ImageTransferIdleTimeoutSeconds }; } }