diff --git a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/ImageServerControlSocket.java b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/ImageServerControlSocket.java new file mode 100644 index 00000000000..2e9852f7bc1 --- /dev/null +++ b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/ImageServerControlSocket.java @@ -0,0 +1,123 @@ +//Licensed to the Apache Software Foundation (ASF) under one +//or more contributor license agreements. See the NOTICE file +//distributed with this work for additional information +//regarding copyright ownership. The ASF licenses this file +//to you under the Apache License, Version 2.0 (the +//"License"); you may not use this file except in compliance +//the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +//Unless required by applicable law or agreed to in writing, +//software distributed under the License is distributed on an +//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +//KIND, either express or implied. See the License for the +//specific language governing permissions and limitations +//under the License. + +package com.cloud.hypervisor.kvm.resource; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import com.cloud.utils.script.OutputInterpreter; +import com.cloud.utils.script.Script; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; + +/** + * Communicates with the cloudstack-image-server control socket via socat. + * + * Protocol: newline-delimited JSON over a Unix domain socket. + * Actions: register, unregister, status. + */ +public class ImageServerControlSocket { + private static final Logger LOGGER = LogManager.getLogger(ImageServerControlSocket.class); + static final String CONTROL_SOCKET_PATH = "/var/run/cloudstack/image-server.sock"; + private static final Gson GSON = new GsonBuilder().create(); + + private ImageServerControlSocket() { + } + + /** + * Send a JSON message to the image server control socket and return the + * parsed response, or null on communication failure. + */ + static JsonObject sendMessage(Map message) { + String json = GSON.toJson(message); + Script script = new Script("/bin/bash", LOGGER); + script.add("-c"); + script.add(String.format("echo '%s' | socat -t5 - UNIX-CONNECT:%s", + json.replace("'", "'\\''"), CONTROL_SOCKET_PATH)); + OutputInterpreter.AllLinesParser parser = new OutputInterpreter.AllLinesParser(); + String result = script.execute(parser); + if (result != null) { + LOGGER.error("Control socket communication failed: {}", result); + return null; + } + String output = parser.getLines(); + if (output == null || output.trim().isEmpty()) { + LOGGER.error("Empty response from control socket"); + return null; + } + try { + return JsonParser.parseString(output.trim()).getAsJsonObject(); + } catch (Exception e) { + LOGGER.error("Failed to parse control socket response: {}", output, e); + return null; + } + } + + /** + * Register a transfer config with the image server. + * @return true if the server accepted the registration. + */ + public static boolean registerTransfer(String transferId, Map config) { + Map msg = new HashMap<>(); + msg.put("action", "register"); + msg.put("transfer_id", transferId); + msg.put("config", config); + JsonObject resp = sendMessage(msg); + if (resp == null) { + return false; + } + return "ok".equals(resp.has("status") ? resp.get("status").getAsString() : null); + } + + /** + * Unregister a transfer from the image server. + * @return the number of remaining active transfers, or -1 on error. + */ + public static int unregisterTransfer(String transferId) { + Map msg = new HashMap<>(); + msg.put("action", "unregister"); + msg.put("transfer_id", transferId); + JsonObject resp = sendMessage(msg); + if (resp == null) { + return -1; + } + if (!"ok".equals(resp.has("status") ? resp.get("status").getAsString() : null)) { + return -1; + } + return resp.has("active_transfers") ? resp.get("active_transfers").getAsInt() : -1; + } + + /** + * Check whether the image server control socket is responsive. + * @return true if the server responded with status "ok". + */ + public static boolean isReady() { + Map msg = new HashMap<>(); + msg.put("action", "status"); + JsonObject resp = sendMessage(msg); + if (resp == null) { + return false; + } + return "ok".equals(resp.has("status") ? resp.get("status").getAsString() : null); + } +} diff --git a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/LibvirtComputingResource.java b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/LibvirtComputingResource.java index dfba9ad1115..821be05cfb2 100644 --- a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/LibvirtComputingResource.java +++ b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/LibvirtComputingResource.java @@ -1100,10 +1100,11 @@ public class LibvirtComputingResource extends ServerResourceBase implements Serv throw new ConfigurationException("Unable to find nasbackup.sh"); } - imageServerPath = Script.findScript(kvmScriptsDir, "image_server.py"); - if (imageServerPath == null) { - throw new ConfigurationException("Unable to find image_server.py"); + String imageServerMain = Script.findScript(kvmScriptsDir, "imageserver/__main__.py"); + if (imageServerMain == null) { + throw new ConfigurationException("Unable to find imageserver package"); } + imageServerPath = new File(imageServerMain).getParent(); createTmplPath = Script.findScript(storageScriptsDir, "createtmplt.sh"); if (createTmplPath == null) { diff --git a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtCreateImageTransferCommandWrapper.java b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtCreateImageTransferCommandWrapper.java index db0918f5c07..71beafe9fa1 100644 --- a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtCreateImageTransferCommandWrapper.java +++ b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtCreateImageTransferCommandWrapper.java @@ -18,7 +18,6 @@ package com.cloud.hypervisor.kvm.resource.wrapper; import java.io.File; -import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -26,62 +25,60 @@ import org.apache.cloudstack.backup.CreateImageTransferAnswer; import org.apache.cloudstack.backup.CreateImageTransferCommand; import org.apache.cloudstack.backup.ImageTransfer; import org.apache.cloudstack.storage.resource.IpTablesHelper; -import org.apache.commons.io.FileUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import com.cloud.agent.api.Answer; +import com.cloud.hypervisor.kvm.resource.ImageServerControlSocket; import com.cloud.hypervisor.kvm.resource.LibvirtComputingResource; import com.cloud.resource.CommandWrapper; import com.cloud.resource.ResourceWrapper; import com.cloud.utils.StringUtils; import com.cloud.utils.script.Script; -import com.google.gson.GsonBuilder; @ResourceWrapper(handles = CreateImageTransferCommand.class) public class LibvirtCreateImageTransferCommandWrapper extends CommandWrapper { protected Logger logger = LogManager.getLogger(getClass()); private boolean startImageServerIfNotRunning(int imageServerPort, LibvirtComputingResource resource) { - final String imageServerScript = resource.getImageServerPath(); + final String imageServerPackageDir = resource.getImageServerPath(); + final String imageServerParentDir = new File(imageServerPackageDir).getParent(); + final String imageServerModuleName = new File(imageServerPackageDir).getName(); String unitName = "cloudstack-image-server"; Script checkScript = new Script("/bin/bash", logger); checkScript.add("-c"); checkScript.add(String.format("systemctl is-active --quiet %s", unitName)); String checkResult = checkScript.execute(); - if (checkResult == null) { + if (checkResult == null && ImageServerControlSocket.isReady()) { return true; } - String systemdRunCmd = String.format( - "systemd-run --unit=%s --property=Restart=no /usr/bin/python3 %s --listen 0.0.0.0 --port %d", - unitName, imageServerScript, imageServerPort); + if (checkResult != null) { + String systemdRunCmd = String.format( + "systemd-run --unit=%s --property=Restart=no --property=WorkingDirectory=%s /usr/bin/python3 -m %s --listen 0.0.0.0 --port %d", + unitName, imageServerParentDir, imageServerModuleName, imageServerPort); - Script startScript = new Script("/bin/bash", logger); - startScript.add("-c"); - startScript.add(systemdRunCmd); - String startResult = startScript.execute(); + Script startScript = new Script("/bin/bash", logger); + startScript.add("-c"); + startScript.add(systemdRunCmd); + String startResult = startScript.execute(); - if (startResult != null) { - logger.error(String.format("Failed to start the Image server: %s", startResult)); - return false; + if (startResult != null) { + logger.error(String.format("Failed to start the Image server: %s", startResult)); + return false; + } } - // Wait with timeout until the service is up int maxWaitSeconds = 10; int pollIntervalMs = 1000; int maxAttempts = (maxWaitSeconds * 1000) / pollIntervalMs; - boolean serviceActive = false; + boolean serverReady = false; for (int attempt = 0; attempt < maxAttempts; attempt++) { - Script verifyScript = new Script("/bin/bash", logger); - verifyScript.add("-c"); - verifyScript.add(String.format("systemctl is-active --quiet %s", unitName)); - String verifyResult = verifyScript.execute(); - if (verifyResult == null) { - serviceActive = true; - logger.info(String.format("Image server is now active (attempt %d)", attempt + 1)); + if (ImageServerControlSocket.isReady()) { + serverReady = true; + logger.info(String.format("Image server control socket is ready (attempt %d)", attempt + 1)); break; } try { @@ -92,8 +89,8 @@ public class LibvirtCreateImageTransferCommandWrapper extends CommandWrapper stream = Files.list(Paths.get("/tmp/imagetransfer"))) { - if (!stream.findAny().isPresent()) { - stopImageServer(); - } - } catch (IOException e) { - logger.warn("Failed to list /tmp/imagetransfer", e); + if (activeTransfers == 0) { + stopImageServer(); } return new Answer(cmd, true, "Image transfer finalized."); diff --git a/scripts/vm/hypervisor/kvm/image_server.py b/scripts/vm/hypervisor/kvm/image_server.py deleted file mode 100644 index c0436b4d207..00000000000 --- a/scripts/vm/hypervisor/kvm/image_server.py +++ /dev/null @@ -1,28 +0,0 @@ -#!/usr/bin/env python3 -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - - -import os -import sys - -sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) - -from imageserver.server import main - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/scripts/vm/hypervisor/kvm/imageserver/__init__.py b/scripts/vm/hypervisor/kvm/imageserver/__init__.py index 5e033f5d527..69eec98956a 100644 --- a/scripts/vm/hypervisor/kvm/imageserver/__init__.py +++ b/scripts/vm/hypervisor/kvm/imageserver/__init__.py @@ -18,7 +18,11 @@ """ CloudStack image server — HTTP server backed by NBD over Unix socket or a local file. -Supports two backends (configured per-transfer via JSON config): +Transfer configs are registered/unregistered by the cloudstack-agent via a +Unix domain control socket (default: /var/run/cloudstack/image-server.sock) +and stored in-memory for the lifetime of the server process. + +Supports two backends (configured per-transfer at registration time): - nbd: proxy to an NBD server via Unix socket; supports range reads/writes (GET/PUT/PATCH), extents, zero, flush. - file: read/write a local qcow2/raw file; full PUT only, GET with optional diff --git a/scripts/vm/hypervisor/kvm/imageserver/config.py b/scripts/vm/hypervisor/kvm/imageserver/config.py index cc0107cce9d..3b1fd686f05 100644 --- a/scripts/vm/hypervisor/kvm/imageserver/config.py +++ b/scripts/vm/hypervisor/kvm/imageserver/config.py @@ -15,13 +15,10 @@ # specific language governing permissions and limitations # under the License. -import json import logging import os import threading -from typing import Any, Dict, Optional, Tuple - -from .constants import CFG_DIR +from typing import Any, Dict, Optional def safe_transfer_id(image_id: str) -> Optional[str]: @@ -40,97 +37,48 @@ def safe_transfer_id(image_id: str) -> Optional[str]: return image_id -class TransferConfigLoader: +class TransferRegistry: """ - Loads and caches per-image transfer configuration from JSON files. + Thread-safe in-memory registry for active image transfer configurations. - CloudStack writes a JSON file at / with: - - NBD backend: {"backend": "nbd", "socket": "...", "export": "vda", "export_bitmap": "..."} - - File backend: {"backend": "file", "file": "/path/to/image.qcow2"} + The cloudstack-agent registers/unregisters transfers via the Unix domain + control socket. The HTTP handler looks up configs through get(). """ - def __init__(self, cfg_dir: str = CFG_DIR): - self._cfg_dir = cfg_dir - self._cache: Dict[str, Tuple[float, Dict[str, Any]]] = {} - self._cache_guard = threading.Lock() + def __init__(self) -> None: + self._lock = threading.Lock() + self._transfers: Dict[str, Dict[str, Any]] = {} - @property - def cfg_dir(self) -> str: - return self._cfg_dir + def register(self, transfer_id: str, config: Dict[str, Any]) -> bool: + safe_id = safe_transfer_id(transfer_id) + if safe_id is None: + logging.error("register rejected invalid transfer_id=%r", transfer_id) + return False + with self._lock: + self._transfers[safe_id] = config + logging.info("registered transfer_id=%s active=%d", safe_id, len(self._transfers)) + return True - def load(self, image_id: str) -> Optional[Dict[str, Any]]: - safe_id = safe_transfer_id(image_id) + def unregister(self, transfer_id: str) -> int: + """Remove a transfer and return the number of remaining active transfers.""" + safe_id = safe_transfer_id(transfer_id) + if safe_id is None: + logging.error("unregister rejected invalid transfer_id=%r", transfer_id) + with self._lock: + return len(self._transfers) + with self._lock: + self._transfers.pop(safe_id, None) + remaining = len(self._transfers) + logging.info("unregistered transfer_id=%s active=%d", safe_id, remaining) + return remaining + + def get(self, transfer_id: str) -> Optional[Dict[str, Any]]: + safe_id = safe_transfer_id(transfer_id) if safe_id is None: return None + with self._lock: + return self._transfers.get(safe_id) - cfg_path = os.path.join(self._cfg_dir, safe_id) - try: - st = os.stat(cfg_path) - except FileNotFoundError: - return None - except OSError as e: - logging.error("cfg stat failed image_id=%s err=%r", image_id, e) - return None - - with self._cache_guard: - cached = self._cache.get(safe_id) - if cached is not None: - cached_mtime, cached_cfg = cached - if float(st.st_mtime) == float(cached_mtime): - return cached_cfg - - try: - with open(cfg_path, "rb") as f: - raw = f.read(4096) - except OSError as e: - logging.error("cfg read failed image_id=%s err=%r", image_id, e) - return None - - try: - obj = json.loads(raw.decode("utf-8")) - except Exception as e: - logging.error("cfg parse failed image_id=%s err=%r", image_id, e) - return None - - if not isinstance(obj, dict): - logging.error("cfg invalid type image_id=%s type=%s", image_id, type(obj).__name__) - return None - - backend = obj.get("backend") - if backend is None: - backend = "nbd" - if not isinstance(backend, str): - logging.error("cfg invalid backend type image_id=%s", image_id) - return None - backend = backend.lower() - if backend not in ("nbd", "file"): - logging.error("cfg unsupported backend image_id=%s backend=%s", image_id, backend) - return None - - if backend == "file": - file_path = obj.get("file") - if not isinstance(file_path, str) or not file_path.strip(): - logging.error("cfg missing/invalid file path for file backend image_id=%s", image_id) - return None - cfg: Dict[str, Any] = {"backend": "file", "file": file_path.strip()} - else: - socket_path = obj.get("socket") - export = obj.get("export") - export_bitmap = obj.get("export_bitmap") - if not isinstance(socket_path, str) or not socket_path.strip(): - logging.error("cfg missing/invalid socket path for nbd backend image_id=%s", image_id) - return None - socket_path = socket_path.strip() - if export is not None and (not isinstance(export, str) or not export): - logging.error("cfg missing/invalid export image_id=%s", image_id) - return None - cfg = { - "backend": "nbd", - "socket": socket_path, - "export": export, - "export_bitmap": export_bitmap, - } - - with self._cache_guard: - self._cache[safe_id] = (float(st.st_mtime), cfg) - return cfg + def active_count(self) -> int: + with self._lock: + return len(self._transfers) diff --git a/scripts/vm/hypervisor/kvm/imageserver/constants.py b/scripts/vm/hypervisor/kvm/imageserver/constants.py index 6836f579807..4e8d5c86da5 100644 --- a/scripts/vm/hypervisor/kvm/imageserver/constants.py +++ b/scripts/vm/hypervisor/kvm/imageserver/constants.py @@ -27,3 +27,4 @@ MAX_PARALLEL_READS = 8 MAX_PARALLEL_WRITES = 1 CFG_DIR = "/tmp/imagetransfer" +CONTROL_SOCKET = "/var/run/cloudstack/image-server.sock" diff --git a/scripts/vm/hypervisor/kvm/imageserver/handler.py b/scripts/vm/hypervisor/kvm/imageserver/handler.py index 8d894f9b0c5..a689467238b 100644 --- a/scripts/vm/hypervisor/kvm/imageserver/handler.py +++ b/scripts/vm/hypervisor/kvm/imageserver/handler.py @@ -25,7 +25,7 @@ from urllib.parse import parse_qs from .backends import NbdBackend, create_backend from .concurrency import ConcurrencyManager -from .config import TransferConfigLoader +from .config import TransferRegistry from .constants import CHUNK_SIZE, MAX_PARALLEL_READS, MAX_PARALLEL_WRITES from .util import is_fallback_dirty_response, json_bytes, now_s @@ -38,7 +38,7 @@ class Handler(BaseHTTPRequestHandler): All backend I/O is delegated to ImageBackend implementations via the create_backend() factory. - Class-level attributes _concurrency and _config_loader are injected + Class-level attributes _concurrency and _registry are injected by the server at startup (see server.py / make_handler()). """ @@ -46,7 +46,7 @@ class Handler(BaseHTTPRequestHandler): server_protocol = "HTTP/1.1" _concurrency: ConcurrencyManager - _config_loader: TransferConfigLoader + _registry: TransferRegistry _CONTENT_RANGE_RE = re.compile(r"^bytes\s+(\d+)-(\d+)/(?:\*|\d+)$") @@ -197,7 +197,7 @@ class Handler(BaseHTTPRequestHandler): return parse_qs(query, keep_blank_values=True) def _image_cfg(self, image_id: str) -> Optional[Dict[str, Any]]: - return self._config_loader.load(image_id) + return self._registry.get(image_id) # ------------------------------------------------------------------ # HTTP verb dispatchers diff --git a/scripts/vm/hypervisor/kvm/imageserver/server.py b/scripts/vm/hypervisor/kvm/imageserver/server.py index 7e9cc74dcaf..d348bf4950d 100644 --- a/scripts/vm/hypervisor/kvm/imageserver/server.py +++ b/scripts/vm/hypervisor/kvm/imageserver/server.py @@ -16,7 +16,11 @@ # under the License. import argparse +import json import logging +import os +import socket +import threading from http.server import HTTPServer from socketserver import ThreadingMixIn from typing import Type @@ -28,14 +32,14 @@ except ImportError: pass from .concurrency import ConcurrencyManager -from .config import TransferConfigLoader -from .constants import CFG_DIR, MAX_PARALLEL_READS, MAX_PARALLEL_WRITES +from .config import TransferRegistry +from .constants import CONTROL_SOCKET, MAX_PARALLEL_READS, MAX_PARALLEL_WRITES from .handler import Handler def make_handler( concurrency: ConcurrencyManager, - config_loader: TransferConfigLoader, + registry: TransferRegistry, ) -> Type[Handler]: """ Create a Handler subclass with injected dependencies. @@ -46,17 +50,131 @@ def make_handler( class ConfiguredHandler(Handler): _concurrency = concurrency - _config_loader = config_loader + _registry = registry return ConfiguredHandler +def _validate_config(obj: dict) -> dict: + """ + Validate and normalize a transfer config dict received over the control + socket. Returns the cleaned config or raises ValueError. + """ + backend = obj.get("backend") + if backend is None: + backend = "nbd" + if not isinstance(backend, str): + raise ValueError("invalid backend type") + backend = backend.lower() + if backend not in ("nbd", "file"): + raise ValueError(f"unsupported backend: {backend}") + + if backend == "file": + file_path = obj.get("file") + if not isinstance(file_path, str) or not file_path.strip(): + raise ValueError("missing/invalid file path for file backend") + return {"backend": "file", "file": file_path.strip()} + + socket_path = obj.get("socket") + export = obj.get("export") + export_bitmap = obj.get("export_bitmap") + if not isinstance(socket_path, str) or not socket_path.strip(): + raise ValueError("missing/invalid socket path for nbd backend") + if export is not None and (not isinstance(export, str) or not export): + raise ValueError("invalid export name") + return { + "backend": "nbd", + "socket": socket_path.strip(), + "export": export, + "export_bitmap": export_bitmap, + } + + +def _handle_control_conn(conn: socket.socket, registry: TransferRegistry) -> None: + """Handle a single control-socket connection (one JSON request/response).""" + try: + data = b"" + while True: + chunk = conn.recv(4096) + if not chunk: + break + data += chunk + if b"\n" in data: + break + + msg = json.loads(data.strip()) + action = msg.get("action") + + if action == "register": + transfer_id = msg.get("transfer_id") + raw_config = msg.get("config") + if not transfer_id or not isinstance(raw_config, dict): + resp = {"status": "error", "message": "missing transfer_id or config"} + else: + try: + config = _validate_config(raw_config) + except ValueError as e: + resp = {"status": "error", "message": str(e)} + else: + if registry.register(transfer_id, config): + resp = {"status": "ok", "active_transfers": registry.active_count()} + else: + resp = {"status": "error", "message": "invalid transfer_id"} + elif action == "unregister": + transfer_id = msg.get("transfer_id") + if not transfer_id: + resp = {"status": "error", "message": "missing transfer_id"} + else: + remaining = registry.unregister(transfer_id) + resp = {"status": "ok", "active_transfers": remaining} + elif action == "status": + resp = {"status": "ok", "active_transfers": registry.active_count()} + else: + resp = {"status": "error", "message": f"unknown action: {action}"} + + conn.sendall((json.dumps(resp) + "\n").encode("utf-8")) + except Exception as e: + logging.error("control socket error: %r", e) + try: + conn.sendall((json.dumps({"status": "error", "message": str(e)}) + "\n").encode("utf-8")) + except Exception: + pass + finally: + conn.close() + + +def _control_listener(registry: TransferRegistry, sock_path: str) -> None: + """Accept loop for the Unix domain control socket (runs in a daemon thread).""" + if os.path.exists(sock_path): + os.unlink(sock_path) + os.makedirs(os.path.dirname(sock_path), exist_ok=True) + + srv = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + srv.bind(sock_path) + os.chmod(sock_path, 0o660) + srv.listen(5) + logging.info("control socket listening on %s", sock_path) + + while True: + conn, _ = srv.accept() + threading.Thread( + target=_handle_control_conn, + args=(conn, registry), + daemon=True, + ).start() + + def main() -> None: parser = argparse.ArgumentParser( description="CloudStack image server backed by NBD / local file" ) parser.add_argument("--listen", default="127.0.0.1", help="Address to bind") parser.add_argument("--port", type=int, default=54323, help="Port to listen on") + parser.add_argument( + "--control-socket", + default=CONTROL_SOCKET, + help="Path to the Unix domain control socket", + ) args = parser.parse_args() logging.basicConfig( @@ -64,12 +182,18 @@ def main() -> None: format="%(asctime)s %(levelname)s %(message)s", ) + registry = TransferRegistry() concurrency = ConcurrencyManager(MAX_PARALLEL_READS, MAX_PARALLEL_WRITES) - config_loader = TransferConfigLoader(CFG_DIR) - handler_cls = make_handler(concurrency, config_loader) + handler_cls = make_handler(concurrency, registry) + + ctrl_thread = threading.Thread( + target=_control_listener, + args=(registry, args.control_socket), + daemon=True, + ) + ctrl_thread.start() addr = (args.listen, args.port) httpd = ThreadingHTTPServer(addr, handler_cls) logging.info("listening on http://%s:%d", args.listen, args.port) - logging.info("image configs are read from %s/", config_loader.cfg_dir) httpd.serve_forever()