diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 6475d45..6227b6d 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -14,11 +14,11 @@ For large pre-existing STAC archives stored as hive-partitioned parquet director Work is split sharply into two phases. -**Phase 0 — open time** runs at `open()` call time. It does the minimum needed to build a fully-described lazy DataArray: one DuckDB query to discover bands, one DuckDB query to build the time axis, and a grid computation. No pixel I/O happens. No dask task graph is built. A single `MultiBandStacBackendArray` is wrapped in an xarray `LazilyIndexedArray`. +**Phase 0 — open time** runs at `open()` call time. It does the minimum needed to build a fully-described lazy DataArray: one DuckDB query to discover bands, one DuckDB query to build the time axis, a small object-store access smoketest, and a grid computation. No pixel I/O happens. No dask task graph is built. A single `MultiBandStacBackendArray` is wrapped in an xarray `LazilyIndexedArray`. **Phase 1 — compute time** runs inside a dask worker when a chunk is actually computed. `MultiBandStacBackendArray.__getitem__` receives the exact `(band, time, y, x)` index for the chunk, derives the chunk's spatial footprint, queries DuckDB for only the COGs that overlap that footprint and time step, reads and reprojects all selected bands concurrently with `asyncio`, and mosaics the results. -This split means that `open()` is nearly instant even for large queries, and the DuckDB spatial filter runs once per chunk rather than over the entire bbox at open time. +This split means that `open()` is nearly instant even for large queries, and the DuckDB spatial filter runs once per chunk rather than over the entire bbox at open time. The one exception to the "metadata-only" story is the storage smoketest: `open()` issues a single `head()` against one representative asset so authentication and object-store wiring fail early with a clear error. ## Module overview @@ -27,12 +27,12 @@ src/lazycogs/ _core.py Entry point. open(), band discovery, time-step building. _backend.py MultiBandStacBackendArray — xarray BackendArray implementation that bridges xarray indexing to chunk reads. _chunk_reader.py Async mosaic logic: open COGs, select overviews, read windows, reproject, mosaic. - _executor.py Per-chunk reprojection thread pool configuration. Exposes set_reproject_workers() and get_max_workers(); the actual pool is created per event loop in _backend.py. + _executor.py Shared background loop and bounded executors. Exposes run_on_loop(). _explain.py Dry-run read estimator. Registers the da.lazycogs.explain() xarray accessor. _grid.py Compute output affine transform and dimensions from bbox + resolution. _reproject.py Nearest-neighbor reprojection using pyproj Transformer + numpy fancy indexing. _storage_ext.py STAC Storage Extension metadata parsing (version detection, kwargs extraction for v1 and v2). - _store.py Resolve cloud HREFs into obstore Store instances (or route through a user-supplied store) with a thread-local cache; store_for() factory for constructing stores from parquet STAC files. + _store.py Resolve cloud HREFs into obstore Store instances (or route through a user-supplied store) with a shared cache; store_for() factory for constructing stores from parquet STAC files. _temporal.py Temporal grouping strategies (day, week, month, year, fixed-day-count). _mosaic_methods.py Pixel-selection strategies (First, Highest, Lowest, Mean, Median, Stdev, Count). ``` @@ -55,7 +55,7 @@ src/lazycogs/ ## Explain: dry-run read estimator -`da.lazycogs.explain()` (registered in `_explain.py` as an xarray accessor) provides an `EXPLAIN ANALYZE`-style dry run. It issues one DuckDB spatial query per `(time step, spatial chunk)` combination — not per band, because the query result is band-independent. Query results are fanned across all active bands in Python, and execution stops before any pixel I/O. All `(time × spatial tile)` queries are dispatched concurrently via `asyncio.gather`; DuckDB itself serialises access on a single connection. +`da.lazycogs.explain()` (registered in `_explain.py` as an xarray accessor) provides an `EXPLAIN ANALYZE`-style dry run. It issues one DuckDB spatial query per `(time step, spatial chunk)` combination — not per band, because the query result is band-independent. Query results are fanned across all active bands in Python, and execution stops before any pixel I/O. All `(time × spatial tile)` queries are dispatched concurrently via `asyncio.gather`, while DuckDB submission is routed through the same small bounded helper used by backend reads. ```python plan = da.lazycogs.explain() # DuckDB queries only, no pixel reads @@ -79,8 +79,8 @@ With `fetch_headers=True`, each matched COG header is fetched (a small HTTP rang `MultiBandStacBackendArray.__getitem__` in `_backend.py`: -1. xarray calls `__getitem__` with an `ExplicitIndexer`. The call is forwarded through `indexing.explicit_indexing_adapter` to `_sync_getitem`, which calls `_run_coroutine(self._async_getitem(key))` with a basic `(int | slice, int | slice, int | slice, int | slice)` key for `(band, time, y, x)`. - When the caller is already inside an async event loop (for example, an interactive map running in Jupyter), xarray can dispatch reads via `async_getitem` instead. `async_getitem` calls `indexing.async_explicit_indexing_adapter` with the same `_async_getitem` method, so the sync and async paths share a single source of truth and produce identical results. The only difference is that `async_getitem` stays on the caller's loop and avoids the background-thread overhead of `_run_coroutine`. +1. xarray calls `__getitem__` with an `ExplicitIndexer`. The call is forwarded through `indexing.explicit_indexing_adapter` to `_sync_getitem`, which calls `run_on_loop(self._async_getitem(key))` with a basic `(int | slice, int | slice, int | slice, int | slice)` key for `(band, time, y, x)`. + When the caller is already inside an async event loop (for example, an interactive map running in Jupyter), xarray can dispatch reads via `async_getitem` instead. `async_getitem` calls `indexing.async_explicit_indexing_adapter` with the same `_async_getitem` method, so the sync and async paths share a single source of truth and produce identical results. The only difference is that `async_getitem` stays on the caller's loop and avoids the background-thread overhead of `run_on_loop`. 2. The band key is resolved to a list of integer band indices. If it was an integer the band dimension is squeezed in the output. 3. The time key is resolved to a list of integer positions. Integer keys squeeze the time dimension. 4. Integer y or x keys are normalised to size-1 slices; the dimension is squeezed before returning. @@ -88,7 +88,7 @@ With `fetch_headers=True`, each matched COG header is fetched (a small HTTP rang 6. The chunk's affine transform is derived: `chunk_affine = dst_affine * Affine.translation(x_start, y_start_physical)`. 7. The chunk's EPSG:4326 bounding box is computed from the four corners of the chunk using `_dst_to_4326`, a `pyproj.Transformer` cached on the `MultiBandStacBackendArray` instance at construction time (or `None` when `dst_crs` is already EPSG:4326). 8. `_async_getitem` drives all time steps via `await _read_chunk_all_dates(...)`. Inside `_read_chunk_all_dates`, an `asyncio.gather` fans out one `_run_one_date` coroutine per time step, so COG reads overlap across dates: - a. Each `_run_one_date` calls `await _search_items_async(plan, date)`, which dispatches the DuckDB query to `_DUCKDB_EXECUTOR` via `run_in_executor`. DuckDB serialises access internally, so concurrent queries on the same client are safe but not parallel. Empty results short-circuit to nodata immediately. + a. Each `_run_one_date` calls `await _search_items_async(plan, date)`, which dispatches the DuckDB query through a shared bounded `run_duckdb(...)` helper onto the DuckDB executor. DuckDB serialises access internally, so concurrent queries on the same client are safe but not parallel. Empty results short-circuit to nodata immediately. b. `read_chunk_async(...)` materialises all selected bands for the time step concurrently. 9. The result is returned in the same top-down orientation as the COG data. @@ -139,23 +139,19 @@ Nearest-neighbor is the only supported resampling method. ## Concurrency model -There are four nested layers of concurrency in a chunk read. +There are two load-bearing concurrency layers in a chunk read, plus dask as an orthogonal scheduler. -**Dask (chunk level).** When a dask-backed DataArray is computed, dask dispatches each chunk task to a worker thread. Each worker thread calls `_sync_getitem()` in `_backend.py`, which calls `_run_coroutine(_async_getitem(...))` to drive all time steps from the persistent per-thread background loop. Worker threads are independent — they share no state except the thread-local store cache in `_store.py` and the thread-local DuckDB clients in `_backend.py`. +**Dask (optional, chunk level).** When a dask-backed DataArray is computed, dask dispatches chunk tasks to worker threads. Each worker thread calls `_sync_getitem()` in `_backend.py`, which uses `run_on_loop(_async_getitem(...))` to submit the chunk read to lazycogs' shared persistent background loop. Dask controls how many chunk tasks run at once; it does not create extra lazycogs event loops. -**asyncio (time + item level).** A single event loop call (via `_run_coroutine`) handles the entire chunk. Inside `_async_getitem`, `asyncio.gather` fans out one `_run_one_date` coroutine per time step, so all time steps are in flight concurrently within the same event loop. DuckDB queries run on `_DUCKDB_EXECUTOR` (a dedicated two-thread pool) via `run_in_executor`, yielding the event loop during each query. DuckDB's internal mutex serialises actual DB access, so queries are safe but not parallel on a single `DuckdbClient`. Once a query returns, its mosaic coroutine proceeds immediately and COG reads for all time steps overlap in the event loop's I/O layer. Because all time steps share a single event loop and therefore a single bounded reprojection executor, the reprojection thread count stays at `get_max_workers()` regardless of how many time steps are in the chunk (no thread explosion). The `warp_cache` is shared across coroutines: `compute_warp_map` is deterministic, so concurrent writes are safe. +**One shared asyncio loop (I/O fan-out).** A single background event loop is created lazily and reused for every sync caller. Startup is only considered ready once that loop is actually running, so first-use sync bridge calls do not depend on thread scheduling luck. Inside `_async_getitem`, `asyncio.gather` fans out one `_run_one_date` coroutine per requested time step, and each time step uses `read_chunk_async` to fan out item reads under `asyncio.Semaphore(max_concurrent_reads)`. COG header reads and tile fetches from `async-geotiff` are all awaitable, so the loop multiplexes network I/O without blocking. DuckDB work never runs on the loop thread: `_search_items_async` and `_explain_async` both route queries through `run_duckdb(...)`, a small internal helper that applies a private submission bound before dispatching onto the DuckDB executor. The executor is still bounded (`max_workers=1` today), so DuckDB yields the event loop but still matches the effective single-connection serialization of the current client model. The current local benchmark fixture keeps DuckDB well below the U4 threshold for a separate client pool (1.9% of per-date chunk wall time on a small-bbox / many-time-step workload, 0.2% on a large-bbox / few-time-step workload), so the single-worker executor remains the intended design for now. -**asyncio (item level).** Inside a time step's event loop, `read_chunk_async` launches one `_read_item_band()` task per overlapping item up front, with an `asyncio.Semaphore(max_concurrent_reads)` (configurable via `open()`, default 32) capping how many reads run concurrently. Tasks complete in I/O arrival order, but results are buffered by their original list index and drained into the mosaic in source-list order. This preserves the sort contract for `FirstMethod` — items are fed strictly in the order returned by DuckDB (i.e. the caller's `sortby` order) regardless of which COGs arrive first over the network, while all concurrent I/O remains in flight. COG header reads and tile fetches from `async-geotiff` are all awaitable, so the event loop multiplexes them without blocking. Early exit is preserved: once the mosaic method signals completion, remaining tasks are cancelled in a `finally` block, and items still waiting on the semaphore never start. +**One bounded reprojection thread pool (CPU work).** `_apply_bands_with_warp_cache` is synchronous CPU-bound work that processes all bands for one item together. `_read_item_band` dispatches it via `loop.run_in_executor(get_reproject_pool(), ...)`, so the event loop stays free while reprojections run on a shared bounded thread pool. The default bound is `min(os.cpu_count() or 1, 4)`, configurable before first use via `LAZYCOGS_REPROJECT_WORKERS`. That environment variable is read once when the pool is first created; later changes are ignored. The `warp_cache` is shared across coroutines; `compute_warp_map` is deterministic, so duplicate concurrent writes are safe. -**Thread pool (CPU work per item).** `_apply_bands_with_warp_cache` is synchronous CPU-bound work that processes all bands for one item together. `_read_item_band` dispatches it via `loop.run_in_executor(None, ...)` — one executor call per item — so the event loop stays free to process other items' tile reads while reprojections run on threads. Because the call is coarse-grained (all bands per item) and GIL-releasing (`pyproj` and numpy both release during heavy inner loops), offloading to the thread pool gives real CPU parallelism without excessive submission overhead. +**Why threads, not a process pool.** `pyproj.Transformer.transform()` and numpy's fancy-indexing both release the GIL during their heavy inner loops. Threads therefore give real CPU parallelism here without the process-spawn and array-pickling overhead of a `ProcessPoolExecutor`. -**Why threads, not a process pool.** `pyproj.Transformer.transform()` and numpy's fancy-indexing both release the GIL during their heavy inner loops. Threads therefore give real CPU parallelism here — not just interleaving — without the overhead of process spawning and array pickling that a `ProcessPoolExecutor` would require. +**Why reprojection is memory-bandwidth-bound, not compute-bound.** `compute_warp_map` builds two meshgrids the size of the output chunk, transforms all coordinates in one vectorised call, and produces large index arrays. `apply_warp_map` samples the source array with random-access fancy indexing (`out[:, valid] = data[:, row_idx[valid], col_idx[valid]]`), which produces near-constant cache misses. Both phases are dominated by memory latency and bandwidth rather than arithmetic, so pushing far past 4 threads usually hurts more than it helps. -**Why reprojection is memory-bandwidth-bound, not compute-bound.** `compute_warp_map` builds two meshgrids the size of the output chunk, transforms all coordinates in one vectorised call, and produces large index arrays. `apply_warp_map` samples the source array with random-access fancy indexing (`out[:, valid] = data[:, row_idx[valid], col_idx[valid]]`), which produces near-constant cache misses. Both phases are dominated by memory latency and bandwidth rather than arithmetic. In practice this means CPU utilisation is low (threads stall waiting for memory), and adding more than 4 concurrent reprojection threads provides no throughput benefit — they saturate the memory bus instead. - -**Bounded per-loop executor.** Rather than using Python's default `min(32, cpu_count + 4)` thread count, `_get_or_create_background_loop()` installs a bounded `ThreadPoolExecutor` (default `min(os.cpu_count(), 4)`) as the default executor on each background loop it creates, before any coroutines run. This caps thread count per loop while preserving per-loop isolation: each dask task has its own independent pool and does not queue behind other tasks. The executor is shut down when the background loop thread exits. Call `lazycogs.set_reproject_workers(n)` to change the per-loop bound (see `_executor.py`). - -**Persistent background loop.** `_run_coroutine()` submits the coroutine to a persistent per-thread background loop via `asyncio.run_coroutine_threadsafe`. The background loop is created by `_get_or_create_background_loop` on the first call from a given thread and reused for all subsequent calls on that same thread. This is the same path whether the caller is a regular Python script, a Jupyter kernel, or a dask worker. The persistent loop is required because `async-geotiff` and `obstore` may fire callbacks after the awaited coroutine returns; a fresh per-call loop would tear down before those callbacks land. Per-thread isolation is preserved: each dask worker thread gets its own independent background loop and executor. One consequence: credential providers that hold event-loop-bound resources (such as `NasaEarthdataAsyncCredentialProvider`, which creates an `aiohttp` session at construction time) fail in this path because the session is bound to a different loop. Use the synchronous credential provider equivalents (e.g. `NasaEarthdataCredentialProvider`) instead. +**Persistent background loop.** The loop stays alive for the life of the process because `async-geotiff` and `obstore` may fire callbacks after the awaited coroutine returns; a fresh per-call loop would tear down before those callbacks land. Because there is now one shared lazycogs loop, loop-bound resources can be constructed on the right loop with `lazycogs.run_on_loop(...)`. This replaces the old per-thread-loop credential footgun: async credential providers that need an `aiohttp` session should build that session on the lazycogs loop rather than on an arbitrary caller loop. ## Chunking strategy and throughput tradeoffs @@ -167,7 +163,7 @@ Without spatial chunks, xarray calls `MultiBandStacBackendArray.__getitem__` onc With spatial chunks (e.g. `chunks={"x": 512, "y": 512}`), dask splits the extent into N tasks. Each task: -- Runs a separate DuckDB query (via `_search_items_async` on `_DUCKDB_EXECUTOR`) to find overlapping items. +- Runs a separate DuckDB query (via `_search_items_async` on the bounded DuckDB executor) to find overlapping items. - Fires a smaller `asyncio.gather` over only the COGs that overlap its sub-region. The total number of COG reads is the same, but they are spread across N smaller gathers rather than one large one. Dask workers do provide some task-level parallelism, but the overhead of N DuckDB queries and N smaller event loop gathers typically outweighs the benefit, especially for small chunk sizes. A COG that spans multiple spatial chunks is also opened once per overlapping chunk rather than once per time step. @@ -178,7 +174,7 @@ The async layer already handles spatial I/O concurrency. Dask spatial chunks add **The time dimension.** Without dask, `_async_getitem()` already parallelises all time steps concurrently within a single chunk read via `asyncio.gather`. Dask adds a second level of time parallelism when the array has more time steps than fit in one chunk: `chunks={"time": N}` lets dask run multiple chunks in parallel across worker threads, each chunk running its own internal gather. For most use cases without dask, the built-in time-step parallelism is sufficient and avoids dask scheduling overhead entirely. -**Band dimension chunking does not help.** Within a single time step, all bands are read together by `_read_item_bands`. Splitting bands into separate dask tasks (`chunks={"band": 1}`) creates the same per-task overhead as spatial chunks (separate DuckDB queries, event loop creation, executor instantiation) without a meaningful parallelism benefit. Keep all bands in a single chunk. +**Band dimension chunking does not help.** Within a single time step, all bands are read together by `_read_item_bands`. Splitting bands into separate dask tasks (`chunks={"band": 1}`) creates the same per-task overhead as spatial chunks (separate DuckDB queries and gather orchestration) without a meaningful parallelism benefit. Keep all bands in a single chunk. **Memory pressure** is the other legitimate reason to add spatial chunks. If the full array does not fit in memory, spatial chunking limits how much is materialised at once even if it costs throughput. @@ -250,7 +246,7 @@ mean that requires all weeks to be present before reducing). `lazycogs.open(..., store=...)` accepts the same store contract that `async-geotiff` consumes. In practice, any object that satisfies `async_geotiff.Store` can be passed through and will be forwarded unchanged to `GeoTIFF.open(...)`. -`resolve()` in `_store.py` remains the default convenience layer. When `store=None`, it defers to `obstore.store.from_url` for scheme detection — including the special-case HTTPS routing for `amazonaws.com`, `r2.cloudflarestorage.com`, and Azure hosts — rather than maintaining its own list of known object-store domains. The constructed obstore-backed store is cached per thread in a `dict[str, ObjectStore]` keyed by root URL (`scheme://netloc`). Because dask tasks run in threads, this avoids repeated connection setup within a single task while remaining safe across concurrent tasks. +`resolve()` in `_store.py` remains the default convenience layer. When `store=None`, it defers to `obstore.store.from_url` for scheme detection — including the special-case HTTPS routing for `amazonaws.com`, `r2.cloudflarestorage.com`, and Azure hosts — rather than maintaining its own list of known object-store domains. The constructed obstore-backed store is cached in a shared process-local `dict[str, ObjectStore]` keyed by root URL (`scheme://netloc`) behind a small lock. This avoids repeated connection setup while keeping direct threaded callers safe. No credential defaults are applied; auto-resolved stores are constructed with obstore's own environment-based credential discovery (environment variables, instance metadata, config files, etc.). For public buckets that do not require signed requests, callers pass `skip_signature=True` explicitly. For authenticated access or any non-default configuration, callers may still construct an obstore `ObjectStore` and pass it via `store=`; `resolve()` then returns it unchanged and only extracts the object path from each HREF. No introspection is done on a user-supplied store — the caller is responsible for ensuring it satisfies the `GeoTIFF.open` read contract and is rooted at the same `scheme://netloc` the HREFs point to. diff --git a/README.md b/README.md index ad1e939..fdd69e9 100644 --- a/README.md +++ b/README.md @@ -71,7 +71,9 @@ await rustac.search_to( bbox=bbox_4326, ) -# Open a fully lazy (band, time, y, x) DataArray. No COGs are read yet. +# Open a fully lazy (band, time, y, x) DataArray. No pixel data is read yet. +# lazycogs does perform a small storage-access smoketest here so auth or +# object-store misconfiguration fails early instead of on the first chunk read. da = lazycogs.open( "items.parquet", bbox=dst_bbox, @@ -102,6 +104,22 @@ reading many chunks inside an already-running loop. `lazycogs.open(..., store=...)` accepts any store object that satisfies `async_geotiff.Store`. For most users, the recommended path is still obstore: leave `store=None` to auto-resolve per-asset stores, or call `lazycogs.store_for()` to build one explicitly. +### Concurrency notes + +- Sync callers submit work to one shared persistent lazycogs event loop. +- CPU-bound reprojection runs on one bounded shared thread pool. Set + `LAZYCOGS_REPROJECT_WORKERS` before first use to change the default + `min(os.cpu_count() or 1, 4)` limit. The value is read when the pool is + first created; changes after that are ignored for the life of the process. +- DuckDB queries yield the event loop by running through one small bounded + internal submission path and explicit executor instead of on the loop + thread. On the local benchmark fixture, DuckDB stayed under 2% of + per-date chunk wall time, so there is no separate per-thread DuckDB + client pool today. +- If you need to construct a loop-bound resource for lazycogs internals, + use `lazycogs.run_on_loop(...)`. +- Low-level callers should use `await lazycogs.read_chunk_async(...)`. + ## Documentation - [Home](https://developmentseed.github.io/lazycogs/) — quickstart and full usage guide diff --git a/docs/api/utils.md b/docs/api/utils.md index 16f0859..6f80502 100644 --- a/docs/api/utils.md +++ b/docs/api/utils.md @@ -4,7 +4,7 @@ ::: lazycogs.store_for -::: lazycogs.set_reproject_workers +::: lazycogs.run_on_loop ::: lazycogs.ExplainPlan diff --git a/docs/guides/chunking.md b/docs/guides/chunking.md index 7525079..2a900b8 100644 --- a/docs/guides/chunking.md +++ b/docs/guides/chunking.md @@ -19,7 +19,7 @@ vals = da.sel(x=299965, y=2653947, method="nearest").sel(time=slice("2025-06", " ## When to add chunks -Add `chunks={"time": 1}` when you want dask to distribute work across multiple workers. Without chunks, all time steps share one event loop and one reprojection thread pool. I/O is concurrent but CPU-bound reprojection is serialized within that pool. With temporal chunks, each time step becomes an independent dask task with its own event loop and thread pool, so reprojection work can run in true parallel across workers: +Add `chunks={"time": 1}` when you want dask to distribute work across multiple workers. Without chunks, all time steps share one lazycogs event loop and one bounded reprojection pool. I/O is concurrent but CPU-bound reprojection is bounded by that shared pool. With temporal chunks, dask can run multiple chunk tasks in parallel across worker threads, all submitting to the same lazycogs loop while still sharing the same bounded reprojection pool: ```python da = lazycogs.open( @@ -34,7 +34,7 @@ da.max(dim="time").compute() # each time step runs in its own dask task ## Spatial chunks -Avoid spatial chunks unless you are under memory pressure. lazycogs handles spatial I/O concurrency internally through its async event loop — adding spatial dask tasks layers DuckDB query overhead on top of I/O that was already happening concurrently. +Avoid spatial chunks unless you are under memory pressure. lazycogs handles spatial I/O concurrency internally through its async event loop — adding spatial dask tasks layers extra DuckDB query overhead on top of I/O that was already happening concurrently. The one case where spatial chunks are useful is when a single time step is too large to fit in memory even at `max_concurrent_reads=1`. In that case, small spatial chunks limit how many pixels are in flight at once. @@ -57,17 +57,16 @@ da = lazycogs.open( ) ``` -## `set_reproject_workers` +## `LAZYCOGS_REPROJECT_WORKERS` -Controls how many threads each chunk's event loop uses for CPU-bound reprojection (pyproj + numpy). The default is `min(os.cpu_count(), 4)`. +Controls how many threads the shared reprojection pool uses for CPU-bound reprojection (pyproj + numpy). The default is `min(os.cpu_count(), 4)`. Reprojection is memory-bandwidth-bound rather than compute-bound. Benchmarks show diminishing returns above 4 threads because concurrent large-array operations saturate the memory bus rather than adding throughput. Raising this beyond 4 is rarely useful. -Each chunk gets its own independent thread pool, so dask tasks do not queue behind each other for reprojection. +Set the environment variable before the first lazycogs chunk read: -```python -import lazycogs -lazycogs.set_reproject_workers(2) # reduce from default if memory-constrained +```bash +export LAZYCOGS_REPROJECT_WORKERS=2 ``` See also: [API reference for open()](../api/open.md), [API reference for utilities](../api/utils.md), [Architecture](../architecture.md) diff --git a/src/lazycogs/__init__.py b/src/lazycogs/__init__.py index 828a60c..c72b298 100644 --- a/src/lazycogs/__init__.py +++ b/src/lazycogs/__init__.py @@ -1,8 +1,8 @@ """lazycogs: lazy xarray DataArrays from STAC COG collections.""" -from lazycogs._chunk_reader import read_chunk, read_chunk_async +from lazycogs._chunk_reader import read_chunk_async from lazycogs._core import open # noqa: A004 -from lazycogs._executor import set_reproject_workers +from lazycogs._executor import run_on_loop from lazycogs._explain import ( # noqa: F401 — registers da.lazycogs accessor ChunkRead, CogRead, @@ -36,8 +36,7 @@ "StdevMethod", "align_bbox", "open", - "read_chunk", "read_chunk_async", - "set_reproject_workers", + "run_on_loop", "store_for", ] diff --git a/src/lazycogs/_backend.py b/src/lazycogs/_backend.py index 11ea410..6790aec 100644 --- a/src/lazycogs/_backend.py +++ b/src/lazycogs/_backend.py @@ -16,10 +16,7 @@ from lazycogs._chunk_reader import read_chunk_async from lazycogs._cql2 import _extract_filter_fields, _sortby_fields -from lazycogs._executor import ( - _DUCKDB_EXECUTOR, - _run_coroutine, -) +from lazycogs._executor import run_duckdb, run_on_loop logger = logging.getLogger(__name__) @@ -224,8 +221,7 @@ async def _search_items_async( List of STAC items returned by DuckDB. """ - loop = asyncio.get_running_loop() - return await loop.run_in_executor(_DUCKDB_EXECUTOR, _search_items_sync, plan, date) + return await run_duckdb(_search_items_sync, plan, date) async def _run_one_date( @@ -340,7 +336,7 @@ class MultiBandStacBackendArray(BackendArray): store: Pre-configured :class:`async_geotiff.Store` accepted by ``GeoTIFF.open`` and shared across all chunk reads. When ``None``, each asset HREF is resolved to an obstore-backed store via the - thread-local cache in :func:`~lazycogs._store.resolve`. + shared process-local cache in :func:`~lazycogs._store.resolve`. max_concurrent_reads: Maximum number of COG reads to run concurrently per chunk. Limits peak in-flight memory when a chunk overlaps many items. Defaults to 32. @@ -515,7 +511,7 @@ def _sync_getitem(self, key: tuple[Any, ...]) -> np.ndarray: Numpy array with shape determined by the indexing key. """ - return _run_coroutine(self._async_getitem(key)) + return run_on_loop(self._async_getitem(key)) async def _async_getitem(self, key: tuple[Any, ...]) -> np.ndarray: """Materialise the chunk identified by ``key``. diff --git a/src/lazycogs/_chunk_reader.py b/src/lazycogs/_chunk_reader.py index b70993b..490521f 100644 --- a/src/lazycogs/_chunk_reader.py +++ b/src/lazycogs/_chunk_reader.py @@ -12,7 +12,7 @@ from async_geotiff import GeoTIFF, Window from numpy import ma -from lazycogs._executor import _run_coroutine +from lazycogs._executor import get_reproject_pool from lazycogs._mosaic_methods import FirstMethod, MosaicMethodBase from lazycogs._reproject import ( WarpMap, @@ -51,13 +51,13 @@ class _ChunkContext: warp_cache: dict[tuple[tuple[float, ...], CRS], WarpMap] | None -def _log_batch_failure( +def _log_read_failure( label: str, key: object, item_id: str, err: BaseException, ) -> None: - """Log a warning for an item that failed inside an asyncio.gather batch.""" + """Log a warning for an item that failed inside bounded concurrent reads.""" logger.warning( "Failed to read %s %r from item %s: %s", label, @@ -445,7 +445,7 @@ async def _read_band( # Compute warp maps and apply, sharing maps across bands with identical geometry. loop = asyncio.get_running_loop() return await loop.run_in_executor( - None, + get_reproject_pool(), lambda: _apply_bands_with_warp_cache( band_rasters, ctx.chunk_affine, @@ -534,8 +534,9 @@ async def read_chunk_async( same source geometry compute the reprojection warp map only once (via :func:`_apply_bands_with_warp_cache`). - Items are processed in batches of ``max_concurrent_reads``. When all - per-band mosaic methods signal completion, remaining batches are skipped. + All item reads are scheduled up front, but execution is bounded by + ``max_concurrent_reads`` via an ``asyncio.Semaphore``. When all per-band + mosaic methods signal completion, remaining pending reads are skipped. Args: items: List of STAC item dicts to mosaic. Processed in order. @@ -603,7 +604,7 @@ def _done() -> bool: return all(m.is_done for m in mosaic_methods.values()) def _error(idx: int, exc: BaseException) -> None: - _log_batch_failure("bands", bands, items[idx].get("id", ""), exc) + _log_read_failure("bands", bands, items[idx].get("id", ""), exc) await _drain_in_order(task_list, _feed, _done, _error) @@ -619,63 +620,3 @@ def _error(idx: int, exc: BaseException) -> None: dtype=np.float32, ) return output - - -def read_chunk( - items: list[dict], - bands: list[str], - chunk_affine: Affine, - dst_crs: CRS, - chunk_width: int, - chunk_height: int, - nodata: float | None = None, - mosaic_method_cls: type[MosaicMethodBase] | None = None, - store: Store | None = None, - max_concurrent_reads: int = 32, - warp_cache: dict | None = None, - path_fn: Callable[[str], str] | None = None, -) -> dict[str, np.ndarray]: - """Run :func:`read_chunk_async` on the persistent per-thread background loop. - - All arguments are identical to :func:`read_chunk_async`. - - Args: - items: List of STAC item dicts to mosaic. Processed in order. - bands: Asset keys identifying the bands to read from each item. - chunk_affine: Affine transform of the destination chunk. - dst_crs: CRS of the destination chunk. - chunk_width: Width of the destination chunk in pixels. - chunk_height: Height of the destination chunk in pixels. - nodata: No-data fill value. - mosaic_method_cls: Mosaic method class instantiated once per band. - Defaults to :class:`~lazycogs._mosaic_methods.FirstMethod`. - store: Optional pre-configured :class:`async_geotiff.Store` - accepted by ``GeoTIFF.open``. - max_concurrent_reads: Maximum number of COG reads to run concurrently. - warp_cache: Optional cache shared across calls for reusing warp maps - from earlier time steps. - path_fn: Optional callable that takes an asset HREF and returns the - object path to use with *store*. - - Returns: - ``dict`` mapping each band name to an array of shape - ``(cog_bands, chunk_height, chunk_width)`` with dtype matching the - source COGs. - - """ - return _run_coroutine( - read_chunk_async( - items=items, - bands=bands, - chunk_affine=chunk_affine, - dst_crs=dst_crs, - chunk_width=chunk_width, - chunk_height=chunk_height, - nodata=nodata, - mosaic_method_cls=mosaic_method_cls, - store=store, - max_concurrent_reads=max_concurrent_reads, - warp_cache=warp_cache, - path_fn=path_fn, - ), - ) diff --git a/src/lazycogs/_core.py b/src/lazycogs/_core.py index cba8b29..e65b224 100644 --- a/src/lazycogs/_core.py +++ b/src/lazycogs/_core.py @@ -16,7 +16,7 @@ from lazycogs._backend import MultiBandStacBackendArray from lazycogs._cql2 import _extract_filter_fields, _sortby_fields -from lazycogs._executor import _run_coroutine +from lazycogs._executor import run_on_loop from lazycogs._grid import compute_output_grid from lazycogs._mosaic_methods import FirstMethod, MosaicMethodBase from lazycogs._store import resolve @@ -168,7 +168,7 @@ def _smoketest_store( resolved_store, path = resolve(href, store=store, path_fn=path_from_href) try: - _run_coroutine(_open_store_sample(path, store=resolved_store)) + run_on_loop(_open_store_sample(path, store=resolved_store)) except Exception as e: raise RuntimeError( f"Store cannot open {href!r} through GeoTIFF.open: {e}. " @@ -524,11 +524,11 @@ def open( # noqa: A001 credentials, custom endpoints, or non-default options are needed without relying on automatic store resolution from each HREF. When ``None`` (default), each asset URL is parsed to create or reuse a - per-thread cached obstore-backed store. + shared cached obstore-backed store behind a small lock. max_concurrent_reads: Maximum number of COG reads to run concurrently - per chunk. Items are processed in batches of this size, which - bounds peak in-flight memory when a chunk overlaps many files. - Methods that support early exit (e.g. the default + per chunk. Concurrency is bounded to this size with an + ``asyncio.Semaphore``, which bounds peak in-flight memory when a + chunk overlaps many files. Methods that support early exit (e.g. the default :class:`~lazycogs._mosaic_methods.FirstMethod`) will stop reading once every output pixel is filled, so lower values also reduce unnecessary I/O on dense datasets. Defaults to 32. diff --git a/src/lazycogs/_executor.py b/src/lazycogs/_executor.py index 07e3c4b..ae4ce0f 100644 --- a/src/lazycogs/_executor.py +++ b/src/lazycogs/_executor.py @@ -1,4 +1,4 @@ -"""Thread pool and event loop configuration for reprojection and DuckDB work.""" +"""Event loop and executor ownership for lazycogs background work.""" from __future__ import annotations @@ -6,23 +6,26 @@ import concurrent.futures import os import threading +import weakref +from functools import partial from typing import TYPE_CHECKING if TYPE_CHECKING: - from asyncio.futures import Future - from types import CoroutineType + from collections.abc import Callable, Coroutine -config: dict[str, int | None] = {"max_workers": None} +_REPROJECT_WORKERS_ENV = "LAZYCOGS_REPROJECT_WORKERS" +_DUCKDB_MAX_WORKERS = 1 +_DUCKDB_MAX_SUBMISSIONS = 2 -_tls = threading.local() - -# DuckDB queries serialise on a single connection, so a small pool is enough. -# Kept separate from the reprojection executor so a long reprojection cannot -# starve a queued query within the same chunk read. -_DUCKDB_EXECUTOR = concurrent.futures.ThreadPoolExecutor( - max_workers=2, - thread_name_prefix="lazycogs-duckdb", -) +_LOOP: asyncio.AbstractEventLoop | None = None +_LOOP_THREAD: threading.Thread | None = None +_REPROJECT_POOL: concurrent.futures.ThreadPoolExecutor | None = None +_DUCKDB_POOL: concurrent.futures.ThreadPoolExecutor | None = None +_DUCKDB_SUBMISSION_GATES: weakref.WeakKeyDictionary[ + asyncio.AbstractEventLoop, + asyncio.Semaphore, +] = weakref.WeakKeyDictionary() +_LOCK = threading.Lock() def _default_workers() -> int: @@ -33,96 +36,139 @@ def _default_workers() -> int: saturate the memory bus rather than adding CPU throughput. Keep the default conservative. """ - return min(os.cpu_count() or 4, 4) - - -def get_max_workers() -> int: - """Return the configured worker count, or the default if not set. - - Returns: - Number of reprojection threads each event loop will use. - - """ - val = config["max_workers"] - return val if val is not None else _default_workers() + return min(os.cpu_count() or 1, 4) -def set_reproject_workers(n: int) -> None: - """Set the number of threads each thread's event loop uses for reprojection. +def _reproject_worker_count() -> int: + """Return the configured reprojection worker count.""" + value = os.getenv(_REPROJECT_WORKERS_ENV) + if value is None: + return _default_workers() - Each thread (dask worker, Jupyter kernel callback thread, etc.) gets one - persistent background event loop with one bounded reprojection - ``ThreadPoolExecutor``. All chunk reads on that thread share the same loop - and executor. Dask tasks on different threads do not compete for a shared - pool. Total reprojection threads at any moment is at most - ``n x active_thread_count``. - - Reprojection is memory-bandwidth-bound rather than compute-bound, so values - above 4 typically offer no benefit and can hurt throughput due to memory - contention. The default is ``min(os.cpu_count(), 4)``. - - To improve overall throughput, prefer adding time or band parallelism via - dask (``chunks={"time": 1}``) over raising this value. - - Args: - n: Number of worker threads per event loop. Must be >= 1. - - Raises: - ValueError: If ``n`` is less than 1. - - """ - if n < 1: - raise ValueError(f"n must be >= 1, got {n!r}") - config["max_workers"] = n + try: + parsed = int(value) + except ValueError as exc: + raise ValueError( + f"{_REPROJECT_WORKERS_ENV} must be an integer, got {value!r}", + ) from exc + if parsed < 1: + raise ValueError(f"worker count must be >= 1, got {parsed!r}") + return parsed -def _get_or_create_background_loop() -> asyncio.AbstractEventLoop: - """Return the persistent background event loop for the current thread. - - Creates the loop, its bounded reprojection executor, and its daemon runner - thread on first call from a given thread. Subsequent calls on the same - thread return the cached loop immediately. - - The loop is stored on ``threading.local`` so each thread (each dask worker, - each Jupyter kernel callback thread) has its own independent loop and - executor — tasks on different threads do not share a pool. - - Using a persistent loop (rather than a fresh one per call) ensures that - any in-flight callbacks from ``async_geotiff``/``obstore`` background - threads can always be delivered, avoiding ``RuntimeError: Event loop is - closed`` errors from callbacks that fire after a fresh loop is torn down. - """ - loop: asyncio.AbstractEventLoop | None = getattr(_tls, "loop", None) - if loop is not None and loop.is_running(): - return loop +def _start_background_loop() -> tuple[asyncio.AbstractEventLoop, threading.Thread]: + """Create and start the shared background event loop.""" loop = asyncio.new_event_loop() - loop.set_default_executor( - concurrent.futures.ThreadPoolExecutor( - max_workers=get_max_workers(), - thread_name_prefix="lazycogs-reproject", - ), - ) - t = threading.Thread(target=loop.run_forever, daemon=True, name="lazycogs-loop") - t.start() - _tls.loop = loop - return loop - - -def _run_coroutine(coro: CoroutineType) -> Future: - """Run an async coroutine from sync code. - - Submits the coroutine to a persistent per-thread background event loop, - blocking until it completes. The background loop is created on the first - call from a given thread (dask worker, Jupyter kernel thread, etc.) and - reused for all subsequent calls on that same thread. - - Args: - coro: The coroutine to execute. - - Returns: - The return value of the coroutine. - + ready = threading.Event() + + def _run() -> None: + asyncio.set_event_loop(loop) + loop.call_soon(ready.set) + loop.run_forever() + + thread = threading.Thread(target=_run, daemon=True, name="lazycogs-loop") + thread.start() + ready.wait() + return loop, thread + + +def _ensure_loop() -> asyncio.AbstractEventLoop: + """Return the shared background event loop, starting it lazily.""" + global _LOOP, _LOOP_THREAD + + with _LOCK: + if ( + _LOOP is not None + and _LOOP_THREAD is not None + and _LOOP_THREAD.is_alive() + and _LOOP.is_running() + and not _LOOP.is_closed() + ): + return _LOOP + + _LOOP, _LOOP_THREAD = _start_background_loop() + return _LOOP + + +def get_reproject_pool() -> concurrent.futures.ThreadPoolExecutor: + """Return the shared bounded reprojection executor.""" + global _REPROJECT_POOL # noqa: PLW0603 + + with _LOCK: + if _REPROJECT_POOL is None: + _REPROJECT_POOL = concurrent.futures.ThreadPoolExecutor( + max_workers=_reproject_worker_count(), + thread_name_prefix="lazycogs-reproject", + ) + return _REPROJECT_POOL + + +def get_duckdb_pool() -> concurrent.futures.ThreadPoolExecutor: + """Return the shared bounded DuckDB executor.""" + global _DUCKDB_POOL # noqa: PLW0603 + + with _LOCK: + if _DUCKDB_POOL is None: + _DUCKDB_POOL = concurrent.futures.ThreadPoolExecutor( + max_workers=_DUCKDB_MAX_WORKERS, + thread_name_prefix="lazycogs-duckdb", + ) + return _DUCKDB_POOL + + +def _duckdb_submission_gate() -> asyncio.Semaphore: + """Return the private per-loop gate for DuckDB submissions.""" + loop = asyncio.get_running_loop() + with _LOCK: + gate = _DUCKDB_SUBMISSION_GATES.get(loop) + if gate is None: + gate = asyncio.Semaphore(_DUCKDB_MAX_SUBMISSIONS) + _DUCKDB_SUBMISSION_GATES[loop] = gate + return gate + + +async def run_duckdb[**P, T]( + func: Callable[P, T], + *args: P.args, + **kwargs: P.kwargs, +) -> T: + """Run blocking DuckDB work on the shared bounded executor. + + Submission is gated per event loop to keep the executor queue small while + still yielding the caller's loop during the blocking query. + """ + loop = asyncio.get_running_loop() + async with _duckdb_submission_gate(): + return await loop.run_in_executor( + get_duckdb_pool(), + partial(func, *args, **kwargs), + ) + + +def _submit_to_loop[T]( + coro: Coroutine[object, object, T], +) -> concurrent.futures.Future[T]: + """Submit a coroutine to the shared background loop.""" + loop = _ensure_loop() + thread = _LOOP_THREAD + if thread is None or not thread.is_alive() or not loop.is_running(): + coro.close() + raise RuntimeError("lazycogs background event loop is not running") + if thread.ident == threading.get_ident(): + coro.close() + raise RuntimeError( + "Cannot call sync lazycogs bridge from the lazycogs event loop thread. " + "Await the async API directly instead.", + ) + return asyncio.run_coroutine_threadsafe(coro, loop) + + +def run_on_loop[T](coro: Coroutine[object, object, T]) -> T: + """Run ``coro`` on the shared lazycogs event loop and return its result. + + This is the supported helper for sync code that must execute a coroutine on + the lazycogs background loop, including callers that need to construct + loop-bound resources on that loop. """ - loop = _get_or_create_background_loop() - return asyncio.run_coroutine_threadsafe(coro, loop).result() + return _submit_to_loop(coro).result() diff --git a/src/lazycogs/_explain.py b/src/lazycogs/_explain.py index 4c4bf46..97f318b 100644 --- a/src/lazycogs/_explain.py +++ b/src/lazycogs/_explain.py @@ -15,7 +15,7 @@ from pyproj import CRS, Transformer from lazycogs._chunk_reader import _ChunkContext, _open_and_window -from lazycogs._executor import _DUCKDB_EXECUTOR, _run_coroutine +from lazycogs._executor import run_duckdb, run_on_loop if TYPE_CHECKING: from collections.abc import Iterator @@ -601,17 +601,14 @@ async def _explain_one_tile( actual_h, dst_crs, ) - loop = asyncio.get_running_loop() - items = await loop.run_in_executor( - _DUCKDB_EXECUTOR, - lambda: backend.duckdb_client.search( - backend.parquet_path, - bbox=chunk_bbox_4326, - datetime=date_filter, - sortby=backend.sortby, - filter=backend.filter, - ids=backend.ids, - ), + items = await run_duckdb( + backend.duckdb_client.search, + backend.parquet_path, + bbox=chunk_bbox_4326, + datetime=date_filter, + sortby=backend.sortby, + filter=backend.filter, + ids=backend.ids, ) logger.debug( "explain date=%s chunk=(%d,%d) -> %d items (for %d band(s))", @@ -707,7 +704,7 @@ async def _explain_one_tile( class StacCogAccessor: """xarray accessor adding explain functionality to lazycogs DataArrays. - Registered as the ``stac_cog`` namespace on all ``xr.DataArray`` objects. + Registered as the ``lazycogs`` namespace on all ``xr.DataArray`` objects. The :meth:`explain` method is only useful on DataArrays produced by :func:`lazycogs.open`. @@ -749,9 +746,9 @@ def explain(self, *, fetch_headers: bool = False) -> ExplainPlan: backend: MultiBandStacBackendArray | None = self._da.attrs.get("_stac_backend") if backend is None: raise ValueError( - "This DataArray does not have stac_cog explain metadata. " + "This DataArray does not have lazycogs explain metadata. " "Ensure it was created by lazycogs.open().", ) - return _run_coroutine( + return run_on_loop( _explain_async(self._da, backend, fetch_headers=fetch_headers), ) diff --git a/src/lazycogs/_store.py b/src/lazycogs/_store.py index 26087ba..879bdcc 100644 --- a/src/lazycogs/_store.py +++ b/src/lazycogs/_store.py @@ -20,14 +20,18 @@ logger = logging.getLogger(__name__) -_local = threading.local() +_STORE_CACHE: dict[str, ObjectStore] = {} +_STORE_CACHE_LOCK = threading.Lock() -def _cache() -> dict[str, ObjectStore]: - """Return the thread-local store cache, creating it on first access.""" - if not hasattr(_local, "stores"): - _local.stores = {} - return _local.stores +def _get_cached_store(root_url: str) -> ObjectStore: + """Return the cached store for ``root_url``, creating it once.""" + with _STORE_CACHE_LOCK: + store = _STORE_CACHE.get(root_url) + if store is None: + store = from_url(root_url) + _STORE_CACHE[root_url] = store + return store def resolve( @@ -45,8 +49,8 @@ def resolve( When ``store`` is ``None``, a store is auto-constructed via :func:`obstore.store.from_url` using only the ``scheme://netloc`` portion - of the HREF and cached per thread. No credential defaults are applied; the - store is constructed with obstore's own environment-based credential + of the HREF and cached per root URL. No credential defaults are applied; + the store is constructed with obstore's own environment-based credential discovery. For public buckets, signed URLs, custom endpoints, or request-payer buckets, construct the store yourself and pass it via ``store`` — see the cloud storage guide for examples. @@ -82,10 +86,7 @@ def resolve( return store, path root_url = f"{scheme}://{parsed.netloc}" if scheme != "file" else "file:///" - cache = _cache() - if root_url not in cache: - cache[root_url] = from_url(root_url) - return cache[root_url], path + return _get_cached_store(root_url), path def store_for( diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/_executor_test_utils.py b/tests/_executor_test_utils.py new file mode 100644 index 0000000..dc3b3e0 --- /dev/null +++ b/tests/_executor_test_utils.py @@ -0,0 +1,47 @@ +"""Test-only helpers for resetting lazycogs executor state.""" + +from __future__ import annotations + +import asyncio + +from lazycogs import _executor + + +async def _cancel_loop_tasks() -> None: + """Cancel outstanding tasks on the current event loop.""" + current = asyncio.current_task() + tasks = [task for task in asyncio.all_tasks() if task is not current] + for task in tasks: + task.cancel() + if tasks: + await asyncio.gather(*tasks, return_exceptions=True) + + +def reset_executor_state_for_tests() -> None: + """Reset lazycogs executor singletons between tests.""" + with _executor._LOCK: + loop = _executor._LOOP + thread = _executor._LOOP_THREAD + reproject_pool = _executor._REPROJECT_POOL + duckdb_pool = _executor._DUCKDB_POOL + _executor._LOOP = None + _executor._LOOP_THREAD = None + _executor._REPROJECT_POOL = None + _executor._DUCKDB_POOL = None + _executor._DUCKDB_SUBMISSION_GATES.clear() + + if loop is not None and loop.is_running(): + future = asyncio.run_coroutine_threadsafe(_cancel_loop_tasks(), loop) + try: + future.result(timeout=5) + except Exception: + future.cancel() + loop.call_soon_threadsafe(loop.stop) + if thread is not None and thread.is_alive(): + thread.join(timeout=5) + if loop is not None and not loop.is_closed(): + loop.close() + if reproject_pool is not None: + reproject_pool.shutdown(wait=True, cancel_futures=True) + if duckdb_pool is not None: + duckdb_pool.shutdown(wait=True, cancel_futures=True) diff --git a/tests/benchmarks/bench_duckdb_share.py b/tests/benchmarks/bench_duckdb_share.py new file mode 100644 index 0000000..1a0dd68 --- /dev/null +++ b/tests/benchmarks/bench_duckdb_share.py @@ -0,0 +1,175 @@ +"""Benchmark DuckDB's share of per-date chunk wall time. + +These benchmarks reuse the local fixtures from ``tests/benchmarks/conftest.py``. +They answer the U4 follow-up question from the concurrency refactor plan: +should lazycogs add a per-thread DuckDB client pool for true parallel query +execution, or is the current single-worker bounded executor already good enough? + +Run with: + uv run pytest tests/benchmarks/bench_duckdb_share.py -s +""" + +from __future__ import annotations + +import time +from contextlib import contextmanager +from dataclasses import dataclass + +import numpy as np +import pytest +from pyproj import Transformer + +import lazycogs +import lazycogs._backend as backend + +from .conftest import BENCHMARK_BBOX, BENCHMARK_CRS + +_FULL_QUERY_BBOX_4326 = (-108.5, 37.5, -107.5, 38.5) +_WARMUP_RUNS = 1 +_MEASURED_RUNS = 5 + + +@dataclass +class _WorkloadSummary: + name: str + wall_mean_s: float + wall_p95_s: float + wall_p99_s: float + duck_share_mean_pct: float + duck_query_mean_ms: float + duck_query_p95_ms: float + duck_query_p99_ms: float + date_total_mean_ms: float + date_total_p95_ms: float + date_total_p99_ms: float + + def format(self) -> str: + return ( + f"{self.name}: wall mean={self.wall_mean_s:.3f}s " + f"p95={self.wall_p95_s:.3f}s p99={self.wall_p99_s:.3f}s | " + f"DuckDB share mean={self.duck_share_mean_pct:.1f}% | " + f"duck query mean={self.duck_query_mean_ms:.1f}ms " + f"p95={self.duck_query_p95_ms:.1f}ms p99={self.duck_query_p99_ms:.1f}ms | " + f"per-date chunk mean={self.date_total_mean_ms:.1f}ms " + f"p95={self.date_total_p95_ms:.1f}ms p99={self.date_total_p99_ms:.1f}ms" + ) + + +@contextmanager +def _collect_duckdb_share(): + duck_times: list[float] = [] + date_totals: list[float] = [] + orig_search = backend._search_items_sync + orig_run_one_date = backend._run_one_date + + def wrapped_search(plan, date): + t0 = time.perf_counter() + try: + return orig_search(plan, date) + finally: + duck_times.append(time.perf_counter() - t0) + + async def wrapped_run_one_date(t_idx, plan): + t0 = time.perf_counter() + try: + return await orig_run_one_date(t_idx, plan) + finally: + date_totals.append(time.perf_counter() - t0) + + backend._search_items_sync = wrapped_search + backend._run_one_date = wrapped_run_one_date + try: + yield duck_times, date_totals + finally: + backend._search_items_sync = orig_search + backend._run_one_date = orig_run_one_date + + +def _percentile(values: list[float], pct: int) -> float: + if not values: + return 0.0 + return float(np.percentile(np.asarray(values, dtype=float), pct)) + + +def _large_bbox_5070() -> tuple[float, float, float, float]: + transformer = Transformer.from_crs("EPSG:4326", BENCHMARK_CRS, always_xy=True) + return transformer.transform_bounds(*_FULL_QUERY_BBOX_4326) + + +def _measure_workload( + name: str, + parquet_path: str, + *, + bbox: tuple[float, float, float, float], + crs: str, + resolution: float, + time_period: str, +) -> _WorkloadSummary: + runs: list[tuple[float, float, list[float], list[float]]] = [] + + for _ in range(_WARMUP_RUNS + _MEASURED_RUNS): + with _collect_duckdb_share() as (duck_times, date_totals): + da = lazycogs.open( + parquet_path, + bbox=bbox, + crs=crs, + resolution=resolution, + time_period=time_period, + chunks={"time": 1}, + ) + t0 = time.perf_counter() + da.compute() + wall_s = time.perf_counter() - t0 + + duck_share = sum(duck_times) / sum(date_totals) + runs.append((wall_s, duck_share, list(duck_times), list(date_totals))) + + measured = runs[_WARMUP_RUNS:] + wall_s = [run[0] for run in measured] + duck_share_pct = [run[1] * 100 for run in measured] + duck_queries_ms = [value * 1000 for run in measured for value in run[2]] + date_totals_ms = [value * 1000 for run in measured for value in run[3]] + + return _WorkloadSummary( + name=name, + wall_mean_s=float(np.mean(wall_s)), + wall_p95_s=_percentile(wall_s, 95), + wall_p99_s=_percentile(wall_s, 99), + duck_share_mean_pct=float(np.mean(duck_share_pct)), + duck_query_mean_ms=float(np.mean(duck_queries_ms)), + duck_query_p95_ms=_percentile(duck_queries_ms, 95), + duck_query_p99_ms=_percentile(duck_queries_ms, 99), + date_total_mean_ms=float(np.mean(date_totals_ms)), + date_total_p95_ms=_percentile(date_totals_ms, 95), + date_total_p99_ms=_percentile(date_totals_ms, 99), + ) + + +@pytest.mark.benchmark +def test_duckdb_share_small_bbox_many_time_steps( + expanded_benchmark_parquet: str, +) -> None: + """Small bbox, many monthly time steps from the expanded benchmark fixture.""" + summary = _measure_workload( + "small bbox / many time steps", + expanded_benchmark_parquet, + bbox=BENCHMARK_BBOX, + crs=BENCHMARK_CRS, + resolution=60.0, + time_period="P1M", + ) + print(summary.format()) + + +@pytest.mark.benchmark +def test_duckdb_share_large_bbox_few_time_steps(benchmark_parquet: str) -> None: + """Large bbox, few daily time steps from the original benchmark fixture.""" + summary = _measure_workload( + "large bbox / few time steps", + benchmark_parquet, + bbox=_large_bbox_5070(), + crs=BENCHMARK_CRS, + resolution=60.0, + time_period="P1D", + ) + print(summary.format()) diff --git a/tests/benchmarks/bench_pipeline.py b/tests/benchmarks/bench_pipeline.py index 83259f3..529d47f 100644 --- a/tests/benchmarks/bench_pipeline.py +++ b/tests/benchmarks/bench_pipeline.py @@ -7,10 +7,13 @@ uv run pytest tests/benchmarks/ --benchmark-enable --benchmark-save= """ +import os + import pytest import lazycogs -from lazycogs import FirstMethod, MedianMethod, MosaicMethodBase, set_reproject_workers +from lazycogs import FirstMethod, MedianMethod, MosaicMethodBase +from tests._executor_test_utils import reset_executor_state_for_tests from .conftest import ( BENCHMARK_BBOX, @@ -88,11 +91,12 @@ def test_reproject_workers( """Measure throughput as reprojection thread count varies. Uses the expanded 12-time-step dataset with ``chunks={"time": 1}`` so dask - dispatches many concurrent tasks, putting real pressure on the per-chunk - thread pool. Validates the claim that memory-bandwidth saturation causes - diminishing returns above 4 threads. + dispatches many concurrent tasks, putting real pressure on the shared + reprojection pool. Validates the claim that memory-bandwidth saturation + causes diminishing returns above 4 threads. """ - set_reproject_workers(n_workers) + os.environ["LAZYCOGS_REPROJECT_WORKERS"] = str(n_workers) + reset_executor_state_for_tests() def run() -> object: da = lazycogs.open( @@ -109,7 +113,8 @@ def run() -> object: benchmark(run) finally: # Reset to default so other benchmarks are not affected. - set_reproject_workers(min(__import__("os").cpu_count() or 4, 4)) + os.environ.pop("LAZYCOGS_REPROJECT_WORKERS", None) + reset_executor_state_for_tests() @pytest.mark.benchmark diff --git a/tests/conftest.py b/tests/conftest.py index 0b1061a..c78a4a9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,6 +3,7 @@ from __future__ import annotations import tempfile +from collections.abc import Iterator from pathlib import Path import numpy as np @@ -13,6 +14,22 @@ from affine import Affine from pyproj import CRS +from lazycogs import _store + + +def clear_store_cache_for_tests() -> None: + """Clear the shared store cache between tests that exercise resolve().""" + with _store._STORE_CACHE_LOCK: + _store._STORE_CACHE.clear() + + +@pytest.fixture +def clear_store_cache() -> Iterator[None]: + """Reset the shared store cache around a test.""" + clear_store_cache_for_tests() + yield + clear_store_cache_for_tests() + @pytest.fixture(scope="session") def synthetic_cog(tmp_path_factory) -> Path: diff --git a/tests/test_backend.py b/tests/test_backend.py index 0e1ce66..c8e006f 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -1,6 +1,8 @@ """Tests for _backend helpers and _async_getitem / _sync_getitem.""" import asyncio +import concurrent.futures +import threading from unittest.mock import AsyncMock, patch import numpy as np @@ -10,8 +12,19 @@ from rustac import DuckdbClient from xarray.core import indexing +from lazycogs import _executor from lazycogs._backend import MultiBandStacBackendArray +from lazycogs._executor import run_duckdb, run_on_loop from lazycogs._mosaic_methods import FirstMethod +from tests._executor_test_utils import reset_executor_state_for_tests + + +@pytest.fixture(autouse=True) +def reset_executor_state() -> None: + """Reset shared executor state between tests.""" + reset_executor_state_for_tests() + yield + reset_executor_state_for_tests() @pytest.fixture @@ -307,3 +320,124 @@ async def _inner(): assert isinstance(out2, np.ndarray) assert out1.shape == (1, 1, 1, 2) assert out2.shape == (1, 1, 1, 2) + + +# --------------------------------------------------------------------------- +# lazycogs._executor bridge behavior +# --------------------------------------------------------------------------- + + +def test_sync_getitem_routes_duckdb_queries_through_helper(wgs84): + """Chunk reads route DuckDB work through the shared helper.""" + arr = _make_array(wgs84) + + with patch( + "lazycogs._backend.run_duckdb", + new_callable=AsyncMock, + return_value=[], + ) as mock_run_duckdb: + result = arr._sync_getitem( + (slice(0, 1), slice(0, 1), slice(0, 1), slice(0, 4)), + ) + + assert result.shape == (1, 1, 1, 4) + np.testing.assert_array_equal(result, -9999.0) + mock_run_duckdb.assert_awaited_once() + + +def test_run_on_loop_first_use_is_deterministic(): + """Reset plus immediate first use consistently succeeds.""" + for _ in range(12): + reset_executor_state_for_tests() + assert run_on_loop(asyncio.sleep(0, result="ok")) == "ok" + + +def test_run_duckdb_bounds_concurrent_submissions(): + """DuckDB helper applies private submission backpressure per event loop.""" + release = threading.Event() + reached_limit = threading.Event() + overflow = threading.Event() + state = {"running": 0, "max": 0} + state_lock = threading.Lock() + + def blocking_call() -> str: + with state_lock: + state["running"] += 1 + state["max"] = max(state["max"], state["running"]) + if state["running"] == _executor._DUCKDB_MAX_SUBMISSIONS: + reached_limit.set() + if state["running"] > _executor._DUCKDB_MAX_SUBMISSIONS: + overflow.set() + release.wait(timeout=5) + with state_lock: + state["running"] -= 1 + return "ok" + + async def _exercise() -> list[str]: + with ( + concurrent.futures.ThreadPoolExecutor(max_workers=4) as pool, + patch("lazycogs._executor.get_duckdb_pool", return_value=pool), + ): + tasks = [asyncio.create_task(run_duckdb(blocking_call)) for _ in range(5)] + try: + assert await asyncio.to_thread(reached_limit.wait, 5) + assert not overflow.is_set() + finally: + release.set() + return await asyncio.gather(*tasks) + + assert asyncio.run(_exercise()) == ["ok"] * 5 + assert state["max"] == _executor._DUCKDB_MAX_SUBMISSIONS + + +def test_run_duckdb_releases_submission_slot_after_error(): + """DuckDB helper releases its slot even when the callable raises.""" + + def fail() -> None: + raise RuntimeError("boom") + + async def _exercise() -> str: + with ( + concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool, + patch("lazycogs._executor.get_duckdb_pool", return_value=pool), + patch.object(_executor, "_DUCKDB_MAX_SUBMISSIONS", 1), + ): + with pytest.raises(RuntimeError, match="boom"): + await run_duckdb(fail) + return await run_duckdb(lambda: "ok") + + assert asyncio.run(_exercise()) == "ok" + + +def test_run_on_loop_uses_one_shared_loop_thread(): + """Concurrent sync callers all submit to the same lazycogs loop thread.""" + thread_ids: list[int] = [] + + def worker() -> None: + async def capture_thread_id() -> int: + return threading.get_ident() + + thread_ids.append(run_on_loop(capture_thread_id())) + + threads = [threading.Thread(target=worker) for _ in range(6)] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + assert len(thread_ids) == 6 + assert len(set(thread_ids)) == 1 + + +def test_run_on_loop_raises_on_lazycogs_loop_thread(): + """The sync bridge raises instead of deadlocking on the lazycogs loop.""" + + async def call_sync_bridge_from_loop() -> str: + try: + run_on_loop(asyncio.sleep(0)) + except RuntimeError as exc: + return str(exc) + return "did not raise" + + message = run_on_loop(call_sync_bridge_from_loop()) + assert "Cannot call sync lazycogs bridge" in message diff --git a/tests/test_chunk_reader.py b/tests/test_chunk_reader.py index a8634b0..19b4e41 100644 --- a/tests/test_chunk_reader.py +++ b/tests/test_chunk_reader.py @@ -6,6 +6,7 @@ from unittest.mock import MagicMock, patch import numpy as np +import pytest from affine import Affine from pyproj import CRS @@ -16,7 +17,19 @@ _select_overview, read_chunk_async, ) +from lazycogs._executor import get_reproject_pool from lazycogs._mosaic_methods import FirstMethod +from tests._executor_test_utils import reset_executor_state_for_tests + + +@pytest.fixture(autouse=True) +def reset_executor_state(monkeypatch): + """Reset shared executor state between tests.""" + monkeypatch.delenv("LAZYCOGS_REPROJECT_WORKERS", raising=False) + reset_executor_state_for_tests() + yield + reset_executor_state_for_tests() + # --------------------------------------------------------------------------- # Helpers @@ -478,3 +491,21 @@ async def _fake_read_item_band(*args, **kwargs): # at max_concurrent_reads. Once every band's FirstMethod is satisfied, # pending tasks are cancelled, so strictly fewer than n_items reads execute. assert reads_executed[0] < n_items + + +# --------------------------------------------------------------------------- +# Executor configuration +# --------------------------------------------------------------------------- + + +def test_reproject_pool_uses_env_var(monkeypatch): + """The reprojection pool reads LAZYCOGS_REPROJECT_WORKERS lazily.""" + monkeypatch.setenv("LAZYCOGS_REPROJECT_WORKERS", "2") + assert get_reproject_pool()._max_workers == 2 + + +def test_reproject_pool_rejects_invalid_env_var(monkeypatch): + """Invalid LAZYCOGS_REPROJECT_WORKERS values raise clearly.""" + monkeypatch.setenv("LAZYCOGS_REPROJECT_WORKERS", "not-an-int") + with pytest.raises(ValueError, match="must be an integer"): + get_reproject_pool() diff --git a/tests/test_explain.py b/tests/test_explain.py index 5dd61f7..6e8afd3 100644 --- a/tests/test_explain.py +++ b/tests/test_explain.py @@ -2,7 +2,7 @@ from __future__ import annotations -from unittest.mock import patch +from unittest.mock import AsyncMock, patch import numpy as np import pytest @@ -82,7 +82,7 @@ def _make_da_with_backends( height: int = 10, affine: Affine | None = None, ) -> xr.DataArray: - """Return a minimal DataArray with stac_cog explain attrs attached.""" + """Return a minimal DataArray with lazycogs explain attrs attached.""" if affine is None: resolution = 1.0 affine = Affine(resolution, 0.0, 0.0, 0.0, -resolution, float(height)) @@ -461,6 +461,31 @@ def test_accessor_raises_on_non_stac_da(): da.lazycogs.explain() +def test_accessor_explain_routes_duckdb_queries_through_helper(wgs84): + """explain() routes DuckDB work through the shared helper.""" + dates = ["2023-01-01/2023-01-01", "2023-01-02/2023-01-02"] + time_coords = [np.datetime64("2023-01-01", "D"), np.datetime64("2023-01-02", "D")] + da = _make_da_with_backends( + wgs84, + dates=dates, + time_coords=time_coords, + bands=["red"], + width=4, + height=4, + ) + + with patch( + "lazycogs._explain.run_duckdb", + new_callable=AsyncMock, + return_value=_fake_items("red", 2), + ) as mock_run_duckdb: + plan = da.lazycogs.explain() + + assert plan.total_chunk_reads == 2 + assert plan.total_cog_reads == 4 + assert mock_run_duckdb.await_count == 2 + + def test_accessor_explain_returns_plan(wgs84): """explain() returns an ExplainPlan with correct counts.""" dates = ["2023-01-01/2023-01-01", "2023-01-02/2023-01-02"] diff --git a/tests/integration_test.py b/tests/test_integration.py similarity index 100% rename from tests/integration_test.py rename to tests/test_integration.py diff --git a/tests/test_store.py b/tests/test_store.py index 26c69f6..0cf0c20 100644 --- a/tests/test_store.py +++ b/tests/test_store.py @@ -2,6 +2,7 @@ from __future__ import annotations +from concurrent.futures import ThreadPoolExecutor from unittest.mock import patch import pytest @@ -15,6 +16,8 @@ ) from lazycogs._store import resolve, store_for +pytestmark = pytest.mark.usefixtures("clear_store_cache") + class _ProtocolStore: async def get_range_async(self, _start: int, _end: int): @@ -67,27 +70,40 @@ def test_unsupported_scheme_raises(): resolve("ftp://server/file.tif") -def test_thread_local_cache_same_bucket(): +def test_shared_cache_same_bucket(): """Two HREFs in the same bucket return the same store object.""" store_a, _ = resolve("s3://shared-bucket/file1.tif") store_b, _ = resolve("s3://shared-bucket/file2.tif") assert store_a is store_b -def test_thread_local_cache_different_buckets(): +def test_shared_cache_different_buckets(): """HREFs in different buckets return distinct store objects.""" store_a, _ = resolve("s3://bucket-one/file.tif") store_b, _ = resolve("s3://bucket-two/file.tif") assert store_a is not store_b -def test_thread_local_cache_same_https_host(): +def test_shared_cache_same_https_host(): """Two HTTPS HREFs on the same host share a store.""" store_a, _ = resolve("https://cdn.example.com/img/a.tif") store_b, _ = resolve("https://cdn.example.com/img/b.tif") assert store_a is store_b +def test_shared_cache_is_thread_safe(): + """Concurrent callers reuse the same cached store instance.""" + + def resolve_store(): + store, _ = resolve("s3://shared-bucket/thread-safe.tif") + return store + + with ThreadPoolExecutor(max_workers=8) as executor: + stores = list(executor.map(lambda _: resolve_store(), range(16))) + + assert len({id(store) for store in stores}) == 1 + + def test_user_supplied_store_is_returned_unchanged(): """When a store is passed, it is returned as-is with just the path extracted.""" user_store = MemoryStore()