diff --git a/docs/source/api-reference/drivers/opendal.md b/docs/source/api-reference/drivers/opendal.md index 57b497b1f..0c76e66a0 100644 --- a/docs/source/api-reference/drivers/opendal.md +++ b/docs/source/api-reference/drivers/opendal.md @@ -13,11 +13,9 @@ The Opendal driver is a driver for interacting with storages attached to the exp ```{doctest} >>> from tempfile import NamedTemporaryFile >>> opendal.create_dir("test/directory/") ->>> remote_file = opendal.open("test/directory/file", "wb") ->>> with NamedTemporaryFile() as local_file: -... assert local_file.write(b"hello") == 5 -... remote_file.write(local_file.name) ->>> remote_file.close() +>>> opendal.write_bytes("test/directory/file", b"hello") +>>> assert opendal.hash("test/directory/file", "md5") == "5d41402abc4b2a76b9719d911017c592" +>>> opendal.remove_all("test/") ``` ```{testsetup} * diff --git a/packages/jumpstarter-driver-http/jumpstarter_driver_http/client.py b/packages/jumpstarter-driver-http/jumpstarter_driver_http/client.py index e541f3a90..6b431ea72 100644 --- a/packages/jumpstarter-driver-http/jumpstarter_driver_http/client.py +++ b/packages/jumpstarter-driver-http/jumpstarter_driver_http/client.py @@ -1,14 +1,13 @@ from dataclasses import dataclass -from pathlib import Path -from jumpstarter_driver_opendal.adapter import OpendalAdapter +from jumpstarter_driver_composite.client import CompositeClient +from jumpstarter_driver_opendal.common import PathBuf from opendal import Operator - -from jumpstarter.client import DriverClient +from yarl import URL @dataclass(kw_only=True) -class HttpServerClient(DriverClient): +class HttpServerClient(CompositeClient): """Client for the HTTP server driver""" def start(self): @@ -30,59 +29,6 @@ def stop(self): """ self.call("stop") - def list_files(self) -> list[str]: - """ - List all files in the HTTP server's root directory. - - Returns: - list[str]: A list of filenames present in the HTTP server's root directory - """ - return self.call("list_files") - - def put_file(self, filename: str, src_stream): - """ - Upload a file to the HTTP server using a streamed source. - - Args: - filename (str): Name to save the file as on the server. - src_stream: Stream/source to read the file data from. - - Returns: - str: URL of the uploaded file - """ - return self.call("put_file", filename, src_stream) - - def put_local_file(self, filepath: str) -> str: - """ - Upload a file from the local filesystem to the HTTP server. - - Note: This doesn't use HTTP to upload; it streams the file content directly. - - Args: - filepath (str): Path to the local file to upload. - - Returns: - str: Name of the uploaded file - - Example: - >>> client.put_local_file("/path/to/local/file.txt") - """ - absolute = Path(filepath).resolve() - with OpendalAdapter(client=self, operator=Operator("fs", root="/"), path=str(absolute), mode="rb") as handle: - return self.call("put_file", absolute.name, handle) - - def delete_file(self, filename: str) -> str: - """ - Delete a file from the HTTP server. - - Args: - filename (str): Name of the file to delete. - - Returns: - str: Name of the deleted file - """ - return self.call("delete_file", filename) - def get_host(self) -> str: """ Get the host IP address the HTTP server is listening on. @@ -109,3 +55,19 @@ def get_url(self) -> str: str: The base URL of the server """ return self.call("get_url") + + def put_file(self, dst: PathBuf, src: PathBuf, operator: Operator | None = None) -> str: + """ + Upload a file to the HTTP server using a opendal operator as source. + + Args: + dst (PathBuf): Name to save the file as on the server. + src (PathBuf): Name to read the file from opendal operator. + operator (Operator): opendal operator to read the file from, defaults to local fs. + + Returns: + str: URL of the uploaded file + """ + self.storage.write_from_path(dst, src, operator) + + return str(URL(self.get_url()).joinpath(dst)) diff --git a/packages/jumpstarter-driver-http/jumpstarter_driver_http/driver.py b/packages/jumpstarter-driver-http/jumpstarter_driver_http/driver.py index 7c8c4a6ff..7db8623db 100644 --- a/packages/jumpstarter-driver-http/jumpstarter_driver_http/driver.py +++ b/packages/jumpstarter-driver-http/jumpstarter_driver_http/driver.py @@ -1,11 +1,10 @@ import os from dataclasses import dataclass, field -from pathlib import Path from typing import Optional import anyio from aiohttp import web -from anyio.streams.file import FileWriteStream +from jumpstarter_driver_opendal.driver import Opendal from jumpstarter.driver import Driver, export @@ -34,6 +33,8 @@ def __post_init__(self): super().__post_init__() os.makedirs(self.root_dir, exist_ok=True) + + self.children["storage"] = Opendal(scheme="fs", kwargs={"root": self.root_dir}) self.app.router.add_routes( [ web.static("/", self.root_dir), @@ -58,84 +59,6 @@ def client(cls) -> str: """Return the import path of the corresponding client""" return "jumpstarter_driver_http.client.HttpServerClient" - @export - async def put_file(self, filename: str, src_stream) -> str: - """ - Upload a file to the HTTP server. - - Args: - filename (str): Name of the file to upload. - src_stream: Stream of file content. - - Returns: - str: Name of the uploaded file. - - Raises: - HttpServerError: If the target path is invalid. - FileWriteError: If the file upload fails. - """ - try: - file_path = os.path.join(self.root_dir, filename) - - if not Path(file_path).resolve().is_relative_to(Path(self.root_dir).resolve()): - raise HttpServerError("Invalid target path") - - async with await FileWriteStream.from_path(file_path) as dst: - async with self.resource(src_stream, timeout=self.timeout) as src: - async for chunk in src: - await dst.send(chunk) - - self.logger.info(f"File '{filename}' written to '{file_path}'") - return f"{self.get_url()}/{filename}" - - except Exception as e: - self.logger.error(f"Failed to upload file '{filename}': {e}") - raise FileWriteError(f"Failed to upload file '{filename}': {e}") from e - - @export - async def delete_file(self, filename: str) -> str: - """ - Delete a file from the HTTP server. - - Args: - filename (str): Name of the file to delete. - - Returns: - str: Name of the deleted file. - - Raises: - HttpServerError: If the file does not exist or deletion fails. - """ - file_path = Path(self.root_dir) / filename - if not file_path.exists(): - raise HttpServerError(f"File '{filename}' does not exist.") - try: - file_path.unlink() - self.logger.info(f"File '{filename}' has been deleted.") - return filename - except Exception as e: - self.logger.error(f"Failed to delete file '{filename}': {e}") - raise HttpServerError(f"Failed to delete file '{filename}': {e}") from e - - @export - def list_files(self) -> list[str]: - """ - List all files in the root directory. - - Returns: - list[str]: List of filenames in the root directory. - - Raises: - HttpServerError: If listing files fails. - """ - try: - files = os.listdir(self.root_dir) - files = [f for f in files if os.path.isfile(os.path.join(self.root_dir, f))] - return files - except Exception as e: - self.logger.error(f"Failed to list files: {e}") - raise HttpServerError(f"Failed to list files: {e}") from e - @export async def start(self): """ diff --git a/packages/jumpstarter-driver-http/jumpstarter_driver_http/driver_test.py b/packages/jumpstarter-driver-http/jumpstarter_driver_http/driver_test.py index b0dc15aec..da75ff96b 100644 --- a/packages/jumpstarter-driver-http/jumpstarter_driver_http/driver_test.py +++ b/packages/jumpstarter-driver-http/jumpstarter_driver_http/driver_test.py @@ -1,14 +1,8 @@ -import os -import uuid -from tempfile import TemporaryDirectory - import aiohttp -import anyio import pytest -from anyio import create_memory_object_stream from .driver import HttpServer -from jumpstarter.common.resources import ClientStreamResource +from jumpstarter.common.utils import serve @pytest.fixture @@ -17,44 +11,27 @@ def anyio_backend(): @pytest.fixture -def temp_dir(): - with TemporaryDirectory() as tmpdir: - yield tmpdir - - -@pytest.fixture -async def server(temp_dir): - server = HttpServer(root_dir=temp_dir) - await server.start() - try: - yield server - finally: - await server.stop() +def http(tmp_path): + with serve(HttpServer(root_dir=str(tmp_path))) as client: + client.start() + try: + yield client + finally: + client.stop() @pytest.mark.anyio -async def test_http_server(server): +async def test_http_server(http, tmp_path): filename = "test.txt" test_content = b"test content" - send_stream, receive_stream = create_memory_object_stream(max_buffer_size=1024) - - resource_uuid = uuid.uuid4() - server.resources[resource_uuid] = receive_stream - - resource_handle = ClientStreamResource(uuid=resource_uuid).model_dump(mode="json") - - async def send_data(): - await send_stream.send(test_content) - await send_stream.aclose() + (tmp_path / "src").write_bytes(test_content) - async with anyio.create_task_group() as tg: - tg.start_soon(send_data) + uploaded_url = http.put_file(filename, tmp_path / "src") - uploaded_url = await server.put_file(filename, resource_handle) - assert uploaded_url == f"{server.get_url()}/{filename}" + print(http.storage.stat(filename)) - files = server.list_files() + files = list(http.storage.list("/")) assert filename in files async with aiohttp.ClientSession() as session: @@ -63,20 +40,19 @@ async def send_data(): retrieved_content = await response.read() assert retrieved_content == test_content - deleted_filename = await server.delete_file(filename) - assert deleted_filename == filename + http.storage.delete(filename) - files_after_deletion = server.list_files() + files_after_deletion = list(http.storage.list("/")) assert filename not in files_after_deletion -def test_http_server_host_config(temp_dir): +def test_http_server_host_config(tmp_path): custom_host = "192.168.1.1" - server = HttpServer(root_dir=temp_dir, host=custom_host) + server = HttpServer(root_dir=str(tmp_path), host=custom_host) assert server.get_host() == custom_host -def test_http_server_root_directory_creation(temp_dir): - new_dir = os.path.join(temp_dir, "new_http_root") - _ = HttpServer(root_dir=new_dir) - assert os.path.exists(new_dir) +def test_http_server_root_directory_creation(tmp_path): + new_dir = tmp_path / "new_http_root" + _ = HttpServer(root_dir=str(new_dir)) + assert new_dir.exists() diff --git a/packages/jumpstarter-driver-http/pyproject.toml b/packages/jumpstarter-driver-http/pyproject.toml index 1a2afd819..cf3191e25 100644 --- a/packages/jumpstarter-driver-http/pyproject.toml +++ b/packages/jumpstarter-driver-http/pyproject.toml @@ -10,7 +10,9 @@ requires-python = ">=3.12" dependencies = [ "anyio>=4.6.2.post1", "jumpstarter", - "jumpstarter-driver-opendal" + "jumpstarter-driver-composite", + "jumpstarter-driver-opendal", + "yarl>=1.18.3", ] [tool.hatch.version] diff --git a/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/client.py b/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/client.py index 11e3902f4..9b3f23015 100644 --- a/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/client.py +++ b/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/client.py @@ -1,11 +1,15 @@ from __future__ import annotations from collections.abc import Generator +from contextlib import closing from dataclasses import dataclass +from io import BytesIO from pathlib import Path from uuid import UUID import asyncclick as click +from anyio import EndOfStream +from anyio.abc import ObjectStream from opendal import Operator from pydantic import ConfigDict, validate_call @@ -14,6 +18,26 @@ from jumpstarter.client import DriverClient +@dataclass(kw_only=True) +class BytesIOStream(ObjectStream[bytes]): + buf: BytesIO + + async def send(self, item: bytes): + self.buf.write(item) + + async def receive(self) -> bytes: + item = self.buf.read(65535) + if len(item) == 0: + raise EndOfStream + return item + + async def send_eof(self): + pass + + async def aclose(self): + pass + + @dataclass(kw_only=True) class OpendalFile: """ @@ -29,28 +53,46 @@ def __write(self, handle): def __read(self, handle): return self.client.call("file_read", self.fd, handle) + def __fs_operator_for_path(self, path: PathBuf) -> (PathBuf, Operator): + return Path(path).resolve(), Operator("fs", root="/") + @validate_call(validate_return=True, config=ConfigDict(arbitrary_types_allowed=True)) - def write(self, path: PathBuf, operator: Operator | None = None): + def write_from_path(self, path: PathBuf, operator: Operator | None = None): """ Write into remote file with content from local file """ if operator is None: - operator = Operator("fs", root="/") + path, operator = self.__fs_operator_for_path(path) with OpendalAdapter(client=self.client, operator=operator, path=path) as handle: return self.__write(handle) @validate_call(validate_return=True, config=ConfigDict(arbitrary_types_allowed=True)) - def read(self, path: PathBuf, operator: Operator | None = None): + def read_into_path(self, path: PathBuf, operator: Operator | None = None): """ Read content from remote file into local file """ if operator is None: - operator = Operator("fs", root="/") + path, operator = self.__fs_operator_for_path(path) with OpendalAdapter(client=self.client, operator=operator, path=path, mode="wb") as handle: return self.__read(handle) + @validate_call(validate_return=True) + def write_bytes(self, data: bytes) -> None: + buf = BytesIO(data) + with self.client.portal.wrap_async_context_manager(BytesIOStream(buf=buf)) as stream: + with self.client.portal.wrap_async_context_manager(self.client.resource_async(stream)) as handle: + self.__write(handle) + + @validate_call(validate_return=True) + def read_bytes(self) -> bytes: + buf = BytesIO() + with self.client.portal.wrap_async_context_manager(BytesIOStream(buf=buf)) as stream: + with self.client.portal.wrap_async_context_manager(self.client.resource_async(stream)) as handle: + self.__read(handle) + return buf.getvalue() + @validate_call(validate_return=True) def seek(self, pos: int, whence: int = 0) -> int: """ @@ -113,6 +155,26 @@ def writable(self) -> bool: class OpendalClient(DriverClient): + @validate_call(validate_return=True) + def write_bytes(self, /, path: PathBuf, data: bytes) -> None: + with closing(self.open(path, "wb")) as f: + f.write_bytes(data) + + @validate_call(validate_return=True) + def read_bytes(self, /, path: PathBuf) -> bytes: + with closing(self.open(path, "rb")) as f: + return f.read_bytes() + + @validate_call(validate_return=True, config=ConfigDict(arbitrary_types_allowed=True)) + def write_from_path(self, dst: PathBuf, src: PathBuf, operator: Operator | None = None) -> None: + with closing(self.open(dst, "wb")) as f: + f.write_from_path(src, operator) + + @validate_call(validate_return=True, config=ConfigDict(arbitrary_types_allowed=True)) + def read_into_path(self, src: PathBuf, dst: PathBuf, operator: Operator | None = None) -> None: + with closing(self.open(src, "rb")) as f: + f.read_into_path(dst, operator) + @validate_call def open(self, /, path: PathBuf, mode: Mode) -> OpendalFile: """ diff --git a/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/common.py b/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/common.py index d5f68af71..f847e4458 100644 --- a/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/common.py +++ b/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/common.py @@ -24,10 +24,10 @@ def __validate(cls, data: Any): return data def is_file(self) -> bool: - return self._is_file + return self.entry_is_file def is_dir(self) -> bool: - return self._is_dir + return self.entry_is_dir class Metadata(BaseModel): diff --git a/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/driver_test.py b/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/driver_test.py index ea8d432c9..485ec8098 100644 --- a/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/driver_test.py +++ b/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/driver_test.py @@ -1,3 +1,5 @@ +import hashlib +import os from http.server import BaseHTTPRequestHandler, HTTPServer from pathlib import Path from random import randbytes @@ -12,71 +14,125 @@ from jumpstarter.common.utils import serve -def test_drivers_opendal(tmp_path): +@pytest.fixture(scope="function") +def opendal(tmp_path): with serve(Opendal(scheme="fs", kwargs={"root": str(tmp_path)})) as client: - assert not client.capability().presign + yield client - client.create_dir("test_dir/") - client.create_dir("demo_dir/nest_dir/") - assert client.exists("test_dir/") - assert client.exists("demo_dir/nest_dir/") +test_file = "test_file.txt" +test_content = b"hello" - assert client.stat("test_dir/").mode.is_dir - assert sorted(client.list("/")) == ["/", "demo_dir/", "test_dir/"] - assert sorted(client.scan("/")) == ["/", "demo_dir/", "demo_dir/nest_dir/", "test_dir/"] +def test_driver_opendal_read_write_bytes(opendal): + opendal.write_bytes(test_file, test_content) - test_file = client.open("test_dir/test_file", "wb") - assert not test_file.closed - assert not test_file.readable() - assert not test_file.seekable() - assert test_file.writable() + assert opendal.read_bytes(test_file) == test_content + assert opendal.hash(test_file, "md5") == hashlib.md5(test_content).hexdigest() + assert opendal.hash(test_file, "sha256") == hashlib.sha256(test_content).hexdigest() - (tmp_path / "src").write_text("hello") - test_file.write(tmp_path / "src") - test_file.close() - assert test_file.closed +def test_driver_opendal_read_write_path(opendal, tmp_path): + src = tmp_path / "src" + dst = tmp_path / "dst" - test_file = client.open("test_dir/test_file", "rb") - assert not test_file.closed - assert test_file.readable() - assert test_file.seekable() - assert not test_file.writable() + src.write_bytes(test_content) - assert test_file.tell() == 0 - assert test_file.seek(2) == 2 + opendal.write_from_path(test_file, src) + opendal.read_into_path(test_file, dst) - assert client.hash("test_dir/test_file", "md5") == "5d41402abc4b2a76b9719d911017c592" - assert ( - client.hash("test_dir/test_file", "sha256") - == "2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824" - ) + assert dst.read_bytes() == test_content + + +def test_driver_opendal_seek_tell(opendal): + off = -3 + pos = len(test_content) + off + + assert pos >= 0 + + opendal.write_bytes(test_file, test_content) + + file = opendal.open(test_file, "rb") + file.seek(off, os.SEEK_END) + + assert file.tell() == pos + assert file.read_bytes() == test_content[off:] + + file.close() + + +def test_driver_opendal_file_property(opendal): + file = opendal.open(test_file, "wb") + + assert not file.closed + assert not file.readable() + assert not file.seekable() + assert file.writable() + + file.close() + + assert file.closed + + file = opendal.open(test_file, "rb") + + assert not file.closed + assert file.readable() + assert file.seekable() + assert not file.writable() + + file.close() - test_file.read(tmp_path / "dst") - assert (tmp_path / "dst").read_text() == "llo" + assert file.closed - assert client.stat("dst").content_length == 3 - test_file.close() - assert test_file.closed +def test_driver_opendal_file_metadata(opendal): + opendal.write_bytes(test_file, test_content) - client.copy("test_dir/test_file", "test_dir/copy_file") - client.rename("test_dir/copy_file", "test_dir/rename_file") - assert not client.exists("test_dir/copy_file") - assert client.exists("test_dir/rename_file") + assert opendal.exists(test_file) + assert opendal.stat(test_file).mode.is_file() - client.delete("test_dir/rename_file") - assert not client.exists("test_dir/rename_file") + opendal.copy(test_file, "copy_of_test_file") - client.remove_all("test_dir/") - assert not client.exists("test_dir/") + assert opendal.exists("copy_of_test_file") + opendal.rename("copy_of_test_file", "renamed_copy_of_test_file") + + assert not opendal.exists("copy_of_test_file") + assert opendal.exists("renamed_copy_of_test_file") + + opendal.delete("renamed_copy_of_test_file") + + assert not opendal.exists("renamed_copy_of_test_file") + + opendal.create_dir("test_dir/") + + assert opendal.exists("test_dir/") + + assert opendal.stat("test_dir/").mode.is_dir() + + opendal.remove_all("test_dir/") + + assert not opendal.exists("test_dir/") + + +def test_driver_opendal_file_list_scan(opendal): + opendal.create_dir("a/b/c/") + opendal.create_dir("d/e/") + + assert sorted(opendal.list("/")) == ["/", "a/", "d/"] + assert sorted(opendal.scan("/")) == ["/", "a/", "a/b/", "a/b/c/", "d/", "d/e/"] + + +def test_driver_opendal_presign(tmp_path): with serve(Opendal(scheme="http", kwargs={"endpoint": "http://invalid.invalid"})) as client: + capability = client.capability() + + assert capability.presign_read assert client.presign_read("test", 100) == PresignedRequest( url="http://invalid.invalid/test", method="GET", headers={} ) + + assert capability.presign_stat assert client.presign_stat("test", 100) == PresignedRequest( url="http://invalid.invalid/test", method="HEAD", headers={} ) diff --git a/packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/driver_test.py b/packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/driver_test.py index e29b648bb..6067415b0 100644 --- a/packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/driver_test.py +++ b/packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/driver_test.py @@ -21,15 +21,10 @@ def tftp(tmp_path): @pytest.mark.anyio async def test_tftp_file_operations(tftp, tmp_path): - local_filename = "test_src.txt" filename = "test.txt" test_data = b"Hello" - (tmp_path / local_filename).write_bytes(test_data) - - file = tftp.storage.open(filename, "wb") - file.write(str(tmp_path / local_filename)) - file.close() + tftp.storage.write_bytes(filename, test_data) files = list(tftp.storage.list("/")) assert filename in files diff --git a/packages/jumpstarter/jumpstarter/common/grpc.py b/packages/jumpstarter/jumpstarter/common/grpc.py index 382f1e936..8536be200 100644 --- a/packages/jumpstarter/jumpstarter/common/grpc.py +++ b/packages/jumpstarter/jumpstarter/common/grpc.py @@ -35,12 +35,18 @@ def ssl_channel_credentials(target: str, tls_config): def aio_secure_channel(target: str, credentials: grpc.ChannelCredentials): - return grpc.aio.secure_channel(target, credentials, options=( - ("grpc.lb_policy_name", "round_robin"), - ("grpc.keepalive_time_ms", 350000), - ("grpc.keepalive_timeout_ms", 5000), - ("grpc.http2.max_pings_without_data", 5), - ("grpc.keepalive_permit_without_calls", 1))) + return grpc.aio.secure_channel( + target, + credentials, + options=( + ("grpc.lb_policy_name", "round_robin"), + ("grpc.keepalive_time_ms", 350000), + ("grpc.keepalive_timeout_ms", 5000), + ("grpc.http2.max_pings_without_data", 5), + ("grpc.keepalive_permit_without_calls", 1), + ), + ) + def configure_grpc_env(): # disable informative logs by default, i.e.: diff --git a/uv.lock b/uv.lock index 62742f552..f66dceb8d 100644 --- a/uv.lock +++ b/uv.lock @@ -1151,7 +1151,9 @@ source = { editable = "packages/jumpstarter-driver-http" } dependencies = [ { name = "anyio" }, { name = "jumpstarter" }, + { name = "jumpstarter-driver-composite" }, { name = "jumpstarter-driver-opendal" }, + { name = "yarl" }, ] [package.dev-dependencies] @@ -1165,7 +1167,9 @@ dev = [ requires-dist = [ { name = "anyio", specifier = ">=4.6.2.post1" }, { name = "jumpstarter", editable = "packages/jumpstarter" }, + { name = "jumpstarter-driver-composite", editable = "packages/jumpstarter-driver-composite" }, { name = "jumpstarter-driver-opendal", editable = "packages/jumpstarter-driver-opendal" }, + { name = "yarl", specifier = ">=1.18.3" }, ] [package.metadata.requires-dev]