diff --git a/.gitignore b/.gitignore index 22ef5a7a..53b80e86 100644 --- a/.gitignore +++ b/.gitignore @@ -65,3 +65,6 @@ CHANGELOG.temp # vi / vim swp files *.swp + +# AI reviews +reviews/ diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 00000000..296746b1 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,46 @@ +# Instructions for AI tools on pytroll/trollmoves + +Purpose +- Short guide for AI sessions to help with builds, tests, architecture, and repository-specific conventions. + +Build, test, and lint commands +- Ask the user which Python to use +- Install (editable): pip install -e . +- Install all extras: pip install -e .[all] +- Build sdist/wheel: python setup.py sdist bdist_wheel +- Run full test suite: pytest +- Run a single test: pytest path/to/test_file.py::test_function (e.g. pytest trollmoves/tests/test_example.py::test_xyz) +- Run tests matching name: pytest -k +- Lint: flake8 . (configured via setup.cfg; max-line-length=120) + +High-level architecture +- Core package: `trollmoves/` contains modules for Server, Client, Mirror, Dispatcher, Fetcher and Movers: + - server.py: watches directories and publishes announcements (Posttroll). + - client.py: subscribes and requests transfers from Server. + - mirror.py: bridge between internal/external networks. + - dispatcher.py: pushes local files to configured destinations. + - fetcher.py / s3downloader.py: fetch files from sources (S3, etc.). + - movers.py: implementations for transfer protocols (FileMover, ScpMover, SftpMover, S3Mover, FtpMover). +- Messaging: Posttroll is used to announce and request transfers; systems integrate by publishing/subscribing to Posttroll topics. +- Entry points & scripts: console script `pytroll-fetcher` and bin scripts (move_it_*.py, dispatcher.py, s3downloader.py) provide CLI access. +- Versioning: versioneer is used; version file is `trollmoves/version.py`, tag prefix `v`. + +Key conventions and repo specifics +- Extras groups: defined in setup.py under extras_require (e.g., 's3', 'server', 'remote_fs', 'fetcher', 'all'). Use these to install optional movers/protocol dependencies. +- S3Mover path semantics: trailing slash on destination means “keep original filename”; no trailing slash means the destination's last path segment is the new filename. +- Linting: setup.cfg contains flake8 rules (max-line-length 120) and ignores (RST303, W504). +- Tests: pytest is required; tests live under `trollmoves/tests` (or inside package as a tests package). Tests_require lists pytest-reraise and pytest-bdd when running behavior tests. +- Coverage/packaging: versioneer and version.py are excluded from coverage; packaging scripts and console entry points are defined in setup.py. + +Files of interest for AI tools +- README.md: project overview and mover details (used for architecture cues). +- setup.py / setup.cfg: install, extras, flake8, versioneer config. +- trollmoves/movers.py: look here for protocol-specific logic and S3 behavior. +- bin/* and entry_points: show CLI surface and expected runtime scripts. + +When using AI tools in this repo +- Prefer locating behavior across multiple files: transfers are composed from Posttroll messages (server/client) + mover implementations. +- If changing protocol behavior, update movers.py and add integration tests exercising the server/client flow. +- Respect extras in setup.py when suggesting dependency additions: put optional deps in the correct extras group. + + diff --git a/README.md b/README.md index 66ea3860..9106b1b4 100644 --- a/README.md +++ b/README.md @@ -39,6 +39,14 @@ the messages it publishes, along with a json representation of a `fsspec` filesystem. From there, processes accepting these (eg `trollflow2`) will be able to use `fsspec` to read and process the remote files. +#### Using initial temporary filenames + +To avoid exposing partially-uploaded files, movers can be configured to upload +first to a temporary name and be renamed/activated only after the transfer +completes. See [the mover section](#Using temporary-initial-filenames-in-transfers) +and `examples/move_it_server.ini` for details and examples. + + ### Trollmoves Client Trollmoves Client is configured to subscribe to a specific topic, and to make requests @@ -95,6 +103,7 @@ Required libraries: In addition, the required packages for the transfer protocol(s) to be used. See the mover documentation below for more details. + ## Individual movers The individual movers can be used via the above listed processes, or used directly @@ -146,6 +155,40 @@ analogous to moving a file from one directory to a new destination changing the filename. The new destination filename will be the last part of the provided destination following the last slash ('/'). +### Using temporary initial filenames in transfers + +It is possible to first transfer the files to temporary filenames and +renamed after the transfer. This can be helpful if the consumer does +not use Posttroll messaging to avoid premature reads. These options +are passed via the mover's connection_parameters or attrs dictionary. + +- use_tmp_on_transfer: boolean (default: False) + If true, movers will upload to a temporary destination (see tmp_prefix) + and finalize the transfer by renaming/moving the tmp object to the final + name after successful transfer. +- tmp_prefix: string (default: ".") + Prefix to use for temporary filenames (e.g. ".filename"). + +S3-specific options +- s3_use_multipart: boolean (default: False) + When True and boto3 is available, S3Mover will perform a multipart upload + directly to the final key and CompleteMultipartUpload to make the object + visible atomically. +- s3_use_copy: boolean (default: False) + If multipart uploads are not used, enabling this will finalize an upload + performed to a tmp key by performing a server-side copy (CopyObject) to + the final key and deleting the temporary key. This is compatible with + s3fs or boto3 backends but requires additional permissions. +- s3_multipart_chunksize: integer (default: 8388608) + Chunk size (bytes) used for multipart uploads when boto3 multipart is used. + +Behavior notes +- Multipart uploads (preferred) avoid the extra server-side copy step but + require boto3 and appropriate permissions. Copy+delete is provided as a + fallback for S3-compatible endpoints that do not support multipart. +- The tmp_prefix and use_tmp_on_transfer options are intentionally opt-in to + preserve existing behavior by default. + ## s3downloader diff --git a/examples/dispatch.yaml b/examples/dispatch.yaml index b8d7a77a..dfb3f9af 100644 --- a/examples/dispatch.yaml +++ b/examples/dispatch.yaml @@ -60,6 +60,12 @@ target-s3-example1: secret: "my-super-secret-key" key: "my-access-key" use_ssl: true + + # Optional atomic transfer options for S3 mover: + # s3_use_multipart: True # Prefer multipart upload (boto3 required) + # s3_use_copy: False # If multipart not used, perform copy+delete to finalize tmp key + # tmp_prefix: "." # Prefix used for temporary keys (e.g. ".") + # s3_multipart_chunksize: 8388608 # Chunk size in bytes for multipart uploads aliases: platform_name: Suomi-NPP: npp diff --git a/examples/move_it_server.ini b/examples/move_it_server.ini index 10577780..e818c39e 100644 --- a/examples/move_it_server.ini +++ b/examples/move_it_server.ini @@ -9,6 +9,13 @@ # Put this in connection_parameters dictionary by adding the prefix connection_parameters__ssh_key_filename = /home/username/.ssh/id_rsa +# Optional global defaults for atomic transfers (passed into connection_parameters) +# connection_parameters__use_tmp_on_transfer = False +# connection_parameters__tmp_prefix = . +# connection_parameters__s3_use_multipart = True +# connection_parameters__s3_use_copy = False +# connection_parameters__s3_multipart_chunksize = 8388608 + # Set watchdog polling timeout (interval) in seconds. # Only effective if "-w" commandline argument is given # watchdog_timeout = 2.0 diff --git a/pyproject.toml b/pyproject.toml index 983e52c1..0633d6c5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -98,3 +98,8 @@ convention = "google" [tool.ruff.lint.mccabe] max-complexity = 10 + +[tool.pytest.ini_options] +markers = [ + "slow: marks tests as slow (deselect with '-m \"not slow\"')", +] diff --git a/trollmoves/_mover_utils.py b/trollmoves/_mover_utils.py new file mode 100644 index 00000000..102c0532 --- /dev/null +++ b/trollmoves/_mover_utils.py @@ -0,0 +1,141 @@ +"""Helper utilities for movers to reduce duplication. + +Contains functions to ensure local and remote directories exist. Designed to be small +and dependency-free; supports FTP-like objects (ftplib.FTP) and SFTP-like +(paramiko SFTPClient) objects by duck-typing. +""" + +import ftplib +import os + + +def ensure_local_dir(path): + """Ensure local directory exists for given path. + + If path is a file path, its directory is created. If path is a directory, + it is created. No exception is raised if it already exists. + """ + if not path: + return + dirname = path + if os.path.isfile(path) or os.path.splitext(path)[1]: + dirname = os.path.dirname(path) or "." + os.makedirs(dirname, exist_ok=True) + + +def _ensure_remote_dirs_ftp(connection, parts): + """Handle FTP-like directory creation (internal helper). + + Implements fast path optimization followed by fallback loop. + For FTP connections: try single cwd(full_path) first; if fails, + iterate through parts creating missing directories as needed. + """ + path = "/" + "/".join(parts) + + # Fast path: if the full path already exists, a single cwd is sufficient + try: + connection.cwd(path) + return + except (ftplib.Error, OSError): + pass + + # Build path from root, creating missing segments and only cd'ing when needed + current = "" + for part in parts: + current = current + "/" + part + try: + connection.cwd(current) + except (ftplib.Error, OSError): + # try to create the directory; accept failures silently and proceed + try: + connection.mkd(current) + except (ftplib.Error, OSError): + try: + connection.mkd(part) + except (ftplib.Error, OSError): + pass + # after creating, change into it + try: + connection.cwd(current) + except (ftplib.Error, OSError): + try: + connection.cwd(part) + except (ftplib.Error, OSError): + pass + + +def _ensure_remote_dirs_sftp(connection, parts): + """Handle SFTP-like directory creation (internal helper). + + For SFTP connections: iterate through path parts, stat each path segment + and create with mkdir if it doesn't exist. Silently ignore all errors. + """ + current = "" + for part in parts: + current = current + "/" + part + try: + connection.stat(current) + except OSError: + try: + connection.mkdir(current) + except OSError: + try: + connection.mkdir(part) + except OSError: + pass + + +def ensure_remote_dirs(connection, path): + """Ensure directories exist on a remote connection. + + Supports FTP-like objects (with cwd() and mkd()) and SFTP-like objects + (with stat() and mkdir()). The function is iterative (no recursion). + + Behavior mirrors previous recursive helper: try a single cwd(path) first; if + that succeeds, return with only one cwd call. If it fails, create missing + directories and cd into the final path as needed. + """ + if not path or path == "/": + return + parts = [p for p in path.split("/") if p] + if not parts: + return + + # FTP-like API + if hasattr(connection, "cwd") and hasattr(connection, "mkd"): + _ensure_remote_dirs_ftp(connection, parts) + return + + # SFTP-like API (paramiko.SFTPClient) + if hasattr(connection, "stat") and hasattr(connection, "mkdir"): + _ensure_remote_dirs_sftp(connection, parts) + return + + raise TypeError("Unsupported connection type for ensure_remote_dirs") + + +def ensure_final_directory_for_rename(sftp_connection, final_destination_path): + """Ensure final directory exists for a rename operation on SFTP. + + Used in finalize_atomic_transfer operations to ensure the target directory + exists before renaming a temporary file to its final location. Attempts to + stat each path segment; creates with mkdir if it doesn't exist. Silently + ignores all errors to match existing SFTP behavior in movers. + """ + final_dir = os.path.dirname(final_destination_path) + if not final_dir: + return + + parts = final_dir.split("/") + path = "" + for p in parts: + if not p: + continue + path = path + "/" + p + try: + sftp_connection.stat(path) + except OSError: + try: + sftp_connection.mkdir(path) + except OSError: + pass diff --git a/trollmoves/movers.py b/trollmoves/movers.py index da411aa7..43c577d9 100644 --- a/trollmoves/movers.py +++ b/trollmoves/movers.py @@ -8,7 +8,7 @@ import sys import time import traceback -from ftplib import FTP, all_errors, error_perm +from ftplib import FTP, all_errors from threading import Event, Lock, Thread, current_thread from urllib.parse import urlparse @@ -16,16 +16,26 @@ from s3fs import S3FileSystem except ImportError: S3FileSystem = None +try: + import boto3 +except ImportError: + boto3 = None from trollmoves.utils import clean_url +from ._mover_utils import ensure_final_directory_for_rename, ensure_remote_dirs + S3_ALLOWED_SETTINGS = ["anon", "endpoint_url", "key", "secret", "token", "use_ssl", "s3_additional_kwargs", "client_kwargs", "requester_pays", "default_block_size", "default_fill_cache", "default_cache_type", "version_aware", "cache_regions", "asynchronous", "config_kwargs", "kwargs", "session", "max_concurrency", "fixed_upload_size", - "profile"] + # allow our atomic-transfer and multipart options to pass through sanitize + "s3_use_multipart", "s3_use_copy", "tmp_prefix", "s3_multipart_chunksize"] + +# Keys consumed by S3Mover logic; must not be forwarded to S3FileSystem or boto3 client +_S3_MOVER_INTERNAL_KEYS = frozenset({"s3_use_multipart", "s3_use_copy", "tmp_prefix", "s3_multipart_chunksize"}) LOGGER = logging.getLogger(__name__) @@ -48,35 +58,72 @@ def move_it(pathname, destination, attrs=None, hook=None, rel_path=None, backup_ new_dest = dest_url._replace(path=new_path) fake_dest = clean_url(new_dest) - LOGGER.debug("new_dest = %s", new_dest) LOGGER.debug("Copying to: %s", fake_dest) try: LOGGER.debug("Scheme = %s", str(dest_url.scheme)) - mover = MOVERS[dest_url.scheme] + mover_cls = MOVERS[dest_url.scheme] except KeyError: LOGGER.error("Unsupported protocol '" + str(dest_url.scheme) + "'. Could not copy " + pathname + " to " + str(destination)) raise try: - m = mover(pathname, new_dest, attrs=attrs, backup_targets=backup_targets) - m.copy() - last_dest = m.destination - if last_dest != new_dest: - new_dest = last_dest - fake_dest = clean_url(new_dest) + tmp_dest = _get_tmp_destination(mover_cls, new_dest, attrs) + mover = _create_mover(mover_cls, pathname, new_dest, attrs, backup_targets, tmp_dest) + _copy(mover, new_dest, tmp_dest) if hook: hook(pathname, new_dest) except Exception as err: + # Intentionally broad: logs and re-raises any failure from copy/finalize across all protocols. exc_type, exc_value, exc_traceback = sys.exc_info() LOGGER.error("Something went wrong during copy of %s to %s: %s", pathname, str(fake_dest), str(err)) LOGGER.debug("".join(traceback.format_tb(exc_traceback))) raise err else: - LOGGER.info("Successfully copied %s to %s", - pathname, str(fake_dest)) - return m.destination + LOGGER.info("Successfully copied %s to %s", pathname, str(fake_dest)) + return mover.destination + + +def _create_mover(mover_cls, pathname, new_dest, attrs, backup_targets=None, tmp_dest=None): + if tmp_dest: + return mover_cls(pathname, tmp_dest, attrs=attrs, backup_targets=backup_targets) + return mover_cls(pathname, new_dest, attrs=attrs, backup_targets=backup_targets) + + +def _get_tmp_destination(mover_cls, new_dest, attrs): + use_tmp = bool(attrs and attrs.get("use_tmp_on_transfer")) + if not use_tmp: + return None + + if not mover_cls.supports_atomic(attrs): + LOGGER.error( + "Mover '%s' does not support atomic transfers. " + "Falling back to transfer without temporary files.", + mover_cls.__name__, + ) + return None + + tmp_prefix = attrs.get("tmp_prefix", ".") + tmp_dest = mover_cls.tmp_destination_for(new_dest, tmp_prefix) + + return tmp_dest + + +def _copy(mover, new_dest, tmp_dest=None): + mover.copy() + if tmp_dest: + # finalize: default finalizer works for local schemes; subclasses should override + try: + mover.finalize_atomic_transfer(tmp_dest, new_dest) + except Exception: + # Intentionally broad: must clean up local tmp regardless of protocol error. + # Re-raises so the caller sees the original failure. + try: + if hasattr(tmp_dest, "path") and os.path.exists(tmp_dest.path): + os.remove(tmp_dest.path) + finally: + raise class Mover: @@ -108,30 +155,69 @@ def move(self): raise NotImplementedError("Move for scheme " + self.destination.scheme + " not implemented (yet).") + @classmethod + def supports_atomic(cls, attrs=None): + """Return True if this mover class supports atomic tmp→final transfers. + + The default is False (conservative). Subclasses that implement + finalize_atomic_transfer should override and return True (or, in + the case of S3Mover, inspect *attrs* to decide). + """ + return False + + @staticmethod + def tmp_destination_for(dest, tmp_prefix="."): + """Return a copy of dest with the basename prefixed by tmp_prefix.""" + try: + path = dest.path + except AttributeError: + return dest + dirname = os.path.dirname(path) + basename = os.path.basename(path) + tmp_name = tmp_prefix + basename + return dest._replace(path=os.path.join(dirname, tmp_name)) + + def finalize_atomic_transfer(self, tmp_destination, final_destination): + """Finalize atomic transfer by renaming tmp to final. + + Default implementation works for local filesystems (empty or 'file' scheme). + Subclasses handling remote schemes must override this method. + """ + try: + tmp_path = tmp_destination.path + final_path = final_destination.path + except AttributeError: + raise NotImplementedError("Finalize atomic transfer not implemented for remote schemes") + + final_dir = os.path.dirname(final_path) + if final_dir: + os.makedirs(final_dir, exist_ok=True) + # Use os.replace for atomic rename where possible + os.replace(tmp_path, final_path) + # Update mover's destination to final + self.destination = final_destination + def get_connection(self, hostname, port, username=None): """Get the connection.""" with self.active_connection_lock: - LOGGER.debug("Destination username and passwd: %s %s", - self._dest_username, self._dest_password) - LOGGER.debug("Getting connection to %s@%s:%s", - username, hostname, port) - try: - connection, timer = self.active_connections[(hostname, port, username)] - if not self.is_connected(connection): - del self.active_connections[(hostname, port, username)] - LOGGER.debug("Resetting connection") - connection = self.open_connection() - timer.cancel() - except KeyError: - connection = self.open_connection() - - timer = CTimer(int(self.attrs.get("connection_uptime", 30)), - self.delete_connection, (connection,)) + connection = self._get_connection(hostname, port, username) + timer = CTimer(int(self.attrs.get("connection_uptime", 30)), self.delete_connection, (connection,)) timer.start() self.active_connections[(self.destination.hostname, port, username)] = connection, timer return connection + def _get_connection(self, hostname, port, username=None): + LOGGER.debug("Getting connection to %s@%s:%s", username, hostname, port) + if (hostname, port, username) in self.active_connections: + connection, timer = self.active_connections[(hostname, port, username)] + timer.cancel() + if self.is_connected(connection): + return connection + del self.active_connections[(hostname, port, username)] + LOGGER.debug("Resetting connection") + return self.open_connection() + def delete_connection(self, connection): """Delete active connection *connection*.""" with self.active_connection_lock: @@ -145,16 +231,24 @@ def delete_connection(self, connection): try: self.close_connection(connection) finally: - for key, (current_connection, current_timer) in self.active_connections.items(): - if current_connection == connection: - del self.active_connections[key] - current_timer.cancel() - break + self._remove_connection_from_active_connections(connection) + + def _remove_connection_from_active_connections(self, connection): + for key, (current_connection, current_timer) in self.active_connections.items(): + if current_connection == connection: + del self.active_connections[key] + current_timer.cancel() + break class FileMover(Mover): """Move files in the filesystem.""" + @classmethod + def supports_atomic(cls, attrs=None): + """Local filesystem always supports atomic rename via os.replace.""" + return True + def copy(self): """Copy the file.""" dirname = os.path.dirname(self.destination.path) @@ -208,6 +302,11 @@ class FtpMover(Mover): active_connections = dict() active_connection_lock = Lock() + @classmethod + def supports_atomic(cls, attrs=None): + """FTP supports atomic rename via RNFR/RNTO.""" + return True + def _get_netrc_authentication(self): """Get login authentications from netrc file if available.""" try: @@ -270,24 +369,30 @@ def copy(self): """Upload the file.""" connection = self.get_connection(self.destination.hostname, self.destination.port, self._dest_username) - def cd_tree(current_dir): - if current_dir != "": - try: - connection.cwd(current_dir) - except (IOError, error_perm): - cd_tree("/".join(current_dir.split("/")[:-1])) - connection.mkd(current_dir) - connection.cwd(current_dir) - LOGGER.debug("cd to %s", os.path.dirname(self.destination.path)) destination_dirname, destination_filename = os.path.split(self.destination.path) - cd_tree(destination_dirname) + ensure_remote_dirs(connection, destination_dirname) if not destination_filename: destination_filename = os.path.basename(self.origin) with open(self.origin, "rb") as file_obj: connection.storbinary("STOR " + destination_filename, file_obj) + def finalize_atomic_transfer(self, tmp_destination, final_destination): + """Finalize atomic transfer by renaming tmp -> final on FTP server.""" + connection = self.get_connection(self.destination.hostname, self.destination.port, self._dest_username) + + dest_dirname = os.path.dirname(tmp_destination.path) + tmp_basename = os.path.basename(tmp_destination.path) + final_basename = os.path.basename(final_destination.path) + ensure_remote_dirs(connection, dest_dirname) + try: + connection.rename(tmp_basename, final_basename) + except all_errors as err: + LOGGER.exception("Failed to finalize FTP atomic transfer: %s", str(err)) + raise + self.destination = final_destination + class ScpMover(Mover): """Move files over ssh with scp.""" @@ -295,6 +400,11 @@ class ScpMover(Mover): active_connections = dict() active_connection_lock = Lock() + @classmethod + def supports_atomic(cls, attrs=None): + """SCP supports atomic rename via SFTP rename over the same SSH connection.""" + return True + def open_connection(self): """Open a connection.""" import copy @@ -331,6 +441,8 @@ def open_connection(self): except socket.timeout as sto: LOGGER.exception("SSH connection timed out: %s", str(sto)) except Exception as err: + # Intentionally broad: SSHClient.connect() may raise unexpected exceptions + # (e.g. from underlying transport or third-party SSH agents). LOGGER.exception("Unknown exception at init SSHClient: %s", str(err)) else: return ssh_connection @@ -373,14 +485,15 @@ def move(self): def copy(self): """Upload the file.""" - from scp import SCPClient + from paramiko import SSHException + from scp import SCPClient, SCPException ssh_connection = self.get_connection(self.destination.hostname, self.destination.port or 22, self._dest_username) try: scp = SCPClient(ssh_connection.get_transport()) - except Exception as err: + except (TypeError, SSHException, OSError) as err: LOGGER.error("Failed to initiate SCPClient: %s", str(err)) ssh_connection.close() raise @@ -392,10 +505,11 @@ def copy(self): LOGGER.error("No such file or directory. File not transfered: " "%s. Original error message: %s", self.origin, str(osex)) + return else: LOGGER.error("OSError in scp.put: %s", str(osex)) raise - except Exception as err: + except (SCPException, SSHException) as err: LOGGER.error("Something went wrong with scp: %s", str(err)) LOGGER.error("Exception name %s", type(err).__name__) LOGGER.error("Exception args %s", str(err.args)) @@ -403,10 +517,30 @@ def copy(self): finally: scp.close() + def finalize_atomic_transfer(self, tmp_destination, final_destination): + """Finalize atomic transfer for SCP by performing remote rename via SFTP.""" + ssh_connection = self.get_connection(self.destination.hostname, + self.destination.port or 22, + self._dest_username) + _rename_over_sftp(ssh_connection, tmp_destination, final_destination) + self.destination = final_destination + + +def _rename_over_sftp(ssh_connection, tmp_destination, final_destination): + """Rename tmp_destination to final_destination on remote host via SFTP.""" + with ssh_connection.open_sftp() as sftp: + ensure_final_directory_for_rename(sftp, final_destination.path) + sftp.rename(tmp_destination.path, final_destination.path) + class SftpMover(Mover): """Move files over sftp.""" + @classmethod + def supports_atomic(cls, attrs=None): + """SFTP supports atomic rename.""" + return True + def move(self): """Push the file.""" self.copy() @@ -428,6 +562,20 @@ def copy(self): with ssh.open_sftp() as sftp: sftp.put(self.origin, self.destination.path) + def finalize_atomic_transfer(self, tmp_destination, final_destination): + """Finalize atomic transfer for SFTP by renaming tmp -> final on remote host.""" + import paramiko + with paramiko.SSHClient() as ssh: + ssh.load_system_host_keys() + ssh.connect(self.destination.hostname, + port=self.destination.port or 22, + username=self._dest_username, + allow_agent=True, + key_filename=self.attrs.get("ssh_private_key_file")) + + _rename_over_sftp(ssh, tmp_destination, final_destination) + self.destination = final_destination + class S3Mover(Mover): """Move files to S3 cloud storage. @@ -497,23 +645,120 @@ def __init__(self, origin, destination, attrs=None, backup_targets=None): super().__init__(origin, destination, attrs, backup_targets) self._sanitize_attrs() + @classmethod + def supports_atomic(cls, attrs=None): + """S3 supports atomic transfers only when multipart upload or copy+delete is configured.""" + if not attrs: + return False + return bool(attrs.get("s3_use_multipart")) or bool(attrs.get("s3_use_copy")) + def copy(self): """Copy the file to a bucket.""" - if S3FileSystem is None: - raise ImportError("S3Mover requires 's3fs' to be installed.") - s3 = S3FileSystem(**self.attrs) + if S3FileSystem is None and boto3 is None: + raise ImportError("S3Mover requires 's3fs' or 'boto3' to be installed.") + destination_file_path = self._get_destination() LOGGER.debug("destination_file_path = %s", destination_file_path) - _create_s3_destination_path(s3, destination_file_path) + + if bool(self.attrs.get("s3_use_multipart", False)) and boto3 is not None: + self._multipart_upload(destination_file_path) + return + + # Fallback: use s3fs put to destination_file_path (tmp or final) + if S3FileSystem is None: + raise ImportError("S3Mover requires 's3fs' to be installed for non-multipart operations.") + s3_attrs = {k: v for k, v in self.attrs.items() if k not in _S3_MOVER_INTERNAL_KEYS} + s3 = S3FileSystem(**s3_attrs) LOGGER.debug("Before call to put: destination_file_path = %s", destination_file_path) LOGGER.debug("self.origin = %s", self.origin) + _create_s3_destination_path(s3, destination_file_path) s3.put(self.origin, destination_file_path) + def _build_boto3_client(self): + """Build and return a boto3 S3 client from attrs. + + Reads client_kwargs, key, secret, and token from self.attrs. + Falls back to boto3 default credential chain when key/secret are absent. + """ + client_kwargs = self.attrs.get("client_kwargs", {}) + boto_kwargs = dict(client_kwargs) if isinstance(client_kwargs, dict) else {} + if self.attrs.get("key") and self.attrs.get("secret"): + return boto3.client( + "s3", + aws_access_key_id=self.attrs["key"], + aws_secret_access_key=self.attrs["secret"], + aws_session_token=self.attrs.get("token"), + **boto_kwargs, + ) + return boto3.client("s3", **boto_kwargs) + + def _do_multipart_upload(self, client, bucket, final_key): + """Perform a multipart upload of self.origin to bucket/final_key. + + Uploads in chunks of s3_multipart_chunksize bytes (default 8 MB). + On failure, aborts the multipart upload (best-effort) and re-raises. + """ + from botocore.exceptions import BotoCoreError + from botocore.exceptions import ClientError as BotoCoreClientError + + chunk_size = int(self.attrs.get("s3_multipart_chunksize", 8 * 1024 * 1024)) + upload_id = None + try: + mp = client.create_multipart_upload(Bucket=bucket, Key=final_key) + upload_id = mp["UploadId"] + upload_parts = [] + part_number = 1 + with open(self.origin, "rb") as f: + while True: + data = f.read(chunk_size) + if not data: + break + resp = client.upload_part( + Bucket=bucket, Key=final_key, PartNumber=part_number, + UploadId=upload_id, Body=data, + ) + upload_parts.append({"ETag": resp["ETag"], "PartNumber": part_number}) + part_number += 1 + client.complete_multipart_upload( + Bucket=bucket, Key=final_key, UploadId=upload_id, + MultipartUpload={"Parts": upload_parts}, + ) + except (BotoCoreClientError, BotoCoreError, OSError) as e: + LOGGER.exception("Multipart upload failed: %s", str(e)) + if upload_id is not None: + try: + client.abort_multipart_upload(Bucket=bucket, Key=final_key, UploadId=upload_id) + except (BotoCoreClientError, BotoCoreError): + pass + raise + + def _multipart_upload(self, destination_file_path): + """Orchestrate a boto3 multipart upload. + + Parses destination_file_path into bucket and key, strips the tmp_prefix from the + basename if present to derive the final key, then uploads and updates self.destination. + """ + tmp_prefix = self.attrs.get("tmp_prefix", ".") + path_parts = destination_file_path.split("/") + bucket = path_parts[0] + key = "/".join(path_parts[1:]) if len(path_parts) > 1 else "" + + basename = key.split("/")[-1] if key else "" + if basename.startswith(tmp_prefix): + final_basename = basename[len(tmp_prefix):] + final_key = key.rsplit("/", 1)[0] + "/" + final_basename if "/" in key else final_basename + else: + final_key = key + + client = self._build_boto3_client() + self._do_multipart_upload(client, bucket, final_key) + self.destination = urlparse("s3://" + bucket + "/" + final_key) + + def _sanitize_attrs(self): keys = list(self.attrs.keys()) for key in keys: if key not in S3_ALLOWED_SETTINGS: - LOGGER.debug("S3 Keyword {str(key)} not allowed - remove from attributes.") del self.attrs[key] def _get_destination(self): @@ -532,6 +777,60 @@ def move(self): self.copy() os.remove(self.origin) + def finalize_atomic_transfer(self, tmp_destination, final_destination): + """Finalize atomic transfer for S3. + + If multipart upload was used, copy() already wrote to the final key — just update + self.destination. Otherwise perform a server-side copy+delete to move the tmp key + to the final key (requires s3_use_copy=True). + """ + use_multipart = bool(self.attrs.get("s3_use_multipart", False)) + use_copy = bool(self.attrs.get("s3_use_copy", False)) + + # Derive source (tmp) S3 path from tmp_destination + if tmp_destination: + tmp_bucket = tmp_destination.netloc + tmp_key = tmp_destination.path.lstrip("/") + tmp_path = (tmp_bucket + "/" + tmp_key) if tmp_key else tmp_bucket + else: + tmp_path = self._get_destination() + tmp_parts = tmp_path.split("/") + tmp_bucket = tmp_parts[0] + tmp_key = "/".join(tmp_parts[1:]) if len(tmp_parts) > 1 else "" + + # Derive destination (final) S3 path from final_destination + final_bucket = final_destination.netloc + final_key = final_destination.path.lstrip("/") + final_path = (final_bucket + "/" + final_key) if final_key else final_bucket + + # If multipart upload was used, copy() already wrote to the final key + if use_multipart and boto3 is not None: + self.destination = final_destination + return + + if not use_copy: + raise NotImplementedError("S3 atomic finalize requires either multipart uploads or copy+delete fallback") + + s3_attrs = {k: v for k, v in self.attrs.items() if k not in _S3_MOVER_INTERNAL_KEYS} + + # use s3fs or boto3 to copy and delete tmp key + if S3FileSystem is not None: + s3 = S3FileSystem(**s3_attrs) + s3.copy(tmp_path, final_path) + s3.rm(tmp_path) + self.destination = final_destination + return + + if boto3 is None: + raise ImportError("No S3 backend available for copy+delete finalize") + # boto3 copy_object and delete_object + client = self._build_boto3_client() + copy_source = {"Bucket": tmp_bucket, "Key": tmp_key} + client.copy_object(CopySource=copy_source, Bucket=final_bucket, Key=final_key) + client.delete_object(Bucket=tmp_bucket, Key=tmp_key) + self.destination = final_destination + + def _create_s3_destination_path(s3, destination_file_path): destination_path = os.path.dirname(destination_file_path) diff --git a/trollmoves/tests/test_movers_s3.py b/trollmoves/tests/test_movers_s3.py new file mode 100644 index 00000000..52e02bba --- /dev/null +++ b/trollmoves/tests/test_movers_s3.py @@ -0,0 +1,204 @@ +import os +import tempfile +from unittest.mock import MagicMock, patch +from urllib.parse import urlparse + +import pytest + +from trollmoves.movers import S3Mover + + +def test_s3_multipart_upload(): + # Create a temporary file with some content + with tempfile.NamedTemporaryFile("wb", delete=False) as f: + f.write(b"hello world") + tmpname = f.name + + # Setup mock boto3 client + mock_client = MagicMock() + mock_client.create_multipart_upload.return_value = {"UploadId": "upload123"} + mock_client.upload_part.return_value = {"ETag": "etag-1"} + mock_client.complete_multipart_upload.return_value = {"ResponseMetadata": {"HTTPStatusCode": 200}} + + mock_boto_mod = MagicMock() + mock_boto_mod.client = MagicMock(return_value=mock_client) + + with patch("trollmoves.movers.boto3", new=mock_boto_mod): + # Destination: s3://mybucket/some/path/file + dest = "s3://mybucket/some/path/file" + attrs = {"s3_use_multipart": True, "client_kwargs": {}} + + mover = S3Mover(tmpname, dest, attrs=attrs) + # Should use boto3 multipart upload and complete it + mover.copy() + + # Assertions: multipart calls were made + mock_client.create_multipart_upload.assert_called() + assert mock_client.upload_part.called + mock_client.complete_multipart_upload.assert_called() + + # Destination should be updated to final key + assert mover.destination.scheme == "s3" + assert "mybucket" in mover.destination.netloc or "mybucket" in mover.destination.path + + # Cleanup + os.remove(tmpname) + + +@patch("trollmoves.movers.S3FileSystem") +def test_s3_copy_delete_fallback(mock_s3fs): + # Create a temporary file with some content + with tempfile.NamedTemporaryFile("wb", delete=False) as f: + f.write(b"fallback test") + tmpname = f.name + + # Setup mock s3fs instance + mock_s3 = MagicMock() + mock_s3.exists.return_value = True + mock_s3.copy.return_value = None + mock_s3.rm.return_value = None + mock_s3.put.return_value = None + mock_s3fs.return_value = mock_s3 + + # Destination path that will be used for tmp key + dest = "s3://bucketname/dir/.tmpfile" + attrs = {"s3_use_multipart": False, "s3_use_copy": True, "tmp_prefix": "."} + + mover = S3Mover(tmpname, dest, attrs=attrs) + # copy should call s3.put + mover.copy() + mock_s3.put.assert_called_once() + + # Now finalize: simulate moving tmp key to final + tmp_dest = urlparse("s3://bucketname/dir/.tmpfile") + final_dest = urlparse("s3://bucketname/dir/file") + mover.attrs = attrs + mover.finalize_atomic_transfer(tmp_dest, final_dest) + + # copy+delete should be invoked + mock_s3.copy.assert_called_once() + mock_s3.rm.assert_called_once() + + # Cleanup + os.remove(tmpname) + + +def test_build_boto3_client_default_credentials(): + """_build_boto3_client uses boto3 default chain when no key/secret present.""" + with tempfile.NamedTemporaryFile("wb", delete=False) as f: + tmpname = f.name + + mock_boto_mod = MagicMock() + mock_boto_mod.client = MagicMock(return_value=MagicMock()) + + with patch("trollmoves.movers.boto3", new=mock_boto_mod): + mover = S3Mover(tmpname, "s3://bucket/key", attrs={"client_kwargs": {"endpoint_url": "http://minio"}}) + mover._build_boto3_client() + mock_boto_mod.client.assert_called_once_with("s3", endpoint_url="http://minio") + + os.remove(tmpname) + + +def test_build_boto3_client_explicit_credentials(): + """_build_boto3_client passes key, secret, and token when present.""" + with tempfile.NamedTemporaryFile("wb", delete=False) as f: + tmpname = f.name + + mock_boto_mod = MagicMock() + mock_boto_mod.client = MagicMock(return_value=MagicMock()) + + with patch("trollmoves.movers.boto3", new=mock_boto_mod): + attrs = {"key": "AKID", "secret": "SECRET", "token": "MYTOKEN"} + mover = S3Mover(tmpname, "s3://bucket/key", attrs=attrs) + mover._build_boto3_client() + mock_boto_mod.client.assert_called_once_with( + "s3", + aws_access_key_id="AKID", + aws_secret_access_key="SECRET", + aws_session_token="MYTOKEN", + ) + + os.remove(tmpname) + + +def test_do_multipart_upload_aborts_on_error(): + """_do_multipart_upload aborts the upload when an error occurs mid-upload.""" + with tempfile.NamedTemporaryFile("wb", delete=False) as f: + f.write(b"data") + tmpname = f.name + + from botocore.exceptions import ClientError + + mock_client = MagicMock() + mock_client.create_multipart_upload.return_value = {"UploadId": "uid-abort"} + mock_client.upload_part.side_effect = ClientError( + {"Error": {"Code": "InternalError", "Message": "fail"}}, "UploadPart" + ) + + mock_boto_mod = MagicMock() + with patch("trollmoves.movers.boto3", new=mock_boto_mod): + mover = S3Mover(tmpname, "s3://bucket/key", attrs={}) + with pytest.raises(ClientError): + mover._do_multipart_upload(mock_client, "bucket", "key") + + mock_client.abort_multipart_upload.assert_called_once_with( + Bucket="bucket", Key="key", UploadId="uid-abort" + ) + os.remove(tmpname) + + +def test_do_multipart_upload_no_abort_when_create_fails(): + """_do_multipart_upload does not call abort if create_multipart_upload itself fails.""" + with tempfile.NamedTemporaryFile("wb", delete=False) as f: + tmpname = f.name + + from botocore.exceptions import ClientError + + mock_client = MagicMock() + mock_client.create_multipart_upload.side_effect = ClientError( + {"Error": {"Code": "AccessDenied", "Message": "denied"}}, "CreateMultipartUpload" + ) + + mock_boto_mod = MagicMock() + with patch("trollmoves.movers.boto3", new=mock_boto_mod): + mover = S3Mover(tmpname, "s3://bucket/key", attrs={}) + with pytest.raises(ClientError): + mover._do_multipart_upload(mock_client, "bucket", "key") + + mock_client.abort_multipart_upload.assert_not_called() + os.remove(tmpname) + + +# =========================================================================== +# supports_atomic classmethod for S3Mover +# =========================================================================== + +def test_s3_mover_supports_atomic_no_attrs(): + """S3Mover.supports_atomic returns False when no attrs are provided.""" + assert S3Mover.supports_atomic() is False + assert S3Mover.supports_atomic(attrs=None) is False + + +def test_s3_mover_supports_atomic_empty_attrs(): + """S3Mover.supports_atomic returns False when attrs have no relevant keys.""" + assert S3Mover.supports_atomic(attrs={}) is False + + +def test_s3_mover_supports_atomic_with_multipart(): + """S3Mover.supports_atomic returns True when s3_use_multipart is set.""" + assert S3Mover.supports_atomic(attrs={"s3_use_multipart": True}) is True + + +def test_s3_mover_supports_atomic_with_copy(): + """S3Mover.supports_atomic returns True when s3_use_copy is set.""" + assert S3Mover.supports_atomic(attrs={"s3_use_copy": True}) is True + + +def test_s3_mover_supports_atomic_with_both(): + """S3Mover.supports_atomic returns True when both flags are set.""" + assert S3Mover.supports_atomic(attrs={"s3_use_multipart": True, "s3_use_copy": True}) is True + + +def test_s3_mover_supports_atomic_false_values(): + """S3Mover.supports_atomic returns False when flags are explicitly False.""" + assert S3Mover.supports_atomic(attrs={"s3_use_multipart": False, "s3_use_copy": False}) is False diff --git a/trollmoves/tests/test_movers_use_tmp.py b/trollmoves/tests/test_movers_use_tmp.py new file mode 100644 index 00000000..4b7cb1bf --- /dev/null +++ b/trollmoves/tests/test_movers_use_tmp.py @@ -0,0 +1,498 @@ +"""Tests for the use_tmp workflow and finalize_atomic_transfer methods in movers.""" + +from unittest.mock import MagicMock, patch +from urllib.parse import urlparse + +import pytest + +# --------------------------------------------------------------------------- +# Helpers shared across SSH-based tests +# --------------------------------------------------------------------------- + +def _patch_ssh_for_auto_add_policy(monkeypatch): + """Patch paramiko.SSHClient to accept any host key (AutoAddPolicy).""" + import paramiko + OrigSSHClient = paramiko.SSHClient + + def _new_ssh_client(*args, **kwargs): + client = OrigSSHClient(*args, **kwargs) + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + return client + + monkeypatch.setattr(paramiko, "SSHClient", _new_ssh_client) + + +@pytest.fixture +def source_file(tmp_path): + """A small source file used as the origin for mover tests.""" + path = tmp_path / "source" / "data.txt" + path.parent.mkdir(parents=True) + path.write_text("hello atomic transfer") + return path + + +# =========================================================================== +# Group A – Mover.tmp_destination_for (pure static method, no I/O) +# =========================================================================== + +def test_tmp_destination_for_default_prefix(): + """Default prefix '.' is prepended to the basename.""" + from trollmoves.movers import Mover + dest = urlparse("/some/dir/file.txt") + tmp = Mover.tmp_destination_for(dest, ".") + assert tmp.path == "/some/dir/.file.txt" + assert tmp.scheme == dest.scheme + + +def test_tmp_destination_for_custom_prefix(): + """Custom prefix is prepended to the basename.""" + from trollmoves.movers import Mover + dest = urlparse("file:///some/dir/data.nc") + tmp = Mover.tmp_destination_for(dest, "_tmp_") + assert tmp.path == "/some/dir/_tmp_data.nc" + + +def test_tmp_destination_for_non_path_dest(): + """Fallback: if dest has no .path attribute, dest is returned unchanged.""" + from trollmoves.movers import Mover + + class _NoDotPath: + pass + + obj = _NoDotPath() + result = Mover.tmp_destination_for(obj, ".") + assert result is obj + + +# =========================================================================== +# Group B – Mover base class finalize_atomic_transfer +# =========================================================================== + +def test_base_mover_finalize_raises_for_pathless_dest(): + """Base Mover.finalize_atomic_transfer raises NotImplementedError when + tmp_destination has no .path attribute.""" + from trollmoves.movers import Mover + + class _NoPath: + pass + + mover = object.__new__(Mover) + mover.destination = urlparse("/some/path") + with pytest.raises(NotImplementedError): + mover.finalize_atomic_transfer(_NoPath(), urlparse("/other/path")) + + +# =========================================================================== +# Group C – FileMover (real files on local filesystem) +# =========================================================================== + +def test_file_mover_finalize_atomic_transfer_renames(tmp_path): + """finalize_atomic_transfer renames tmp file to final, preserving content.""" + from trollmoves.movers import FileMover + + tmp_file = tmp_path / ".data.txt" + tmp_file.write_text("content") + final_file = tmp_path / "data.txt" + + mover = FileMover(str(tmp_file), str(tmp_file)) + tmp_dest = urlparse(str(tmp_file)) + final_dest = urlparse(str(final_file)) + + mover.finalize_atomic_transfer(tmp_dest, final_dest) + + assert final_file.read_text() == "content" + assert not tmp_file.exists() + + +def test_file_mover_finalize_creates_missing_final_dir(tmp_path): + """finalize_atomic_transfer creates the final directory if it is absent.""" + from trollmoves.movers import FileMover + + tmp_file = tmp_path / ".data.txt" + tmp_file.write_text("data") + final_dir = tmp_path / "subdir" / "deep" + final_file = final_dir / "data.txt" + + mover = FileMover(str(tmp_file), str(tmp_file)) + mover.finalize_atomic_transfer(urlparse(str(tmp_file)), urlparse(str(final_file))) + + assert final_file.exists() + assert not tmp_file.exists() + + +def test_file_mover_finalize_updates_destination(tmp_path): + """finalize_atomic_transfer updates mover.destination to the final dest.""" + from trollmoves.movers import FileMover + + tmp_file = tmp_path / ".data.txt" + tmp_file.write_text("x") + final_file = tmp_path / "data.txt" + + mover = FileMover(str(tmp_file), str(tmp_file)) + final_dest = urlparse(str(final_file)) + mover.finalize_atomic_transfer(urlparse(str(tmp_file)), final_dest) + + assert mover.destination == final_dest + + +def test_move_it_use_tmp_file_scheme(tmp_path, source_file): + """Full use_tmp round-trip via move_it() with empty (local) scheme.""" + from trollmoves.movers import move_it + + dest_dir = tmp_path / "dest" + dest_dir.mkdir() + destination = str(dest_dir / "data.txt") + attrs = {"use_tmp_on_transfer": True} + + returned = move_it(str(source_file), destination, attrs=attrs) + + assert (dest_dir / "data.txt").exists() + assert not (dest_dir / ".data.txt").exists(), "tmp file should be cleaned up" + assert returned == urlparse(destination) + + +def test_move_it_use_tmp_custom_prefix(tmp_path, source_file): + """Full use_tmp round-trip via move_it() with a custom tmp_prefix.""" + from trollmoves.movers import move_it + + dest_dir = tmp_path / "dest" + dest_dir.mkdir() + destination = str(dest_dir / "data.txt") + attrs = {"use_tmp_on_transfer": True, "tmp_prefix": "incoming_"} + + move_it(str(source_file), destination, attrs=attrs) + + assert (dest_dir / "data.txt").exists() + assert not (dest_dir / "incoming_data.txt").exists() + + +def test_move_it_use_tmp_cleanup_on_finalize_error(tmp_path, source_file, monkeypatch): + """When finalize_atomic_transfer raises, move_it removes the tmp file.""" + from trollmoves.movers import FileMover, move_it + + dest_dir = tmp_path / "dest" + dest_dir.mkdir() + destination = str(dest_dir / "data.txt") + attrs = {"use_tmp_on_transfer": True} + + def _raise(*_): + raise NotImplementedError("simulated") + + monkeypatch.setattr(FileMover, "finalize_atomic_transfer", _raise) + + with pytest.raises(NotImplementedError): + move_it(str(source_file), destination, attrs=attrs) + + # The tmp file must have been cleaned up + assert not (dest_dir / ".data.txt").exists() + # The final destination should not exist either + assert not (dest_dir / "data.txt").exists() + + +def test_move_it_use_tmp_cleanup_on_oserror(tmp_path, source_file, monkeypatch): + """Non-NotImplementedError exceptions from finalize also trigger cleanup.""" + from trollmoves.movers import FileMover, move_it + + dest_dir = tmp_path / "dest" + dest_dir.mkdir() + destination = str(dest_dir / "data.txt") + attrs = {"use_tmp_on_transfer": True} + + def _raise_os(*_): + raise OSError("disk full") + + monkeypatch.setattr(FileMover, "finalize_atomic_transfer", _raise_os) + + with pytest.raises(OSError, match="disk full"): + move_it(str(source_file), destination, attrs=attrs) + + assert not (dest_dir / ".data.txt").exists() + assert not (dest_dir / "data.txt").exists() + + +# =========================================================================== +# Group D – ScpMover (real localhost SSH) +# =========================================================================== + +@pytest.mark.slow +def test_scp_mover_finalize_atomic_transfer(tmp_path, monkeypatch): + """ScpMover.finalize_atomic_transfer performs a remote SFTP rename on localhost.""" + from trollmoves.movers import ScpMover + + _patch_ssh_for_auto_add_policy(monkeypatch) + + tmp_file = tmp_path / ".data.txt" + tmp_file.write_text("scp finalize") + final_file = tmp_path / "data.txt" + + # Origin and initial destination path don't matter for finalize; only hostname does + mover = ScpMover(str(tmp_file), f"scp://localhost{tmp_path}/data.txt") + + tmp_dest = urlparse(f"scp://localhost{tmp_path}/.data.txt") + final_dest = urlparse(f"scp://localhost{tmp_path}/data.txt") + mover.finalize_atomic_transfer(tmp_dest, final_dest) + + assert final_file.read_text() == "scp finalize" + assert not tmp_file.exists() + assert mover.destination == final_dest + + +@pytest.mark.slow +def test_move_it_use_tmp_scp_scheme(tmp_path, source_file, monkeypatch): + """Full use_tmp round-trip via move_it() with scp://localhost destination.""" + from trollmoves.movers import move_it + + _patch_ssh_for_auto_add_policy(monkeypatch) + + dest_dir = tmp_path / "dest_scp" + dest_dir.mkdir() + destination = f"scp://localhost{dest_dir}/data.txt" + attrs = {"use_tmp_on_transfer": True} + + returned = move_it(str(source_file), destination, attrs=attrs) + + assert (dest_dir / "data.txt").exists() + assert not (dest_dir / ".data.txt").exists() + assert returned == urlparse(destination) + + +# =========================================================================== +# Group E – SftpMover (real localhost SSH) +# =========================================================================== + +@pytest.mark.slow +def test_sftp_mover_finalize_atomic_transfer(tmp_path, monkeypatch): + """SftpMover.finalize_atomic_transfer performs a remote SFTP rename on localhost.""" + from trollmoves.movers import SftpMover + + _patch_ssh_for_auto_add_policy(monkeypatch) + + tmp_file = tmp_path / ".data.txt" + tmp_file.write_text("sftp finalize") + final_file = tmp_path / "data.txt" + + mover = SftpMover(str(tmp_file), f"sftp://localhost{tmp_path}/data.txt") + tmp_dest = urlparse(f"sftp://localhost{tmp_path}/.data.txt") + final_dest = urlparse(f"sftp://localhost{tmp_path}/data.txt") + mover.finalize_atomic_transfer(tmp_dest, final_dest) + + assert final_file.read_text() == "sftp finalize" + assert not tmp_file.exists() + assert mover.destination == final_dest + + +@pytest.mark.slow +def test_move_it_use_tmp_sftp_scheme(tmp_path, source_file, monkeypatch): + """Full use_tmp round-trip via move_it() with sftp://localhost destination.""" + from trollmoves.movers import move_it + + _patch_ssh_for_auto_add_policy(monkeypatch) + + dest_dir = tmp_path / "dest_sftp" + dest_dir.mkdir() + destination = f"sftp://localhost{dest_dir}/data.txt" + attrs = {"use_tmp_on_transfer": True} + + returned = move_it(str(source_file), destination, attrs=attrs) + + assert (dest_dir / "data.txt").exists() + assert not (dest_dir / ".data.txt").exists() + assert returned == urlparse(destination) + + +# =========================================================================== +# Group F – FtpMover (mocked FTP connection) +# =========================================================================== + +def test_ftp_mover_finalize_atomic_transfer_rename_args(): + """FtpMover.finalize_atomic_transfer: cwd to dest dir then rename with basenames.""" + from trollmoves.movers import FtpMover + + origin = "/path/to/source.txt" + destination = "ftp://ftphost/remote/dir/source.txt" + mover = FtpMover(origin, destination) + + mock_connection = MagicMock() + tmp_dest = urlparse("ftp://ftphost/remote/dir/.source.txt") + final_dest = urlparse("ftp://ftphost/remote/dir/source.txt") + + with patch.object(mover, "get_connection", return_value=mock_connection): + mover.finalize_atomic_transfer(tmp_dest, final_dest) + + # ensure_remote_dirs cds to the directory; rename uses basenames relative to that cwd + mock_connection.cwd.assert_called_with("/remote/dir") + mock_connection.rename.assert_called_once_with(".source.txt", "source.txt") + + +def test_ftp_mover_finalize_updates_destination(): + """FtpMover.finalize_atomic_transfer updates mover.destination to final.""" + from trollmoves.movers import FtpMover + + mover = FtpMover("/origin.txt", "ftp://ftphost/dir/file.txt") + mock_connection = MagicMock() + tmp_dest = urlparse("ftp://ftphost/dir/.file.txt") + final_dest = urlparse("ftp://ftphost/dir/file.txt") + + with patch.object(mover, "get_connection", return_value=mock_connection): + mover.finalize_atomic_transfer(tmp_dest, final_dest) + + assert mover.destination == final_dest + + +# =========================================================================== +# Group G – S3Mover (mocked S3 backend) +# =========================================================================== + +@patch("trollmoves.movers.S3FileSystem") +def test_s3_mover_finalize_copy_mode(mock_s3fs): + """s3_use_copy=True: finalize uses final_destination directly (not tmp-stripping). + + tmp is in a staging prefix; final is in a completely different path to prove + the implementation reads final_destination rather than reverse-engineering it + from the tmp path. + """ + from trollmoves.movers import S3Mover + + mock_s3 = MagicMock() + mock_s3fs.return_value = mock_s3 + + mover = S3Mover("/local/file.txt", "s3://mybucket/staging/.file.txt", + attrs={"s3_use_copy": True, "tmp_prefix": "."}) + + tmp_dest = urlparse("s3://mybucket/staging/.file.txt") + final_dest = urlparse("s3://mybucket/archive/2024/file.txt") # different dir + mover.finalize_atomic_transfer(tmp_dest, final_dest) + + mock_s3.copy.assert_called_once_with("mybucket/staging/.file.txt", "mybucket/archive/2024/file.txt") + mock_s3.rm.assert_called_once_with("mybucket/staging/.file.txt") + assert mover.destination == urlparse("s3://mybucket/archive/2024/file.txt") + + +@patch("trollmoves.movers.boto3") +def test_s3_mover_finalize_multipart_mode(mock_boto3): + """s3_use_multipart=True: finalize only updates destination, no S3 I/O.""" + from trollmoves.movers import S3Mover + + mover = S3Mover("/local/file.txt", "s3://mybucket/dir/.file.txt", + attrs={"s3_use_multipart": True, "tmp_prefix": "."}) + + tmp_dest = urlparse("s3://mybucket/dir/.file.txt") + final_dest = urlparse("s3://mybucket/dir/file.txt") + mover.finalize_atomic_transfer(tmp_dest, final_dest) + + # No boto3 client calls – the multipart copy() already wrote the final key + mock_boto3.client.assert_not_called() + assert mover.destination == urlparse("s3://mybucket/dir/file.txt") + + +@patch("trollmoves.movers.S3FileSystem", None) +@patch("trollmoves.movers.boto3", None) +def test_s3_mover_finalize_raises_if_unconfigured(): + """finalize raises NotImplementedError when neither multipart nor copy is set.""" + from trollmoves.movers import S3Mover + + mover = S3Mover("/local/file.txt", "s3://mybucket/dir/.file.txt", attrs={}) + tmp_dest = urlparse("s3://mybucket/dir/.file.txt") + final_dest = urlparse("s3://mybucket/dir/file.txt") + + with pytest.raises(NotImplementedError): + mover.finalize_atomic_transfer(tmp_dest, final_dest) + + +# =========================================================================== +# Group H – move_it() FTP end-to-end with use_tmp (mocked) +# =========================================================================== + +@patch("trollmoves.movers.FTP") +def test_move_it_use_tmp_ftp_scheme(mock_ftp_class, tmp_path): + """Full use_tmp round-trip via move_it() with ftp:// destination (mocked FTP).""" + from trollmoves.movers import move_it + + source = tmp_path / "upload.txt" + source.write_text("ftp content") + + mock_ftp = MagicMock() + mock_ftp_class.return_value.__enter__ = lambda s: mock_ftp + mock_ftp_class.return_value.__exit__ = MagicMock(return_value=False) + mock_ftp_class.return_value = mock_ftp + + destination = "ftp://ftphost/remote/dir/upload.txt" + attrs = {"use_tmp_on_transfer": True} + + move_it(str(source), destination, attrs=attrs) + + # copy step: storbinary was called once (for the tmp file) + assert mock_ftp.storbinary.call_count == 1 + store_call_args = mock_ftp.storbinary.call_args[0] + assert store_call_args[0] == "STOR .upload.txt" + + # finalize step: rename was called once from tmp to final + mock_ftp.rename.assert_called_once_with(".upload.txt", "upload.txt") + + +# =========================================================================== +# Group I – supports_atomic classmethod on each mover class +# =========================================================================== + +def test_supports_atomic_file_mover(): + """FileMover always supports atomic transfers.""" + from trollmoves.movers import FileMover + assert FileMover.supports_atomic() is True + assert FileMover.supports_atomic(attrs={}) is True + + +def test_supports_atomic_ftp_mover(): + """FtpMover always supports atomic transfers.""" + from trollmoves.movers import FtpMover + assert FtpMover.supports_atomic() is True + + +def test_supports_atomic_scp_mover(): + """ScpMover always supports atomic transfers.""" + from trollmoves.movers import ScpMover + assert ScpMover.supports_atomic() is True + + +def test_supports_atomic_sftp_mover(): + """SftpMover always supports atomic transfers.""" + from trollmoves.movers import SftpMover + assert SftpMover.supports_atomic() is True + + +def test_supports_atomic_base_mover_returns_false(): + """The base Mover class returns False as a safe default.""" + from trollmoves.movers import Mover + assert Mover.supports_atomic() is False + assert Mover.supports_atomic(attrs={"use_tmp_on_transfer": True}) is False + + +# =========================================================================== +# Group J – move_it() falls back when supports_atomic returns False +# =========================================================================== + +def test_move_it_falls_back_when_atomic_not_supported(tmp_path, monkeypatch, caplog): + """When use_tmp_on_transfer=True but supports_atomic returns False, move_it + falls back to a direct transfer and logs an error.""" + import logging + + from trollmoves.movers import FileMover, move_it + + # Make FileMover report it does not support atomic so we can test the fallback + # without needing a custom protocol. + monkeypatch.setattr(FileMover, "supports_atomic", classmethod(lambda cls, attrs=None: False)) + + source = tmp_path / "source" / "data.txt" + source.parent.mkdir() + source.write_text("fallback content") + dest = tmp_path / "dest" / "data.txt" + + with caplog.at_level(logging.ERROR, logger="trollmoves.movers"): + move_it(str(source), str(dest), attrs={"use_tmp_on_transfer": True}) + + # Final file must exist (transfer succeeded via direct path) + assert dest.exists() + assert dest.read_text() == "fallback content" + # Tmp file must NOT exist + assert not (tmp_path / "dest" / ".data.txt").exists() + # An error must have been logged about the fallback + assert any("does not support atomic" in record.message for record in caplog.records) diff --git a/trollmoves/tests/test_ssh_server.py b/trollmoves/tests/test_ssh_server.py index 1e8d75e5..01727dea 100644 --- a/trollmoves/tests/test_ssh_server.py +++ b/trollmoves/tests/test_ssh_server.py @@ -237,8 +237,9 @@ def test_scp_copy_generic_exception(self, mock_scp_client, mock_sshclient): with pytest.raises(Exception): scp_mover.copy() + @patch("paramiko.SSHClient", autospec=True) @patch("scp.SCPClient", autospec=True) - def test_scp_copy_oserror_exception(self, mock_scp_client): + def test_scp_copy_oserror_exception(self, mock_scp_client, mock_sshclient): """Check scp copy for OSError.""" from trollmoves.movers import ScpMover @@ -248,8 +249,9 @@ def test_scp_copy_oserror_exception(self, mock_scp_client): with pytest.raises(OSError): scp_mover.copy() + @patch("paramiko.SSHClient", autospec=True) @patch("scp.SCPClient", autospec=True) - def test_scp_copy_oserror_exception_errno_2(self, mock_scp_client): + def test_scp_copy_oserror_exception_errno_2(self, mock_scp_client, mock_sshclient): """Check scp copy OSError errno 2.""" from trollmoves.movers import ScpMover