-
Notifications
You must be signed in to change notification settings - Fork 18
tftp,http: support url sources #272
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,111 +1,12 @@ | ||
| from dataclasses import dataclass | ||
| from pathlib import Path | ||
|
|
||
| from jumpstarter_driver_opendal.adapter import OpendalAdapter | ||
| from opendal import Operator | ||
|
|
||
| from jumpstarter.client import DriverClient | ||
| from jumpstarter_driver_opendal.client import FileServerClient | ||
|
|
||
|
|
||
| @dataclass(kw_only=True) | ||
| class HttpServerClient(DriverClient): | ||
| class HttpServerClient(FileServerClient): | ||
| """Client for the HTTP server driver""" | ||
|
|
||
| def start(self): | ||
| """ | ||
| Start the HTTP server. | ||
|
|
||
| Initializes and starts the HTTP server if it's not already running. | ||
| The server will listen on the configured host and port. | ||
| """ | ||
| self.call("start") | ||
|
|
||
| def stop(self): | ||
| """ | ||
| Stop the HTTP server. | ||
|
|
||
| Stops the running HTTP server and releases associated resources. | ||
| Raises: | ||
| ServerNotRunning: If the server is not currently running | ||
| """ | ||
| self.call("stop") | ||
|
|
||
| def list_files(self) -> list[str]: | ||
| """ | ||
| List all files in the HTTP server's root directory. | ||
|
|
||
| Returns: | ||
| list[str]: A list of filenames present in the HTTP server's root directory | ||
| """ | ||
| return self.call("list_files") | ||
|
|
||
| def put_file(self, filename: str, src_stream): | ||
| """ | ||
| Upload a file to the HTTP server using a streamed source. | ||
|
|
||
| Args: | ||
| filename (str): Name to save the file as on the server. | ||
| src_stream: Stream/source to read the file data from. | ||
|
|
||
| Returns: | ||
| str: URL of the uploaded file | ||
| """ | ||
| return self.call("put_file", filename, src_stream) | ||
|
|
||
| def put_local_file(self, filepath: str) -> str: | ||
| """ | ||
| Upload a file from the local filesystem to the HTTP server. | ||
|
|
||
| Note: This doesn't use HTTP to upload; it streams the file content directly. | ||
|
|
||
| Args: | ||
| filepath (str): Path to the local file to upload. | ||
|
|
||
| Returns: | ||
| str: Name of the uploaded file | ||
|
|
||
| Example: | ||
| >>> client.put_local_file("/path/to/local/file.txt") | ||
| """ | ||
| absolute = Path(filepath).resolve() | ||
| with OpendalAdapter(client=self, operator=Operator("fs", root="/"), path=str(absolute), mode="rb") as handle: | ||
| return self.call("put_file", absolute.name, handle) | ||
|
|
||
| def delete_file(self, filename: str) -> str: | ||
| """ | ||
| Delete a file from the HTTP server. | ||
|
|
||
| Args: | ||
| filename (str): Name of the file to delete. | ||
|
|
||
| Returns: | ||
| str: Name of the deleted file | ||
| """ | ||
| return self.call("delete_file", filename) | ||
|
|
||
| def get_host(self) -> str: | ||
| """ | ||
| Get the host IP address the HTTP server is listening on. | ||
|
|
||
| Returns: | ||
| str: The IP address or hostname the server is bound to | ||
| """ | ||
| return self.call("get_host") | ||
|
|
||
| def get_port(self) -> int: | ||
| """ | ||
| Get the port number the HTTP server is listening on. | ||
|
|
||
| Returns: | ||
| int: The port number (default is 8080) | ||
| """ | ||
| return self.call("get_port") | ||
|
|
||
| def get_url(self) -> str: | ||
| """ | ||
| Get the base URL of the HTTP server. | ||
|
|
||
| Returns: | ||
| str: The base URL of the server | ||
| """ | ||
| """Get the base URL of the HTTP server""" | ||
| return self.call("get_url") |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,6 @@ | ||
| import hashlib | ||
| from pathlib import Path | ||
| from urllib.parse import urlparse | ||
|
|
||
| import asyncclick as click | ||
| from opendal import Operator | ||
|
|
@@ -71,3 +73,147 @@ def write_local_file(file): | |
| self.write_local_file(file) | ||
|
|
||
| return base | ||
|
|
||
| CHUNK_SIZE = 4 * 1024 * 1024 | ||
|
|
||
| class FileServerClient(DriverClient): | ||
| """Base client for file server implementations (HTTP, TFTP, etc)""" | ||
|
|
||
| def start(self): | ||
| """Start the file server""" | ||
| self.call("start") | ||
|
|
||
| def stop(self): | ||
| """Stop the file server""" | ||
| self.call("stop") | ||
|
|
||
| def list_files(self) -> list[str]: | ||
| """List files in the server root directory""" | ||
| return self.call("list_files") | ||
|
|
||
| def compute_checksum(self, filepath: str | Path) -> str: | ||
| """ | ||
| Compute SHA256 checksum of a local file | ||
|
|
||
| Args: | ||
| filepath: Path to the file to checksum | ||
|
|
||
| Returns: | ||
| str: Hex digest of SHA256 hash | ||
| """ | ||
| hasher = hashlib.sha256() | ||
| with open(filepath, "rb") as f: | ||
| while chunk := f.read(CHUNK_SIZE): | ||
| hasher.update(chunk) | ||
| return hasher.hexdigest() | ||
|
|
||
| def compute_opendal_checksum(self, operator: Operator, path: str) -> str: | ||
| """ | ||
| Compute SHA256 checksum of a file from an OpenDAL operator | ||
|
|
||
| Args: | ||
| operator: OpenDAL operator to read from | ||
| path: Path within the operator's storage | ||
|
|
||
| Returns: | ||
| str: Hex digest of SHA256 hash | ||
| """ | ||
| hasher = hashlib.sha256() | ||
| with operator.open(path, "rb") as f: | ||
| while chunk := f.read(CHUNK_SIZE): | ||
| hasher.update(chunk) | ||
| return hasher.hexdigest() | ||
|
|
||
| def check_file_checksum(self, filename: str, expected_checksum: str) -> bool: | ||
| """ | ||
| Check if a server-side file matches an expected checksum | ||
|
|
||
| Args: | ||
| filename: Name of file to check | ||
| expected_checksum: Expected SHA256 checksum | ||
|
|
||
| Returns: | ||
| bool: True if checksums match, False otherwise | ||
| """ | ||
| return self.call("check_file_checksum", filename, expected_checksum) | ||
|
|
||
| def put_file(self, filename: str, src_stream, checksum: str | None = None): | ||
| """ | ||
| Upload a file to the server | ||
|
|
||
| Args: | ||
| filename: Name to save the file as | ||
| src_stream: Source stream to read data from | ||
| checksum: Optional SHA256 checksum for verification | ||
| """ | ||
| if checksum is not None: | ||
| try: | ||
| return self.call("put_file", filename, src_stream, checksum) | ||
| except (TypeError, ValueError): | ||
| self.logger.debug("Server does not support checksum verification, falling back to basic upload") | ||
|
|
||
| return self.call("put_file", filename, src_stream) | ||
|
|
||
| def put_file_from_source(self, source: str, checksum: str | None = None): | ||
| """ | ||
| Upload a file from either a local path or URL to the server. | ||
|
|
||
| Args: | ||
| source (str): Local file path or URL to upload | ||
| checksum (str, optional): SHA256 checksum of the file. If provided, | ||
| will be used for verification | ||
| """ | ||
| self.logger.info(f"Starting upload from source: {source}") | ||
|
|
||
| if source.startswith(('http://', 'https://')): | ||
| parsed_url = urlparse(source) | ||
| operator = Operator( | ||
| 'http', | ||
| root='/', | ||
| endpoint=f"{parsed_url.scheme}://{parsed_url.netloc}" | ||
| ) | ||
| filename = parsed_url.path.split('/')[-1] | ||
| path = parsed_url.path | ||
| if path.startswith('/'): | ||
| path = path[1:] | ||
|
|
||
| if checksum is None: | ||
| self.logger.warning("No checksum provided for remote file - skipping verification") | ||
| else: | ||
| operator = Operator('fs', root='/') | ||
| path = str(Path(source).resolve()) | ||
| filename = Path(path).name | ||
|
|
||
| if checksum is None: | ||
| computed_checksum = self.compute_checksum(source) | ||
| self.logger.info(f"Computed checksum for local file {filename}: {computed_checksum}") | ||
| checksum = computed_checksum | ||
| else: | ||
| computed_checksum = self.compute_checksum(source) | ||
| self.logger.info(f"Provided checksum: {checksum}") | ||
| self.logger.info(f"Computed checksum: {computed_checksum}") | ||
| if computed_checksum != checksum: | ||
| self.logger.warning("Checksum mismatch between provided and computed values") | ||
|
|
||
| if checksum and self.check_file_checksum(filename, checksum): | ||
| self.logger.info(f"Skipping upload of identical file: {filename}") | ||
| return filename | ||
|
|
||
| self.logger.info(f"Opening adapter for {filename}") | ||
| with OpendalAdapter(client=self, operator=operator, path=path, mode="rb") as handle: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can be wrong, but I suspect this would make the client download and push it to the exporter through the stream... Can we verify if there's a way to have the exporter download it directly and save the extra hop? i.e. the presigned sources.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @NickCao ^
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so it should be ok for the authless urls i'm testing this with, based on:
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes that's the case here. |
||
| self.logger.info(f"Putting file {filename}") | ||
| result = self.put_file(filename, handle, checksum) | ||
| self.logger.info(f"Completed upload of {filename}") | ||
| return result | ||
|
|
||
| def delete_file(self, filename: str) -> str: | ||
| """Delete a file from the server""" | ||
| return self.call("delete_file", filename) | ||
|
|
||
| def get_host(self) -> str: | ||
| """Get the host address the server is listening on""" | ||
| return self.call("get_host") | ||
|
|
||
| def get_port(self) -> int: | ||
| """Get the port number the server is listening on""" | ||
| return self.call("get_port") | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also generalize the driver side common stuff?
@NickCao do you have some ideas around this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It works the same way, just factor the file server related methods into a base class.