extents(zero/dirty) and capabilities - working

todo: patch (needed?)
This commit is contained in:
Abhisar Sinha 2026-01-30 23:53:23 +05:30 committed by Abhishek Kumar
parent da62e9a3ed
commit 4173947aa3
1 changed files with 468 additions and 38 deletions

View File

@ -43,9 +43,12 @@ Example curl commands
- PUT full image (Content-Length must equal export size exactly):
curl -v -T demo.img http://127.0.0.1:54323/images/demo
- GET extents (POC-level; may return a single allocated extent):
- GET extents (zero/hole extents from NBD base:allocation):
curl -s http://127.0.0.1:54323/images/demo/extents | jq .
- GET extents with dirty and zero (requires export_bitmap in config):
curl -s "http://127.0.0.1:54323/images/demo/extents?context=dirty" | jq .
- POST flush:
curl -s -X POST http://127.0.0.1:54323/images/demo/flush | jq .
"""
@ -61,11 +64,18 @@ import threading
import time
from http import HTTPStatus
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from typing import Any, Dict, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import parse_qs
import nbd
CHUNK_SIZE = 256 * 1024 # 256 KiB
# NBD base:allocation flags (hole=1, zero=2; hole|zero=3)
_NBD_STATE_HOLE = 1
_NBD_STATE_ZERO = 2
# NBD qemu:dirty-bitmap flags (dirty=1)
_NBD_STATE_DIRTY = 1
# Concurrency limits across ALL images.
MAX_PARALLEL_READS = 8
MAX_PARALLEL_WRITES = 1
@ -80,7 +90,7 @@ _IMAGE_LOCKS_GUARD = threading.Lock()
# Dynamic image_id(transferId) -> NBD export mapping:
# CloudStack writes a JSON file at /tmp/imagetransfer/<transferId> with:
# {"host": "...", "port": 10809, "export": "vda"}
# {"host": "...", "port": 10809, "export": "vda", "export_bitmap":"bitmap1"}
#
# This server reads that file on-demand.
_CFG_DIR = "/tmp/imagetransfer"
@ -92,6 +102,57 @@ def _json_bytes(obj: Any) -> bytes:
return json.dumps(obj, separators=(",", ":"), ensure_ascii=False).encode("utf-8")
def _merge_dirty_zero_extents(
allocation_extents: List[Tuple[int, int, bool]],
dirty_extents: List[Tuple[int, int, bool]],
size: int,
) -> List[Dict[str, Any]]:
"""
Merge allocation (start, length, zero) and dirty (start, length, dirty) extents
into a single list of {start, length, dirty, zero} with unified boundaries.
"""
boundaries: set[int] = {0, size}
for start, length, _ in allocation_extents:
boundaries.add(start)
boundaries.add(start + length)
for start, length, _ in dirty_extents:
boundaries.add(start)
boundaries.add(start + length)
sorted_boundaries = sorted(boundaries)
def lookup(
extents: List[Tuple[int, int, bool]], offset: int, default: bool
) -> bool:
for start, length, flag in extents:
if start <= offset < start + length:
return flag
return default
result: List[Dict[str, Any]] = []
for i in range(len(sorted_boundaries) - 1):
a, b = sorted_boundaries[i], sorted_boundaries[i + 1]
if a >= b:
continue
result.append(
{
"start": a,
"length": b - a,
"dirty": lookup(dirty_extents, a, False),
"zero": lookup(allocation_extents, a, False),
}
)
return result
def _is_fallback_dirty_response(extents: List[Dict[str, Any]]) -> bool:
"""True if extents is the single-extent fallback (dirty=false, zero=false)."""
return (
len(extents) == 1
and extents[0].get("dirty") is False
and extents[0].get("zero") is False
)
def _get_image_lock(image_id: str) -> threading.Lock:
with _IMAGE_LOCKS_GUARD:
lock = _IMAGE_LOCKS.get(image_id)
@ -132,7 +193,7 @@ def _load_image_cfg(image_id: str) -> Optional[Dict[str, Any]]:
except FileNotFoundError:
return None
except OSError as e:
logging.warning("cfg stat failed image_id=%s err=%r", image_id, e)
logging.error("cfg stat failed image_id=%s err=%r", image_id, e)
return None
with _CFG_CACHE_GUARD:
@ -147,38 +208,39 @@ def _load_image_cfg(image_id: str) -> Optional[Dict[str, Any]]:
with open(cfg_path, "rb") as f:
raw = f.read(4096)
except OSError as e:
logging.warning("cfg read failed image_id=%s err=%r", image_id, e)
logging.error("cfg read failed image_id=%s err=%r", image_id, e)
return None
try:
obj = json.loads(raw.decode("utf-8"))
except Exception as e:
logging.warning("cfg parse failed image_id=%s err=%r", image_id, e)
logging.error("cfg parse failed image_id=%s err=%r", image_id, e)
return None
if not isinstance(obj, dict):
logging.warning("cfg invalid type image_id=%s type=%s", image_id, type(obj).__name__)
logging.error("cfg invalid type image_id=%s type=%s", image_id, type(obj).__name__)
return None
host = obj.get("host")
port = obj.get("port")
export = obj.get("export")
export_bitmap = obj.get("export_bitmap")
if not isinstance(host, str) or not host:
logging.warning("cfg missing/invalid host image_id=%s", image_id)
logging.error("cfg missing/invalid host image_id=%s", image_id)
return None
try:
port_i = int(port)
except Exception:
logging.warning("cfg missing/invalid port image_id=%s", image_id)
logging.error("cfg missing/invalid port image_id=%s", image_id)
return None
if port_i <= 0 or port_i > 65535:
logging.warning("cfg out-of-range port image_id=%s port=%r", image_id, port)
logging.error("cfg out-of-range port image_id=%s port=%r", image_id, port)
return None
if export is not None and (not isinstance(export, str) or not export):
logging.warning("cfg missing/invalid export image_id=%s", image_id)
logging.error("cfg missing/invalid export image_id=%s", image_id)
return None
cfg: Dict[str, Any] = {"host": host, "port": port_i, "export": export}
cfg: Dict[str, Any] = {"host": host, "port": port_i, "export": export, "export_bitmap": export_bitmap}
with _CFG_CACHE_GUARD:
_CFG_CACHE[safe_id] = (float(st.st_mtime), cfg)
@ -191,7 +253,14 @@ class _NbdConn:
Opens a fresh handle per request, per POC requirements.
"""
def __init__(self, host: str, port: int, export: Optional[str]):
def __init__(
self,
host: str,
port: int,
export: Optional[str],
need_block_status: bool = False,
extra_meta_contexts: Optional[List[str]] = None,
):
self._sock = socket.create_connection((host, port))
self._nbd = nbd.NBD()
@ -199,6 +268,14 @@ class _NbdConn:
if export and hasattr(self._nbd, "set_export_name"):
self._nbd.set_export_name(export)
# Request meta contexts before connect (for block status / dirty bitmap).
if need_block_status and hasattr(self._nbd, "add_meta_context"):
for ctx in ["base:allocation"] + (extra_meta_contexts or []):
try:
self._nbd.add_meta_context(ctx)
except Exception as e:
logging.warning("add_meta_context %r failed: %r", ctx, e)
self._connect_existing_socket(self._sock)
def _connect_existing_socket(self, sock: socket.socket) -> None:
@ -230,6 +307,32 @@ class _NbdConn:
def size(self) -> int:
return int(self._nbd.get_size())
def get_capabilities(self) -> Dict[str, bool]:
"""
Query NBD export capabilities (read_only, can_flush, can_zero) from the
server handshake. Returns dict with keys read_only, can_flush, can_zero.
Uses getattr for binding name variations (is_read_only/get_read_only, etc.).
"""
out: Dict[str, bool] = {
"read_only": True,
"can_flush": False,
"can_zero": False,
}
for name, keys in [
("read_only", ("is_read_only", "get_read_only")),
("can_flush", ("can_flush", "get_can_flush")),
("can_zero", ("can_zero", "get_can_zero")),
]:
for attr in keys:
if hasattr(self._nbd, attr):
try:
val = getattr(self._nbd, attr)()
out[name] = bool(val)
except Exception:
pass
break
return out
def pread(self, length: int, offset: int) -> bytes:
# Expected signature: pread(length, offset)
try:
@ -253,6 +356,235 @@ class _NbdConn:
return
raise RuntimeError("libnbd binding has no flush/fsync method")
def get_zero_extents(self) -> List[Dict[str, Any]]:
"""
Query NBD block status (base:allocation) and return extents that are
hole or zero in imageio format: [{"start": ..., "length": ..., "zero": true}, ...].
Returns [] if block status is not supported; fallback to one full-image
zero extent when we have size but block status fails.
"""
size = self.size()
if size == 0:
return []
if not hasattr(self._nbd, "block_status") and not hasattr(
self._nbd, "block_status_64"
):
logging.error("get_zero_extents: no block_status/block_status_64")
return self._fallback_zero_extent(size)
if hasattr(self._nbd, "can_meta_context") and not self._nbd.can_meta_context(
"base:allocation"
):
logging.error(
"get_zero_extents: server did not negotiate base:allocation"
)
return self._fallback_zero_extent(size)
zero_extents: List[Dict[str, Any]] = []
chunk = min(size, 64 * 1024 * 1024) # 64 MiB
offset = 0
def extent_cb(*args: Any, **kwargs: Any) -> int:
# Binding typically passes (metacontext, offset, entries[, nr_entries][, error]).
metacontext = None
off = 0
entries = None
if len(args) >= 3:
metacontext, off, entries = args[0], args[1], args[2]
else:
for a in args:
if isinstance(a, str):
metacontext = a
elif isinstance(a, int):
off = a
elif a is not None and hasattr(a, "__iter__"):
entries = a
if metacontext != "base:allocation" or entries is None:
return 0
current = off
try:
flat = list(entries)
for i in range(0, len(flat), 2):
if i + 1 >= len(flat):
break
length = int(flat[i])
flags = int(flat[i + 1])
if (flags & (_NBD_STATE_HOLE | _NBD_STATE_ZERO)) != 0:
zero_extents.append(
{"start": current, "length": length, "zero": True}
)
current += length
except (TypeError, ValueError, IndexError):
pass
return 0
block_status_fn = getattr(
self._nbd, "block_status_64", getattr(self._nbd, "block_status", None)
)
if block_status_fn is None:
return self._fallback_zero_extent(size)
try:
while offset < size:
count = min(chunk, size - offset)
# Try (count, offset, callback) then (offset, count, callback)
try:
block_status_fn(count, offset, extent_cb)
except TypeError:
block_status_fn(offset, count, extent_cb)
offset += count
except Exception as e:
logging.error("get_zero_extents block_status failed: %r", e)
return self._fallback_zero_extent(size)
if not zero_extents:
return self._fallback_zero_extent(size)
return zero_extents
def _fallback_zero_extent(self, size: int) -> List[Dict[str, Any]]:
"""Return one zero extent covering the whole image when block status unavailable."""
return [{"start": 0, "length": size, "zero": True}]
def get_allocation_extents(self) -> List[Dict[str, Any]]:
"""
Query base:allocation and return all extents (allocated and hole/zero)
as [{"start": ..., "length": ..., "zero": bool}, ...].
Fallback when block status unavailable: one extent with zero=False.
"""
size = self.size()
if size == 0:
return []
if not hasattr(self._nbd, "block_status") and not hasattr(
self._nbd, "block_status_64"
):
return [{"start": 0, "length": size, "zero": False}]
if hasattr(self._nbd, "can_meta_context") and not self._nbd.can_meta_context(
"base:allocation"
):
return [{"start": 0, "length": size, "zero": False}]
allocation_extents: List[Dict[str, Any]] = []
chunk = min(size, 64 * 1024 * 1024)
offset = 0
def extent_cb(*args: Any, **kwargs: Any) -> int:
if len(args) < 3:
return 0
metacontext, off, entries = args[0], args[1], args[2]
if metacontext != "base:allocation" or entries is None:
return 0
current = off
try:
flat = list(entries)
for i in range(0, len(flat), 2):
if i + 1 >= len(flat):
break
length = int(flat[i])
flags = int(flat[i + 1])
zero = (flags & (_NBD_STATE_HOLE | _NBD_STATE_ZERO)) != 0
allocation_extents.append(
{"start": current, "length": length, "zero": zero}
)
current += length
except (TypeError, ValueError, IndexError):
pass
return 0
block_status_fn = getattr(
self._nbd, "block_status_64", getattr(self._nbd, "block_status", None)
)
if block_status_fn is None:
return [{"start": 0, "length": size, "zero": False}]
try:
while offset < size:
count = min(chunk, size - offset)
try:
block_status_fn(count, offset, extent_cb)
except TypeError:
block_status_fn(offset, count, extent_cb)
offset += count
except Exception as e:
logging.warning("get_allocation_extents block_status failed: %r", e)
return [{"start": 0, "length": size, "zero": False}]
if not allocation_extents:
return [{"start": 0, "length": size, "zero": False}]
return allocation_extents
def get_extents_dirty_and_zero(
self, dirty_bitmap_context: str
) -> List[Dict[str, Any]]:
"""
Query block status for base:allocation and qemu:dirty-bitmap:<context>,
merge boundaries, and return extents with dirty and zero flags.
Format: [{"start": ..., "length": ..., "dirty": bool, "zero": bool}, ...].
"""
size = self.size()
if size == 0:
return []
if not hasattr(self._nbd, "block_status") and not hasattr(
self._nbd, "block_status_64"
):
return self._fallback_dirty_zero_extents(size)
if hasattr(self._nbd, "can_meta_context"):
if not self._nbd.can_meta_context("base:allocation"):
return self._fallback_dirty_zero_extents(size)
if not self._nbd.can_meta_context(dirty_bitmap_context):
logging.warning(
"dirty bitmap context %r not negotiated", dirty_bitmap_context
)
return self._fallback_dirty_zero_extents(size)
allocation_extents: List[Tuple[int, int, bool]] = [] # (start, length, zero)
dirty_extents: List[Tuple[int, int, bool]] = [] # (start, length, dirty)
chunk = min(size, 64 * 1024 * 1024)
offset = 0
def extent_cb(*args: Any, **kwargs: Any) -> int:
if len(args) < 3:
return 0
metacontext, off, entries = args[0], args[1], args[2]
if entries is None or not hasattr(entries, "__iter__"):
return 0
current = off
try:
flat = list(entries)
for i in range(0, len(flat), 2):
if i + 1 >= len(flat):
break
length = int(flat[i])
flags = int(flat[i + 1])
if metacontext == "base:allocation":
zero = (flags & (_NBD_STATE_HOLE | _NBD_STATE_ZERO)) != 0
allocation_extents.append((current, length, zero))
elif metacontext == dirty_bitmap_context:
dirty = (flags & _NBD_STATE_DIRTY) != 0
dirty_extents.append((current, length, dirty))
current += length
except (TypeError, ValueError, IndexError):
pass
return 0
block_status_fn = getattr(
self._nbd, "block_status_64", getattr(self._nbd, "block_status", None)
)
if block_status_fn is None:
return self._fallback_dirty_zero_extents(size)
try:
while offset < size:
count = min(chunk, size - offset)
try:
block_status_fn(count, offset, extent_cb)
except TypeError:
block_status_fn(offset, count, extent_cb)
offset += count
except Exception as e:
logging.warning("get_extents_dirty_and_zero block_status failed: %r", e)
return self._fallback_dirty_zero_extents(size)
return _merge_dirty_zero_extents(allocation_extents, dirty_extents, size)
def _fallback_dirty_zero_extents(self, size: int) -> List[Dict[str, Any]]:
"""One extent: whole image, dirty=false, zero=false when bitmap unavailable."""
return [{"start": 0, "length": size, "dirty": False, "zero": False}]
def close(self) -> None:
# Best-effort; bindings may differ.
try:
@ -284,15 +616,24 @@ class Handler(BaseHTTPRequestHandler):
def log_message(self, fmt: str, *args: Any) -> None:
logging.info("%s - - %s", self.address_string(), fmt % args)
def _send_imageio_headers(self) -> None:
def _send_imageio_headers(
self, allowed_methods: Optional[str] = None
) -> None:
# Include these headers for compatibility with the imageio contract.
self.send_header("Access-Control-Allow-Methods", "GET, PUT, OPTIONS")
if allowed_methods is None:
allowed_methods = "GET, PUT, OPTIONS"
self.send_header("Access-Control-Allow-Methods", allowed_methods)
self.send_header("Accept-Ranges", "bytes")
def _send_json(self, status: int, obj: Any) -> None:
def _send_json(
self,
status: int,
obj: Any,
allowed_methods: Optional[str] = None,
) -> None:
body = _json_bytes(obj)
self.send_response(status)
self._send_imageio_headers()
self._send_imageio_headers(allowed_methods)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
@ -403,6 +744,13 @@ class Handler(BaseHTTPRequestHandler):
return None, None
return image_id, tail
def _parse_query(self) -> Dict[str, List[str]]:
"""Parse query string from self.path into a dict of name -> list of values."""
if "?" not in self.path:
return {}
query = self.path.split("?", 1)[1]
return parse_qs(query, keep_blank_values=True)
def _image_cfg(self, image_id: str) -> Optional[Dict[str, Any]]:
return _load_image_cfg(image_id)
@ -411,18 +759,50 @@ class Handler(BaseHTTPRequestHandler):
if image_id is None or tail is not None:
self._send_error_json(HTTPStatus.NOT_FOUND, "not found")
return
if self._image_cfg(image_id) is None:
cfg = self._image_cfg(image_id)
if cfg is None:
self._send_error_json(HTTPStatus.NOT_FOUND, "unknown image_id")
return
# todo: get capabilities from backend later. this is just for upload to work
features = ["extents", "zero", "flush"]
# Query NBD backend for capabilities (like nbdinfo); fall back to config.
read_only = True
can_flush = False
can_zero = False
try:
with _NbdConn(
cfg["host"],
int(cfg["port"]),
cfg.get("export"),
) as conn:
caps = conn.get_capabilities()
read_only = caps["read_only"]
can_flush = caps["can_flush"]
can_zero = caps["can_zero"]
except Exception as e:
logging.warning("OPTIONS: could not query NBD capabilities: %r", e)
read_only = bool(cfg.get("read_only"))
if not read_only:
can_flush = True
can_zero = True
# Report options for this image from NBD: read-only => no PUT; only advertise supported features.
if read_only:
allowed_methods = "GET, OPTIONS"
features = ["extents"]
max_writers = 0
else:
allowed_methods = "GET, PUT, OPTIONS"
features = ["extents"]
if can_zero:
features.append("zero")
if can_flush:
features.append("flush")
max_writers = MAX_PARALLEL_WRITES if not read_only else 0
response = {
"unix_socket": None, # Not used in this implementation
"features": features,
"max_readers": MAX_PARALLEL_READS,
"max_writers": MAX_PARALLEL_WRITES,
"max_writers": max_writers,
}
self._send_json(HTTPStatus.OK, response)
self._send_json(HTTPStatus.OK, response, allowed_methods=allowed_methods)
def do_GET(self) -> None:
image_id, tail = self._parse_route()
@ -436,7 +816,9 @@ class Handler(BaseHTTPRequestHandler):
return
if tail == "extents":
self._handle_get_extents(image_id, cfg)
query = self._parse_query()
context = (query.get("context") or [None])[0]
self._handle_get_extents(image_id, cfg, context=context)
return
if tail is not None:
self._send_error_json(HTTPStatus.NOT_FOUND, "not found")
@ -556,7 +938,7 @@ class Handler(BaseHTTPRequestHandler):
bytes_sent += len(data)
except Exception as e:
# If headers already sent, we can't return JSON reliably; just log.
logging.warning("GET error image_id=%s err=%r", image_id, e)
logging.error("GET error image_id=%s err=%r", image_id, e)
try:
if not self.wfile.closed:
self.close_connection = True
@ -604,7 +986,7 @@ class Handler(BaseHTTPRequestHandler):
# POC-level: do not auto-flush on PUT; expose explicit /flush endpoint.
self._send_json(HTTPStatus.OK, {"ok": True, "bytes_written": bytes_written})
except Exception as e:
logging.warning("PUT error image_id=%s err=%r", image_id, e)
logging.error("PUT error image_id=%s err=%r", image_id, e)
self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error")
finally:
_WRITE_SEM.release()
@ -614,10 +996,11 @@ class Handler(BaseHTTPRequestHandler):
"PUT end image_id=%s bytes=%d duration_s=%.3f", image_id, bytes_written, dur
)
def _handle_get_extents(self, image_id: str, cfg: Dict[str, Any]) -> None:
# Keep deterministic and simple (POC): report entire image allocated.
# No per-image lock required by spec, but we still take it to avoid racing
# with a write and to keep behavior consistent.
def _handle_get_extents(
self, image_id: str, cfg: Dict[str, Any], context: Optional[str] = None
) -> None:
# context=dirty: return extents with dirty and zero from base:allocation + bitmap.
# Otherwise: return zero/hole extents from base:allocation only.
lock = _get_image_lock(image_id)
if not lock.acquire(blocking=False):
self._send_error_json(HTTPStatus.CONFLICT, "image busy")
@ -625,15 +1008,62 @@ class Handler(BaseHTTPRequestHandler):
start = _now_s()
try:
logging.info("EXTENTS start image_id=%s", image_id)
with _NbdConn(cfg["host"], int(cfg["port"]), cfg.get("export")) as conn:
size = conn.size()
self._send_json(
HTTPStatus.OK,
[{"start": 0, "length": size, "allocated": True}],
)
logging.info("EXTENTS start image_id=%s context=%s", image_id, context)
if context == "dirty":
export_bitmap = cfg.get("export_bitmap")
if not export_bitmap:
# Fallback: same structure as zero extents but dirty=true for all ranges
with _NbdConn(
cfg["host"],
int(cfg["port"]),
cfg.get("export"),
need_block_status=True,
) as conn:
allocation = conn.get_allocation_extents()
extents = [
{"start": e["start"], "length": e["length"], "dirty": True, "zero": e["zero"]}
for e in allocation
]
else:
dirty_bitmap_ctx = f"qemu:dirty-bitmap:{export_bitmap}"
extra_contexts: List[str] = [dirty_bitmap_ctx]
with _NbdConn(
cfg["host"],
int(cfg["port"]),
cfg.get("export"),
need_block_status=True,
extra_meta_contexts=extra_contexts,
) as conn:
extents = conn.get_extents_dirty_and_zero(dirty_bitmap_ctx)
# When bitmap not actually available, same fallback: zero structure + dirty=true
if _is_fallback_dirty_response(extents):
with _NbdConn(
cfg["host"],
int(cfg["port"]),
cfg.get("export"),
need_block_status=True,
) as conn:
allocation = conn.get_allocation_extents()
extents = [
{
"start": e["start"],
"length": e["length"],
"dirty": True,
"zero": e["zero"],
}
for e in allocation
]
else:
with _NbdConn(
cfg["host"],
int(cfg["port"]),
cfg.get("export"),
need_block_status=True,
) as conn:
extents = conn.get_zero_extents()
self._send_json(HTTPStatus.OK, extents)
except Exception as e:
logging.warning("EXTENTS error image_id=%s err=%r", image_id, e)
logging.error("EXTENTS error image_id=%s err=%r", image_id, e)
self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error")
finally:
lock.release()
@ -653,7 +1083,7 @@ class Handler(BaseHTTPRequestHandler):
conn.flush()
self._send_json(HTTPStatus.OK, {"ok": True})
except Exception as e:
logging.warning("FLUSH error image_id=%s err=%r", image_id, e)
logging.error("FLUSH error image_id=%s err=%r", image_id, e)
self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error")
finally:
lock.release()