cloudstack/scripts/vm/hypervisor/kvm/image_server.py

1586 lines
60 KiB
Python

#!/usr/bin/env python3
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
POC "imageio-like" HTTP server backed by NBD over Unix socket or a local file.
Supports two backends (see config payload):
- nbd: proxy to an NBD server via Unix socket (socket path, export, export_bitmap);
supports range reads/writes (GET/PUT/PATCH), extents, zero, flush. PUT accepts
full upload or ranged upload (Content-Range). Concurrent PUTs on the same image
are serialized (blocking).
- file: read/write a local qcow2 (or raw) file path; full PUT only (no range
writes), GET with optional ranges, flush.
How to run
----------
- Install dependency:
dnf install python3-libnbd
or
apt install python3-libnbd
- Run server:
createImageTransfer will start the server as a systemd service 'cloudstack-image-server'
Example curl commands
--------------------
- OPTIONS:
curl -i -X OPTIONS http://127.0.0.1:54323/images/demo
- GET full image:
curl -v http://127.0.0.1:54323/images/demo -o demo.img
- GET a byte range:
curl -v -H "Range: bytes=0-1048575" http://127.0.0.1:54323/images/demo -o first_1MiB.bin
- PUT full image (Content-Length must equal export size exactly). Optional ?flush=y|n:
curl -v -T demo.img http://127.0.0.1:54323/images/demo
curl -v -T demo.img "http://127.0.0.1:54323/images/demo?flush=y"
- PUT ranged (NBD backend only). Content-Range: bytes start-end/* or bytes start-end/size
(server uses only start; length from Content-Length). Optional ?flush=y|n:
curl -v -X PUT -H "Content-Range: bytes 0-1048575/*" -H "Content-Length: 1048576" \
--data-binary @chunk.bin http://127.0.0.1:54323/images/demo
curl -v -X PUT -H "Content-Range: bytes 1048576-2097151/52428800" -H "Content-Length: 1048576" \
--data-binary @chunk2.bin "http://127.0.0.1:54323/images/demo?flush=n"
- GET extents (zero/hole extents from NBD base:allocation):
curl -s http://127.0.0.1:54323/images/demo/extents | jq .
- GET extents with dirty and zero (requires export_bitmap in config):
curl -s "http://127.0.0.1:54323/images/demo/extents?context=dirty" | jq .
- POST flush:
curl -s -X POST http://127.0.0.1:54323/images/demo/flush | jq .
- PATCH zero (zero a byte range; application/json body):
curl -k -X PATCH \
-H "Content-Type: application/json" \
--data-binary '{"op": "zero", "offset": 4096, "size": 8192}' \
http://127.0.0.1:54323/images/demo
Zero at offset 1 GiB, 4096 bytes, no flush:
curl -k -X PATCH \
-H "Content-Type: application/json" \
--data-binary '{"op": "zero", "offset": 1073741824, "size": 4096}' \
http://127.0.0.1:54323/images/demo
Zero entire disk and flush:
curl -k -X PATCH \
-H "Content-Type: application/json" \
--data-binary '{"op": "zero", "size": 107374182400, "flush": true}' \
http://127.0.0.1:54323/images/demo
- PATCH flush (flush data to storage; operates on entire image):
curl -k -X PATCH \
-H "Content-Type: application/json" \
--data-binary '{"op": "flush"}' \
http://127.0.0.1:54323/images/demo
- PATCH range (write binary body at byte range; Range + Content-Length required):
curl -v -X PATCH -H "Range: bytes=0-1048576" --data-binary @chunk.bin \
http://127.0.0.1:54323/images/demo
"""
import argparse
import json
import logging
import os
import re
import socket
import threading
import time
from http import HTTPStatus
from http.server import BaseHTTPRequestHandler, HTTPServer
from socketserver import ThreadingMixIn
try:
from http.server import ThreadingHTTPServer
except ImportError:
# Python 3.6: ThreadingHTTPServer was added in 3.7
class ThreadingHTTPServer(ThreadingMixIn, HTTPServer):
pass
from typing import Any, Dict, List, Optional, Set, Tuple
from urllib.parse import parse_qs
import nbd
CHUNK_SIZE = 256 * 1024 # 256 KiB
# NBD base:allocation flags (hole=1, zero=2; hole|zero=3)
_NBD_STATE_HOLE = 1
_NBD_STATE_ZERO = 2
# NBD qemu:dirty-bitmap flags (dirty=1)
_NBD_STATE_DIRTY = 1
# Concurrency limits across ALL images.
MAX_PARALLEL_READS = 8
MAX_PARALLEL_WRITES = 1
_READ_SEM = threading.Semaphore(MAX_PARALLEL_READS)
_WRITE_SEM = threading.Semaphore(MAX_PARALLEL_WRITES)
# In-memory per-image lock: single lock gates both read and write.
_IMAGE_LOCKS: Dict[str, threading.Lock] = {}
_IMAGE_LOCKS_GUARD = threading.Lock()
# Dynamic image_id(transferId) -> backend mapping:
# CloudStack writes a JSON file at /tmp/imagetransfer/<transferId> with:
# - NBD backend: {"backend": "nbd", "socket": "/tmp/imagetransfer/<id>.sock", "export": "vda", "export_bitmap": "..."}
# - File backend: {"backend": "file", "file": "/path/to/image.qcow2"}
#
# This server reads that file on-demand.
_CFG_DIR = "/tmp/imagetransfer"
_CFG_CACHE: Dict[str, Tuple[float, Dict[str, Any]]] = {}
_CFG_CACHE_GUARD = threading.Lock()
def _json_bytes(obj: Any) -> bytes:
return json.dumps(obj, separators=(",", ":"), ensure_ascii=False).encode("utf-8")
def _merge_dirty_zero_extents(
allocation_extents: List[Tuple[int, int, bool]],
dirty_extents: List[Tuple[int, int, bool]],
size: int,
) -> List[Dict[str, Any]]:
"""
Merge allocation (start, length, zero) and dirty (start, length, dirty) extents
into a single list of {start, length, dirty, zero} with unified boundaries.
"""
boundaries: Set[int] = {0, size}
for start, length, _ in allocation_extents:
boundaries.add(start)
boundaries.add(start + length)
for start, length, _ in dirty_extents:
boundaries.add(start)
boundaries.add(start + length)
sorted_boundaries = sorted(boundaries)
def lookup(
extents: List[Tuple[int, int, bool]], offset: int, default: bool
) -> bool:
for start, length, flag in extents:
if start <= offset < start + length:
return flag
return default
result: List[Dict[str, Any]] = []
for i in range(len(sorted_boundaries) - 1):
a, b = sorted_boundaries[i], sorted_boundaries[i + 1]
if a >= b:
continue
result.append(
{
"start": a,
"length": b - a,
"dirty": lookup(dirty_extents, a, False),
"zero": lookup(allocation_extents, a, False),
}
)
return result
def _is_fallback_dirty_response(extents: List[Dict[str, Any]]) -> bool:
"""True if extents is the single-extent fallback (dirty=false, zero=false)."""
return (
len(extents) == 1
and extents[0].get("dirty") is False
and extents[0].get("zero") is False
)
def _get_image_lock(image_id: str) -> threading.Lock:
with _IMAGE_LOCKS_GUARD:
lock = _IMAGE_LOCKS.get(image_id)
if lock is None:
lock = threading.Lock()
_IMAGE_LOCKS[image_id] = lock
return lock
def _now_s() -> float:
return time.monotonic()
def _safe_transfer_id(image_id: str) -> Optional[str]:
"""
Only allow a single filename component to avoid path traversal.
We intentionally keep validation simple: reject anything containing '/' or '\\'.
"""
if not image_id:
return None
if image_id != os.path.basename(image_id):
return None
if "/" in image_id or "\\" in image_id:
return None
if image_id in (".", ".."):
return None
return image_id
def _load_image_cfg(image_id: str) -> Optional[Dict[str, Any]]:
safe_id = _safe_transfer_id(image_id)
if safe_id is None:
return None
cfg_path = os.path.join(_CFG_DIR, safe_id)
try:
st = os.stat(cfg_path)
except FileNotFoundError:
return None
except OSError as e:
logging.error("cfg stat failed image_id=%s err=%r", image_id, e)
return None
with _CFG_CACHE_GUARD:
cached = _CFG_CACHE.get(safe_id)
if cached is not None:
cached_mtime, cached_cfg = cached
# Use cached config if the file hasn't changed.
if float(st.st_mtime) == float(cached_mtime):
return cached_cfg
try:
with open(cfg_path, "rb") as f:
raw = f.read(4096)
except OSError as e:
logging.error("cfg read failed image_id=%s err=%r", image_id, e)
return None
try:
obj = json.loads(raw.decode("utf-8"))
except Exception as e:
logging.error("cfg parse failed image_id=%s err=%r", image_id, e)
return None
if not isinstance(obj, dict):
logging.error("cfg invalid type image_id=%s type=%s", image_id, type(obj).__name__)
return None
backend = obj.get("backend")
if backend is None:
backend = "nbd"
if not isinstance(backend, str):
logging.error("cfg invalid backend type image_id=%s", image_id)
return None
backend = backend.lower()
if backend not in ("nbd", "file"):
logging.error("cfg unsupported backend image_id=%s backend=%s", image_id, backend)
return None
if backend == "file":
file_path = obj.get("file")
if not isinstance(file_path, str) or not file_path.strip():
logging.error("cfg missing/invalid file path for file backend image_id=%s", image_id)
return None
cfg = {"backend": "file", "file": file_path.strip()}
else:
socket_path = obj.get("socket")
export = obj.get("export")
export_bitmap = obj.get("export_bitmap")
if not isinstance(socket_path, str) or not socket_path.strip():
logging.error("cfg missing/invalid socket path for nbd backend image_id=%s", image_id)
return None
socket_path = socket_path.strip()
if export is not None and (not isinstance(export, str) or not export):
logging.error("cfg missing/invalid export image_id=%s", image_id)
return None
cfg = {
"backend": "nbd",
"socket": socket_path,
"export": export,
"export_bitmap": export_bitmap,
}
with _CFG_CACHE_GUARD:
_CFG_CACHE[safe_id] = (float(st.st_mtime), cfg)
return cfg
class _NbdConn:
"""
Small helper to connect to NBD over a Unix socket.
Opens a fresh handle per request, per POC requirements.
"""
def __init__(
self,
socket_path: str,
export: Optional[str],
need_block_status: bool = False,
extra_meta_contexts: Optional[List[str]] = None,
):
self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self._sock.connect(socket_path)
self._nbd = nbd.NBD()
# Select export name if supported/needed.
if export and hasattr(self._nbd, "set_export_name"):
self._nbd.set_export_name(export)
# Request meta contexts before connect (for block status / dirty bitmap).
if need_block_status and hasattr(self._nbd, "add_meta_context"):
for ctx in ["base:allocation"] + (extra_meta_contexts or []):
try:
self._nbd.add_meta_context(ctx)
except Exception as e:
logging.warning("add_meta_context %r failed: %r", ctx, e)
self._connect_existing_socket(self._sock)
def _connect_existing_socket(self, sock: socket.socket) -> None:
# Requirement: attach libnbd to an existing socket / FD (no qemu-nbd).
# libnbd python API varies slightly by version, so try common options.
last_err: Optional[BaseException] = None
if hasattr(self._nbd, "connect_socket"):
try:
self._nbd.connect_socket(sock)
return
except Exception as e: # pragma: no cover (depends on binding)
last_err = e
try:
self._nbd.connect_socket(sock.fileno())
return
except Exception as e2: # pragma: no cover
last_err = e2
if hasattr(self._nbd, "connect_fd"):
try:
self._nbd.connect_fd(sock.fileno())
return
except Exception as e: # pragma: no cover
last_err = e
raise RuntimeError(
"Unable to connect libnbd using existing socket/fd; "
f"binding missing connect_socket/connect_fd or call failed: {last_err!r}"
)
def size(self) -> int:
return int(self._nbd.get_size())
def get_capabilities(self) -> Dict[str, bool]:
"""
Query NBD export capabilities (read_only, can_flush, can_zero) from the
server handshake. Returns dict with keys read_only, can_flush, can_zero.
Uses getattr for binding name variations (is_read_only/get_read_only, etc.).
"""
out: Dict[str, bool] = {
"read_only": True,
"can_flush": False,
"can_zero": False,
}
for name, keys in [
("read_only", ("is_read_only", "get_read_only")),
("can_flush", ("can_flush", "get_can_flush")),
("can_zero", ("can_zero", "get_can_zero")),
]:
for attr in keys:
if hasattr(self._nbd, attr):
try:
val = getattr(self._nbd, attr)()
out[name] = bool(val)
except Exception:
pass
break
return out
def pread(self, length: int, offset: int) -> bytes:
# Expected signature: pread(length, offset)
try:
return self._nbd.pread(length, offset)
except TypeError: # pragma: no cover (binding differences)
return self._nbd.pread(offset, length)
def pwrite(self, buf: bytes, offset: int) -> None:
# Expected signature: pwrite(buf, offset)
try:
self._nbd.pwrite(buf, offset)
except TypeError: # pragma: no cover (binding differences)
self._nbd.pwrite(offset, buf)
def pzero(self, offset: int, size: int) -> None:
"""
Zero a byte range. Uses NBD WRITE_ZEROES when available (efficient/punch hole),
otherwise falls back to writing zero bytes via pwrite.
"""
if size <= 0:
return
# Try libnbd pwrite_zeros / zero; argument order varies by binding.
for name in ("pwrite_zeros", "zero"):
if not hasattr(self._nbd, name):
continue
fn = getattr(self._nbd, name)
try:
fn(size, offset)
return
except TypeError:
try:
fn(offset, size)
return
except TypeError:
pass
# Fallback: write zeros in chunks.
remaining = size
pos = offset
zero_buf = b"\x00" * min(CHUNK_SIZE, size)
while remaining > 0:
chunk = min(len(zero_buf), remaining)
self.pwrite(zero_buf[:chunk], pos)
pos += chunk
remaining -= chunk
def flush(self) -> None:
if hasattr(self._nbd, "flush"):
self._nbd.flush()
return
if hasattr(self._nbd, "fsync"):
self._nbd.fsync()
return
raise RuntimeError("libnbd binding has no flush/fsync method")
def get_allocation_extents(self) -> List[Dict[str, Any]]:
"""
Query base:allocation and return all extents (allocated and hole/zero)
as [{"start": ..., "length": ..., "zero": bool}, ...].
Fallback when block status unavailable: one extent with zero=False.
"""
size = self.size()
if size == 0:
return []
if not hasattr(self._nbd, "block_status") and not hasattr(
self._nbd, "block_status_64"
):
return [{"start": 0, "length": size, "zero": False}]
if hasattr(self._nbd, "can_meta_context") and not self._nbd.can_meta_context(
"base:allocation"
):
return [{"start": 0, "length": size, "zero": False}]
allocation_extents: List[Dict[str, Any]] = []
chunk = min(size, 64 * 1024 * 1024)
offset = 0
def extent_cb(*args: Any, **kwargs: Any) -> int:
if len(args) < 3:
return 0
metacontext, off, entries = args[0], args[1], args[2]
if metacontext != "base:allocation" or entries is None:
return 0
current = off
try:
flat = list(entries)
for i in range(0, len(flat), 2):
if i + 1 >= len(flat):
break
length = int(flat[i])
flags = int(flat[i + 1])
zero = (flags & (_NBD_STATE_HOLE | _NBD_STATE_ZERO)) != 0
allocation_extents.append(
{"start": current, "length": length, "zero": zero}
)
current += length
except (TypeError, ValueError, IndexError):
pass
return 0
block_status_fn = getattr(
self._nbd, "block_status_64", getattr(self._nbd, "block_status", None)
)
if block_status_fn is None:
return [{"start": 0, "length": size, "zero": False}]
try:
while offset < size:
count = min(chunk, size - offset)
try:
block_status_fn(count, offset, extent_cb)
except TypeError:
block_status_fn(offset, count, extent_cb)
offset += count
except Exception as e:
logging.warning("get_allocation_extents block_status failed: %r", e)
return [{"start": 0, "length": size, "zero": False}]
if not allocation_extents:
return [{"start": 0, "length": size, "zero": False}]
return allocation_extents
def get_extents_dirty_and_zero(
self, dirty_bitmap_context: str
) -> List[Dict[str, Any]]:
"""
Query block status for base:allocation and qemu:dirty-bitmap:<context>,
merge boundaries, and return extents with dirty and zero flags.
Format: [{"start": ..., "length": ..., "dirty": bool, "zero": bool}, ...].
"""
size = self.size()
if size == 0:
return []
if not hasattr(self._nbd, "block_status") and not hasattr(
self._nbd, "block_status_64"
):
return self._fallback_dirty_zero_extents(size)
if hasattr(self._nbd, "can_meta_context"):
if not self._nbd.can_meta_context("base:allocation"):
return self._fallback_dirty_zero_extents(size)
if not self._nbd.can_meta_context(dirty_bitmap_context):
logging.warning(
"dirty bitmap context %r not negotiated", dirty_bitmap_context
)
return self._fallback_dirty_zero_extents(size)
allocation_extents: List[Tuple[int, int, bool]] = [] # (start, length, zero)
dirty_extents: List[Tuple[int, int, bool]] = [] # (start, length, dirty)
chunk = min(size, 64 * 1024 * 1024)
offset = 0
def extent_cb(*args: Any, **kwargs: Any) -> int:
if len(args) < 3:
return 0
metacontext, off, entries = args[0], args[1], args[2]
if entries is None or not hasattr(entries, "__iter__"):
return 0
current = off
try:
flat = list(entries)
for i in range(0, len(flat), 2):
if i + 1 >= len(flat):
break
length = int(flat[i])
flags = int(flat[i + 1])
if metacontext == "base:allocation":
zero = (flags & (_NBD_STATE_HOLE | _NBD_STATE_ZERO)) != 0
allocation_extents.append((current, length, zero))
elif metacontext == dirty_bitmap_context:
dirty = (flags & _NBD_STATE_DIRTY) != 0
dirty_extents.append((current, length, dirty))
current += length
except (TypeError, ValueError, IndexError):
pass
return 0
block_status_fn = getattr(
self._nbd, "block_status_64", getattr(self._nbd, "block_status", None)
)
if block_status_fn is None:
return self._fallback_dirty_zero_extents(size)
try:
while offset < size:
count = min(chunk, size - offset)
try:
block_status_fn(count, offset, extent_cb)
except TypeError:
block_status_fn(offset, count, extent_cb)
offset += count
except Exception as e:
logging.warning("get_extents_dirty_and_zero block_status failed: %r", e)
return self._fallback_dirty_zero_extents(size)
return _merge_dirty_zero_extents(allocation_extents, dirty_extents, size)
def _fallback_dirty_zero_extents(self, size: int) -> List[Dict[str, Any]]:
"""One extent: whole image, dirty=false, zero=false when bitmap unavailable."""
return [{"start": 0, "length": size, "dirty": False, "zero": False}]
def close(self) -> None:
# Best-effort; bindings may differ.
try:
if hasattr(self._nbd, "shutdown"):
self._nbd.shutdown()
except Exception:
pass
try:
if hasattr(self._nbd, "close"):
self._nbd.close()
except Exception:
pass
try:
self._sock.close()
except Exception:
pass
def __enter__(self) -> "_NbdConn":
return self
def __exit__(self, exc_type, exc, tb) -> None:
self.close()
class Handler(BaseHTTPRequestHandler):
server_version = "imageio-poc/0.1"
server_protocol = "HTTP/1.1"
# Accept both "bytes start-end/*" and "bytes start-end/size"; we only use start.
_CONTENT_RANGE_RE = re.compile(r"^bytes\s+(\d+)-(\d+)/(?:\*|\d+)$")
# Keep BaseHTTPRequestHandler from printing noisy default logs
def log_message(self, fmt: str, *args: Any) -> None:
logging.info("%s - - %s", self.address_string(), fmt % args)
def _send_imageio_headers(
self, allowed_methods: Optional[str] = None
) -> None:
# Include these headers for compatibility with the imageio contract.
if allowed_methods is None:
allowed_methods = "GET, PUT, OPTIONS"
self.send_header("Access-Control-Allow-Methods", allowed_methods)
self.send_header("Accept-Ranges", "bytes")
def _send_json(
self,
status: int,
obj: Any,
allowed_methods: Optional[str] = None,
) -> None:
body = _json_bytes(obj)
self.send_response(status)
self._send_imageio_headers(allowed_methods)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
try:
self.wfile.write(body)
except BrokenPipeError:
pass
def _send_error_json(self, status: int, message: str) -> None:
self._send_json(status, {"error": message})
def _send_range_not_satisfiable(self, size: int) -> None:
# RFC 7233: reply with Content-Range: bytes */<size>
self.send_response(HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE)
self._send_imageio_headers()
self.send_header("Content-Type", "application/json")
self.send_header("Content-Range", f"bytes */{size}")
body = _json_bytes({"error": "range not satisfiable"})
self.send_header("Content-Length", str(len(body)))
self.end_headers()
try:
self.wfile.write(body)
except BrokenPipeError:
pass
def _parse_single_range(self, range_header: str, size: int) -> Tuple[int, int]:
"""
Parse a single HTTP byte range (RFC 7233) and return (start, end_inclusive).
Supported:
- Range: bytes=START-END
- Range: bytes=START-
- Range: bytes=-SUFFIX
Raises ValueError for invalid headers. Caller handles 416 vs 400.
"""
if size < 0:
raise ValueError("invalid size")
if not range_header:
raise ValueError("empty Range")
if "," in range_header:
raise ValueError("multiple ranges not supported")
prefix = "bytes="
if not range_header.startswith(prefix):
raise ValueError("only bytes ranges supported")
spec = range_header[len(prefix) :].strip()
if "-" not in spec:
raise ValueError("invalid bytes range")
left, right = spec.split("-", 1)
left = left.strip()
right = right.strip()
if left == "":
# Suffix range: last N bytes.
if right == "":
raise ValueError("invalid suffix range")
try:
suffix_len = int(right, 10)
except ValueError as e:
raise ValueError("invalid suffix length") from e
if suffix_len <= 0:
raise ValueError("invalid suffix length")
if size == 0:
# Nothing to serve
raise ValueError("unsatisfiable")
if suffix_len >= size:
return 0, size - 1
return size - suffix_len, size - 1
# START is present
try:
start = int(left, 10)
except ValueError as e:
raise ValueError("invalid range start") from e
if start < 0:
raise ValueError("invalid range start")
if start >= size:
raise ValueError("unsatisfiable")
if right == "":
# START-
return start, size - 1
try:
end = int(right, 10)
except ValueError as e:
raise ValueError("invalid range end") from e
if end < start:
raise ValueError("unsatisfiable")
if end >= size:
end = size - 1
return start, end
def _parse_route(self) -> Tuple[Optional[str], Optional[str]]:
# Returns (image_id, tail) where tail is:
# None => /images/{id}
# "extents" => /images/{id}/extents
# "flush" => /images/{id}/flush
path = self.path.split("?", 1)[0]
parts = [p for p in path.split("/") if p]
if len(parts) < 2 or parts[0] != "images":
return None, None
image_id = parts[1]
tail = parts[2] if len(parts) >= 3 else None
if len(parts) > 3:
return None, None
return image_id, tail
def _parse_content_range(self, header: str) -> Tuple[int, int]:
"""
Parse Content-Range header "bytes start-end/*" or "bytes start-end/size"
and return (start, end_inclusive). Raises ValueError on invalid input.
"""
if not header:
raise ValueError("empty Content-Range")
m = self._CONTENT_RANGE_RE.match(header.strip())
if not m:
raise ValueError("invalid Content-Range")
start_s, end_s = m.groups()
start = int(start_s, 10)
end = int(end_s, 10)
if start < 0 or end < start:
raise ValueError("invalid Content-Range range")
return start, end
def _parse_query(self) -> Dict[str, List[str]]:
"""Parse query string from self.path into a dict of name -> list of values."""
if "?" not in self.path:
return {}
query = self.path.split("?", 1)[1]
return parse_qs(query, keep_blank_values=True)
def _image_cfg(self, image_id: str) -> Optional[Dict[str, Any]]:
return _load_image_cfg(image_id)
def _is_file_backend(self, cfg: Dict[str, Any]) -> bool:
return cfg.get("backend") == "file"
def do_OPTIONS(self) -> None:
image_id, tail = self._parse_route()
if image_id is None or tail is not None:
self._send_error_json(HTTPStatus.NOT_FOUND, "not found")
return
cfg = self._image_cfg(image_id)
if cfg is None:
self._send_error_json(HTTPStatus.NOT_FOUND, "unknown image_id")
return
if self._is_file_backend(cfg):
# File backend: full PUT only, no range writes; GET with ranges allowed; flush supported.
allowed_methods = "GET, PUT, POST, OPTIONS"
features = ["flush"]
max_writers = MAX_PARALLEL_WRITES
response = {
"unix_socket": None,
"features": features,
"max_readers": MAX_PARALLEL_READS,
"max_writers": max_writers,
}
self._send_json(HTTPStatus.OK, response, allowed_methods=allowed_methods)
return
# Query NBD backend for capabilities (like nbdinfo); fall back to config.
read_only = True
can_flush = False
can_zero = False
try:
with _NbdConn(
cfg["socket"],
cfg.get("export"),
) as conn:
caps = conn.get_capabilities()
read_only = caps["read_only"]
can_flush = caps["can_flush"]
can_zero = caps["can_zero"]
except Exception as e:
logging.warning("OPTIONS: could not query NBD capabilities: %r", e)
read_only = bool(cfg.get("read_only"))
if not read_only:
can_flush = True
can_zero = True
# Report options for this image from NBD: read-only => no PUT; only advertise supported features.
if read_only:
allowed_methods = "GET, OPTIONS"
features = ["extents"]
max_writers = 0
else:
# PATCH: JSON (zero/flush) and Range+binary (write byte range).
allowed_methods = "GET, PUT, PATCH, OPTIONS"
features = ["extents"]
if can_zero:
features.append("zero")
if can_flush:
features.append("flush")
max_writers = MAX_PARALLEL_WRITES if not read_only else 0
response = {
"unix_socket": None, # Not used in this implementation
"features": features,
"max_readers": MAX_PARALLEL_READS,
"max_writers": max_writers,
}
self._send_json(HTTPStatus.OK, response, allowed_methods=allowed_methods)
def do_GET(self) -> None:
image_id, tail = self._parse_route()
if image_id is None:
self._send_error_json(HTTPStatus.NOT_FOUND, "not found")
return
cfg = self._image_cfg(image_id)
if cfg is None:
self._send_error_json(HTTPStatus.NOT_FOUND, "unknown image_id")
return
if tail == "extents":
if self._is_file_backend(cfg):
self._send_error_json(
HTTPStatus.BAD_REQUEST, "extents not supported for file backend"
)
return
query = self._parse_query()
context = (query.get("context") or [None])[0]
self._handle_get_extents(image_id, cfg, context=context)
return
if tail is not None:
self._send_error_json(HTTPStatus.NOT_FOUND, "not found")
return
range_header = self.headers.get("Range")
self._handle_get_image(image_id, cfg, range_header)
def do_PUT(self) -> None:
image_id, tail = self._parse_route()
if image_id is None or tail is not None:
self._send_error_json(HTTPStatus.NOT_FOUND, "not found")
return
cfg = self._image_cfg(image_id)
if cfg is None:
self._send_error_json(HTTPStatus.NOT_FOUND, "unknown image_id")
return
# For PUT we only support Content-Range (for NBD backend); Range is rejected.
if self.headers.get("Range") is not None:
self._send_error_json(
HTTPStatus.BAD_REQUEST,
"Range header not supported for PUT; use Content-Range or PATCH",
)
return
content_length_hdr = self.headers.get("Content-Length")
if content_length_hdr is None:
self._send_error_json(HTTPStatus.BAD_REQUEST, "Content-Length required")
return
try:
content_length = int(content_length_hdr)
except ValueError:
self._send_error_json(HTTPStatus.BAD_REQUEST, "Invalid Content-Length")
return
if content_length < 0:
self._send_error_json(HTTPStatus.BAD_REQUEST, "Invalid Content-Length")
return
# Optional flush=y|n query parameter.
query = self._parse_query()
flush_param = (query.get("flush") or ["n"])[0].lower()
flush = flush_param in ("y", "yes", "true", "1")
content_range_hdr = self.headers.get("Content-Range")
if content_range_hdr is not None:
if self._is_file_backend(cfg):
self._send_error_json(
HTTPStatus.BAD_REQUEST,
"Content-Range PUT not supported for file backend; use full PUT",
)
return
self._handle_put_range(image_id, cfg, content_range_hdr, content_length, flush)
return
# No Content-Range: full image PUT.
self._handle_put_image(image_id, cfg, content_length, flush)
def do_POST(self) -> None:
image_id, tail = self._parse_route()
if image_id is None:
self._send_error_json(HTTPStatus.NOT_FOUND, "not found")
return
cfg = self._image_cfg(image_id)
if cfg is None:
self._send_error_json(HTTPStatus.NOT_FOUND, "unknown image_id")
return
if tail == "flush":
self._handle_post_flush(image_id, cfg)
return
self._send_error_json(HTTPStatus.NOT_FOUND, "not found")
def do_PATCH(self) -> None:
image_id, tail = self._parse_route()
if image_id is None or tail is not None:
self._send_error_json(HTTPStatus.NOT_FOUND, "not found")
return
cfg = self._image_cfg(image_id)
if cfg is None:
self._send_error_json(HTTPStatus.NOT_FOUND, "unknown image_id")
return
if self._is_file_backend(cfg):
self._send_error_json(
HTTPStatus.BAD_REQUEST,
"range writes and PATCH not supported for file backend; use PUT for full upload",
)
return
content_type = self.headers.get("Content-Type", "").split(";")[0].strip().lower()
range_header = self.headers.get("Range")
# Binary PATCH: Range + body writes bytes at that range (e.g. curl -X PATCH -H "Range: bytes=0-1048576" --data-binary @chunk.bin).
if range_header is not None and content_type != "application/json":
content_length_hdr = self.headers.get("Content-Length")
if content_length_hdr is None:
self._send_error_json(HTTPStatus.BAD_REQUEST, "Content-Length required")
return
try:
content_length = int(content_length_hdr)
except ValueError:
self._send_error_json(HTTPStatus.BAD_REQUEST, "Invalid Content-Length")
return
if content_length <= 0:
self._send_error_json(HTTPStatus.BAD_REQUEST, "Content-Length must be positive")
return
self._handle_patch_range(image_id, cfg, range_header, content_length)
return
# JSON PATCH: application/json with op (zero, flush).
if content_type != "application/json":
self._send_error_json(
HTTPStatus.UNSUPPORTED_MEDIA_TYPE,
"PATCH requires Content-Type: application/json (for zero/flush) or Range with binary body",
)
return
content_length_hdr = self.headers.get("Content-Length")
if content_length_hdr is None:
self._send_error_json(HTTPStatus.BAD_REQUEST, "Content-Length required")
return
try:
content_length = int(content_length_hdr)
except ValueError:
self._send_error_json(HTTPStatus.BAD_REQUEST, "Invalid Content-Length")
return
if content_length <= 0 or content_length > 64 * 1024:
self._send_error_json(HTTPStatus.BAD_REQUEST, "Invalid Content-Length")
return
body = self.rfile.read(content_length)
if len(body) != content_length:
self._send_error_json(HTTPStatus.BAD_REQUEST, "request body truncated")
return
try:
payload = json.loads(body.decode("utf-8"))
except (json.JSONDecodeError, UnicodeDecodeError) as e:
self._send_error_json(HTTPStatus.BAD_REQUEST, f"invalid JSON: {e}")
return
if not isinstance(payload, dict):
self._send_error_json(HTTPStatus.BAD_REQUEST, "body must be a JSON object")
return
op = payload.get("op")
if op == "flush":
# Flush entire image; offset and size are ignored (per spec).
self._handle_post_flush(image_id, cfg)
return
if op != "zero":
self._send_error_json(
HTTPStatus.BAD_REQUEST,
"unsupported op; only \"zero\" and \"flush\" are supported",
)
return
try:
size = int(payload.get("size"))
except (TypeError, ValueError):
self._send_error_json(HTTPStatus.BAD_REQUEST, "missing or invalid \"size\"")
return
if size <= 0:
self._send_error_json(HTTPStatus.BAD_REQUEST, "\"size\" must be positive")
return
offset = payload.get("offset")
if offset is None:
offset = 0
else:
try:
offset = int(offset)
except (TypeError, ValueError):
self._send_error_json(HTTPStatus.BAD_REQUEST, "invalid \"offset\"")
return
if offset < 0:
self._send_error_json(HTTPStatus.BAD_REQUEST, "\"offset\" must be non-negative")
return
flush = bool(payload.get("flush", False))
self._handle_patch_zero(image_id, cfg, offset=offset, size=size, flush=flush)
def _handle_get_image(
self, image_id: str, cfg: Dict[str, Any], range_header: Optional[str]
) -> None:
if not _READ_SEM.acquire(blocking=False):
self._send_error_json(HTTPStatus.SERVICE_UNAVAILABLE, "too many parallel reads")
return
start = _now_s()
bytes_sent = 0
try:
logging.info("GET start image_id=%s range=%s", image_id, range_header or "-")
if self._is_file_backend(cfg):
file_path = cfg["file"]
try:
size = os.path.getsize(file_path)
except OSError as e:
logging.error("GET file size error image_id=%s path=%s err=%r", image_id, file_path, e)
self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "failed to access file")
return
start_off = 0
end_off_incl = size - 1 if size > 0 else -1
status = HTTPStatus.OK
content_length = size
if range_header is not None:
try:
start_off, end_off_incl = self._parse_single_range(range_header, size)
except ValueError as e:
if str(e) == "unsatisfiable":
self._send_range_not_satisfiable(size)
return
if "unsatisfiable" in str(e):
self._send_range_not_satisfiable(size)
return
self._send_error_json(HTTPStatus.BAD_REQUEST, "invalid Range header")
return
status = HTTPStatus.PARTIAL_CONTENT
content_length = (end_off_incl - start_off) + 1
self.send_response(status)
self._send_imageio_headers()
self.send_header("Content-Type", "application/octet-stream")
self.send_header("Content-Length", str(content_length))
if status == HTTPStatus.PARTIAL_CONTENT:
self.send_header("Content-Range", f"bytes {start_off}-{end_off_incl}/{size}")
self.end_headers()
offset = start_off
end_excl = end_off_incl + 1
with open(file_path, "rb") as f:
f.seek(offset)
while offset < end_excl:
to_read = min(CHUNK_SIZE, end_excl - offset)
data = f.read(to_read)
if not data:
break
try:
self.wfile.write(data)
except BrokenPipeError:
logging.info("GET client disconnected image_id=%s at=%d", image_id, offset)
break
offset += len(data)
bytes_sent += len(data)
else:
with _NbdConn(cfg["socket"], cfg.get("export")) as conn:
size = conn.size()
start_off = 0
end_off_incl = size - 1 if size > 0 else -1
status = HTTPStatus.OK
content_length = size
if range_header is not None:
try:
start_off, end_off_incl = self._parse_single_range(range_header, size)
except ValueError as e:
if str(e) == "unsatisfiable":
self._send_range_not_satisfiable(size)
return
if "unsatisfiable" in str(e):
self._send_range_not_satisfiable(size)
return
self._send_error_json(HTTPStatus.BAD_REQUEST, "invalid Range header")
return
status = HTTPStatus.PARTIAL_CONTENT
content_length = (end_off_incl - start_off) + 1
self.send_response(status)
self._send_imageio_headers()
self.send_header("Content-Type", "application/octet-stream")
self.send_header("Content-Length", str(content_length))
if status == HTTPStatus.PARTIAL_CONTENT:
self.send_header("Content-Range", f"bytes {start_off}-{end_off_incl}/{size}")
self.end_headers()
offset = start_off
end_excl = end_off_incl + 1
while offset < end_excl:
to_read = min(CHUNK_SIZE, end_excl - offset)
data = conn.pread(to_read, offset)
if not data:
raise RuntimeError("backend returned empty read")
try:
self.wfile.write(data)
except BrokenPipeError:
logging.info("GET client disconnected image_id=%s at=%d", image_id, offset)
break
offset += len(data)
bytes_sent += len(data)
except Exception as e:
# If headers already sent, we can't return JSON reliably; just log.
logging.error("GET error image_id=%s err=%r", image_id, e)
try:
if not self.wfile.closed:
self.close_connection = True
except Exception:
pass
finally:
_READ_SEM.release()
dur = _now_s() - start
logging.info(
"GET end image_id=%s bytes=%d duration_s=%.3f", image_id, bytes_sent, dur
)
def _handle_put_image(
self, image_id: str, cfg: Dict[str, Any], content_length: int, flush: bool
) -> None:
lock = _get_image_lock(image_id)
# Block until we can write this image
lock.acquire()
if not _WRITE_SEM.acquire(blocking=False):
lock.release()
self._send_error_json(HTTPStatus.SERVICE_UNAVAILABLE, "too many parallel writes")
return
start = _now_s()
bytes_written = 0
try:
logging.info("PUT start image_id=%s content_length=%d", image_id, content_length)
if self._is_file_backend(cfg):
file_path = cfg["file"]
remaining = content_length
with open(file_path, "wb") as f:
while remaining > 0:
chunk = self.rfile.read(min(CHUNK_SIZE, remaining))
if not chunk:
self._send_error_json(
HTTPStatus.BAD_REQUEST,
f"request body ended early at {bytes_written} bytes",
)
return
f.write(chunk)
bytes_written += len(chunk)
remaining -= len(chunk)
if flush:
f.flush()
os.fsync(f.fileno())
self._send_json(
HTTPStatus.OK,
{"ok": True, "bytes_written": bytes_written, "flushed": flush},
)
else:
with _NbdConn(cfg["socket"], cfg.get("export")) as conn:
offset = 0
remaining = content_length
while remaining > 0:
chunk = self.rfile.read(min(CHUNK_SIZE, remaining))
if not chunk:
self._send_error_json(
HTTPStatus.BAD_REQUEST,
f"request body ended early at {offset} bytes",
)
return
conn.pwrite(chunk, offset)
offset += len(chunk)
remaining -= len(chunk)
bytes_written += len(chunk)
if flush:
conn.flush()
self._send_json(
HTTPStatus.OK,
{"ok": True, "bytes_written": bytes_written, "flushed": flush},
)
except Exception as e:
logging.error("PUT error image_id=%s err=%r", image_id, e)
self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error")
finally:
_WRITE_SEM.release()
lock.release()
dur = _now_s() - start
logging.info(
"PUT end image_id=%s bytes=%d duration_s=%.3f", image_id, bytes_written, dur
)
def _write_range_nbd(
self,
image_id: str,
cfg: Dict[str, Any],
start_off: int,
content_length: int,
) -> Tuple[int, bool]:
"""
Low-level helper: write request body to NBD backend starting at start_off.
The length is taken from Content-Length. Returns (bytes_written, ok).
If ok is False, an error response was already sent.
"""
bytes_written = 0
with _NbdConn(cfg["socket"], cfg.get("export")) as conn:
image_size = conn.size()
if start_off >= image_size:
self._send_range_not_satisfiable(image_size)
return 0, False
# Clamp to image size: we do not allow writes beyond end of image.
max_len = image_size - start_off
if content_length > max_len:
self._send_range_not_satisfiable(image_size)
return 0, False
offset = start_off
remaining = content_length
while remaining > 0:
chunk = self.rfile.read(min(CHUNK_SIZE, remaining))
if not chunk:
self._send_error_json(
HTTPStatus.BAD_REQUEST,
f"request body ended early at {bytes_written} bytes",
)
return bytes_written, False
conn.pwrite(chunk, offset)
n = len(chunk)
offset += n
remaining -= n
bytes_written += n
return bytes_written, True
def _handle_get_extents(
self, image_id: str, cfg: Dict[str, Any], context: Optional[str] = None
) -> None:
# context=dirty: return extents with dirty and zero from base:allocation + bitmap.
# Otherwise: return zero/hole extents from base:allocation only.
lock = _get_image_lock(image_id)
if not lock.acquire(blocking=False):
self._send_error_json(HTTPStatus.CONFLICT, "image busy")
return
start = _now_s()
try:
logging.info("EXTENTS start image_id=%s context=%s", image_id, context)
if context == "dirty":
export_bitmap = cfg.get("export_bitmap")
if not export_bitmap:
# Fallback: same structure as zero extents but dirty=true for all ranges
with _NbdConn(
cfg["socket"],
cfg.get("export"),
need_block_status=True,
) as conn:
allocation = conn.get_allocation_extents()
extents = [
{"start": e["start"], "length": e["length"], "dirty": True, "zero": e["zero"]}
for e in allocation
]
else:
dirty_bitmap_ctx = f"qemu:dirty-bitmap:{export_bitmap}"
extra_contexts: List[str] = [dirty_bitmap_ctx]
with _NbdConn(
cfg["socket"],
cfg.get("export"),
need_block_status=True,
extra_meta_contexts=extra_contexts,
) as conn:
extents = conn.get_extents_dirty_and_zero(dirty_bitmap_ctx)
# When bitmap not actually available, same fallback: zero structure + dirty=true
if _is_fallback_dirty_response(extents):
with _NbdConn(
cfg["socket"],
cfg.get("export"),
need_block_status=True,
) as conn:
allocation = conn.get_allocation_extents()
extents = [
{
"start": e["start"],
"length": e["length"],
"dirty": True,
"zero": e["zero"],
}
for e in allocation
]
else:
with _NbdConn(
cfg["socket"],
cfg.get("export"),
need_block_status=True,
) as conn:
extents = conn.get_allocation_extents()
self._send_json(HTTPStatus.OK, extents)
except Exception as e:
logging.error("EXTENTS error image_id=%s err=%r", image_id, e)
self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error")
finally:
lock.release()
dur = _now_s() - start
logging.info("EXTENTS end image_id=%s duration_s=%.3f", image_id, dur)
def _handle_post_flush(self, image_id: str, cfg: Dict[str, Any]) -> None:
lock = _get_image_lock(image_id)
if not lock.acquire(blocking=False):
self._send_error_json(HTTPStatus.CONFLICT, "image busy")
return
start = _now_s()
try:
logging.info("FLUSH start image_id=%s", image_id)
if self._is_file_backend(cfg):
file_path = cfg["file"]
with open(file_path, "rb") as f:
f.flush()
os.fsync(f.fileno())
self._send_json(HTTPStatus.OK, {"ok": True})
else:
with _NbdConn(cfg["socket"], cfg.get("export")) as conn:
conn.flush()
self._send_json(HTTPStatus.OK, {"ok": True})
except Exception as e:
logging.error("FLUSH error image_id=%s err=%r", image_id, e)
self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error")
finally:
lock.release()
dur = _now_s() - start
logging.info("FLUSH end image_id=%s duration_s=%.3f", image_id, dur)
def _handle_patch_zero(
self,
image_id: str,
cfg: Dict[str, Any],
offset: int,
size: int,
flush: bool,
) -> None:
lock = _get_image_lock(image_id)
if not lock.acquire(blocking=False):
self._send_error_json(HTTPStatus.CONFLICT, "image busy")
return
if not _WRITE_SEM.acquire(blocking=False):
lock.release()
self._send_error_json(HTTPStatus.SERVICE_UNAVAILABLE, "too many parallel writes")
return
start = _now_s()
try:
logging.info(
"PATCH zero start image_id=%s offset=%d size=%d flush=%s",
image_id, offset, size, flush,
)
with _NbdConn(cfg["socket"], cfg.get("export")) as conn:
image_size = conn.size()
if offset >= image_size:
self._send_error_json(
HTTPStatus.BAD_REQUEST,
"offset must be less than image size",
)
return
zero_size = min(size, image_size - offset)
conn.pzero(offset, zero_size)
if flush:
conn.flush()
self._send_json(HTTPStatus.OK, {"ok": True})
except Exception as e:
logging.error("PATCH zero error image_id=%s err=%r", image_id, e)
self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error")
finally:
_WRITE_SEM.release()
lock.release()
dur = _now_s() - start
logging.info("PATCH zero end image_id=%s duration_s=%.3f", image_id, dur)
def _handle_patch_range(
self,
image_id: str,
cfg: Dict[str, Any],
range_header: str,
content_length: int,
) -> None:
"""Write request body to the image at the byte range from Range header."""
lock = _get_image_lock(image_id)
if not lock.acquire(blocking=False):
self._send_error_json(HTTPStatus.CONFLICT, "image busy")
return
if not _WRITE_SEM.acquire(blocking=False):
lock.release()
self._send_error_json(HTTPStatus.SERVICE_UNAVAILABLE, "too many parallel writes")
return
start = _now_s()
bytes_written = 0
try:
logging.info(
"PATCH range start image_id=%s range=%s content_length=%d",
image_id, range_header, content_length,
)
with _NbdConn(cfg["socket"], cfg.get("export")) as conn:
image_size = conn.size()
try:
start_off, end_inclusive = self._parse_single_range(
range_header, image_size
)
except ValueError as e:
if "unsatisfiable" in str(e).lower():
self._send_range_not_satisfiable(image_size)
else:
self._send_error_json(
HTTPStatus.BAD_REQUEST, f"invalid Range header: {e}"
)
return
expected_len = end_inclusive - start_off + 1
if content_length != expected_len:
self._send_error_json(
HTTPStatus.BAD_REQUEST,
f"Content-Length ({content_length}) must equal range length ({expected_len})",
)
return
bytes_written, ok = self._write_range_nbd(image_id, cfg, start_off, content_length)
if not ok:
return
self._send_json(HTTPStatus.OK, {"ok": True, "bytes_written": bytes_written})
except Exception as e:
logging.error("PATCH range error image_id=%s err=%r", image_id, e)
self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error")
finally:
_WRITE_SEM.release()
lock.release()
dur = _now_s() - start
logging.info(
"PATCH range end image_id=%s bytes=%d duration_s=%.3f",
image_id, bytes_written, dur,
)
def _handle_put_range(
self,
image_id: str,
cfg: Dict[str, Any],
content_range: str,
content_length: int,
flush: bool,
) -> None:
"""Handle PUT with Content-Range: bytes start-end/* for NBD backend."""
lock = _get_image_lock(image_id)
# Block until we can write this image.
lock.acquire()
if not _WRITE_SEM.acquire(blocking=False):
lock.release()
self._send_error_json(HTTPStatus.SERVICE_UNAVAILABLE, "too many parallel writes")
return
start = _now_s()
bytes_written = 0
try:
logging.info(
"PUT range start image_id=%s Content-Range=%s content_length=%d flush=%s",
image_id,
content_range,
content_length,
flush,
)
try:
start_off, end_inclusive = self._parse_content_range(content_range)
except ValueError as e:
self._send_error_json(
HTTPStatus.BAD_REQUEST, f"invalid Content-Range header: {e}"
)
return
# Per contract we only use the start byte from Content-Range;
# length comes from Content-Length.
bytes_written, ok = self._write_range_nbd(image_id, cfg, start_off, content_length)
if not ok:
return
if flush:
with _NbdConn(cfg["socket"], cfg.get("export")) as conn:
conn.flush()
self._send_json(
HTTPStatus.OK,
{"ok": True, "bytes_written": bytes_written, "flushed": flush},
)
except Exception as e:
logging.error("PUT range error image_id=%s err=%r", image_id, e)
self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error")
finally:
_WRITE_SEM.release()
lock.release()
dur = _now_s() - start
logging.info(
"PUT range end image_id=%s bytes=%d duration_s=%.3f flush=%s",
image_id,
bytes_written,
dur,
flush,
)
def main() -> None:
parser = argparse.ArgumentParser(description="POC imageio-like HTTP server backed by NBD")
parser.add_argument("--listen", default="127.0.0.1", help="Address to bind")
parser.add_argument("--port", type=int, default=54323, help="Port to listen on")
args = parser.parse_args()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
addr = (args.listen, args.port)
httpd = ThreadingHTTPServer(addr, Handler)
logging.info("listening on http://%s:%d", args.listen, args.port)
logging.info("image configs are read from %s/<transferId>", _CFG_DIR)
httpd.serve_forever()
if __name__ == "__main__":
main()