Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@ For large pre-existing STAC archives stored as hive-partitioned parquet director

Work is split sharply into two phases.

**Phase 0 — open time** runs at `open()` call time. It does the minimum needed to build a fully-described lazy DataArray: one DuckDB query to discover bands, one DuckDB query to build the time axis, a small object-store access smoketest, and a grid computation. No pixel I/O happens. No dask task graph is built. A single `MultiBandStacBackendArray` is wrapped in an xarray `LazilyIndexedArray`.
**Phase 0 — open time** runs at `open()` call time. It does the minimum needed to build a fully-described lazy DataArray: one DuckDB query to inspect the first matching item, one DuckDB query to build the time axis, concurrent COG metadata opens for the requested bands, and a grid computation. No pixel windows are read. No dask task graph is built. A single `MultiBandStacBackendArray` is wrapped in an xarray `LazilyIndexedArray`.

**Phase 1 — compute time** runs inside a dask worker when a chunk is actually computed. `MultiBandStacBackendArray.__getitem__` receives the exact `(band, time, y, x)` index for the chunk, derives the chunk's spatial footprint, queries DuckDB for only the COGs that overlap that footprint and time step, reads and reprojects all selected bands concurrently with `asyncio`, and mosaics the results.

This split means that `open()` is nearly instant even for large queries, and the DuckDB spatial filter runs once per chunk rather than over the entire bbox at open time. The one exception to the "metadata-only" story is the storage smoketest: `open()` issues a single `head()` against one representative asset so authentication and object-store wiring fail early with a clear error.
This split means that `open()` is nearly instant even for large queries, and the DuckDB spatial filter runs once per chunk rather than over the entire bbox at open time. The startup inspection is still deliberately small: one representative item and one `GeoTIFF.open(...)` per requested band, just enough to validate store access and infer the output dtype/nodata contract. That contract is enforced later during chunk reads unless the caller passed `dtype=` or `nodata=` explicitly.

## Module overview

```
src/lazycogs/
_core.py Entry point. open(), band discovery, time-step building.
_core.py Entry point. open(), representative-item inspection, dtype/nodata resolution, time-step building.
_backend.py MultiBandStacBackendArray — xarray BackendArray implementation that bridges xarray indexing to chunk reads.
_chunk_reader.py Async mosaic logic: open COGs, select overviews, read windows, reproject, mosaic.
_executor.py Shared background loop and bounded executors. Exposes run_on_loop().
Expand All @@ -44,13 +44,18 @@ src/lazycogs/
1. Resolves `duckdb_client`: if not provided, creates a plain `DuckdbClient()`. Validates that `href` ends in `.parquet`/`.geoparquet` when no client is supplied (a directory path is accepted when a custom client is passed).
2. Parses `time_period` into a `_TemporalGrouper` (see `_temporal.py`).
3. Converts `bbox` from the target CRS to EPSG:4326 using `pyproj.Transformer`.
4. Calls `_discover_bands()`: queries the parquet source via `duckdb_client.search(..., max_items=1)` to find asset keys. Assets with role `"data"` or media type `"image/tiff"` are returned first.
5. Calls `_smoketest_store()`: fetches one sample item from the parquet, resolves the store for a representative data asset HREF, and calls `GeoTIFF.open(path, store=...)` to confirm that the configured store satisfies the same read contract the real chunk reader uses. Raises `RuntimeError` immediately with a clear message if the store cannot reach the asset, so misconfiguration is surfaced at `open()` time rather than deferred to the first chunk read.
4. Calls `_inspect_first_item()`: queries the parquet source via `duckdb_client.search(..., max_items=1)` to fetch one representative item, resolves preferred data assets (or caller-requested `bands=`), and opens one COG per requested band concurrently through `run_on_loop(...)`.
5. Resolves the output contract from that inspection result:
- `bands`: explicit `bands=` wins, otherwise the inspected preferred-band order
- `dtype`: explicit `dtype=` wins unless it is an integer paired with a float-only mosaic method; otherwise lazycogs infers from `_promote_dtypes(...)` and auto-promotes integer inference to `float32` for float-only methods
- `nodata`: explicit `nodata=` wins, otherwise `_resolve_output_nodata(...)`
- float-only mosaic methods (`MeanMethod`, `MedianMethod`, `StdevMethod`) fail early only when the caller explicitly forces an integer dtype
- inferred `dtype` and `nodata` are runtime-validated during chunk reads so later heterogeneous assets fail loudly instead of truncating or silently remapping nodata
6. Calls `_build_time_steps()`: queries the parquet source via `duckdb_client.search_to_arrow(...)` to obtain an Arrow table containing only the `datetime` and `start_datetime` columns (plus any filter/sort fields). Extracts timestamps from the Arrow columns without Python-level dict walking, buckets them with the `_TemporalGrouper`, deduplicates, and returns sorted `(filter_strings, time_coords)` pairs. Only groups with at least one item produce a time step.
7. Calls `compute_output_grid()` to get the output affine transform and dimensions (width, height). No eager coordinate arrays are produced.
8. Creates a single `MultiBandStacBackendArray` (a dataclass) with shape `(band, time, y, x)` holding all the parameters needed to materialise any chunk later, then wraps it in one `xarray.core.indexing.LazilyIndexedArray`. This avoids `xr.concat` (used internally by `ds.to_array()`), which would eagerly load `LazilyIndexedArray`-backed objects.
9. Uses `rasterix.RasterIndex` for spatial indexing, but materialises the x/y coordinate variables eagerly as numpy arrays so chunked scalar spatial selections compute reliably.
10. Constructs the `xr.DataArray` directly from the 4-D variable. If `chunks` is provided, calls `.chunk(chunks)` to convert to a dask-backed array; otherwise the `LazilyIndexedArray` remains in play so narrow slices (e.g. a single pixel) translate to minimal I/O.
10. Constructs the `xr.DataArray` directly from the 4-D variable. If `chunks` is provided, calls `.chunk(chunks)` to convert to a dask-backed array; otherwise the `LazilyIndexedArray` remains in play so narrow slices (e.g. a single pixel) translate to minimal I/O. When output nodata is known, the returned array sets `da.attrs["_FillValue"]` and `da.encoding["_FillValue"]` for downstream serialization. When unknown, no `_FillValue` metadata is attached.
11. Stores `_stac_backend` (the `MultiBandStacBackendArray` instance) and `_stac_time_coords` (the full time coordinate array) in `da.attrs` so that `da.lazycogs.explain()` can reconstruct the explain plan without re-specifying `open()` parameters.

## Explain: dry-run read estimator
Expand Down Expand Up @@ -105,7 +110,7 @@ A `rasterix.RasterIndex` is attached to every DataArray returned by `open()`. Th

## Per-chunk read and resample pipeline

Each call to `_read_item_band()` in `_chunk_reader.py` follows a four-step pipeline to turn a remote COG into a correctly-sized, correctly-projected numpy array for one destination chunk.
Each call to `_read_item_band()` in `_chunk_reader.py` first validates the source asset against the inferred output contract, then follows a four-step pipeline to turn a remote COG into a correctly-sized, correctly-projected numpy array for one destination chunk.

### 1. Overview selection

Expand Down Expand Up @@ -143,7 +148,7 @@ There are two load-bearing concurrency layers in a chunk read, plus dask as an o

**Dask (optional, chunk level).** When a dask-backed DataArray is computed, dask dispatches chunk tasks to worker threads. Each worker thread calls `_sync_getitem()` in `_backend.py`, which uses `run_on_loop(_async_getitem(...))` to submit the chunk read to lazycogs' shared persistent background loop. Dask controls how many chunk tasks run at once; it does not create extra lazycogs event loops.

**One shared asyncio loop (I/O fan-out).** A single background event loop is created lazily and reused for every sync caller. Startup is only considered ready once that loop is actually running, so first-use sync bridge calls do not depend on thread scheduling luck. Inside `_async_getitem`, `asyncio.gather` fans out one `_run_one_date` coroutine per requested time step, and each time step uses `read_chunk_async` to fan out item reads under `asyncio.Semaphore(max_concurrent_reads)`. COG header reads and tile fetches from `async-geotiff` are all awaitable, so the loop multiplexes network I/O without blocking. DuckDB work never runs on the loop thread: `_search_items_async` and `_explain_async` both route queries through `run_duckdb(...)`, a small internal helper that applies a private submission bound before dispatching onto the DuckDB executor. The executor is still bounded (`max_workers=1` today), so DuckDB yields the event loop but still matches the effective single-connection serialization of the current client model. The current local benchmark fixture keeps DuckDB well below the U4 threshold for a separate client pool (1.9% of per-date chunk wall time on a small-bbox / many-time-step workload, 0.2% on a large-bbox / few-time-step workload), so the single-worker executor remains the intended design for now.
**One shared asyncio loop (I/O fan-out).** A single background event loop is created lazily and reused for every sync caller. Startup is only considered ready once that loop is actually running, so first-use sync bridge calls do not depend on thread scheduling luck. Inside `_async_getitem`, `asyncio.gather` fans out one `_run_one_date` coroutine per requested time step, and each time step uses `read_chunk_async` to fan out item reads under `asyncio.Semaphore(max_concurrent_reads)`. COG header reads and tile fetches from `async-geotiff` are all awaitable, so the loop multiplexes network I/O without blocking. DuckDB work never runs on the loop thread: `_search_items_async` and `_explain_async` both route queries through `run_duckdb(...)`, a small internal helper that applies a private submission bound before dispatching onto the DuckDB executor. The executor is still bounded (`max_workers=1` today), so DuckDB yields the event loop but still matches the effective single-connection serialization of the current client model. The current local benchmark fixture keeps DuckDB well below the U4 threshold for a separate client pool (1.9% of per-date chunk wall time on a small-bbox / many-time-step workload, 0.2% on a large-bbox / few-time-step workload), so the single-worker executor remains the intended design for now. That same prepared benchmark dataset also backs offline regression coverage in `tests/benchmarks/`, which keeps dtype/nodata contract tests off the network while exercising real local COGs.

**One bounded reprojection thread pool (CPU work).** `_apply_bands_with_warp_cache` is synchronous CPU-bound work that processes all bands for one item together. `_read_item_band` dispatches it via `loop.run_in_executor(get_reproject_pool(), ...)`, so the event loop stays free while reprojections run on a shared bounded thread pool. The default bound is `min(os.cpu_count() or 1, 4)`, configurable before first use via `LAZYCOGS_REPROJECT_WORKERS`. That environment variable is read once when the pool is first created; later changes are ignored. The `warp_cache` is shared across coroutines; `compute_warp_map` is deterministic, so duplicate concurrent writes are safe.

Expand Down Expand Up @@ -207,9 +212,11 @@ Each grouper provides three methods: `group_key()` maps a datetime string to a s

- `FirstMethod`: Returns the first valid pixel seen. Short-circuits when all pixels are filled.
- `HighestMethod` / `LowestMethod`: Tracks the running max / min across items.
- `MeanMethod` / `MedianMethod` / `StdevMethod`: Accumulates all arrays and reduces at the end.
- `MeanMethod` / `MedianMethod` / `StdevMethod`: Accumulates all arrays and reduces at the end. These methods advertise `requires_float = True`, so `open()` auto-promotes inferred integer outputs to `float32` and rejects explicit integer `dtype=` overrides.
- `CountMethod`: Counts valid (non-masked) observations per pixel.

For nodata-aware methods, any pixels that remain masked after mosaicking are materialized with the resolved output nodata sentinel when one is known, or with `0` only when output nodata is unknown.

These are copied from `rio-tiler` (MIT licence, zero GDAL imports) to avoid pulling in rasterio as a transitive dependency.

## Temporal compositing
Expand Down Expand Up @@ -277,5 +284,6 @@ The documentation site is built with [mkdocs-material](https://squidfunk.github.
- **Introduction**: rendered from `README.md` (symlinked as `docs/index.md`)
- **Examples**: two Jupyter notebooks rendered by `mkdocs-jupyter` — `docs/demo_midwest_daily.ipynb` (daily Sentinel-2 array over the Midwest US) and `docs/demo_southwest_monthly.ipynb` (monthly low-cloud composite over the US Southwest)
- **API Reference**: auto-generated from docstrings by `mkdocstrings[python]`
- **Guides**: focused docs for topics like STAC queries, chunking, cloud storage, and dtype/nodata handling

To preview locally: `uv run mkdocs serve`
40 changes: 38 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ await rustac.search_to(
)

# Open a fully lazy (band, time, y, x) DataArray. No pixel data is read yet.
# lazycogs does perform a small storage-access smoketest here so auth or
# object-store misconfiguration fails early instead of on the first chunk read.
# lazycogs does inspect one representative item here: it picks preferred data
# assets, opens one COG per requested band concurrently, and infers a default
# dtype/nodata contract from that representative sample.
da = lazycogs.open(
"items.parquet",
bbox=dst_bbox,
Expand Down Expand Up @@ -104,6 +105,37 @@ reading many chunks inside an already-running loop.
`lazycogs.open(..., store=...)` accepts any store object that satisfies `async_geotiff.Store`.
For most users, the recommended path is still obstore: leave `store=None` to auto-resolve per-asset stores, or call `lazycogs.store_for()` to build one explicitly.

## Dtype and nodata semantics

See also: [docs guide on dtype and nodata handling](docs/guides/dtype-nodata.md).

When you omit `dtype=`, `lazycogs.open()` samples one representative COG per
requested band and infers one safe output dtype instead of defaulting to
`float32`. That inferred dtype is then enforced at chunk-read time: if a later
asset has a source dtype that cannot be safely represented, compute raises and
asks you to pass `dtype=` explicitly.

When you omit `nodata=`:

- if sampled bands all agree on one scalar nodata sentinel, the returned
`DataArray` sets `attrs["_FillValue"]` and `encoding["_FillValue"]`, and
masked mosaic output materializes with that same sentinel instead of zero
- if sampled bands disagree, `open()` raises `ValueError` and asks you to pass
`nodata=` explicitly
- if sampled bands have no nodata sentinel, no `_FillValue` encoding is
attached and `0` remains only an implementation fill value for uncovered
regions
- if later chunk reads encounter a conflicting source nodata value, compute
raises and asks you to pass `nodata=` explicitly

Explicit `dtype=` and `nodata=` stay authoritative even when source assets are
heterogeneous.

Float-only mosaic methods such as `MeanMethod`, `MedianMethod`, and
`StdevMethod` auto-promote inferred integer outputs to `float32`. If you pass
an explicit integer `dtype=` with one of those methods, `open()` raises and
asks for `dtype="float32"` or `dtype="float64"` instead.

### Concurrency notes

- Sync callers submit work to one shared persistent lazycogs event loop.
Expand All @@ -116,6 +148,9 @@ For most users, the recommended path is still obstore: leave `store=None` to aut
thread. On the local benchmark fixture, DuckDB stayed under 2% of
per-date chunk wall time, so there is no separate per-thread DuckDB
client pool today.
- `scripts/prepare_benchmark_data.py` also powers offline regression coverage
under `tests/benchmarks/`, so the contract tests run on local cached files
instead of live network reads.
- If you need to construct a loop-bound resource for lazycogs internals,
use `lazycogs.run_on_loop(...)`.
- Low-level callers should use `await lazycogs.read_chunk_async(...)`.
Expand All @@ -125,5 +160,6 @@ For most users, the recommended path is still obstore: leave `store=None` to aut
- [Home](https://developmentseed.github.io/lazycogs/) — quickstart and full usage guide
- [Example: Midwest US daily Sentinel-2 array](https://developmentseed.org/lazycogs/notebooks/demo_midwest_daily/)
- [Example: Southwest US monthly low-cloud Sentinel-2 array](https://developmentseed.org/lazycogs/notebooks/demo_southwest_monthly/)
- [Guide: dtype and nodata handling](https://developmentseed.org/lazycogs/guides/dtype-nodata/)
- [Architecture](https://developmentseed.org/lazycogs/architecture/)
- [Contributing](CONTRIBUTING.md)
Loading
Loading