diff --git a/systemvm/debian/opt/cloud/bin/image_server.py b/systemvm/debian/opt/cloud/bin/image_server.py index 7f3beb328db..a49b2ec605a 100644 --- a/systemvm/debian/opt/cloud/bin/image_server.py +++ b/systemvm/debian/opt/cloud/bin/image_server.py @@ -43,9 +43,12 @@ Example curl commands - PUT full image (Content-Length must equal export size exactly): curl -v -T demo.img http://127.0.0.1:54323/images/demo -- GET extents (POC-level; may return a single allocated extent): +- GET extents (zero/hole extents from NBD base:allocation): curl -s http://127.0.0.1:54323/images/demo/extents | jq . +- GET extents with dirty and zero (requires export_bitmap in config): + curl -s "http://127.0.0.1:54323/images/demo/extents?context=dirty" | jq . + - POST flush: curl -s -X POST http://127.0.0.1:54323/images/demo/flush | jq . """ @@ -61,11 +64,18 @@ import threading import time from http import HTTPStatus from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer -from typing import Any, Dict, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple +from urllib.parse import parse_qs import nbd CHUNK_SIZE = 256 * 1024 # 256 KiB +# NBD base:allocation flags (hole=1, zero=2; hole|zero=3) +_NBD_STATE_HOLE = 1 +_NBD_STATE_ZERO = 2 +# NBD qemu:dirty-bitmap flags (dirty=1) +_NBD_STATE_DIRTY = 1 + # Concurrency limits across ALL images. MAX_PARALLEL_READS = 8 MAX_PARALLEL_WRITES = 1 @@ -80,7 +90,7 @@ _IMAGE_LOCKS_GUARD = threading.Lock() # Dynamic image_id(transferId) -> NBD export mapping: # CloudStack writes a JSON file at /tmp/imagetransfer/ with: -# {"host": "...", "port": 10809, "export": "vda"} +# {"host": "...", "port": 10809, "export": "vda", "export_bitmap":"bitmap1"} # # This server reads that file on-demand. _CFG_DIR = "/tmp/imagetransfer" @@ -92,6 +102,57 @@ def _json_bytes(obj: Any) -> bytes: return json.dumps(obj, separators=(",", ":"), ensure_ascii=False).encode("utf-8") +def _merge_dirty_zero_extents( + allocation_extents: List[Tuple[int, int, bool]], + dirty_extents: List[Tuple[int, int, bool]], + size: int, +) -> List[Dict[str, Any]]: + """ + Merge allocation (start, length, zero) and dirty (start, length, dirty) extents + into a single list of {start, length, dirty, zero} with unified boundaries. + """ + boundaries: set[int] = {0, size} + for start, length, _ in allocation_extents: + boundaries.add(start) + boundaries.add(start + length) + for start, length, _ in dirty_extents: + boundaries.add(start) + boundaries.add(start + length) + sorted_boundaries = sorted(boundaries) + + def lookup( + extents: List[Tuple[int, int, bool]], offset: int, default: bool + ) -> bool: + for start, length, flag in extents: + if start <= offset < start + length: + return flag + return default + + result: List[Dict[str, Any]] = [] + for i in range(len(sorted_boundaries) - 1): + a, b = sorted_boundaries[i], sorted_boundaries[i + 1] + if a >= b: + continue + result.append( + { + "start": a, + "length": b - a, + "dirty": lookup(dirty_extents, a, False), + "zero": lookup(allocation_extents, a, False), + } + ) + return result + + +def _is_fallback_dirty_response(extents: List[Dict[str, Any]]) -> bool: + """True if extents is the single-extent fallback (dirty=false, zero=false).""" + return ( + len(extents) == 1 + and extents[0].get("dirty") is False + and extents[0].get("zero") is False + ) + + def _get_image_lock(image_id: str) -> threading.Lock: with _IMAGE_LOCKS_GUARD: lock = _IMAGE_LOCKS.get(image_id) @@ -132,7 +193,7 @@ def _load_image_cfg(image_id: str) -> Optional[Dict[str, Any]]: except FileNotFoundError: return None except OSError as e: - logging.warning("cfg stat failed image_id=%s err=%r", image_id, e) + logging.error("cfg stat failed image_id=%s err=%r", image_id, e) return None with _CFG_CACHE_GUARD: @@ -147,38 +208,39 @@ def _load_image_cfg(image_id: str) -> Optional[Dict[str, Any]]: with open(cfg_path, "rb") as f: raw = f.read(4096) except OSError as e: - logging.warning("cfg read failed image_id=%s err=%r", image_id, e) + logging.error("cfg read failed image_id=%s err=%r", image_id, e) return None try: obj = json.loads(raw.decode("utf-8")) except Exception as e: - logging.warning("cfg parse failed image_id=%s err=%r", image_id, e) + logging.error("cfg parse failed image_id=%s err=%r", image_id, e) return None if not isinstance(obj, dict): - logging.warning("cfg invalid type image_id=%s type=%s", image_id, type(obj).__name__) + logging.error("cfg invalid type image_id=%s type=%s", image_id, type(obj).__name__) return None host = obj.get("host") port = obj.get("port") export = obj.get("export") + export_bitmap = obj.get("export_bitmap") if not isinstance(host, str) or not host: - logging.warning("cfg missing/invalid host image_id=%s", image_id) + logging.error("cfg missing/invalid host image_id=%s", image_id) return None try: port_i = int(port) except Exception: - logging.warning("cfg missing/invalid port image_id=%s", image_id) + logging.error("cfg missing/invalid port image_id=%s", image_id) return None if port_i <= 0 or port_i > 65535: - logging.warning("cfg out-of-range port image_id=%s port=%r", image_id, port) + logging.error("cfg out-of-range port image_id=%s port=%r", image_id, port) return None if export is not None and (not isinstance(export, str) or not export): - logging.warning("cfg missing/invalid export image_id=%s", image_id) + logging.error("cfg missing/invalid export image_id=%s", image_id) return None - cfg: Dict[str, Any] = {"host": host, "port": port_i, "export": export} + cfg: Dict[str, Any] = {"host": host, "port": port_i, "export": export, "export_bitmap": export_bitmap} with _CFG_CACHE_GUARD: _CFG_CACHE[safe_id] = (float(st.st_mtime), cfg) @@ -191,7 +253,14 @@ class _NbdConn: Opens a fresh handle per request, per POC requirements. """ - def __init__(self, host: str, port: int, export: Optional[str]): + def __init__( + self, + host: str, + port: int, + export: Optional[str], + need_block_status: bool = False, + extra_meta_contexts: Optional[List[str]] = None, + ): self._sock = socket.create_connection((host, port)) self._nbd = nbd.NBD() @@ -199,6 +268,14 @@ class _NbdConn: if export and hasattr(self._nbd, "set_export_name"): self._nbd.set_export_name(export) + # Request meta contexts before connect (for block status / dirty bitmap). + if need_block_status and hasattr(self._nbd, "add_meta_context"): + for ctx in ["base:allocation"] + (extra_meta_contexts or []): + try: + self._nbd.add_meta_context(ctx) + except Exception as e: + logging.warning("add_meta_context %r failed: %r", ctx, e) + self._connect_existing_socket(self._sock) def _connect_existing_socket(self, sock: socket.socket) -> None: @@ -230,6 +307,32 @@ class _NbdConn: def size(self) -> int: return int(self._nbd.get_size()) + def get_capabilities(self) -> Dict[str, bool]: + """ + Query NBD export capabilities (read_only, can_flush, can_zero) from the + server handshake. Returns dict with keys read_only, can_flush, can_zero. + Uses getattr for binding name variations (is_read_only/get_read_only, etc.). + """ + out: Dict[str, bool] = { + "read_only": True, + "can_flush": False, + "can_zero": False, + } + for name, keys in [ + ("read_only", ("is_read_only", "get_read_only")), + ("can_flush", ("can_flush", "get_can_flush")), + ("can_zero", ("can_zero", "get_can_zero")), + ]: + for attr in keys: + if hasattr(self._nbd, attr): + try: + val = getattr(self._nbd, attr)() + out[name] = bool(val) + except Exception: + pass + break + return out + def pread(self, length: int, offset: int) -> bytes: # Expected signature: pread(length, offset) try: @@ -253,6 +356,235 @@ class _NbdConn: return raise RuntimeError("libnbd binding has no flush/fsync method") + def get_zero_extents(self) -> List[Dict[str, Any]]: + """ + Query NBD block status (base:allocation) and return extents that are + hole or zero in imageio format: [{"start": ..., "length": ..., "zero": true}, ...]. + Returns [] if block status is not supported; fallback to one full-image + zero extent when we have size but block status fails. + """ + size = self.size() + if size == 0: + return [] + + if not hasattr(self._nbd, "block_status") and not hasattr( + self._nbd, "block_status_64" + ): + logging.error("get_zero_extents: no block_status/block_status_64") + return self._fallback_zero_extent(size) + if hasattr(self._nbd, "can_meta_context") and not self._nbd.can_meta_context( + "base:allocation" + ): + logging.error( + "get_zero_extents: server did not negotiate base:allocation" + ) + return self._fallback_zero_extent(size) + + zero_extents: List[Dict[str, Any]] = [] + chunk = min(size, 64 * 1024 * 1024) # 64 MiB + offset = 0 + + def extent_cb(*args: Any, **kwargs: Any) -> int: + # Binding typically passes (metacontext, offset, entries[, nr_entries][, error]). + metacontext = None + off = 0 + entries = None + if len(args) >= 3: + metacontext, off, entries = args[0], args[1], args[2] + else: + for a in args: + if isinstance(a, str): + metacontext = a + elif isinstance(a, int): + off = a + elif a is not None and hasattr(a, "__iter__"): + entries = a + if metacontext != "base:allocation" or entries is None: + return 0 + current = off + try: + flat = list(entries) + for i in range(0, len(flat), 2): + if i + 1 >= len(flat): + break + length = int(flat[i]) + flags = int(flat[i + 1]) + if (flags & (_NBD_STATE_HOLE | _NBD_STATE_ZERO)) != 0: + zero_extents.append( + {"start": current, "length": length, "zero": True} + ) + current += length + except (TypeError, ValueError, IndexError): + pass + return 0 + + block_status_fn = getattr( + self._nbd, "block_status_64", getattr(self._nbd, "block_status", None) + ) + if block_status_fn is None: + return self._fallback_zero_extent(size) + + try: + while offset < size: + count = min(chunk, size - offset) + # Try (count, offset, callback) then (offset, count, callback) + try: + block_status_fn(count, offset, extent_cb) + except TypeError: + block_status_fn(offset, count, extent_cb) + offset += count + except Exception as e: + logging.error("get_zero_extents block_status failed: %r", e) + return self._fallback_zero_extent(size) + if not zero_extents: + return self._fallback_zero_extent(size) + return zero_extents + + def _fallback_zero_extent(self, size: int) -> List[Dict[str, Any]]: + """Return one zero extent covering the whole image when block status unavailable.""" + return [{"start": 0, "length": size, "zero": True}] + + def get_allocation_extents(self) -> List[Dict[str, Any]]: + """ + Query base:allocation and return all extents (allocated and hole/zero) + as [{"start": ..., "length": ..., "zero": bool}, ...]. + Fallback when block status unavailable: one extent with zero=False. + """ + size = self.size() + if size == 0: + return [] + if not hasattr(self._nbd, "block_status") and not hasattr( + self._nbd, "block_status_64" + ): + return [{"start": 0, "length": size, "zero": False}] + if hasattr(self._nbd, "can_meta_context") and not self._nbd.can_meta_context( + "base:allocation" + ): + return [{"start": 0, "length": size, "zero": False}] + + allocation_extents: List[Dict[str, Any]] = [] + chunk = min(size, 64 * 1024 * 1024) + offset = 0 + + def extent_cb(*args: Any, **kwargs: Any) -> int: + if len(args) < 3: + return 0 + metacontext, off, entries = args[0], args[1], args[2] + if metacontext != "base:allocation" or entries is None: + return 0 + current = off + try: + flat = list(entries) + for i in range(0, len(flat), 2): + if i + 1 >= len(flat): + break + length = int(flat[i]) + flags = int(flat[i + 1]) + zero = (flags & (_NBD_STATE_HOLE | _NBD_STATE_ZERO)) != 0 + allocation_extents.append( + {"start": current, "length": length, "zero": zero} + ) + current += length + except (TypeError, ValueError, IndexError): + pass + return 0 + + block_status_fn = getattr( + self._nbd, "block_status_64", getattr(self._nbd, "block_status", None) + ) + if block_status_fn is None: + return [{"start": 0, "length": size, "zero": False}] + try: + while offset < size: + count = min(chunk, size - offset) + try: + block_status_fn(count, offset, extent_cb) + except TypeError: + block_status_fn(offset, count, extent_cb) + offset += count + except Exception as e: + logging.warning("get_allocation_extents block_status failed: %r", e) + return [{"start": 0, "length": size, "zero": False}] + if not allocation_extents: + return [{"start": 0, "length": size, "zero": False}] + return allocation_extents + + def get_extents_dirty_and_zero( + self, dirty_bitmap_context: str + ) -> List[Dict[str, Any]]: + """ + Query block status for base:allocation and qemu:dirty-bitmap:, + merge boundaries, and return extents with dirty and zero flags. + Format: [{"start": ..., "length": ..., "dirty": bool, "zero": bool}, ...]. + """ + size = self.size() + if size == 0: + return [] + if not hasattr(self._nbd, "block_status") and not hasattr( + self._nbd, "block_status_64" + ): + return self._fallback_dirty_zero_extents(size) + if hasattr(self._nbd, "can_meta_context"): + if not self._nbd.can_meta_context("base:allocation"): + return self._fallback_dirty_zero_extents(size) + if not self._nbd.can_meta_context(dirty_bitmap_context): + logging.warning( + "dirty bitmap context %r not negotiated", dirty_bitmap_context + ) + return self._fallback_dirty_zero_extents(size) + + allocation_extents: List[Tuple[int, int, bool]] = [] # (start, length, zero) + dirty_extents: List[Tuple[int, int, bool]] = [] # (start, length, dirty) + chunk = min(size, 64 * 1024 * 1024) + offset = 0 + + def extent_cb(*args: Any, **kwargs: Any) -> int: + if len(args) < 3: + return 0 + metacontext, off, entries = args[0], args[1], args[2] + if entries is None or not hasattr(entries, "__iter__"): + return 0 + current = off + try: + flat = list(entries) + for i in range(0, len(flat), 2): + if i + 1 >= len(flat): + break + length = int(flat[i]) + flags = int(flat[i + 1]) + if metacontext == "base:allocation": + zero = (flags & (_NBD_STATE_HOLE | _NBD_STATE_ZERO)) != 0 + allocation_extents.append((current, length, zero)) + elif metacontext == dirty_bitmap_context: + dirty = (flags & _NBD_STATE_DIRTY) != 0 + dirty_extents.append((current, length, dirty)) + current += length + except (TypeError, ValueError, IndexError): + pass + return 0 + + block_status_fn = getattr( + self._nbd, "block_status_64", getattr(self._nbd, "block_status", None) + ) + if block_status_fn is None: + return self._fallback_dirty_zero_extents(size) + try: + while offset < size: + count = min(chunk, size - offset) + try: + block_status_fn(count, offset, extent_cb) + except TypeError: + block_status_fn(offset, count, extent_cb) + offset += count + except Exception as e: + logging.warning("get_extents_dirty_and_zero block_status failed: %r", e) + return self._fallback_dirty_zero_extents(size) + return _merge_dirty_zero_extents(allocation_extents, dirty_extents, size) + + def _fallback_dirty_zero_extents(self, size: int) -> List[Dict[str, Any]]: + """One extent: whole image, dirty=false, zero=false when bitmap unavailable.""" + return [{"start": 0, "length": size, "dirty": False, "zero": False}] + def close(self) -> None: # Best-effort; bindings may differ. try: @@ -284,15 +616,24 @@ class Handler(BaseHTTPRequestHandler): def log_message(self, fmt: str, *args: Any) -> None: logging.info("%s - - %s", self.address_string(), fmt % args) - def _send_imageio_headers(self) -> None: + def _send_imageio_headers( + self, allowed_methods: Optional[str] = None + ) -> None: # Include these headers for compatibility with the imageio contract. - self.send_header("Access-Control-Allow-Methods", "GET, PUT, OPTIONS") + if allowed_methods is None: + allowed_methods = "GET, PUT, OPTIONS" + self.send_header("Access-Control-Allow-Methods", allowed_methods) self.send_header("Accept-Ranges", "bytes") - def _send_json(self, status: int, obj: Any) -> None: + def _send_json( + self, + status: int, + obj: Any, + allowed_methods: Optional[str] = None, + ) -> None: body = _json_bytes(obj) self.send_response(status) - self._send_imageio_headers() + self._send_imageio_headers(allowed_methods) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.end_headers() @@ -403,6 +744,13 @@ class Handler(BaseHTTPRequestHandler): return None, None return image_id, tail + def _parse_query(self) -> Dict[str, List[str]]: + """Parse query string from self.path into a dict of name -> list of values.""" + if "?" not in self.path: + return {} + query = self.path.split("?", 1)[1] + return parse_qs(query, keep_blank_values=True) + def _image_cfg(self, image_id: str) -> Optional[Dict[str, Any]]: return _load_image_cfg(image_id) @@ -411,18 +759,50 @@ class Handler(BaseHTTPRequestHandler): if image_id is None or tail is not None: self._send_error_json(HTTPStatus.NOT_FOUND, "not found") return - if self._image_cfg(image_id) is None: + cfg = self._image_cfg(image_id) + if cfg is None: self._send_error_json(HTTPStatus.NOT_FOUND, "unknown image_id") return - # todo: get capabilities from backend later. this is just for upload to work - features = ["extents", "zero", "flush"] + # Query NBD backend for capabilities (like nbdinfo); fall back to config. + read_only = True + can_flush = False + can_zero = False + try: + with _NbdConn( + cfg["host"], + int(cfg["port"]), + cfg.get("export"), + ) as conn: + caps = conn.get_capabilities() + read_only = caps["read_only"] + can_flush = caps["can_flush"] + can_zero = caps["can_zero"] + except Exception as e: + logging.warning("OPTIONS: could not query NBD capabilities: %r", e) + read_only = bool(cfg.get("read_only")) + if not read_only: + can_flush = True + can_zero = True + # Report options for this image from NBD: read-only => no PUT; only advertise supported features. + if read_only: + allowed_methods = "GET, OPTIONS" + features = ["extents"] + max_writers = 0 + else: + allowed_methods = "GET, PUT, OPTIONS" + features = ["extents"] + if can_zero: + features.append("zero") + if can_flush: + features.append("flush") + max_writers = MAX_PARALLEL_WRITES if not read_only else 0 response = { "unix_socket": None, # Not used in this implementation "features": features, "max_readers": MAX_PARALLEL_READS, - "max_writers": MAX_PARALLEL_WRITES, + "max_writers": max_writers, } - self._send_json(HTTPStatus.OK, response) + self._send_json(HTTPStatus.OK, response, allowed_methods=allowed_methods) def do_GET(self) -> None: image_id, tail = self._parse_route() @@ -436,7 +816,9 @@ class Handler(BaseHTTPRequestHandler): return if tail == "extents": - self._handle_get_extents(image_id, cfg) + query = self._parse_query() + context = (query.get("context") or [None])[0] + self._handle_get_extents(image_id, cfg, context=context) return if tail is not None: self._send_error_json(HTTPStatus.NOT_FOUND, "not found") @@ -556,7 +938,7 @@ class Handler(BaseHTTPRequestHandler): bytes_sent += len(data) except Exception as e: # If headers already sent, we can't return JSON reliably; just log. - logging.warning("GET error image_id=%s err=%r", image_id, e) + logging.error("GET error image_id=%s err=%r", image_id, e) try: if not self.wfile.closed: self.close_connection = True @@ -604,7 +986,7 @@ class Handler(BaseHTTPRequestHandler): # POC-level: do not auto-flush on PUT; expose explicit /flush endpoint. self._send_json(HTTPStatus.OK, {"ok": True, "bytes_written": bytes_written}) except Exception as e: - logging.warning("PUT error image_id=%s err=%r", image_id, e) + logging.error("PUT error image_id=%s err=%r", image_id, e) self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error") finally: _WRITE_SEM.release() @@ -614,10 +996,11 @@ class Handler(BaseHTTPRequestHandler): "PUT end image_id=%s bytes=%d duration_s=%.3f", image_id, bytes_written, dur ) - def _handle_get_extents(self, image_id: str, cfg: Dict[str, Any]) -> None: - # Keep deterministic and simple (POC): report entire image allocated. - # No per-image lock required by spec, but we still take it to avoid racing - # with a write and to keep behavior consistent. + def _handle_get_extents( + self, image_id: str, cfg: Dict[str, Any], context: Optional[str] = None + ) -> None: + # context=dirty: return extents with dirty and zero from base:allocation + bitmap. + # Otherwise: return zero/hole extents from base:allocation only. lock = _get_image_lock(image_id) if not lock.acquire(blocking=False): self._send_error_json(HTTPStatus.CONFLICT, "image busy") @@ -625,15 +1008,62 @@ class Handler(BaseHTTPRequestHandler): start = _now_s() try: - logging.info("EXTENTS start image_id=%s", image_id) - with _NbdConn(cfg["host"], int(cfg["port"]), cfg.get("export")) as conn: - size = conn.size() - self._send_json( - HTTPStatus.OK, - [{"start": 0, "length": size, "allocated": True}], - ) + logging.info("EXTENTS start image_id=%s context=%s", image_id, context) + if context == "dirty": + export_bitmap = cfg.get("export_bitmap") + if not export_bitmap: + # Fallback: same structure as zero extents but dirty=true for all ranges + with _NbdConn( + cfg["host"], + int(cfg["port"]), + cfg.get("export"), + need_block_status=True, + ) as conn: + allocation = conn.get_allocation_extents() + extents = [ + {"start": e["start"], "length": e["length"], "dirty": True, "zero": e["zero"]} + for e in allocation + ] + else: + dirty_bitmap_ctx = f"qemu:dirty-bitmap:{export_bitmap}" + extra_contexts: List[str] = [dirty_bitmap_ctx] + with _NbdConn( + cfg["host"], + int(cfg["port"]), + cfg.get("export"), + need_block_status=True, + extra_meta_contexts=extra_contexts, + ) as conn: + extents = conn.get_extents_dirty_and_zero(dirty_bitmap_ctx) + # When bitmap not actually available, same fallback: zero structure + dirty=true + if _is_fallback_dirty_response(extents): + with _NbdConn( + cfg["host"], + int(cfg["port"]), + cfg.get("export"), + need_block_status=True, + ) as conn: + allocation = conn.get_allocation_extents() + extents = [ + { + "start": e["start"], + "length": e["length"], + "dirty": True, + "zero": e["zero"], + } + for e in allocation + ] + else: + with _NbdConn( + cfg["host"], + int(cfg["port"]), + cfg.get("export"), + need_block_status=True, + ) as conn: + extents = conn.get_zero_extents() + self._send_json(HTTPStatus.OK, extents) except Exception as e: - logging.warning("EXTENTS error image_id=%s err=%r", image_id, e) + logging.error("EXTENTS error image_id=%s err=%r", image_id, e) self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error") finally: lock.release() @@ -653,7 +1083,7 @@ class Handler(BaseHTTPRequestHandler): conn.flush() self._send_json(HTTPStatus.OK, {"ok": True}) except Exception as e: - logging.warning("FLUSH error image_id=%s err=%r", image_id, e) + logging.error("FLUSH error image_id=%s err=%r", image_id, e) self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error") finally: lock.release()