mirror of https://github.com/apache/cloudstack.git
image server : support for range puts and blocking writes
This commit is contained in:
parent
b68e541b31
commit
824b05ff0c
|
|
@ -20,8 +20,12 @@
|
|||
POC "imageio-like" HTTP server backed by NBD over Unix socket or a local file.
|
||||
|
||||
Supports two backends (see config payload):
|
||||
- nbd: proxy to an NBD server via Unix socket (socket path, export, export_bitmap); supports range reads/writes, extents, zero, flush.
|
||||
- file: read/write a local qcow2 (or raw) file path; full PUT only (no range writes), GET with optional ranges, flush.
|
||||
- nbd: proxy to an NBD server via Unix socket (socket path, export, export_bitmap);
|
||||
supports range reads/writes (GET/PUT/PATCH), extents, zero, flush. PUT accepts
|
||||
full upload or ranged upload (Content-Range). Concurrent PUTs on the same image
|
||||
are serialized (blocking).
|
||||
- file: read/write a local qcow2 (or raw) file path; full PUT only (no range
|
||||
writes), GET with optional ranges, flush.
|
||||
|
||||
How to run
|
||||
----------
|
||||
|
|
@ -44,8 +48,16 @@ Example curl commands
|
|||
- GET a byte range:
|
||||
curl -v -H "Range: bytes=0-1048575" http://127.0.0.1:54323/images/demo -o first_1MiB.bin
|
||||
|
||||
- PUT full image (Content-Length must equal export size exactly):
|
||||
- PUT full image (Content-Length must equal export size exactly). Optional ?flush=y|n:
|
||||
curl -v -T demo.img http://127.0.0.1:54323/images/demo
|
||||
curl -v -T demo.img "http://127.0.0.1:54323/images/demo?flush=y"
|
||||
|
||||
- PUT ranged (NBD backend only). Content-Range: bytes start-end/* or bytes start-end/size
|
||||
(server uses only start; length from Content-Length). Optional ?flush=y|n:
|
||||
curl -v -X PUT -H "Content-Range: bytes 0-1048575/*" -H "Content-Length: 1048576" \
|
||||
--data-binary @chunk.bin http://127.0.0.1:54323/images/demo
|
||||
curl -v -X PUT -H "Content-Range: bytes 1048576-2097151/52428800" -H "Content-Length: 1048576" \
|
||||
--data-binary @chunk2.bin "http://127.0.0.1:54323/images/demo?flush=n"
|
||||
|
||||
- GET extents (zero/hole extents from NBD base:allocation):
|
||||
curl -s http://127.0.0.1:54323/images/demo/extents | jq .
|
||||
|
|
@ -89,6 +101,7 @@ import argparse
|
|||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
|
|
@ -608,6 +621,9 @@ class Handler(BaseHTTPRequestHandler):
|
|||
server_version = "imageio-poc/0.1"
|
||||
server_protocol = "HTTP/1.1"
|
||||
|
||||
# Accept both "bytes start-end/*" and "bytes start-end/size"; we only use start.
|
||||
_CONTENT_RANGE_RE = re.compile(r"^bytes\s+(\d+)-(\d+)/(?:\*|\d+)$")
|
||||
|
||||
# Keep BaseHTTPRequestHandler from printing noisy default logs
|
||||
def log_message(self, fmt: str, *args: Any) -> None:
|
||||
logging.info("%s - - %s", self.address_string(), fmt % args)
|
||||
|
|
@ -740,6 +756,23 @@ class Handler(BaseHTTPRequestHandler):
|
|||
return None, None
|
||||
return image_id, tail
|
||||
|
||||
def _parse_content_range(self, header: str) -> Tuple[int, int]:
|
||||
"""
|
||||
Parse Content-Range header "bytes start-end/*" or "bytes start-end/size"
|
||||
and return (start, end_inclusive). Raises ValueError on invalid input.
|
||||
"""
|
||||
if not header:
|
||||
raise ValueError("empty Content-Range")
|
||||
m = self._CONTENT_RANGE_RE.match(header.strip())
|
||||
if not m:
|
||||
raise ValueError("invalid Content-Range")
|
||||
start_s, end_s = m.groups()
|
||||
start = int(start_s, 10)
|
||||
end = int(end_s, 10)
|
||||
if start < 0 or end < start:
|
||||
raise ValueError("invalid Content-Range range")
|
||||
return start, end
|
||||
|
||||
def _parse_query(self) -> Dict[str, List[str]]:
|
||||
"""Parse query string from self.path into a dict of name -> list of values."""
|
||||
if "?" not in self.path:
|
||||
|
|
@ -855,9 +888,11 @@ class Handler(BaseHTTPRequestHandler):
|
|||
self._send_error_json(HTTPStatus.NOT_FOUND, "unknown image_id")
|
||||
return
|
||||
|
||||
if self.headers.get("Range") is not None or self.headers.get("Content-Range") is not None:
|
||||
# For PUT we only support Content-Range (for NBD backend); Range is rejected.
|
||||
if self.headers.get("Range") is not None:
|
||||
self._send_error_json(
|
||||
HTTPStatus.BAD_REQUEST, "Range/Content-Range not supported; full writes only"
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
"Range header not supported for PUT; use Content-Range or PATCH",
|
||||
)
|
||||
return
|
||||
|
||||
|
|
@ -874,7 +909,24 @@ class Handler(BaseHTTPRequestHandler):
|
|||
self._send_error_json(HTTPStatus.BAD_REQUEST, "Invalid Content-Length")
|
||||
return
|
||||
|
||||
self._handle_put_image(image_id, cfg, content_length)
|
||||
# Optional flush=y|n query parameter.
|
||||
query = self._parse_query()
|
||||
flush_param = (query.get("flush") or ["n"])[0].lower()
|
||||
flush = flush_param in ("y", "yes", "true", "1")
|
||||
|
||||
content_range_hdr = self.headers.get("Content-Range")
|
||||
if content_range_hdr is not None:
|
||||
if self._is_file_backend(cfg):
|
||||
self._send_error_json(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
"Content-Range PUT not supported for file backend; use full PUT",
|
||||
)
|
||||
return
|
||||
self._handle_put_range(image_id, cfg, content_range_hdr, content_length, flush)
|
||||
return
|
||||
|
||||
# No Content-Range: full image PUT.
|
||||
self._handle_put_image(image_id, cfg, content_length, flush)
|
||||
|
||||
def do_POST(self) -> None:
|
||||
image_id, tail = self._parse_route()
|
||||
|
|
@ -1004,7 +1056,7 @@ class Handler(BaseHTTPRequestHandler):
|
|||
self._handle_patch_zero(image_id, cfg, offset=offset, size=size, flush=flush)
|
||||
|
||||
def _handle_get_image(
|
||||
self, image_id: str, cfg: Dict[str, Any], range_header: Optional[str]
|
||||
self, image_id: str, cfg: Dict[str, Any], range_header: Optional[str]
|
||||
) -> None:
|
||||
if not _READ_SEM.acquire(blocking=False):
|
||||
self._send_error_json(HTTPStatus.SERVICE_UNAVAILABLE, "too many parallel reads")
|
||||
|
|
@ -1125,11 +1177,12 @@ class Handler(BaseHTTPRequestHandler):
|
|||
"GET end image_id=%s bytes=%d duration_s=%.3f", image_id, bytes_sent, dur
|
||||
)
|
||||
|
||||
def _handle_put_image(self, image_id: str, cfg: Dict[str, Any], content_length: int) -> None:
|
||||
def _handle_put_image(
|
||||
self, image_id: str, cfg: Dict[str, Any], content_length: int, flush: bool
|
||||
) -> None:
|
||||
lock = _get_image_lock(image_id)
|
||||
if not lock.acquire(blocking=False):
|
||||
self._send_error_json(HTTPStatus.CONFLICT, "image busy")
|
||||
return
|
||||
# Block until we can write this image
|
||||
lock.acquire()
|
||||
|
||||
if not _WRITE_SEM.acquire(blocking=False):
|
||||
lock.release()
|
||||
|
|
@ -1155,7 +1208,13 @@ class Handler(BaseHTTPRequestHandler):
|
|||
f.write(chunk)
|
||||
bytes_written += len(chunk)
|
||||
remaining -= len(chunk)
|
||||
self._send_json(HTTPStatus.OK, {"ok": True, "bytes_written": bytes_written})
|
||||
if flush:
|
||||
f.flush()
|
||||
os.fsync(f.fileno())
|
||||
self._send_json(
|
||||
HTTPStatus.OK,
|
||||
{"ok": True, "bytes_written": bytes_written, "flushed": flush},
|
||||
)
|
||||
else:
|
||||
with _NbdConn(cfg["socket"], cfg.get("export")) as conn:
|
||||
offset = 0
|
||||
|
|
@ -1173,8 +1232,12 @@ class Handler(BaseHTTPRequestHandler):
|
|||
remaining -= len(chunk)
|
||||
bytes_written += len(chunk)
|
||||
|
||||
# POC-level: do not auto-flush on PUT; expose explicit /flush endpoint.
|
||||
self._send_json(HTTPStatus.OK, {"ok": True, "bytes_written": bytes_written})
|
||||
if flush:
|
||||
conn.flush()
|
||||
self._send_json(
|
||||
HTTPStatus.OK,
|
||||
{"ok": True, "bytes_written": bytes_written, "flushed": flush},
|
||||
)
|
||||
except Exception as e:
|
||||
logging.error("PUT error image_id=%s err=%r", image_id, e)
|
||||
self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error")
|
||||
|
|
@ -1186,6 +1249,49 @@ class Handler(BaseHTTPRequestHandler):
|
|||
"PUT end image_id=%s bytes=%d duration_s=%.3f", image_id, bytes_written, dur
|
||||
)
|
||||
|
||||
def _write_range_nbd(
|
||||
self,
|
||||
image_id: str,
|
||||
cfg: Dict[str, Any],
|
||||
start_off: int,
|
||||
content_length: int,
|
||||
) -> Tuple[int, bool]:
|
||||
"""
|
||||
Low-level helper: write request body to NBD backend starting at start_off.
|
||||
The length is taken from Content-Length. Returns (bytes_written, ok).
|
||||
If ok is False, an error response was already sent.
|
||||
"""
|
||||
bytes_written = 0
|
||||
with _NbdConn(cfg["socket"], cfg.get("export")) as conn:
|
||||
image_size = conn.size()
|
||||
if start_off >= image_size:
|
||||
self._send_range_not_satisfiable(image_size)
|
||||
return 0, False
|
||||
|
||||
# Clamp to image size: we do not allow writes beyond end of image.
|
||||
max_len = image_size - start_off
|
||||
if content_length > max_len:
|
||||
self._send_range_not_satisfiable(image_size)
|
||||
return 0, False
|
||||
|
||||
offset = start_off
|
||||
remaining = content_length
|
||||
while remaining > 0:
|
||||
chunk = self.rfile.read(min(CHUNK_SIZE, remaining))
|
||||
if not chunk:
|
||||
self._send_error_json(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
f"request body ended early at {bytes_written} bytes",
|
||||
)
|
||||
return bytes_written, False
|
||||
conn.pwrite(chunk, offset)
|
||||
n = len(chunk)
|
||||
offset += n
|
||||
remaining -= n
|
||||
bytes_written += n
|
||||
|
||||
return bytes_written, True
|
||||
|
||||
def _handle_get_extents(
|
||||
self, image_id: str, cfg: Dict[str, Any], context: Optional[str] = None
|
||||
) -> None:
|
||||
|
|
@ -1356,40 +1462,28 @@ class Handler(BaseHTTPRequestHandler):
|
|||
)
|
||||
with _NbdConn(cfg["socket"], cfg.get("export")) as conn:
|
||||
image_size = conn.size()
|
||||
try:
|
||||
start_off, end_inclusive = self._parse_single_range(
|
||||
range_header, image_size
|
||||
)
|
||||
except ValueError as e:
|
||||
if "unsatisfiable" in str(e).lower():
|
||||
self._send_range_not_satisfiable(image_size)
|
||||
else:
|
||||
self._send_error_json(
|
||||
HTTPStatus.BAD_REQUEST, f"invalid Range header: {e}"
|
||||
)
|
||||
return
|
||||
expected_len = end_inclusive - start_off + 1
|
||||
if content_length != expected_len:
|
||||
try:
|
||||
start_off, end_inclusive = self._parse_single_range(
|
||||
range_header, image_size
|
||||
)
|
||||
except ValueError as e:
|
||||
if "unsatisfiable" in str(e).lower():
|
||||
self._send_range_not_satisfiable(image_size)
|
||||
else:
|
||||
self._send_error_json(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
f"Content-Length ({content_length}) must equal range length ({expected_len})",
|
||||
HTTPStatus.BAD_REQUEST, f"invalid Range header: {e}"
|
||||
)
|
||||
return
|
||||
offset = start_off
|
||||
remaining = content_length
|
||||
while remaining > 0:
|
||||
chunk = self.rfile.read(min(CHUNK_SIZE, remaining))
|
||||
if not chunk:
|
||||
self._send_error_json(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
f"request body ended early at {bytes_written} bytes",
|
||||
)
|
||||
return
|
||||
conn.pwrite(chunk, offset)
|
||||
n = len(chunk)
|
||||
offset += n
|
||||
remaining -= n
|
||||
bytes_written += n
|
||||
return
|
||||
expected_len = end_inclusive - start_off + 1
|
||||
if content_length != expected_len:
|
||||
self._send_error_json(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
f"Content-Length ({content_length}) must equal range length ({expected_len})",
|
||||
)
|
||||
return
|
||||
bytes_written, ok = self._write_range_nbd(image_id, cfg, start_off, content_length)
|
||||
if not ok:
|
||||
return
|
||||
self._send_json(HTTPStatus.OK, {"ok": True, "bytes_written": bytes_written})
|
||||
except Exception as e:
|
||||
logging.error("PATCH range error image_id=%s err=%r", image_id, e)
|
||||
|
|
@ -1403,6 +1497,71 @@ class Handler(BaseHTTPRequestHandler):
|
|||
image_id, bytes_written, dur,
|
||||
)
|
||||
|
||||
def _handle_put_range(
|
||||
self,
|
||||
image_id: str,
|
||||
cfg: Dict[str, Any],
|
||||
content_range: str,
|
||||
content_length: int,
|
||||
flush: bool,
|
||||
) -> None:
|
||||
"""Handle PUT with Content-Range: bytes start-end/* for NBD backend."""
|
||||
lock = _get_image_lock(image_id)
|
||||
# Block until we can write this image.
|
||||
lock.acquire()
|
||||
|
||||
if not _WRITE_SEM.acquire(blocking=False):
|
||||
lock.release()
|
||||
self._send_error_json(HTTPStatus.SERVICE_UNAVAILABLE, "too many parallel writes")
|
||||
return
|
||||
|
||||
start = _now_s()
|
||||
bytes_written = 0
|
||||
try:
|
||||
logging.info(
|
||||
"PUT range start image_id=%s Content-Range=%s content_length=%d flush=%s",
|
||||
image_id,
|
||||
content_range,
|
||||
content_length,
|
||||
flush,
|
||||
)
|
||||
try:
|
||||
start_off, end_inclusive = self._parse_content_range(content_range)
|
||||
except ValueError as e:
|
||||
self._send_error_json(
|
||||
HTTPStatus.BAD_REQUEST, f"invalid Content-Range header: {e}"
|
||||
)
|
||||
return
|
||||
|
||||
# Per contract we only use the start byte from Content-Range;
|
||||
# length comes from Content-Length.
|
||||
bytes_written, ok = self._write_range_nbd(image_id, cfg, start_off, content_length)
|
||||
if not ok:
|
||||
return
|
||||
|
||||
if flush:
|
||||
with _NbdConn(cfg["socket"], cfg.get("export")) as conn:
|
||||
conn.flush()
|
||||
|
||||
self._send_json(
|
||||
HTTPStatus.OK,
|
||||
{"ok": True, "bytes_written": bytes_written, "flushed": flush},
|
||||
)
|
||||
except Exception as e:
|
||||
logging.error("PUT range error image_id=%s err=%r", image_id, e)
|
||||
self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error")
|
||||
finally:
|
||||
_WRITE_SEM.release()
|
||||
lock.release()
|
||||
dur = _now_s() - start
|
||||
logging.info(
|
||||
"PUT range end image_id=%s bytes=%d duration_s=%.3f flush=%s",
|
||||
image_id,
|
||||
bytes_written,
|
||||
dur,
|
||||
flush,
|
||||
)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="POC imageio-like HTTP server backed by NBD")
|
||||
|
|
|
|||
Loading…
Reference in New Issue