Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
8754433
Add opt-in atomic temporary transfer base support; Mover helper metho…
pnuu Apr 25, 2026
7f74333
Implement finalize_atomic_transfer for FTP, SCP, and SFTP movers; ena…
pnuu Apr 25, 2026
fe58287
S3Mover: add multipart-upload finalization and copy+delete fallback f…
pnuu Apr 25, 2026
ab319a5
Add S3Mover unit tests: multipart upload and copy+delete fallback\n\n…
pnuu Apr 25, 2026
a7882de
Fix S3 multipart test: patch boto3 module object when boto3 may be un…
pnuu Apr 25, 2026
8636d41
Docs: document atomic temporary-transfer options; add commented examp…
pnuu Apr 25, 2026
43366b2
Docs: default s3_use_multipart is False (s3fs-first, opt-in multipart)
pnuu Apr 25, 2026
5c57602
Remove misplaced S3 multipart options from S3 stalker example config
pnuu Apr 25, 2026
bc4cfd7
Use double quotes for string literals on branch-changed lines (safe, …
pnuu Apr 25, 2026
9bbe8e0
refactor(movers): replace recursive directory helpers with iterative …
pnuu Apr 25, 2026
e63b759
refactor(movers): extract final directory creation to common helper
pnuu Apr 25, 2026
21638d1
Remove files/dirs accidentally added and commited
pnuu Apr 25, 2026
21ad7e7
Remove header
pnuu Apr 25, 2026
a9668cb
refactor(mover_utils): extract FTP/SFTP handlers to private methods
pnuu Apr 25, 2026
218a89d
Remove connection parameters from client config example, they are not…
pnuu Apr 25, 2026
3e2f2cb
Clarify readme on temporary filenames
pnuu Apr 25, 2026
90e000a
Re-add client config option example
pnuu Apr 25, 2026
89616c3
Do not include AI reviews directory in the repo
pnuu Apr 26, 2026
273c1bd
Merge branch 'main' into new-atomic-temp-transfer
pnuu Apr 26, 2026
b30c33c
Add AGENTS.md
pnuu Apr 26, 2026
abaf4f3
Add tests for use_tmp workflow and finalize_atomic_transfer methods
lahtinep Apr 27, 2026
87c570d
Fix bugs in atomic transfer methods and improve tests
lahtinep Apr 27, 2026
9879c95
Fix ScpMover.copy() to return None on ENOENT; improve test isolation
lahtinep Apr 27, 2026
73e82bf
Replace catch-all except Exception with specific exception types
lahtinep Apr 27, 2026
e64e9d0
refactor(S3Mover): extract multipart upload to private methods
lahtinep Apr 27, 2026
5616898
Refactor movers.move_it()
lahtinep Apr 27, 2026
1274f45
Use supports_atomic to guard tmp-file transfers
lahtinep Apr 27, 2026
464b22b
Flatten _get_tmp_destination() structure
lahtinep Apr 28, 2026
2642483
Remove unnecessary alias imports
lahtinep Apr 28, 2026
a1fe077
Refactor renaming in SCP/SFTP movers
lahtinep Apr 28, 2026
4952af1
Refactor opening connections
lahtinep Apr 28, 2026
b8f2439
Refactor connection deletion
lahtinep Apr 28, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,6 @@ CHANGELOG.temp

# vi / vim swp files
*.swp

# AI reviews
reviews/
46 changes: 46 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Instructions for AI tools on pytroll/trollmoves

Purpose
- Short guide for AI sessions to help with builds, tests, architecture, and repository-specific conventions.

Build, test, and lint commands
- Ask the user which Python to use
- Install (editable): pip install -e .
- Install all extras: pip install -e .[all]
- Build sdist/wheel: python setup.py sdist bdist_wheel
- Run full test suite: pytest
- Run a single test: pytest path/to/test_file.py::test_function (e.g. pytest trollmoves/tests/test_example.py::test_xyz)
- Run tests matching name: pytest -k <expr>
- Lint: flake8 . (configured via setup.cfg; max-line-length=120)

High-level architecture
- Core package: `trollmoves/` contains modules for Server, Client, Mirror, Dispatcher, Fetcher and Movers:
- server.py: watches directories and publishes announcements (Posttroll).
- client.py: subscribes and requests transfers from Server.
- mirror.py: bridge between internal/external networks.
- dispatcher.py: pushes local files to configured destinations.
- fetcher.py / s3downloader.py: fetch files from sources (S3, etc.).
- movers.py: implementations for transfer protocols (FileMover, ScpMover, SftpMover, S3Mover, FtpMover).
- Messaging: Posttroll is used to announce and request transfers; systems integrate by publishing/subscribing to Posttroll topics.
- Entry points & scripts: console script `pytroll-fetcher` and bin scripts (move_it_*.py, dispatcher.py, s3downloader.py) provide CLI access.
- Versioning: versioneer is used; version file is `trollmoves/version.py`, tag prefix `v`.

Key conventions and repo specifics
- Extras groups: defined in setup.py under extras_require (e.g., 's3', 'server', 'remote_fs', 'fetcher', 'all'). Use these to install optional movers/protocol dependencies.
- S3Mover path semantics: trailing slash on destination means “keep original filename”; no trailing slash means the destination's last path segment is the new filename.
- Linting: setup.cfg contains flake8 rules (max-line-length 120) and ignores (RST303, W504).
- Tests: pytest is required; tests live under `trollmoves/tests` (or inside package as a tests package). Tests_require lists pytest-reraise and pytest-bdd when running behavior tests.
- Coverage/packaging: versioneer and version.py are excluded from coverage; packaging scripts and console entry points are defined in setup.py.

Files of interest for AI tools
- README.md: project overview and mover details (used for architecture cues).
- setup.py / setup.cfg: install, extras, flake8, versioneer config.
- trollmoves/movers.py: look here for protocol-specific logic and S3 behavior.
- bin/* and entry_points: show CLI surface and expected runtime scripts.

When using AI tools in this repo
- Prefer locating behavior across multiple files: transfers are composed from Posttroll messages (server/client) + mover implementations.
- If changing protocol behavior, update movers.py and add integration tests exercising the server/client flow.
- Respect extras in setup.py when suggesting dependency additions: put optional deps in the correct extras group.


43 changes: 43 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ the messages it publishes, along with a json representation of a `fsspec`
filesystem. From there, processes accepting these (eg `trollflow2`) will be able
to use `fsspec` to read and process the remote files.

#### Using initial temporary filenames

To avoid exposing partially-uploaded files, movers can be configured to upload
first to a temporary name and be renamed/activated only after the transfer
completes. See [the mover section](#Using temporary-initial-filenames-in-transfers)
and `examples/move_it_server.ini` for details and examples.


### Trollmoves Client

Trollmoves Client is configured to subscribe to a specific topic, and to make requests
Expand Down Expand Up @@ -95,6 +103,7 @@ Required libraries:
In addition, the required packages for the transfer protocol(s) to be used. See the
mover documentation below for more details.


## Individual movers

The individual movers can be used via the above listed processes, or used directly
Expand Down Expand Up @@ -146,6 +155,40 @@ analogous to moving a file from one directory to a new destination changing the
filename. The new destination filename will be the last part of the provided
destination following the last slash ('/').

### Using temporary initial filenames in transfers

It is possible to first transfer the files to temporary filenames and
renamed after the transfer. This can be helpful if the consumer does
not use Posttroll messaging to avoid premature reads. These options
are passed via the mover's connection_parameters or attrs dictionary.

- use_tmp_on_transfer: boolean (default: False)
If true, movers will upload to a temporary destination (see tmp_prefix)
and finalize the transfer by renaming/moving the tmp object to the final
name after successful transfer.
- tmp_prefix: string (default: ".")
Prefix to use for temporary filenames (e.g. ".filename").

S3-specific options
- s3_use_multipart: boolean (default: False)
When True and boto3 is available, S3Mover will perform a multipart upload
directly to the final key and CompleteMultipartUpload to make the object
visible atomically.
- s3_use_copy: boolean (default: False)
If multipart uploads are not used, enabling this will finalize an upload
performed to a tmp key by performing a server-side copy (CopyObject) to
the final key and deleting the temporary key. This is compatible with
s3fs or boto3 backends but requires additional permissions.
- s3_multipart_chunksize: integer (default: 8388608)
Chunk size (bytes) used for multipart uploads when boto3 multipart is used.

Behavior notes
- Multipart uploads (preferred) avoid the extra server-side copy step but
require boto3 and appropriate permissions. Copy+delete is provided as a
fallback for S3-compatible endpoints that do not support multipart.
- The tmp_prefix and use_tmp_on_transfer options are intentionally opt-in to
preserve existing behavior by default.


## s3downloader

Expand Down
6 changes: 6 additions & 0 deletions examples/dispatch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ target-s3-example1:
secret: "my-super-secret-key"
key: "my-access-key"
use_ssl: true

# Optional atomic transfer options for S3 mover:
# s3_use_multipart: True # Prefer multipart upload (boto3 required)
# s3_use_copy: False # If multipart not used, perform copy+delete to finalize tmp key
# tmp_prefix: "." # Prefix used for temporary keys (e.g. ".")
# s3_multipart_chunksize: 8388608 # Chunk size in bytes for multipart uploads
aliases:
platform_name:
Suomi-NPP: npp
Expand Down
7 changes: 7 additions & 0 deletions examples/move_it_server.ini
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@
# Put this in connection_parameters dictionary by adding the prefix
connection_parameters__ssh_key_filename = /home/username/.ssh/id_rsa

# Optional global defaults for atomic transfers (passed into connection_parameters)
# connection_parameters__use_tmp_on_transfer = False
# connection_parameters__tmp_prefix = .
# connection_parameters__s3_use_multipart = True
# connection_parameters__s3_use_copy = False
# connection_parameters__s3_multipart_chunksize = 8388608

# Set watchdog polling timeout (interval) in seconds.
# Only effective if "-w" commandline argument is given
# watchdog_timeout = 2.0
Expand Down
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,8 @@ convention = "google"

[tool.ruff.lint.mccabe]
max-complexity = 10

[tool.pytest.ini_options]
markers = [
"slow: marks tests as slow (deselect with '-m \"not slow\"')",
]
141 changes: 141 additions & 0 deletions trollmoves/_mover_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
"""Helper utilities for movers to reduce duplication.

Contains functions to ensure local and remote directories exist. Designed to be small
and dependency-free; supports FTP-like objects (ftplib.FTP) and SFTP-like
(paramiko SFTPClient) objects by duck-typing.
"""

import ftplib
import os


def ensure_local_dir(path):
"""Ensure local directory exists for given path.

If path is a file path, its directory is created. If path is a directory,
it is created. No exception is raised if it already exists.
"""
if not path:
return
dirname = path
if os.path.isfile(path) or os.path.splitext(path)[1]:
dirname = os.path.dirname(path) or "."
os.makedirs(dirname, exist_ok=True)


def _ensure_remote_dirs_ftp(connection, parts):
"""Handle FTP-like directory creation (internal helper).

Implements fast path optimization followed by fallback loop.
For FTP connections: try single cwd(full_path) first; if fails,
iterate through parts creating missing directories as needed.
"""
path = "/" + "/".join(parts)

# Fast path: if the full path already exists, a single cwd is sufficient
try:
connection.cwd(path)
return
except (ftplib.Error, OSError):
pass

# Build path from root, creating missing segments and only cd'ing when needed
current = ""
for part in parts:
current = current + "/" + part
try:
connection.cwd(current)
except (ftplib.Error, OSError):
# try to create the directory; accept failures silently and proceed
try:
connection.mkd(current)
except (ftplib.Error, OSError):
try:
connection.mkd(part)
except (ftplib.Error, OSError):
pass
# after creating, change into it
try:
connection.cwd(current)
except (ftplib.Error, OSError):
try:
connection.cwd(part)
except (ftplib.Error, OSError):
pass


def _ensure_remote_dirs_sftp(connection, parts):
"""Handle SFTP-like directory creation (internal helper).

For SFTP connections: iterate through path parts, stat each path segment
and create with mkdir if it doesn't exist. Silently ignore all errors.
"""
current = ""
for part in parts:
current = current + "/" + part
try:
connection.stat(current)
except OSError:
try:
connection.mkdir(current)
except OSError:
try:
connection.mkdir(part)
except OSError:
pass


def ensure_remote_dirs(connection, path):
"""Ensure directories exist on a remote connection.

Supports FTP-like objects (with cwd() and mkd()) and SFTP-like objects
(with stat() and mkdir()). The function is iterative (no recursion).

Behavior mirrors previous recursive helper: try a single cwd(path) first; if
that succeeds, return with only one cwd call. If it fails, create missing
directories and cd into the final path as needed.
"""
if not path or path == "/":
return
parts = [p for p in path.split("/") if p]
if not parts:
return

# FTP-like API
if hasattr(connection, "cwd") and hasattr(connection, "mkd"):
_ensure_remote_dirs_ftp(connection, parts)
return

# SFTP-like API (paramiko.SFTPClient)
if hasattr(connection, "stat") and hasattr(connection, "mkdir"):
_ensure_remote_dirs_sftp(connection, parts)
return

raise TypeError("Unsupported connection type for ensure_remote_dirs")


def ensure_final_directory_for_rename(sftp_connection, final_destination_path):
"""Ensure final directory exists for a rename operation on SFTP.

Used in finalize_atomic_transfer operations to ensure the target directory
exists before renaming a temporary file to its final location. Attempts to
stat each path segment; creates with mkdir if it doesn't exist. Silently
ignores all errors to match existing SFTP behavior in movers.
"""
final_dir = os.path.dirname(final_destination_path)
if not final_dir:
return

parts = final_dir.split("/")
path = ""
for p in parts:
if not p:
continue
path = path + "/" + p
try:
sftp_connection.stat(path)
except OSError:
try:
sftp_connection.mkdir(path)
except OSError:
pass
Loading
Loading