expiry timeouts for idle image transfers

This commit is contained in:
Abhisar Sinha 2026-04-01 12:08:30 +05:30 committed by Abhishek Kumar
parent cdf4684bcf
commit 6f4758d062
11 changed files with 519 additions and 226 deletions

View File

@ -43,6 +43,12 @@ public interface KVMBackupExportService extends Configurable, PluggableService {
"10",
"The image transfer progress polling interval in seconds.", true, ConfigKey.Scope.Global);
ConfigKey<Integer> ImageTransferIdleTimeoutSeconds = new ConfigKey<>("Advanced", Integer.class,
"image.transfer.idle.timeout.seconds",
"600",
"Seconds since last completed HTTP request to an image transfer before the image server unregisters it (idle timeout).",
true, ConfigKey.Scope.Zone);
ConfigKey<Boolean> ExposeKVMBackupExportServiceApis = new ConfigKey<>("Advanced", Boolean.class,
"expose.kvm.backup.export.service.apis",
"false",

View File

@ -27,25 +27,27 @@ public class CreateImageTransferCommand extends Command {
private String checkpointId;
private String file;
private ImageTransfer.Backend backend;
private int idleTimeoutSeconds;
public CreateImageTransferCommand() {
}
private CreateImageTransferCommand(String transferId, String direction, String socket) {
private CreateImageTransferCommand(String transferId, String direction, String socket, int idleTimeoutSeconds) {
this.transferId = transferId;
this.direction = direction;
this.socket = socket;
this.idleTimeoutSeconds = idleTimeoutSeconds;
}
public CreateImageTransferCommand(String transferId, String direction, String exportName, String socket, String checkpointId) {
this(transferId, direction, socket);
public CreateImageTransferCommand(String transferId, String direction, String exportName, String socket, String checkpointId, int idleTimeoutSeconds) {
this(transferId, direction, socket, idleTimeoutSeconds);
this.backend = ImageTransfer.Backend.nbd;
this.exportName = exportName;
this.checkpointId = checkpointId;
}
public CreateImageTransferCommand(String transferId, String direction, String socket, String file) {
this(transferId, direction, socket);
public CreateImageTransferCommand(String transferId, String direction, String socket, String file, int idleTimeoutSeconds) {
this(transferId, direction, socket, idleTimeoutSeconds);
if (direction == ImageTransfer.Direction.download.toString()) {
throw new IllegalArgumentException("File backend is only supported for upload");
}
@ -85,4 +87,8 @@ public class CreateImageTransferCommand extends Command {
public String getCheckpointId() {
return checkpointId;
}
public int getIdleTimeoutSeconds() {
return idleTimeoutSeconds;
}
}

View File

@ -131,6 +131,7 @@ public class LibvirtCreateImageTransferCommandWrapper extends CommandWrapper<Cre
final Map<String, Object> payload = new HashMap<>();
payload.put("backend", backend.toString());
payload.put("idle_timeout_seconds", cmd.getIdleTimeoutSeconds());
if (backend == ImageTransfer.Backend.file) {
final String filePath = cmd.getFile();

View File

@ -18,7 +18,60 @@
import logging
import os
import threading
from typing import Any, Dict, Optional
import time
from contextlib import contextmanager
from typing import Any, Dict, Iterator, List, Optional
from .constants import DEFAULT_IDLE_TIMEOUT_SECONDS
def parse_idle_timeout_seconds(obj: dict) -> int:
"""Seconds of idle time (no completed HTTP requests) before unregister."""
v = obj.get("idle_timeout_seconds", DEFAULT_IDLE_TIMEOUT_SECONDS)
if not isinstance(v, int):
raise ValueError("idle_timeout_seconds must be an integer")
v = int(v)
if v < 1:
v = 86400 * 7
return v
def validate_transfer_config(obj: dict) -> dict:
"""
Validate and normalize a transfer config dict received over the control
socket. Returns the cleaned config or raises ValueError.
"""
idle_sec = parse_idle_timeout_seconds(obj)
backend = obj.get("backend")
if backend is None:
backend = "nbd"
if not isinstance(backend, str):
raise ValueError("invalid backend type")
backend = backend.lower()
if backend not in ("nbd", "file"):
raise ValueError(f"unsupported backend: {backend}")
if backend == "file":
file_path = obj.get("file")
if not isinstance(file_path, str) or not file_path.strip():
raise ValueError("missing/invalid file path for file backend")
return {"backend": "file", "file": file_path.strip(), "idle_timeout_seconds": idle_sec}
socket_path = obj.get("socket")
export = obj.get("export")
export_bitmap = obj.get("export_bitmap")
if not isinstance(socket_path, str) or not socket_path.strip():
raise ValueError("missing/invalid socket path for nbd backend")
if export is not None and (not isinstance(export, str) or not export):
raise ValueError("invalid export name")
return {
"backend": "nbd",
"socket": socket_path.strip(),
"export": export,
"export_bitmap": export_bitmap,
"idle_timeout_seconds": idle_sec,
}
def safe_transfer_id(image_id: str) -> Optional[str]:
@ -43,11 +96,17 @@ class TransferRegistry:
The cloudstack-agent registers/unregisters transfers via the Unix domain
control socket. The HTTP handler looks up configs through get().
Each transfer may specify idle_timeout_seconds (default DEFAULT_IDLE_TIMEOUT_SECONDS).
After no in-flight HTTP requests have completed for that idle period, the transfer
is removed (same effect as unregister).
"""
def __init__(self) -> None:
self._lock = threading.Lock()
self._transfers: Dict[str, Dict[str, Any]] = {}
self._last_activity: Dict[str, float] = {}
self._inflight: Dict[str, int] = {}
def register(self, transfer_id: str, config: Dict[str, Any]) -> bool:
safe_id = safe_transfer_id(transfer_id)
@ -56,6 +115,8 @@ class TransferRegistry:
return False
with self._lock:
self._transfers[safe_id] = config
self._last_activity[safe_id] = time.monotonic()
self._inflight.pop(safe_id, None)
logging.info("registered transfer_id=%s active=%d", safe_id, len(self._transfers))
return True
@ -68,6 +129,8 @@ class TransferRegistry:
return len(self._transfers)
with self._lock:
self._transfers.pop(safe_id, None)
self._last_activity.pop(safe_id, None)
self._inflight.pop(safe_id, None)
remaining = len(self._transfers)
logging.info("unregistered transfer_id=%s active=%d", safe_id, remaining)
return remaining
@ -82,3 +145,56 @@ class TransferRegistry:
def active_count(self) -> int:
with self._lock:
return len(self._transfers)
@contextmanager
def request_lifecycle(self, transfer_id: str) -> Iterator[None]:
"""
Track an HTTP request for idle-timeout purposes.
Expiry is based on time since the last request *completed* (all in-flight
work for this transfer_id finished). Transfers with active requests are
never expired.
"""
safe_id = safe_transfer_id(transfer_id)
if safe_id is None:
yield
return
with self._lock:
if safe_id not in self._transfers:
yield
return
self._inflight[safe_id] = self._inflight.get(safe_id, 0) + 1
try:
yield
finally:
now = time.monotonic()
with self._lock:
count = self._inflight.get(safe_id, 1) - 1
if count <= 0:
self._inflight.pop(safe_id, None)
if safe_id in self._transfers:
self._last_activity[safe_id] = now
else:
self._inflight[safe_id] = count
def sweep_expired_transfers(self) -> None:
"""Remove transfers that exceeded idle_timeout_seconds with no in-flight HTTP work."""
now = time.monotonic()
with self._lock:
expired: List[str] = []
for tid, cfg in list(self._transfers.items()):
if self._inflight.get(tid, 0) > 0:
continue
timeout = int(cfg.get("idle_timeout_seconds", DEFAULT_IDLE_TIMEOUT_SECONDS))
last = self._last_activity.get(tid, now)
if now - last >= timeout:
expired.append(tid)
for tid in expired:
self._transfers.pop(tid, None)
self._last_activity.pop(tid, None)
self._inflight.pop(tid, None)
logging.info(
"idle expiry: unregistered transfer_id=%s active=%d",
tid,
len(self._transfers),
)

View File

@ -36,6 +36,10 @@ CONTROL_SOCKET_BACKLOG = 32
CONTROL_SOCKET_PERMISSIONS = 0o660
CONTROL_RECV_BUFFER = 4096
# Transfer idle timeout (seconds). A transfer is expired when no in-flight HTTP
# requests have completed for this duration.
DEFAULT_IDLE_TIMEOUT_SECONDS = 600
# Maximum size of a JSON body in a PATCH request (zero / flush ops)
MAX_PATCH_JSON_SIZE = 64 * 1024 # 64 KiB

View File

@ -213,57 +213,58 @@ class Handler(BaseHTTPRequestHandler):
self._send_error_json(HTTPStatus.NOT_FOUND, "unknown image_id")
return
backend = create_backend(cfg)
try:
if not backend.supports_extents:
allowed_methods = "GET, PUT, POST, OPTIONS"
features = ["flush"]
with self._registry.request_lifecycle(image_id):
backend = create_backend(cfg)
try:
if not backend.supports_extents:
allowed_methods = "GET, PUT, POST, OPTIONS"
features = ["flush"]
response = {
"unix_socket": None,
"features": features,
"max_readers": MAX_PARALLEL_READS,
"max_writers": MAX_PARALLEL_WRITES,
}
self._send_json(HTTPStatus.OK, response, allowed_methods=allowed_methods)
return
read_only = True
can_flush = False
can_zero = False
try:
caps = backend.get_capabilities()
read_only = caps["read_only"]
can_flush = caps["can_flush"]
can_zero = caps["can_zero"]
except Exception as e:
logging.warning("OPTIONS: could not query backend capabilities: %r", e)
read_only = bool(cfg.get("read_only"))
if not read_only:
can_flush = True
can_zero = True
if read_only:
allowed_methods = "GET, OPTIONS"
features = ["extents"]
max_writers = 0
else:
allowed_methods = "GET, PUT, PATCH, OPTIONS"
features = ["extents"]
if can_zero:
features.append("zero")
if can_flush:
features.append("flush")
max_writers = MAX_PARALLEL_WRITES
response = {
"unix_socket": None,
"features": features,
"max_readers": MAX_PARALLEL_READS,
"max_writers": MAX_PARALLEL_WRITES,
"max_writers": max_writers,
}
self._send_json(HTTPStatus.OK, response, allowed_methods=allowed_methods)
return
read_only = True
can_flush = False
can_zero = False
try:
caps = backend.get_capabilities()
read_only = caps["read_only"]
can_flush = caps["can_flush"]
can_zero = caps["can_zero"]
except Exception as e:
logging.warning("OPTIONS: could not query backend capabilities: %r", e)
read_only = bool(cfg.get("read_only"))
if not read_only:
can_flush = True
can_zero = True
if read_only:
allowed_methods = "GET, OPTIONS"
features = ["extents"]
max_writers = 0
else:
allowed_methods = "GET, PUT, PATCH, OPTIONS"
features = ["extents"]
if can_zero:
features.append("zero")
if can_flush:
features.append("flush")
max_writers = MAX_PARALLEL_WRITES
response = {
"unix_socket": None,
"features": features,
"max_readers": MAX_PARALLEL_READS,
"max_writers": max_writers,
}
self._send_json(HTTPStatus.OK, response, allowed_methods=allowed_methods)
finally:
backend.close()
finally:
backend.close()
def do_GET(self) -> None:
image_id, tail = self._parse_route()
@ -277,25 +278,27 @@ class Handler(BaseHTTPRequestHandler):
return
if tail == "extents":
backend = create_backend(cfg)
try:
if not backend.supports_extents:
self._send_error_json(
HTTPStatus.BAD_REQUEST, "extents not supported for file backend"
)
return
finally:
backend.close()
query = self._parse_query()
context = (query.get("context") or [None])[0]
self._handle_get_extents(image_id, cfg, context=context)
with self._registry.request_lifecycle(image_id):
backend = create_backend(cfg)
try:
if not backend.supports_extents:
self._send_error_json(
HTTPStatus.BAD_REQUEST, "extents not supported for file backend"
)
return
finally:
backend.close()
query = self._parse_query()
context = (query.get("context") or [None])[0]
self._handle_get_extents(image_id, cfg, context=context)
return
if tail is not None:
self._send_error_json(HTTPStatus.NOT_FOUND, "not found")
return
range_header = self.headers.get("Range")
self._handle_get_image(image_id, cfg, range_header)
with self._registry.request_lifecycle(image_id):
self._handle_get_image(image_id, cfg, range_header)
def do_PUT(self) -> None:
image_id, tail = self._parse_route()
@ -308,46 +311,47 @@ class Handler(BaseHTTPRequestHandler):
self._send_error_json(HTTPStatus.NOT_FOUND, "unknown image_id")
return
if self.headers.get("Range") is not None:
self._send_error_json(
HTTPStatus.BAD_REQUEST,
"Range header not supported for PUT; use Content-Range or PATCH",
)
return
with self._registry.request_lifecycle(image_id):
if self.headers.get("Range") is not None:
self._send_error_json(
HTTPStatus.BAD_REQUEST,
"Range header not supported for PUT; use Content-Range or PATCH",
)
return
content_length_hdr = self.headers.get("Content-Length")
if content_length_hdr is None:
self._send_error_json(HTTPStatus.BAD_REQUEST, "Content-Length required")
return
try:
content_length = int(content_length_hdr)
except ValueError:
self._send_error_json(HTTPStatus.BAD_REQUEST, "Invalid Content-Length")
return
if content_length < 0:
self._send_error_json(HTTPStatus.BAD_REQUEST, "Invalid Content-Length")
return
query = self._parse_query()
flush_param = (query.get("flush") or ["n"])[0].lower()
flush = flush_param in ("y", "yes", "true", "1")
content_range_hdr = self.headers.get("Content-Range")
if content_range_hdr is not None:
backend = create_backend(cfg)
content_length_hdr = self.headers.get("Content-Length")
if content_length_hdr is None:
self._send_error_json(HTTPStatus.BAD_REQUEST, "Content-Length required")
return
try:
if not backend.supports_range_write:
self._send_error_json(
HTTPStatus.BAD_REQUEST,
"Content-Range PUT not supported for file backend; use full PUT",
)
return
finally:
backend.close()
self._handle_put_range(image_id, cfg, content_range_hdr, content_length, flush)
return
content_length = int(content_length_hdr)
except ValueError:
self._send_error_json(HTTPStatus.BAD_REQUEST, "Invalid Content-Length")
return
if content_length < 0:
self._send_error_json(HTTPStatus.BAD_REQUEST, "Invalid Content-Length")
return
self._handle_put_image(image_id, cfg, content_length, flush)
query = self._parse_query()
flush_param = (query.get("flush") or ["n"])[0].lower()
flush = flush_param in ("y", "yes", "true", "1")
content_range_hdr = self.headers.get("Content-Range")
if content_range_hdr is not None:
backend = create_backend(cfg)
try:
if not backend.supports_range_write:
self._send_error_json(
HTTPStatus.BAD_REQUEST,
"Content-Range PUT not supported for file backend; use full PUT",
)
return
finally:
backend.close()
self._handle_put_range(image_id, cfg, content_range_hdr, content_length, flush)
return
self._handle_put_image(image_id, cfg, content_length, flush)
def do_POST(self) -> None:
image_id, tail = self._parse_route()
@ -361,7 +365,8 @@ class Handler(BaseHTTPRequestHandler):
return
if tail == "flush":
self._handle_post_flush(image_id, cfg)
with self._registry.request_lifecycle(image_id):
self._handle_post_flush(image_id, cfg)
return
self._send_error_json(HTTPStatus.NOT_FOUND, "not found")
@ -376,21 +381,44 @@ class Handler(BaseHTTPRequestHandler):
self._send_error_json(HTTPStatus.NOT_FOUND, "unknown image_id")
return
backend = create_backend(cfg)
try:
if not backend.supports_range_write:
with self._registry.request_lifecycle(image_id):
backend = create_backend(cfg)
try:
if not backend.supports_range_write:
self._send_error_json(
HTTPStatus.BAD_REQUEST,
"range writes and PATCH not supported for file backend; use PUT for full upload",
)
return
finally:
backend.close()
content_type = self.headers.get("Content-Type", "").split(";")[0].strip().lower()
range_header = self.headers.get("Range")
if range_header is not None and content_type != "application/json":
content_length_hdr = self.headers.get("Content-Length")
if content_length_hdr is None:
self._send_error_json(HTTPStatus.BAD_REQUEST, "Content-Length required")
return
try:
content_length = int(content_length_hdr)
except ValueError:
self._send_error_json(HTTPStatus.BAD_REQUEST, "Invalid Content-Length")
return
if content_length <= 0:
self._send_error_json(HTTPStatus.BAD_REQUEST, "Content-Length must be positive")
return
self._handle_patch_range(image_id, cfg, range_header, content_length)
return
if content_type != "application/json":
self._send_error_json(
HTTPStatus.BAD_REQUEST,
"range writes and PATCH not supported for file backend; use PUT for full upload",
HTTPStatus.UNSUPPORTED_MEDIA_TYPE,
"PATCH requires Content-Type: application/json (for zero/flush) or Range with binary body",
)
return
finally:
backend.close()
content_type = self.headers.get("Content-Type", "").split(";")[0].strip().lower()
range_header = self.headers.get("Range")
if range_header is not None and content_type != "application/json":
content_length_hdr = self.headers.get("Content-Length")
if content_length_hdr is None:
self._send_error_json(HTTPStatus.BAD_REQUEST, "Content-Length required")
@ -400,82 +428,60 @@ class Handler(BaseHTTPRequestHandler):
except ValueError:
self._send_error_json(HTTPStatus.BAD_REQUEST, "Invalid Content-Length")
return
if content_length <= 0:
self._send_error_json(HTTPStatus.BAD_REQUEST, "Content-Length must be positive")
if content_length <= 0 or content_length > MAX_PATCH_JSON_SIZE:
self._send_error_json(HTTPStatus.BAD_REQUEST, "Invalid Content-Length")
return
self._handle_patch_range(image_id, cfg, range_header, content_length)
return
if content_type != "application/json":
self._send_error_json(
HTTPStatus.UNSUPPORTED_MEDIA_TYPE,
"PATCH requires Content-Type: application/json (for zero/flush) or Range with binary body",
)
return
body = self.rfile.read(content_length)
if len(body) != content_length:
self._send_error_json(HTTPStatus.BAD_REQUEST, "request body truncated")
return
content_length_hdr = self.headers.get("Content-Length")
if content_length_hdr is None:
self._send_error_json(HTTPStatus.BAD_REQUEST, "Content-Length required")
return
try:
content_length = int(content_length_hdr)
except ValueError:
self._send_error_json(HTTPStatus.BAD_REQUEST, "Invalid Content-Length")
return
if content_length <= 0 or content_length > MAX_PATCH_JSON_SIZE:
self._send_error_json(HTTPStatus.BAD_REQUEST, "Invalid Content-Length")
return
body = self.rfile.read(content_length)
if len(body) != content_length:
self._send_error_json(HTTPStatus.BAD_REQUEST, "request body truncated")
return
try:
payload = json.loads(body.decode("utf-8"))
except (json.JSONDecodeError, UnicodeDecodeError) as e:
self._send_error_json(HTTPStatus.BAD_REQUEST, f"invalid JSON: {e}")
return
if not isinstance(payload, dict):
self._send_error_json(HTTPStatus.BAD_REQUEST, "body must be a JSON object")
return
op = payload.get("op")
if op == "flush":
self._handle_post_flush(image_id, cfg)
return
if op != "zero":
self._send_error_json(
HTTPStatus.BAD_REQUEST,
"unsupported op; only \"zero\" and \"flush\" are supported",
)
return
try:
size = int(payload.get("size"))
except (TypeError, ValueError):
self._send_error_json(HTTPStatus.BAD_REQUEST, "missing or invalid \"size\"")
return
if size <= 0:
self._send_error_json(HTTPStatus.BAD_REQUEST, "\"size\" must be positive")
return
offset = payload.get("offset")
if offset is None:
offset = 0
else:
try:
offset = int(offset)
except (TypeError, ValueError):
self._send_error_json(HTTPStatus.BAD_REQUEST, "invalid \"offset\"")
return
if offset < 0:
self._send_error_json(HTTPStatus.BAD_REQUEST, "\"offset\" must be non-negative")
payload = json.loads(body.decode("utf-8"))
except (json.JSONDecodeError, UnicodeDecodeError) as e:
self._send_error_json(HTTPStatus.BAD_REQUEST, f"invalid JSON: {e}")
return
flush = bool(payload.get("flush", False))
self._handle_patch_zero(image_id, cfg, offset=offset, size=size, flush=flush)
if not isinstance(payload, dict):
self._send_error_json(HTTPStatus.BAD_REQUEST, "body must be a JSON object")
return
op = payload.get("op")
if op == "flush":
self._handle_post_flush(image_id, cfg)
return
if op != "zero":
self._send_error_json(
HTTPStatus.BAD_REQUEST,
"unsupported op; only \"zero\" and \"flush\" are supported",
)
return
try:
size = int(payload.get("size"))
except (TypeError, ValueError):
self._send_error_json(HTTPStatus.BAD_REQUEST, "missing or invalid \"size\"")
return
if size <= 0:
self._send_error_json(HTTPStatus.BAD_REQUEST, "\"size\" must be positive")
return
offset = payload.get("offset")
if offset is None:
offset = 0
else:
try:
offset = int(offset)
except (TypeError, ValueError):
self._send_error_json(HTTPStatus.BAD_REQUEST, "invalid \"offset\"")
return
if offset < 0:
self._send_error_json(HTTPStatus.BAD_REQUEST, "\"offset\" must be non-negative")
return
flush = bool(payload.get("flush", False))
self._handle_patch_zero(image_id, cfg, offset=offset, size=size, flush=flush)
# ------------------------------------------------------------------
# Operation handlers

View File

@ -22,6 +22,7 @@ import os
import socket
import ssl
import threading
import time
from http.server import HTTPServer
from socketserver import ThreadingMixIn
from typing import Type
@ -33,7 +34,7 @@ except ImportError:
pass
from .concurrency import ConcurrencyManager
from .config import TransferRegistry
from .config import TransferRegistry, validate_transfer_config
from .constants import (
CONTROL_RECV_BUFFER,
CONTROL_SOCKET,
@ -65,41 +66,6 @@ def make_handler(
return ConfiguredHandler
def _validate_config(obj: dict) -> dict:
"""
Validate and normalize a transfer config dict received over the control
socket. Returns the cleaned config or raises ValueError.
"""
backend = obj.get("backend")
if backend is None:
backend = "nbd"
if not isinstance(backend, str):
raise ValueError("invalid backend type")
backend = backend.lower()
if backend not in ("nbd", "file"):
raise ValueError(f"unsupported backend: {backend}")
if backend == "file":
file_path = obj.get("file")
if not isinstance(file_path, str) or not file_path.strip():
raise ValueError("missing/invalid file path for file backend")
return {"backend": "file", "file": file_path.strip()}
socket_path = obj.get("socket")
export = obj.get("export")
export_bitmap = obj.get("export_bitmap")
if not isinstance(socket_path, str) or not socket_path.strip():
raise ValueError("missing/invalid socket path for nbd backend")
if export is not None and (not isinstance(export, str) or not export):
raise ValueError("invalid export name")
return {
"backend": "nbd",
"socket": socket_path.strip(),
"export": export,
"export_bitmap": export_bitmap,
}
def _handle_control_conn(conn: socket.socket, registry: TransferRegistry) -> None:
"""Handle a single control-socket connection (one JSON request/response)."""
try:
@ -122,7 +88,7 @@ def _handle_control_conn(conn: socket.socket, registry: TransferRegistry) -> Non
resp = {"status": "error", "message": "missing transfer_id or config"}
else:
try:
config = _validate_config(raw_config)
config = validate_transfer_config(raw_config)
except ValueError as e:
resp = {"status": "error", "message": str(e)}
else:
@ -153,6 +119,15 @@ def _handle_control_conn(conn: socket.socket, registry: TransferRegistry) -> Non
conn.close()
def _idle_sweep_loop(registry: TransferRegistry, interval_s: float = 10.0) -> None:
while True:
time.sleep(interval_s)
try:
registry.sweep_expired_transfers()
except Exception:
logging.exception("idle sweep error")
def _control_listener(registry: TransferRegistry, sock_path: str) -> None:
"""Accept loop for the Unix domain control socket (runs in a daemon thread)."""
if os.path.exists(sock_path):
@ -221,6 +196,13 @@ def main() -> None:
)
ctrl_thread.start()
sweep_thread = threading.Thread(
target=_idle_sweep_loop,
args=(registry,),
daemon=True,
)
sweep_thread.start()
addr = (args.listen, args.port)
httpd = ThreadingHTTPServer(addr, handler_cls)

View File

@ -374,18 +374,23 @@ def make_tmp_image(data=None, image_size=IMAGE_SIZE) -> str:
return path
def make_file_transfer(data=None, image_size=IMAGE_SIZE):
def make_file_transfer(data=None, image_size=IMAGE_SIZE, idle_timeout_seconds=None):
"""
Create a temp file + register a file-backend transfer.
Returns (transfer_id, url, file_path, cleanup_callable).
If *idle_timeout_seconds* is set, it is sent in the transfer config (for idle expiry tests).
"""
srv = get_image_server()
path = make_tmp_image(data=data, image_size=image_size)
transfer_id = f"file-{uuid.uuid4().hex[:8]}"
cfg = {"backend": "file", "file": path}
if idle_timeout_seconds is not None:
cfg["idle_timeout_seconds"] = idle_timeout_seconds
resp = srv["send"]({
"action": "register",
"transfer_id": transfer_id,
"config": {"backend": "file", "file": path},
"config": cfg,
})
assert resp["status"] == "ok", f"register failed: {resp}"
url = f"{srv['base_url']}/images/{transfer_id}"

View File

@ -0,0 +1,100 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Unit tests for transfer idle timeout (no image server / nbd dependency)."""
import unittest
from unittest.mock import patch
from imageserver.config import (
TransferRegistry,
parse_idle_timeout_seconds,
validate_transfer_config,
)
from imageserver.constants import DEFAULT_IDLE_TIMEOUT_SECONDS
class TestParseIdleTimeout(unittest.TestCase):
def test_default_600(self):
self.assertEqual(parse_idle_timeout_seconds({}), DEFAULT_IDLE_TIMEOUT_SECONDS)
def test_explicit(self):
self.assertEqual(
parse_idle_timeout_seconds({"idle_timeout_seconds": 30}), 30
)
def test_rejects_zero(self):
with self.assertRaises(ValueError):
parse_idle_timeout_seconds({"idle_timeout_seconds": 0})
class TestValidateTransferConfig(unittest.TestCase):
def test_file_merges_idle(self):
c = validate_transfer_config(
{"backend": "file", "file": "/tmp/x", "idle_timeout_seconds": 3}
)
self.assertEqual(c["idle_timeout_seconds"], 3)
self.assertEqual(c["backend"], "file")
class TestRegistryIdleSweep(unittest.TestCase):
def test_sweep_unregisters_after_idle(self):
clock = [0.0]
def mono():
return clock[0]
with patch("imageserver.config.time.monotonic", mono):
r = TransferRegistry()
r.register(
"t1",
validate_transfer_config(
{"backend": "file", "file": "/x", "idle_timeout_seconds": 2}
),
)
clock[0] = 5.0
r.sweep_expired_transfers()
self.assertIsNone(r.get("t1"))
def test_inflight_prevents_sweep_until_request_ends(self):
clock = [0.0]
def mono():
return clock[0]
with patch("imageserver.config.time.monotonic", mono):
r = TransferRegistry()
r.register(
"t1",
validate_transfer_config(
{"backend": "file", "file": "/x", "idle_timeout_seconds": 2}
),
)
clock[0] = 1.0
ctx = r.request_lifecycle("t1")
ctx.__enter__()
clock[0] = 100.0
r.sweep_expired_transfers()
self.assertIsNotNone(r.get("t1"))
ctx.__exit__(None, None, None)
clock[0] = 103.0
r.sweep_expired_transfers()
self.assertIsNone(r.get("t1"))
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,57 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Integration tests for per-transfer HTTP idle timeout (requires image server deps e.g. nbd)."""
import time
import urllib.error
from .test_base import (
ImageServerTestCase,
http_options,
make_file_transfer,
)
class TestTransferIdleExpiry(ImageServerTestCase):
def test_transfer_expires_after_idle(self):
"""No HTTP activity after registration: transfer is unregistered after idle_timeout_seconds."""
_tid, url, _path, cleanup = make_file_transfer(idle_timeout_seconds=2)
try:
time.sleep(3.5)
with self.assertRaises(urllib.error.HTTPError) as ctx:
http_options(url)
self.assertEqual(ctx.exception.code, 404)
st = self.ctrl({"action": "status"})
self.assertEqual(st.get("status"), "ok")
finally:
cleanup()
def test_http_activity_resets_idle_deadline(self):
"""Completing a request resets the idle timer; transfer stays past a single interval."""
_tid, url, _path, cleanup = make_file_transfer(idle_timeout_seconds=2)
try:
http_options(url)
time.sleep(1.2)
http_options(url)
time.sleep(1.2)
http_options(url)
time.sleep(1.2)
resp = http_options(url)
self.assertEqual(resp.status, 200)
finally:
cleanup()

View File

@ -327,12 +327,18 @@ public class KVMBackupExportServiceImpl extends ManagerBase implements KVMBackup
socket = transferId;
}
HostVO backupHost = hostDao.findById(backup.getHostId());
if (backupHost == null) {
throw new CloudRuntimeException("Host not found for backup: " + backupId);
}
int idleTimeoutSec = ImageTransferIdleTimeoutSeconds.valueIn(backupHost.getDataCenterId());
CreateImageTransferCommand transferCmd = new CreateImageTransferCommand(
transferId,
direction,
volume.getUuid(),
socket,
backup.getFromCheckpointId());
backup.getFromCheckpointId(),
idleTimeoutSec);
try {
CreateImageTransferAnswer answer;
@ -443,6 +449,7 @@ public class KVMBackupExportServiceImpl extends ManagerBase implements KVMBackup
Host host = getRandomHostFromStoragePool(storagePool);
String volumePath = getVolumePathForFileBasedBackend(volume);
int idleTimeoutSec = ImageTransferIdleTimeoutSeconds.valueIn(host.getDataCenterId());
ImageTransferVO imageTransfer;
CreateImageTransferCommand transferCmd;
@ -462,7 +469,8 @@ public class KVMBackupExportServiceImpl extends ManagerBase implements KVMBackup
transferId,
direction,
transferId,
volumePath);
volumePath,
idleTimeoutSec);
} else {
startNBDServer(transferId, direction, host.getId(), volume.getUuid(), volumePath, null);
@ -483,7 +491,8 @@ public class KVMBackupExportServiceImpl extends ManagerBase implements KVMBackup
direction,
volume.getUuid(),
transferId,
null);
null,
idleTimeoutSec);
}
CreateImageTransferAnswer transferAnswer;
try {
@ -899,7 +908,8 @@ public class KVMBackupExportServiceImpl extends ManagerBase implements KVMBackup
@Override
public ConfigKey<?>[] getConfigKeys() {
return new ConfigKey[]{
ImageTransferPollingInterval
ImageTransferPollingInterval,
ImageTransferIdleTimeoutSeconds
};
}
}