Agent communication with Image server via unix socket

This commit is contained in:
Abhisar Sinha 2026-03-23 19:40:04 +05:30 committed by Abhishek Kumar
parent 3e7268e457
commit 81fc6d5da6
10 changed files with 346 additions and 193 deletions

View File

@ -0,0 +1,123 @@
//Licensed to the Apache Software Foundation (ASF) under one
//or more contributor license agreements. See the NOTICE file
//distributed with this work for additional information
//regarding copyright ownership. The ASF licenses this file
//to you under the Apache License, Version 2.0 (the
//"License"); you may not use this file except in compliance
//the License. You may obtain a copy of the License at
//
//http://www.apache.org/licenses/LICENSE-2.0
//
//Unless required by applicable law or agreed to in writing,
//software distributed under the License is distributed on an
//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
//KIND, either express or implied. See the License for the
//specific language governing permissions and limitations
//under the License.
package com.cloud.hypervisor.kvm.resource;
import java.util.HashMap;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.cloud.utils.script.OutputInterpreter;
import com.cloud.utils.script.Script;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
/**
* Communicates with the cloudstack-image-server control socket via socat.
*
* Protocol: newline-delimited JSON over a Unix domain socket.
* Actions: register, unregister, status.
*/
public class ImageServerControlSocket {
private static final Logger LOGGER = LogManager.getLogger(ImageServerControlSocket.class);
static final String CONTROL_SOCKET_PATH = "/var/run/cloudstack/image-server.sock";
private static final Gson GSON = new GsonBuilder().create();
private ImageServerControlSocket() {
}
/**
* Send a JSON message to the image server control socket and return the
* parsed response, or null on communication failure.
*/
static JsonObject sendMessage(Map<String, Object> message) {
String json = GSON.toJson(message);
Script script = new Script("/bin/bash", LOGGER);
script.add("-c");
script.add(String.format("echo '%s' | socat -t5 - UNIX-CONNECT:%s",
json.replace("'", "'\\''"), CONTROL_SOCKET_PATH));
OutputInterpreter.AllLinesParser parser = new OutputInterpreter.AllLinesParser();
String result = script.execute(parser);
if (result != null) {
LOGGER.error("Control socket communication failed: {}", result);
return null;
}
String output = parser.getLines();
if (output == null || output.trim().isEmpty()) {
LOGGER.error("Empty response from control socket");
return null;
}
try {
return JsonParser.parseString(output.trim()).getAsJsonObject();
} catch (Exception e) {
LOGGER.error("Failed to parse control socket response: {}", output, e);
return null;
}
}
/**
* Register a transfer config with the image server.
* @return true if the server accepted the registration.
*/
public static boolean registerTransfer(String transferId, Map<String, Object> config) {
Map<String, Object> msg = new HashMap<>();
msg.put("action", "register");
msg.put("transfer_id", transferId);
msg.put("config", config);
JsonObject resp = sendMessage(msg);
if (resp == null) {
return false;
}
return "ok".equals(resp.has("status") ? resp.get("status").getAsString() : null);
}
/**
* Unregister a transfer from the image server.
* @return the number of remaining active transfers, or -1 on error.
*/
public static int unregisterTransfer(String transferId) {
Map<String, Object> msg = new HashMap<>();
msg.put("action", "unregister");
msg.put("transfer_id", transferId);
JsonObject resp = sendMessage(msg);
if (resp == null) {
return -1;
}
if (!"ok".equals(resp.has("status") ? resp.get("status").getAsString() : null)) {
return -1;
}
return resp.has("active_transfers") ? resp.get("active_transfers").getAsInt() : -1;
}
/**
* Check whether the image server control socket is responsive.
* @return true if the server responded with status "ok".
*/
public static boolean isReady() {
Map<String, Object> msg = new HashMap<>();
msg.put("action", "status");
JsonObject resp = sendMessage(msg);
if (resp == null) {
return false;
}
return "ok".equals(resp.has("status") ? resp.get("status").getAsString() : null);
}
}

View File

@ -1100,10 +1100,11 @@ public class LibvirtComputingResource extends ServerResourceBase implements Serv
throw new ConfigurationException("Unable to find nasbackup.sh");
}
imageServerPath = Script.findScript(kvmScriptsDir, "image_server.py");
if (imageServerPath == null) {
throw new ConfigurationException("Unable to find image_server.py");
String imageServerMain = Script.findScript(kvmScriptsDir, "imageserver/__main__.py");
if (imageServerMain == null) {
throw new ConfigurationException("Unable to find imageserver package");
}
imageServerPath = new File(imageServerMain).getParent();
createTmplPath = Script.findScript(storageScriptsDir, "createtmplt.sh");
if (createTmplPath == null) {

View File

@ -18,7 +18,6 @@
package com.cloud.hypervisor.kvm.resource.wrapper;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@ -26,62 +25,60 @@ import org.apache.cloudstack.backup.CreateImageTransferAnswer;
import org.apache.cloudstack.backup.CreateImageTransferCommand;
import org.apache.cloudstack.backup.ImageTransfer;
import org.apache.cloudstack.storage.resource.IpTablesHelper;
import org.apache.commons.io.FileUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.cloud.agent.api.Answer;
import com.cloud.hypervisor.kvm.resource.ImageServerControlSocket;
import com.cloud.hypervisor.kvm.resource.LibvirtComputingResource;
import com.cloud.resource.CommandWrapper;
import com.cloud.resource.ResourceWrapper;
import com.cloud.utils.StringUtils;
import com.cloud.utils.script.Script;
import com.google.gson.GsonBuilder;
@ResourceWrapper(handles = CreateImageTransferCommand.class)
public class LibvirtCreateImageTransferCommandWrapper extends CommandWrapper<CreateImageTransferCommand, Answer, LibvirtComputingResource> {
protected Logger logger = LogManager.getLogger(getClass());
private boolean startImageServerIfNotRunning(int imageServerPort, LibvirtComputingResource resource) {
final String imageServerScript = resource.getImageServerPath();
final String imageServerPackageDir = resource.getImageServerPath();
final String imageServerParentDir = new File(imageServerPackageDir).getParent();
final String imageServerModuleName = new File(imageServerPackageDir).getName();
String unitName = "cloudstack-image-server";
Script checkScript = new Script("/bin/bash", logger);
checkScript.add("-c");
checkScript.add(String.format("systemctl is-active --quiet %s", unitName));
String checkResult = checkScript.execute();
if (checkResult == null) {
if (checkResult == null && ImageServerControlSocket.isReady()) {
return true;
}
String systemdRunCmd = String.format(
"systemd-run --unit=%s --property=Restart=no /usr/bin/python3 %s --listen 0.0.0.0 --port %d",
unitName, imageServerScript, imageServerPort);
if (checkResult != null) {
String systemdRunCmd = String.format(
"systemd-run --unit=%s --property=Restart=no --property=WorkingDirectory=%s /usr/bin/python3 -m %s --listen 0.0.0.0 --port %d",
unitName, imageServerParentDir, imageServerModuleName, imageServerPort);
Script startScript = new Script("/bin/bash", logger);
startScript.add("-c");
startScript.add(systemdRunCmd);
String startResult = startScript.execute();
Script startScript = new Script("/bin/bash", logger);
startScript.add("-c");
startScript.add(systemdRunCmd);
String startResult = startScript.execute();
if (startResult != null) {
logger.error(String.format("Failed to start the Image server: %s", startResult));
return false;
if (startResult != null) {
logger.error(String.format("Failed to start the Image server: %s", startResult));
return false;
}
}
// Wait with timeout until the service is up
int maxWaitSeconds = 10;
int pollIntervalMs = 1000;
int maxAttempts = (maxWaitSeconds * 1000) / pollIntervalMs;
boolean serviceActive = false;
boolean serverReady = false;
for (int attempt = 0; attempt < maxAttempts; attempt++) {
Script verifyScript = new Script("/bin/bash", logger);
verifyScript.add("-c");
verifyScript.add(String.format("systemctl is-active --quiet %s", unitName));
String verifyResult = verifyScript.execute();
if (verifyResult == null) {
serviceActive = true;
logger.info(String.format("Image server is now active (attempt %d)", attempt + 1));
if (ImageServerControlSocket.isReady()) {
serverReady = true;
logger.info(String.format("Image server control socket is ready (attempt %d)", attempt + 1));
break;
}
try {
@ -92,8 +89,8 @@ public class LibvirtCreateImageTransferCommandWrapper extends CommandWrapper<Cre
}
}
if (!serviceActive) {
logger.error(String.format("Image server failed to start within %d seconds", maxWaitSeconds));
if (!serverReady) {
logger.error(String.format("Image server control socket not ready within %d seconds", maxWaitSeconds));
return false;
}
@ -138,22 +135,14 @@ public class LibvirtCreateImageTransferCommandWrapper extends CommandWrapper<Cre
}
}
try {
final String json = new GsonBuilder().create().toJson(payload);
File dir = new File("/tmp/imagetransfer");
if (!dir.exists()) {
dir.mkdirs();
}
final File transferFile = new File("/tmp/imagetransfer", transferId);
FileUtils.writeStringToFile(transferFile, json, "UTF-8");
} catch (IOException e) {
logger.warn("Failed to prepare image transfer on KVM host", e);
return new CreateImageTransferAnswer(cmd, false, "Failed to prepare image transfer on KVM host: " + e.getMessage());
final int imageServerPort = 54323;
if (!startImageServerIfNotRunning(imageServerPort, resource)) {
return new CreateImageTransferAnswer(cmd, false, "Failed to start image server.");
}
final int imageServerPort = 54323;
startImageServerIfNotRunning(imageServerPort, resource);
if (!ImageServerControlSocket.registerTransfer(transferId, payload)) {
return new CreateImageTransferAnswer(cmd, false, "Failed to register transfer with image server.");
}
final String transferUrl = String.format("http://%s:%d/images/%s", resource.getPrivateIp(), imageServerPort, transferId);
return new CreateImageTransferAnswer(cmd, true, "Image transfer prepared on KVM host.", transferId, transferUrl);

View File

@ -17,18 +17,12 @@
package com.cloud.hypervisor.kvm.resource.wrapper;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.stream.Stream;
import org.apache.cloudstack.backup.FinalizeImageTransferCommand;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.cloud.agent.api.Answer;
import com.cloud.hypervisor.kvm.resource.ImageServerControlSocket;
import com.cloud.hypervisor.kvm.resource.LibvirtComputingResource;
import com.cloud.resource.CommandWrapper;
import com.cloud.resource.ResourceWrapper;
@ -54,9 +48,8 @@ public class LibvirtFinalizeImageTransferCommandWrapper extends CommandWrapper<F
checkScript.add(String.format("systemctl is-active --quiet %s", unitName));
String checkResult = checkScript.execute();
if (checkResult != null) {
logger.info(String.format("Image server not running, resetting failed state"));
logger.info("Image server not running, resetting failed state");
resetService(unitName);
// Still try to remove firewall rule in case it exists
removeFirewallRule(imageServerPort);
return true;
}
@ -66,7 +59,7 @@ public class LibvirtFinalizeImageTransferCommandWrapper extends CommandWrapper<F
stopScript.add(String.format("systemctl stop %s", unitName));
stopScript.execute();
resetService(unitName);
logger.info(String.format("Image server %s stopped", unitName));
logger.info("Image server {} stopped", unitName);
removeFirewallRule(imageServerPort);
@ -80,9 +73,9 @@ public class LibvirtFinalizeImageTransferCommandWrapper extends CommandWrapper<F
removeScript.add(String.format("iptables -D INPUT %s || true", rule));
String result = removeScript.execute();
if (result != null && !result.isEmpty() && !result.contains("iptables: Bad rule")) {
logger.debug(String.format("Firewall rule removal result for port %d: %s", port, result));
logger.debug("Firewall rule removal result for port {}: {}", port, result);
} else {
logger.info(String.format("Firewall rule removed for port %d (or did not exist)", port));
logger.info("Firewall rule removed for port {} (or did not exist)", port);
}
}
@ -92,17 +85,15 @@ public class LibvirtFinalizeImageTransferCommandWrapper extends CommandWrapper<F
return new Answer(cmd, false, "transferId is empty.");
}
final File transferFile = new File("/tmp/imagetransfer", transferId);
if (transferFile.exists() && !transferFile.delete()) {
return new Answer(cmd, false, "Failed to delete transfer config file: " + transferFile.getAbsolutePath());
int activeTransfers = ImageServerControlSocket.unregisterTransfer(transferId);
if (activeTransfers < 0) {
logger.warn("Could not reach image server to unregister transfer {}; assuming server is down", transferId);
stopImageServer();
return new Answer(cmd, true, "Image transfer finalized (server unreachable, forced stop).");
}
try (Stream<Path> stream = Files.list(Paths.get("/tmp/imagetransfer"))) {
if (!stream.findAny().isPresent()) {
stopImageServer();
}
} catch (IOException e) {
logger.warn("Failed to list /tmp/imagetransfer", e);
if (activeTransfers == 0) {
stopImageServer();
}
return new Answer(cmd, true, "Image transfer finalized.");

View File

@ -1,28 +0,0 @@
#!/usr/bin/env python3
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from imageserver.server import main
if __name__ == "__main__":
main()

View File

@ -18,7 +18,11 @@
"""
CloudStack image server HTTP server backed by NBD over Unix socket or a local file.
Supports two backends (configured per-transfer via JSON config):
Transfer configs are registered/unregistered by the cloudstack-agent via a
Unix domain control socket (default: /var/run/cloudstack/image-server.sock)
and stored in-memory for the lifetime of the server process.
Supports two backends (configured per-transfer at registration time):
- nbd: proxy to an NBD server via Unix socket; supports range reads/writes
(GET/PUT/PATCH), extents, zero, flush.
- file: read/write a local qcow2/raw file; full PUT only, GET with optional

View File

@ -15,13 +15,10 @@
# specific language governing permissions and limitations
# under the License.
import json
import logging
import os
import threading
from typing import Any, Dict, Optional, Tuple
from .constants import CFG_DIR
from typing import Any, Dict, Optional
def safe_transfer_id(image_id: str) -> Optional[str]:
@ -40,97 +37,48 @@ def safe_transfer_id(image_id: str) -> Optional[str]:
return image_id
class TransferConfigLoader:
class TransferRegistry:
"""
Loads and caches per-image transfer configuration from JSON files.
Thread-safe in-memory registry for active image transfer configurations.
CloudStack writes a JSON file at <cfg_dir>/<transferId> with:
- NBD backend: {"backend": "nbd", "socket": "...", "export": "vda", "export_bitmap": "..."}
- File backend: {"backend": "file", "file": "/path/to/image.qcow2"}
The cloudstack-agent registers/unregisters transfers via the Unix domain
control socket. The HTTP handler looks up configs through get().
"""
def __init__(self, cfg_dir: str = CFG_DIR):
self._cfg_dir = cfg_dir
self._cache: Dict[str, Tuple[float, Dict[str, Any]]] = {}
self._cache_guard = threading.Lock()
def __init__(self) -> None:
self._lock = threading.Lock()
self._transfers: Dict[str, Dict[str, Any]] = {}
@property
def cfg_dir(self) -> str:
return self._cfg_dir
def register(self, transfer_id: str, config: Dict[str, Any]) -> bool:
safe_id = safe_transfer_id(transfer_id)
if safe_id is None:
logging.error("register rejected invalid transfer_id=%r", transfer_id)
return False
with self._lock:
self._transfers[safe_id] = config
logging.info("registered transfer_id=%s active=%d", safe_id, len(self._transfers))
return True
def load(self, image_id: str) -> Optional[Dict[str, Any]]:
safe_id = safe_transfer_id(image_id)
def unregister(self, transfer_id: str) -> int:
"""Remove a transfer and return the number of remaining active transfers."""
safe_id = safe_transfer_id(transfer_id)
if safe_id is None:
logging.error("unregister rejected invalid transfer_id=%r", transfer_id)
with self._lock:
return len(self._transfers)
with self._lock:
self._transfers.pop(safe_id, None)
remaining = len(self._transfers)
logging.info("unregistered transfer_id=%s active=%d", safe_id, remaining)
return remaining
def get(self, transfer_id: str) -> Optional[Dict[str, Any]]:
safe_id = safe_transfer_id(transfer_id)
if safe_id is None:
return None
with self._lock:
return self._transfers.get(safe_id)
cfg_path = os.path.join(self._cfg_dir, safe_id)
try:
st = os.stat(cfg_path)
except FileNotFoundError:
return None
except OSError as e:
logging.error("cfg stat failed image_id=%s err=%r", image_id, e)
return None
with self._cache_guard:
cached = self._cache.get(safe_id)
if cached is not None:
cached_mtime, cached_cfg = cached
if float(st.st_mtime) == float(cached_mtime):
return cached_cfg
try:
with open(cfg_path, "rb") as f:
raw = f.read(4096)
except OSError as e:
logging.error("cfg read failed image_id=%s err=%r", image_id, e)
return None
try:
obj = json.loads(raw.decode("utf-8"))
except Exception as e:
logging.error("cfg parse failed image_id=%s err=%r", image_id, e)
return None
if not isinstance(obj, dict):
logging.error("cfg invalid type image_id=%s type=%s", image_id, type(obj).__name__)
return None
backend = obj.get("backend")
if backend is None:
backend = "nbd"
if not isinstance(backend, str):
logging.error("cfg invalid backend type image_id=%s", image_id)
return None
backend = backend.lower()
if backend not in ("nbd", "file"):
logging.error("cfg unsupported backend image_id=%s backend=%s", image_id, backend)
return None
if backend == "file":
file_path = obj.get("file")
if not isinstance(file_path, str) or not file_path.strip():
logging.error("cfg missing/invalid file path for file backend image_id=%s", image_id)
return None
cfg: Dict[str, Any] = {"backend": "file", "file": file_path.strip()}
else:
socket_path = obj.get("socket")
export = obj.get("export")
export_bitmap = obj.get("export_bitmap")
if not isinstance(socket_path, str) or not socket_path.strip():
logging.error("cfg missing/invalid socket path for nbd backend image_id=%s", image_id)
return None
socket_path = socket_path.strip()
if export is not None and (not isinstance(export, str) or not export):
logging.error("cfg missing/invalid export image_id=%s", image_id)
return None
cfg = {
"backend": "nbd",
"socket": socket_path,
"export": export,
"export_bitmap": export_bitmap,
}
with self._cache_guard:
self._cache[safe_id] = (float(st.st_mtime), cfg)
return cfg
def active_count(self) -> int:
with self._lock:
return len(self._transfers)

View File

@ -27,3 +27,4 @@ MAX_PARALLEL_READS = 8
MAX_PARALLEL_WRITES = 1
CFG_DIR = "/tmp/imagetransfer"
CONTROL_SOCKET = "/var/run/cloudstack/image-server.sock"

View File

@ -25,7 +25,7 @@ from urllib.parse import parse_qs
from .backends import NbdBackend, create_backend
from .concurrency import ConcurrencyManager
from .config import TransferConfigLoader
from .config import TransferRegistry
from .constants import CHUNK_SIZE, MAX_PARALLEL_READS, MAX_PARALLEL_WRITES
from .util import is_fallback_dirty_response, json_bytes, now_s
@ -38,7 +38,7 @@ class Handler(BaseHTTPRequestHandler):
All backend I/O is delegated to ImageBackend implementations via the
create_backend() factory.
Class-level attributes _concurrency and _config_loader are injected
Class-level attributes _concurrency and _registry are injected
by the server at startup (see server.py / make_handler()).
"""
@ -46,7 +46,7 @@ class Handler(BaseHTTPRequestHandler):
server_protocol = "HTTP/1.1"
_concurrency: ConcurrencyManager
_config_loader: TransferConfigLoader
_registry: TransferRegistry
_CONTENT_RANGE_RE = re.compile(r"^bytes\s+(\d+)-(\d+)/(?:\*|\d+)$")
@ -197,7 +197,7 @@ class Handler(BaseHTTPRequestHandler):
return parse_qs(query, keep_blank_values=True)
def _image_cfg(self, image_id: str) -> Optional[Dict[str, Any]]:
return self._config_loader.load(image_id)
return self._registry.get(image_id)
# ------------------------------------------------------------------
# HTTP verb dispatchers

View File

@ -16,7 +16,11 @@
# under the License.
import argparse
import json
import logging
import os
import socket
import threading
from http.server import HTTPServer
from socketserver import ThreadingMixIn
from typing import Type
@ -28,14 +32,14 @@ except ImportError:
pass
from .concurrency import ConcurrencyManager
from .config import TransferConfigLoader
from .constants import CFG_DIR, MAX_PARALLEL_READS, MAX_PARALLEL_WRITES
from .config import TransferRegistry
from .constants import CONTROL_SOCKET, MAX_PARALLEL_READS, MAX_PARALLEL_WRITES
from .handler import Handler
def make_handler(
concurrency: ConcurrencyManager,
config_loader: TransferConfigLoader,
registry: TransferRegistry,
) -> Type[Handler]:
"""
Create a Handler subclass with injected dependencies.
@ -46,17 +50,131 @@ def make_handler(
class ConfiguredHandler(Handler):
_concurrency = concurrency
_config_loader = config_loader
_registry = registry
return ConfiguredHandler
def _validate_config(obj: dict) -> dict:
"""
Validate and normalize a transfer config dict received over the control
socket. Returns the cleaned config or raises ValueError.
"""
backend = obj.get("backend")
if backend is None:
backend = "nbd"
if not isinstance(backend, str):
raise ValueError("invalid backend type")
backend = backend.lower()
if backend not in ("nbd", "file"):
raise ValueError(f"unsupported backend: {backend}")
if backend == "file":
file_path = obj.get("file")
if not isinstance(file_path, str) or not file_path.strip():
raise ValueError("missing/invalid file path for file backend")
return {"backend": "file", "file": file_path.strip()}
socket_path = obj.get("socket")
export = obj.get("export")
export_bitmap = obj.get("export_bitmap")
if not isinstance(socket_path, str) or not socket_path.strip():
raise ValueError("missing/invalid socket path for nbd backend")
if export is not None and (not isinstance(export, str) or not export):
raise ValueError("invalid export name")
return {
"backend": "nbd",
"socket": socket_path.strip(),
"export": export,
"export_bitmap": export_bitmap,
}
def _handle_control_conn(conn: socket.socket, registry: TransferRegistry) -> None:
"""Handle a single control-socket connection (one JSON request/response)."""
try:
data = b""
while True:
chunk = conn.recv(4096)
if not chunk:
break
data += chunk
if b"\n" in data:
break
msg = json.loads(data.strip())
action = msg.get("action")
if action == "register":
transfer_id = msg.get("transfer_id")
raw_config = msg.get("config")
if not transfer_id or not isinstance(raw_config, dict):
resp = {"status": "error", "message": "missing transfer_id or config"}
else:
try:
config = _validate_config(raw_config)
except ValueError as e:
resp = {"status": "error", "message": str(e)}
else:
if registry.register(transfer_id, config):
resp = {"status": "ok", "active_transfers": registry.active_count()}
else:
resp = {"status": "error", "message": "invalid transfer_id"}
elif action == "unregister":
transfer_id = msg.get("transfer_id")
if not transfer_id:
resp = {"status": "error", "message": "missing transfer_id"}
else:
remaining = registry.unregister(transfer_id)
resp = {"status": "ok", "active_transfers": remaining}
elif action == "status":
resp = {"status": "ok", "active_transfers": registry.active_count()}
else:
resp = {"status": "error", "message": f"unknown action: {action}"}
conn.sendall((json.dumps(resp) + "\n").encode("utf-8"))
except Exception as e:
logging.error("control socket error: %r", e)
try:
conn.sendall((json.dumps({"status": "error", "message": str(e)}) + "\n").encode("utf-8"))
except Exception:
pass
finally:
conn.close()
def _control_listener(registry: TransferRegistry, sock_path: str) -> None:
"""Accept loop for the Unix domain control socket (runs in a daemon thread)."""
if os.path.exists(sock_path):
os.unlink(sock_path)
os.makedirs(os.path.dirname(sock_path), exist_ok=True)
srv = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
srv.bind(sock_path)
os.chmod(sock_path, 0o660)
srv.listen(5)
logging.info("control socket listening on %s", sock_path)
while True:
conn, _ = srv.accept()
threading.Thread(
target=_handle_control_conn,
args=(conn, registry),
daemon=True,
).start()
def main() -> None:
parser = argparse.ArgumentParser(
description="CloudStack image server backed by NBD / local file"
)
parser.add_argument("--listen", default="127.0.0.1", help="Address to bind")
parser.add_argument("--port", type=int, default=54323, help="Port to listen on")
parser.add_argument(
"--control-socket",
default=CONTROL_SOCKET,
help="Path to the Unix domain control socket",
)
args = parser.parse_args()
logging.basicConfig(
@ -64,12 +182,18 @@ def main() -> None:
format="%(asctime)s %(levelname)s %(message)s",
)
registry = TransferRegistry()
concurrency = ConcurrencyManager(MAX_PARALLEL_READS, MAX_PARALLEL_WRITES)
config_loader = TransferConfigLoader(CFG_DIR)
handler_cls = make_handler(concurrency, config_loader)
handler_cls = make_handler(concurrency, registry)
ctrl_thread = threading.Thread(
target=_control_listener,
args=(registry, args.control_socket),
daemon=True,
)
ctrl_thread.start()
addr = (args.listen, args.port)
httpd = ThreadingHTTPServer(addr, handler_cls)
logging.info("listening on http://%s:%d", args.listen, args.port)
logging.info("image configs are read from %s/<transferId>", config_loader.cfg_dir)
httpd.serve_forever()