mirror of https://github.com/apache/cloudstack.git
Patch (zero, data) + Flush support in image_server.py
This commit is contained in:
parent
4173947aa3
commit
91a081beec
|
|
@ -27,7 +27,7 @@ How to run
|
|||
apt install python3-libnbd
|
||||
|
||||
- Run server:
|
||||
python image_server.py --listen 0.0.0.0 --port 54323
|
||||
createImageTransfer will start the server as a systemd service 'cloudstack-image-server'
|
||||
|
||||
Example curl commands
|
||||
--------------------
|
||||
|
|
@ -51,6 +51,34 @@ Example curl commands
|
|||
|
||||
- POST flush:
|
||||
curl -s -X POST http://127.0.0.1:54323/images/demo/flush | jq .
|
||||
|
||||
- PATCH zero (zero a byte range; application/json body):
|
||||
curl -k -X PATCH \
|
||||
-H "Content-Type: application/json" \
|
||||
--data-binary '{"op": "zero", "offset": 4096, "size": 8192}' \
|
||||
http://127.0.0.1:54323/images/demo
|
||||
|
||||
Zero at offset 1 GiB, 4096 bytes, no flush:
|
||||
curl -k -X PATCH \
|
||||
-H "Content-Type: application/json" \
|
||||
--data-binary '{"op": "zero", "offset": 1073741824, "size": 4096}' \
|
||||
http://127.0.0.1:54323/images/demo
|
||||
|
||||
Zero entire disk and flush:
|
||||
curl -k -X PATCH \
|
||||
-H "Content-Type: application/json" \
|
||||
--data-binary '{"op": "zero", "size": 107374182400, "flush": true}' \
|
||||
http://127.0.0.1:54323/images/demo
|
||||
|
||||
- PATCH flush (flush data to storage; operates on entire image):
|
||||
curl -k -X PATCH \
|
||||
-H "Content-Type: application/json" \
|
||||
--data-binary '{"op": "flush"}' \
|
||||
http://127.0.0.1:54323/images/demo
|
||||
|
||||
- PATCH range (write binary body at byte range; Range + Content-Length required):
|
||||
curl -v -X PATCH -H "Range: bytes=0-1048576" --data-binary @chunk.bin \
|
||||
http://127.0.0.1:54323/images/demo
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
|
@ -347,6 +375,37 @@ class _NbdConn:
|
|||
except TypeError: # pragma: no cover (binding differences)
|
||||
self._nbd.pwrite(offset, buf)
|
||||
|
||||
def pzero(self, offset: int, size: int) -> None:
|
||||
"""
|
||||
Zero a byte range. Uses NBD WRITE_ZEROES when available (efficient/punch hole),
|
||||
otherwise falls back to writing zero bytes via pwrite.
|
||||
"""
|
||||
if size <= 0:
|
||||
return
|
||||
# Try libnbd pwrite_zeros / zero; argument order varies by binding.
|
||||
for name in ("pwrite_zeros", "zero"):
|
||||
if not hasattr(self._nbd, name):
|
||||
continue
|
||||
fn = getattr(self._nbd, name)
|
||||
try:
|
||||
fn(size, offset)
|
||||
return
|
||||
except TypeError:
|
||||
try:
|
||||
fn(offset, size)
|
||||
return
|
||||
except TypeError:
|
||||
pass
|
||||
# Fallback: write zeros in chunks.
|
||||
remaining = size
|
||||
pos = offset
|
||||
zero_buf = b"\x00" * min(CHUNK_SIZE, size)
|
||||
while remaining > 0:
|
||||
chunk = min(len(zero_buf), remaining)
|
||||
self.pwrite(zero_buf[:chunk], pos)
|
||||
pos += chunk
|
||||
remaining -= chunk
|
||||
|
||||
def flush(self) -> None:
|
||||
if hasattr(self._nbd, "flush"):
|
||||
self._nbd.flush()
|
||||
|
|
@ -789,7 +848,8 @@ class Handler(BaseHTTPRequestHandler):
|
|||
features = ["extents"]
|
||||
max_writers = 0
|
||||
else:
|
||||
allowed_methods = "GET, PUT, OPTIONS"
|
||||
# PATCH: JSON (zero/flush) and Range+binary (write byte range).
|
||||
allowed_methods = "GET, PUT, PATCH, OPTIONS"
|
||||
features = ["extents"]
|
||||
if can_zero:
|
||||
features.append("zero")
|
||||
|
|
@ -875,6 +935,111 @@ class Handler(BaseHTTPRequestHandler):
|
|||
return
|
||||
self._send_error_json(HTTPStatus.NOT_FOUND, "not found")
|
||||
|
||||
def do_PATCH(self) -> None:
|
||||
image_id, tail = self._parse_route()
|
||||
if image_id is None or tail is not None:
|
||||
self._send_error_json(HTTPStatus.NOT_FOUND, "not found")
|
||||
return
|
||||
|
||||
cfg = self._image_cfg(image_id)
|
||||
if cfg is None:
|
||||
self._send_error_json(HTTPStatus.NOT_FOUND, "unknown image_id")
|
||||
return
|
||||
|
||||
content_type = self.headers.get("Content-Type", "").split(";")[0].strip().lower()
|
||||
range_header = self.headers.get("Range")
|
||||
|
||||
# Binary PATCH: Range + body writes bytes at that range (e.g. curl -X PATCH -H "Range: bytes=0-1048576" --data-binary @chunk.bin).
|
||||
if range_header is not None and content_type != "application/json":
|
||||
content_length_hdr = self.headers.get("Content-Length")
|
||||
if content_length_hdr is None:
|
||||
self._send_error_json(HTTPStatus.BAD_REQUEST, "Content-Length required")
|
||||
return
|
||||
try:
|
||||
content_length = int(content_length_hdr)
|
||||
except ValueError:
|
||||
self._send_error_json(HTTPStatus.BAD_REQUEST, "Invalid Content-Length")
|
||||
return
|
||||
if content_length <= 0:
|
||||
self._send_error_json(HTTPStatus.BAD_REQUEST, "Content-Length must be positive")
|
||||
return
|
||||
self._handle_patch_range(image_id, cfg, range_header, content_length)
|
||||
return
|
||||
|
||||
# JSON PATCH: application/json with op (zero, flush).
|
||||
if content_type != "application/json":
|
||||
self._send_error_json(
|
||||
HTTPStatus.UNSUPPORTED_MEDIA_TYPE,
|
||||
"PATCH requires Content-Type: application/json (for zero/flush) or Range with binary body",
|
||||
)
|
||||
return
|
||||
|
||||
content_length_hdr = self.headers.get("Content-Length")
|
||||
if content_length_hdr is None:
|
||||
self._send_error_json(HTTPStatus.BAD_REQUEST, "Content-Length required")
|
||||
return
|
||||
try:
|
||||
content_length = int(content_length_hdr)
|
||||
except ValueError:
|
||||
self._send_error_json(HTTPStatus.BAD_REQUEST, "Invalid Content-Length")
|
||||
return
|
||||
if content_length <= 0 or content_length > 64 * 1024:
|
||||
self._send_error_json(HTTPStatus.BAD_REQUEST, "Invalid Content-Length")
|
||||
return
|
||||
|
||||
body = self.rfile.read(content_length)
|
||||
if len(body) != content_length:
|
||||
self._send_error_json(HTTPStatus.BAD_REQUEST, "request body truncated")
|
||||
return
|
||||
|
||||
try:
|
||||
payload = json.loads(body.decode("utf-8"))
|
||||
except (json.JSONDecodeError, UnicodeDecodeError) as e:
|
||||
self._send_error_json(HTTPStatus.BAD_REQUEST, f"invalid JSON: {e}")
|
||||
return
|
||||
|
||||
if not isinstance(payload, dict):
|
||||
self._send_error_json(HTTPStatus.BAD_REQUEST, "body must be a JSON object")
|
||||
return
|
||||
|
||||
op = payload.get("op")
|
||||
if op == "flush":
|
||||
# Flush entire image; offset and size are ignored (per spec).
|
||||
self._handle_post_flush(image_id, cfg)
|
||||
return
|
||||
if op != "zero":
|
||||
self._send_error_json(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
"unsupported op; only \"zero\" and \"flush\" are supported",
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
size = int(payload.get("size"))
|
||||
except (TypeError, ValueError):
|
||||
self._send_error_json(HTTPStatus.BAD_REQUEST, "missing or invalid \"size\"")
|
||||
return
|
||||
if size <= 0:
|
||||
self._send_error_json(HTTPStatus.BAD_REQUEST, "\"size\" must be positive")
|
||||
return
|
||||
|
||||
offset = payload.get("offset")
|
||||
if offset is None:
|
||||
offset = 0
|
||||
else:
|
||||
try:
|
||||
offset = int(offset)
|
||||
except (TypeError, ValueError):
|
||||
self._send_error_json(HTTPStatus.BAD_REQUEST, "invalid \"offset\"")
|
||||
return
|
||||
if offset < 0:
|
||||
self._send_error_json(HTTPStatus.BAD_REQUEST, "\"offset\" must be non-negative")
|
||||
return
|
||||
|
||||
flush = bool(payload.get("flush", False))
|
||||
|
||||
self._handle_patch_zero(image_id, cfg, offset=offset, size=size, flush=flush)
|
||||
|
||||
def _handle_get_image(
|
||||
self, image_id: str, cfg: Dict[str, Any], range_header: Optional[str]
|
||||
) -> None:
|
||||
|
|
@ -1090,6 +1255,126 @@ class Handler(BaseHTTPRequestHandler):
|
|||
dur = _now_s() - start
|
||||
logging.info("FLUSH end image_id=%s duration_s=%.3f", image_id, dur)
|
||||
|
||||
def _handle_patch_zero(
|
||||
self,
|
||||
image_id: str,
|
||||
cfg: Dict[str, Any],
|
||||
offset: int,
|
||||
size: int,
|
||||
flush: bool,
|
||||
) -> None:
|
||||
lock = _get_image_lock(image_id)
|
||||
if not lock.acquire(blocking=False):
|
||||
self._send_error_json(HTTPStatus.CONFLICT, "image busy")
|
||||
return
|
||||
|
||||
if not _WRITE_SEM.acquire(blocking=False):
|
||||
lock.release()
|
||||
self._send_error_json(HTTPStatus.SERVICE_UNAVAILABLE, "too many parallel writes")
|
||||
return
|
||||
|
||||
start = _now_s()
|
||||
try:
|
||||
logging.info(
|
||||
"PATCH zero start image_id=%s offset=%d size=%d flush=%s",
|
||||
image_id, offset, size, flush,
|
||||
)
|
||||
with _NbdConn(cfg["host"], int(cfg["port"]), cfg.get("export")) as conn:
|
||||
image_size = conn.size()
|
||||
if offset >= image_size:
|
||||
self._send_error_json(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
"offset must be less than image size",
|
||||
)
|
||||
return
|
||||
zero_size = min(size, image_size - offset)
|
||||
conn.pzero(offset, zero_size)
|
||||
if flush:
|
||||
conn.flush()
|
||||
self._send_json(HTTPStatus.OK, {"ok": True})
|
||||
except Exception as e:
|
||||
logging.error("PATCH zero error image_id=%s err=%r", image_id, e)
|
||||
self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error")
|
||||
finally:
|
||||
_WRITE_SEM.release()
|
||||
lock.release()
|
||||
dur = _now_s() - start
|
||||
logging.info("PATCH zero end image_id=%s duration_s=%.3f", image_id, dur)
|
||||
|
||||
def _handle_patch_range(
|
||||
self,
|
||||
image_id: str,
|
||||
cfg: Dict[str, Any],
|
||||
range_header: str,
|
||||
content_length: int,
|
||||
) -> None:
|
||||
"""Write request body to the image at the byte range from Range header."""
|
||||
lock = _get_image_lock(image_id)
|
||||
if not lock.acquire(blocking=False):
|
||||
self._send_error_json(HTTPStatus.CONFLICT, "image busy")
|
||||
return
|
||||
|
||||
if not _WRITE_SEM.acquire(blocking=False):
|
||||
lock.release()
|
||||
self._send_error_json(HTTPStatus.SERVICE_UNAVAILABLE, "too many parallel writes")
|
||||
return
|
||||
|
||||
start = _now_s()
|
||||
bytes_written = 0
|
||||
try:
|
||||
logging.info(
|
||||
"PATCH range start image_id=%s range=%s content_length=%d",
|
||||
image_id, range_header, content_length,
|
||||
)
|
||||
with _NbdConn(cfg["host"], int(cfg["port"]), cfg.get("export")) as conn:
|
||||
image_size = conn.size()
|
||||
try:
|
||||
start_off, end_inclusive = self._parse_single_range(
|
||||
range_header, image_size
|
||||
)
|
||||
except ValueError as e:
|
||||
if "unsatisfiable" in str(e).lower():
|
||||
self._send_range_not_satisfiable(image_size)
|
||||
else:
|
||||
self._send_error_json(
|
||||
HTTPStatus.BAD_REQUEST, f"invalid Range header: {e}"
|
||||
)
|
||||
return
|
||||
expected_len = end_inclusive - start_off + 1
|
||||
if content_length != expected_len:
|
||||
self._send_error_json(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
f"Content-Length ({content_length}) must equal range length ({expected_len})",
|
||||
)
|
||||
return
|
||||
offset = start_off
|
||||
remaining = content_length
|
||||
while remaining > 0:
|
||||
chunk = self.rfile.read(min(CHUNK_SIZE, remaining))
|
||||
if not chunk:
|
||||
self._send_error_json(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
f"request body ended early at {bytes_written} bytes",
|
||||
)
|
||||
return
|
||||
conn.pwrite(chunk, offset)
|
||||
n = len(chunk)
|
||||
offset += n
|
||||
remaining -= n
|
||||
bytes_written += n
|
||||
self._send_json(HTTPStatus.OK, {"ok": True, "bytes_written": bytes_written})
|
||||
except Exception as e:
|
||||
logging.error("PATCH range error image_id=%s err=%r", image_id, e)
|
||||
self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error")
|
||||
finally:
|
||||
_WRITE_SEM.release()
|
||||
lock.release()
|
||||
dur = _now_s() - start
|
||||
logging.info(
|
||||
"PATCH range end image_id=%s bytes=%d duration_s=%.3f",
|
||||
image_id, bytes_written, dur,
|
||||
)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="POC imageio-like HTTP server backed by NBD")
|
||||
|
|
|
|||
Loading…
Reference in New Issue