Add S3Executor strategy pattern for async S3 operations#684
Merged
laughingman7743 merged 3 commits intomasterfrom Feb 22, 2026
Merged
Add S3Executor strategy pattern for async S3 operations#684laughingman7743 merged 3 commits intomasterfrom
laughingman7743 merged 3 commits intomasterfrom
Conversation
3 tasks
Introduce S3Executor ABC with two implementations: - S3ThreadPoolExecutor: wraps ThreadPoolExecutor (default for sync) - S3AioExecutor: dispatches work via asyncio event loop S3Executor supports the context manager protocol for safe resource cleanup. S3FileSystem methods now use `with` statements instead of manual try/finally for short-lived executors. Replace hardcoded ThreadPoolExecutor usage in S3File and S3FileSystem with the S3Executor interface. This enables async-aware parallel operations without thread-in-thread nesting. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Implement AioS3FileSystem as an fsspec AsyncFileSystem that composes a sync S3FileSystem internally and dispatches operations via asyncio.to_thread and asyncio.gather for parallelism. AioS3File is a minimal S3File subclass — all async behavior is provided by the injected S3AioExecutor, eliminating the need for method overrides. Wire AioS3FSCursor to use AioS3FileSystem and add pluggable filesystem_class parameter to AthenaS3FSResultSet. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Comprehensive async test suite mirroring the sync S3FileSystem tests. Covers filesystem operations (ls, info, find, du, glob, exists, rm, touch, pipe/cat, put/get, cp, move), file-level read/write/append, pandas CSV I/O, sync wrapper validation, and cache invalidation. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
ee63541 to
c23d060
Compare
3 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
S3ExecutorABC with pluggable implementations (S3ThreadPoolExecutorfor sync,S3AioExecutorfor async), replacing hardcodedThreadPoolExecutorinS3FileandS3FileSystemAioS3FileSystem/AioS3Fileusing composition over inheritance — wraps syncS3FileSystemand dispatches viaasyncio.to_thread+asyncio.gather, eliminating thread-in-thread nestingAioS3FSCursorto useAioS3FileSystemwith pluggablefilesystem_classinAthenaS3FSResultSetMotivation
When aio cursors use
S3FileSystem, operations get double-wrapped: the cursor wraps inasyncio.to_thread(), inside whichS3FileSystemspawns more threads viaThreadPoolExecutor. The Strategy pattern abstracts the executor concern so that async paths use the event loop directly.Changes
pyathena/filesystem/s3_executor.pyS3ExecutorABC,S3ThreadPoolExecutor,S3AioExecutorpyathena/filesystem/s3.pyThreadPoolExecutorwithS3Executorinterface, add_create_executor()factorypyathena/filesystem/s3_async.pyAioS3FileSystem(composition-based) andAioS3File(minimal subclass)pyathena/aio/s3fs/cursor.pyAioS3FSCursorto useAioS3FileSystempyathena/s3fs/result_set.pyfilesystem_classparametertests/pyathena/filesystem/test_s3_async.pytest_s3.pyTest plan
make chkpasses (ruff lint, ruff format, mypy)uv run pytest tests/pyathena/filesystem/ -v— 123 tests passed🤖 Generated with Claude Code