mirror of https://github.com/apache/cloudstack.git
Image server support for file backend (qcow2 upload)
This commit is contained in:
parent
586134d392
commit
6ca1c9b31f
|
|
@ -57,6 +57,7 @@ import javax.naming.ConfigurationException;
|
|||
import org.apache.cloudstack.backup.CreateImageTransferAnswer;
|
||||
import org.apache.cloudstack.backup.CreateImageTransferCommand;
|
||||
import org.apache.cloudstack.backup.FinalizeImageTransferCommand;
|
||||
import org.apache.cloudstack.backup.ImageTransfer;
|
||||
import org.apache.cloudstack.framework.security.keystore.KeystoreManager;
|
||||
import org.apache.cloudstack.storage.NfsMountManagerImpl.PathParser;
|
||||
import org.apache.cloudstack.storage.command.CopyCmdAnswer;
|
||||
|
|
@ -3839,10 +3840,8 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S
|
|||
}
|
||||
|
||||
final String transferId = cmd.getTransferId();
|
||||
|
||||
final String hostIp = cmd.getHostIpAddress();
|
||||
final String exportName = cmd.getExportName();
|
||||
final int nbdPort = cmd.getNbdPort();
|
||||
final ImageTransfer.Backend backend = cmd.getBackend();
|
||||
|
||||
if (StringUtils.isBlank(transferId)) {
|
||||
return new CreateImageTransferAnswer(cmd, false, "transferId is empty.");
|
||||
|
|
@ -3850,18 +3849,25 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S
|
|||
if (StringUtils.isBlank(hostIp)) {
|
||||
return new CreateImageTransferAnswer(cmd, false, "hostIpAddress is empty.");
|
||||
}
|
||||
if (StringUtils.isBlank(exportName)) {
|
||||
return new CreateImageTransferAnswer(cmd, false, "exportName is empty.");
|
||||
}
|
||||
if (nbdPort <= 0) {
|
||||
return new CreateImageTransferAnswer(cmd, false, "Invalid nbdPort: " + nbdPort);
|
||||
}
|
||||
|
||||
final int imageServerPort = 54323;
|
||||
final Map<String, Object> payload = new HashMap<>();
|
||||
payload.put("backend", backend.toString());
|
||||
|
||||
try {
|
||||
// 1) Write /tmp/<transferId> with NBD endpoint details.
|
||||
final Map<String, Object> payload = new HashMap<>();
|
||||
if (backend == ImageTransfer.Backend.file) {
|
||||
final String filePath = cmd.getFile();
|
||||
if (StringUtils.isBlank(filePath)) {
|
||||
return new CreateImageTransferAnswer(cmd, false, "file path is empty for file backend.");
|
||||
}
|
||||
payload.put("file", filePath);
|
||||
} else {
|
||||
final String exportName = cmd.getExportName();
|
||||
final int nbdPort = cmd.getNbdPort();
|
||||
if (StringUtils.isBlank(exportName)) {
|
||||
return new CreateImageTransferAnswer(cmd, false, "exportName is empty.");
|
||||
}
|
||||
if (nbdPort <= 0) {
|
||||
return new CreateImageTransferAnswer(cmd, false, "Invalid nbdPort: " + nbdPort);
|
||||
}
|
||||
payload.put("host", hostIp);
|
||||
payload.put("port", nbdPort);
|
||||
payload.put("export", exportName);
|
||||
|
|
@ -3869,7 +3875,9 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S
|
|||
if (checkpointId != null) {
|
||||
payload.put("export_bitmap", exportName + "-" + checkpointId.substring(0, 4));
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
final String json = new GsonBuilder().create().toJson(payload);
|
||||
File dir = new File("/tmp/imagetransfer");
|
||||
if (!dir.exists()) {
|
||||
|
|
@ -3883,6 +3891,7 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S
|
|||
return new CreateImageTransferAnswer(cmd, false, "Failed to prepare image transfer on SSVM: " + e.getMessage());
|
||||
}
|
||||
|
||||
final int imageServerPort = 54323;
|
||||
startImageServerIfNotRunning(imageServerPort);
|
||||
|
||||
final String transferUrl = String.format("http://%s:%d/images/%s", _publicIp, imageServerPort, transferId);
|
||||
|
|
|
|||
|
|
@ -17,7 +17,11 @@
|
|||
# under the License.
|
||||
|
||||
"""
|
||||
POC "imageio-like" HTTP server backed by NBD over TCP.
|
||||
POC "imageio-like" HTTP server backed by NBD over TCP or a local file.
|
||||
|
||||
Supports two backends (see config payload):
|
||||
- nbd: proxy to an NBD server (port, export, export_bitmap); supports range reads/writes, extents, zero, flush.
|
||||
- file: read/write a local qcow2 (or raw) file path; full PUT only (no range writes), GET with optional ranges, flush.
|
||||
|
||||
How to run
|
||||
----------
|
||||
|
|
@ -116,9 +120,10 @@ _IMAGE_LOCKS: Dict[str, threading.Lock] = {}
|
|||
_IMAGE_LOCKS_GUARD = threading.Lock()
|
||||
|
||||
|
||||
# Dynamic image_id(transferId) -> NBD export mapping:
|
||||
# Dynamic image_id(transferId) -> backend mapping:
|
||||
# CloudStack writes a JSON file at /tmp/imagetransfer/<transferId> with:
|
||||
# {"host": "...", "port": 10809, "export": "vda", "export_bitmap":"bitmap1"}
|
||||
# - NBD backend: {"backend": "nbd", "host": "...", "port": 10809, "export": "vda", "export_bitmap": "..."}
|
||||
# - File backend: {"backend": "file", "file": "/path/to/image.qcow2"}
|
||||
#
|
||||
# This server reads that file on-demand.
|
||||
_CFG_DIR = "/tmp/imagetransfer"
|
||||
|
|
@ -249,26 +254,49 @@ def _load_image_cfg(image_id: str) -> Optional[Dict[str, Any]]:
|
|||
logging.error("cfg invalid type image_id=%s type=%s", image_id, type(obj).__name__)
|
||||
return None
|
||||
|
||||
host = obj.get("host")
|
||||
port = obj.get("port")
|
||||
export = obj.get("export")
|
||||
export_bitmap = obj.get("export_bitmap")
|
||||
if not isinstance(host, str) or not host:
|
||||
logging.error("cfg missing/invalid host image_id=%s", image_id)
|
||||
backend = obj.get("backend")
|
||||
if backend is None:
|
||||
backend = "nbd"
|
||||
if not isinstance(backend, str):
|
||||
logging.error("cfg invalid backend type image_id=%s", image_id)
|
||||
return None
|
||||
try:
|
||||
port_i = int(port)
|
||||
except Exception:
|
||||
logging.error("cfg missing/invalid port image_id=%s", image_id)
|
||||
return None
|
||||
if port_i <= 0 or port_i > 65535:
|
||||
logging.error("cfg out-of-range port image_id=%s port=%r", image_id, port)
|
||||
return None
|
||||
if export is not None and (not isinstance(export, str) or not export):
|
||||
logging.error("cfg missing/invalid export image_id=%s", image_id)
|
||||
backend = backend.lower()
|
||||
if backend not in ("nbd", "file"):
|
||||
logging.error("cfg unsupported backend image_id=%s backend=%s", image_id, backend)
|
||||
return None
|
||||
|
||||
cfg: Dict[str, Any] = {"host": host, "port": port_i, "export": export, "export_bitmap": export_bitmap}
|
||||
if backend == "file":
|
||||
file_path = obj.get("file")
|
||||
if not isinstance(file_path, str) or not file_path.strip():
|
||||
logging.error("cfg missing/invalid file path for file backend image_id=%s", image_id)
|
||||
return None
|
||||
cfg = {"backend": "file", "file": file_path.strip()}
|
||||
else:
|
||||
host = obj.get("host")
|
||||
port = obj.get("port")
|
||||
export = obj.get("export")
|
||||
export_bitmap = obj.get("export_bitmap")
|
||||
if not isinstance(host, str) or not host:
|
||||
logging.error("cfg missing/invalid host image_id=%s", image_id)
|
||||
return None
|
||||
try:
|
||||
port_i = int(port)
|
||||
except Exception:
|
||||
logging.error("cfg missing/invalid port image_id=%s", image_id)
|
||||
return None
|
||||
if port_i <= 0 or port_i > 65535:
|
||||
logging.error("cfg out-of-range port image_id=%s port=%r", image_id, port)
|
||||
return None
|
||||
if export is not None and (not isinstance(export, str) or not export):
|
||||
logging.error("cfg missing/invalid export image_id=%s", image_id)
|
||||
return None
|
||||
cfg = {
|
||||
"backend": "nbd",
|
||||
"host": host,
|
||||
"port": port_i,
|
||||
"export": export,
|
||||
"export_bitmap": export_bitmap,
|
||||
}
|
||||
|
||||
with _CFG_CACHE_GUARD:
|
||||
_CFG_CACHE[safe_id] = (float(st.st_mtime), cfg)
|
||||
|
|
@ -813,6 +841,9 @@ class Handler(BaseHTTPRequestHandler):
|
|||
def _image_cfg(self, image_id: str) -> Optional[Dict[str, Any]]:
|
||||
return _load_image_cfg(image_id)
|
||||
|
||||
def _is_file_backend(self, cfg: Dict[str, Any]) -> bool:
|
||||
return cfg.get("backend") == "file"
|
||||
|
||||
def do_OPTIONS(self) -> None:
|
||||
image_id, tail = self._parse_route()
|
||||
if image_id is None or tail is not None:
|
||||
|
|
@ -822,6 +853,19 @@ class Handler(BaseHTTPRequestHandler):
|
|||
if cfg is None:
|
||||
self._send_error_json(HTTPStatus.NOT_FOUND, "unknown image_id")
|
||||
return
|
||||
if self._is_file_backend(cfg):
|
||||
# File backend: full PUT only, no range writes; GET with ranges allowed; flush supported.
|
||||
allowed_methods = "GET, PUT, POST, OPTIONS"
|
||||
features = ["flush"]
|
||||
max_writers = MAX_PARALLEL_WRITES
|
||||
response = {
|
||||
"unix_socket": None,
|
||||
"features": features,
|
||||
"max_readers": MAX_PARALLEL_READS,
|
||||
"max_writers": max_writers,
|
||||
}
|
||||
self._send_json(HTTPStatus.OK, response, allowed_methods=allowed_methods)
|
||||
return
|
||||
# Query NBD backend for capabilities (like nbdinfo); fall back to config.
|
||||
read_only = True
|
||||
can_flush = False
|
||||
|
|
@ -876,6 +920,11 @@ class Handler(BaseHTTPRequestHandler):
|
|||
return
|
||||
|
||||
if tail == "extents":
|
||||
if self._is_file_backend(cfg):
|
||||
self._send_error_json(
|
||||
HTTPStatus.BAD_REQUEST, "extents not supported for file backend"
|
||||
)
|
||||
return
|
||||
query = self._parse_query()
|
||||
context = (query.get("context") or [None])[0]
|
||||
self._handle_get_extents(image_id, cfg, context=context)
|
||||
|
|
@ -945,6 +994,12 @@ class Handler(BaseHTTPRequestHandler):
|
|||
if cfg is None:
|
||||
self._send_error_json(HTTPStatus.NOT_FOUND, "unknown image_id")
|
||||
return
|
||||
if self._is_file_backend(cfg):
|
||||
self._send_error_json(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
"range writes and PATCH not supported for file backend; use PUT for full upload",
|
||||
)
|
||||
return
|
||||
|
||||
content_type = self.headers.get("Content-Type", "").split(";")[0].strip().lower()
|
||||
range_header = self.headers.get("Range")
|
||||
|
|
@ -1057,9 +1112,14 @@ class Handler(BaseHTTPRequestHandler):
|
|||
bytes_sent = 0
|
||||
try:
|
||||
logging.info("GET start image_id=%s range=%s", image_id, range_header or "-")
|
||||
with _NbdConn(cfg["host"], int(cfg["port"]), cfg.get("export")) as conn:
|
||||
size = conn.size()
|
||||
|
||||
if self._is_file_backend(cfg):
|
||||
file_path = cfg["file"]
|
||||
try:
|
||||
size = os.path.getsize(file_path)
|
||||
except OSError as e:
|
||||
logging.error("GET file size error image_id=%s path=%s err=%r", image_id, file_path, e)
|
||||
self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "failed to access file")
|
||||
return
|
||||
start_off = 0
|
||||
end_off_incl = size - 1 if size > 0 else -1
|
||||
status = HTTPStatus.OK
|
||||
|
|
@ -1089,18 +1149,65 @@ class Handler(BaseHTTPRequestHandler):
|
|||
|
||||
offset = start_off
|
||||
end_excl = end_off_incl + 1
|
||||
while offset < end_excl:
|
||||
to_read = min(CHUNK_SIZE, end_excl - offset)
|
||||
data = conn.pread(to_read, offset)
|
||||
if not data:
|
||||
raise RuntimeError("backend returned empty read")
|
||||
try:
|
||||
self.wfile.write(data)
|
||||
except BrokenPipeError:
|
||||
logging.info("GET client disconnected image_id=%s at=%d", image_id, offset)
|
||||
break
|
||||
offset += len(data)
|
||||
bytes_sent += len(data)
|
||||
with open(file_path, "rb") as f:
|
||||
f.seek(offset)
|
||||
while offset < end_excl:
|
||||
to_read = min(CHUNK_SIZE, end_excl - offset)
|
||||
data = f.read(to_read)
|
||||
if not data:
|
||||
break
|
||||
try:
|
||||
self.wfile.write(data)
|
||||
except BrokenPipeError:
|
||||
logging.info("GET client disconnected image_id=%s at=%d", image_id, offset)
|
||||
break
|
||||
offset += len(data)
|
||||
bytes_sent += len(data)
|
||||
else:
|
||||
with _NbdConn(cfg["host"], int(cfg["port"]), cfg.get("export")) as conn:
|
||||
size = conn.size()
|
||||
|
||||
start_off = 0
|
||||
end_off_incl = size - 1 if size > 0 else -1
|
||||
status = HTTPStatus.OK
|
||||
content_length = size
|
||||
if range_header is not None:
|
||||
try:
|
||||
start_off, end_off_incl = self._parse_single_range(range_header, size)
|
||||
except ValueError as e:
|
||||
if str(e) == "unsatisfiable":
|
||||
self._send_range_not_satisfiable(size)
|
||||
return
|
||||
if "unsatisfiable" in str(e):
|
||||
self._send_range_not_satisfiable(size)
|
||||
return
|
||||
self._send_error_json(HTTPStatus.BAD_REQUEST, "invalid Range header")
|
||||
return
|
||||
status = HTTPStatus.PARTIAL_CONTENT
|
||||
content_length = (end_off_incl - start_off) + 1
|
||||
|
||||
self.send_response(status)
|
||||
self._send_imageio_headers()
|
||||
self.send_header("Content-Type", "application/octet-stream")
|
||||
self.send_header("Content-Length", str(content_length))
|
||||
if status == HTTPStatus.PARTIAL_CONTENT:
|
||||
self.send_header("Content-Range", f"bytes {start_off}-{end_off_incl}/{size}")
|
||||
self.end_headers()
|
||||
|
||||
offset = start_off
|
||||
end_excl = end_off_incl + 1
|
||||
while offset < end_excl:
|
||||
to_read = min(CHUNK_SIZE, end_excl - offset)
|
||||
data = conn.pread(to_read, offset)
|
||||
if not data:
|
||||
raise RuntimeError("backend returned empty read")
|
||||
try:
|
||||
self.wfile.write(data)
|
||||
except BrokenPipeError:
|
||||
logging.info("GET client disconnected image_id=%s at=%d", image_id, offset)
|
||||
break
|
||||
offset += len(data)
|
||||
bytes_sent += len(data)
|
||||
except Exception as e:
|
||||
# If headers already sent, we can't return JSON reliably; just log.
|
||||
logging.error("GET error image_id=%s err=%r", image_id, e)
|
||||
|
|
@ -1132,24 +1239,41 @@ class Handler(BaseHTTPRequestHandler):
|
|||
bytes_written = 0
|
||||
try:
|
||||
logging.info("PUT start image_id=%s content_length=%d", image_id, content_length)
|
||||
with _NbdConn(cfg["host"], int(cfg["port"]), cfg.get("export")) as conn:
|
||||
offset = 0
|
||||
if self._is_file_backend(cfg):
|
||||
file_path = cfg["file"]
|
||||
remaining = content_length
|
||||
while remaining > 0:
|
||||
chunk = self.rfile.read(min(CHUNK_SIZE, remaining))
|
||||
if not chunk:
|
||||
self._send_error_json(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
f"request body ended early at {offset} bytes",
|
||||
)
|
||||
return
|
||||
conn.pwrite(chunk, offset)
|
||||
offset += len(chunk)
|
||||
remaining -= len(chunk)
|
||||
bytes_written += len(chunk)
|
||||
|
||||
# POC-level: do not auto-flush on PUT; expose explicit /flush endpoint.
|
||||
with open(file_path, "wb") as f:
|
||||
while remaining > 0:
|
||||
chunk = self.rfile.read(min(CHUNK_SIZE, remaining))
|
||||
if not chunk:
|
||||
self._send_error_json(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
f"request body ended early at {bytes_written} bytes",
|
||||
)
|
||||
return
|
||||
f.write(chunk)
|
||||
bytes_written += len(chunk)
|
||||
remaining -= len(chunk)
|
||||
self._send_json(HTTPStatus.OK, {"ok": True, "bytes_written": bytes_written})
|
||||
else:
|
||||
with _NbdConn(cfg["host"], int(cfg["port"]), cfg.get("export")) as conn:
|
||||
offset = 0
|
||||
remaining = content_length
|
||||
while remaining > 0:
|
||||
chunk = self.rfile.read(min(CHUNK_SIZE, remaining))
|
||||
if not chunk:
|
||||
self._send_error_json(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
f"request body ended early at {offset} bytes",
|
||||
)
|
||||
return
|
||||
conn.pwrite(chunk, offset)
|
||||
offset += len(chunk)
|
||||
remaining -= len(chunk)
|
||||
bytes_written += len(chunk)
|
||||
|
||||
# POC-level: do not auto-flush on PUT; expose explicit /flush endpoint.
|
||||
self._send_json(HTTPStatus.OK, {"ok": True, "bytes_written": bytes_written})
|
||||
except Exception as e:
|
||||
logging.error("PUT error image_id=%s err=%r", image_id, e)
|
||||
self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error")
|
||||
|
|
@ -1244,9 +1368,16 @@ class Handler(BaseHTTPRequestHandler):
|
|||
start = _now_s()
|
||||
try:
|
||||
logging.info("FLUSH start image_id=%s", image_id)
|
||||
with _NbdConn(cfg["host"], int(cfg["port"]), cfg.get("export")) as conn:
|
||||
conn.flush()
|
||||
self._send_json(HTTPStatus.OK, {"ok": True})
|
||||
if self._is_file_backend(cfg):
|
||||
file_path = cfg["file"]
|
||||
with open(file_path, "rb") as f:
|
||||
f.flush()
|
||||
os.fsync(f.fileno())
|
||||
self._send_json(HTTPStatus.OK, {"ok": True})
|
||||
else:
|
||||
with _NbdConn(cfg["host"], int(cfg["port"]), cfg.get("export")) as conn:
|
||||
conn.flush()
|
||||
self._send_json(HTTPStatus.OK, {"ok": True})
|
||||
except Exception as e:
|
||||
logging.error("FLUSH error image_id=%s err=%r", image_id, e)
|
||||
self._send_error_json(HTTPStatus.INTERNAL_SERVER_ERROR, "backend error")
|
||||
|
|
|
|||
Loading…
Reference in New Issue