Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 18 additions & 22 deletions ARCHITECTURE.md

Large diffs are not rendered by default.

20 changes: 19 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ await rustac.search_to(
bbox=bbox_4326,
)

# Open a fully lazy (band, time, y, x) DataArray. No COGs are read yet.
# Open a fully lazy (band, time, y, x) DataArray. No pixel data is read yet.
# lazycogs does perform a small storage-access smoketest here so auth or
# object-store misconfiguration fails early instead of on the first chunk read.
da = lazycogs.open(
"items.parquet",
bbox=dst_bbox,
Expand Down Expand Up @@ -102,6 +104,22 @@ reading many chunks inside an already-running loop.
`lazycogs.open(..., store=...)` accepts any store object that satisfies `async_geotiff.Store`.
For most users, the recommended path is still obstore: leave `store=None` to auto-resolve per-asset stores, or call `lazycogs.store_for()` to build one explicitly.

### Concurrency notes

- Sync callers submit work to one shared persistent lazycogs event loop.
- CPU-bound reprojection runs on one bounded shared thread pool. Set
`LAZYCOGS_REPROJECT_WORKERS` before first use to change the default
`min(os.cpu_count() or 1, 4)` limit. The value is read when the pool is
first created; changes after that are ignored for the life of the process.
- DuckDB queries yield the event loop by running through one small bounded
internal submission path and explicit executor instead of on the loop
thread. On the local benchmark fixture, DuckDB stayed under 2% of
per-date chunk wall time, so there is no separate per-thread DuckDB
client pool today.
- If you need to construct a loop-bound resource for lazycogs internals,
use `lazycogs.run_on_loop(...)`.
- Low-level callers should use `await lazycogs.read_chunk_async(...)`.

## Documentation

- [Home](https://developmentseed.github.io/lazycogs/) — quickstart and full usage guide
Expand Down
2 changes: 1 addition & 1 deletion docs/api/utils.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

::: lazycogs.store_for

::: lazycogs.set_reproject_workers
::: lazycogs.run_on_loop

::: lazycogs.ExplainPlan

Expand Down
15 changes: 7 additions & 8 deletions docs/guides/chunking.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ vals = da.sel(x=299965, y=2653947, method="nearest").sel(time=slice("2025-06", "

## When to add chunks

Add `chunks={"time": 1}` when you want dask to distribute work across multiple workers. Without chunks, all time steps share one event loop and one reprojection thread pool. I/O is concurrent but CPU-bound reprojection is serialized within that pool. With temporal chunks, each time step becomes an independent dask task with its own event loop and thread pool, so reprojection work can run in true parallel across workers:
Add `chunks={"time": 1}` when you want dask to distribute work across multiple workers. Without chunks, all time steps share one lazycogs event loop and one bounded reprojection pool. I/O is concurrent but CPU-bound reprojection is bounded by that shared pool. With temporal chunks, dask can run multiple chunk tasks in parallel across worker threads, all submitting to the same lazycogs loop while still sharing the same bounded reprojection pool:

```python
da = lazycogs.open(
Expand All @@ -34,7 +34,7 @@ da.max(dim="time").compute() # each time step runs in its own dask task

## Spatial chunks

Avoid spatial chunks unless you are under memory pressure. lazycogs handles spatial I/O concurrency internally through its async event loop — adding spatial dask tasks layers DuckDB query overhead on top of I/O that was already happening concurrently.
Avoid spatial chunks unless you are under memory pressure. lazycogs handles spatial I/O concurrency internally through its async event loop — adding spatial dask tasks layers extra DuckDB query overhead on top of I/O that was already happening concurrently.

The one case where spatial chunks are useful is when a single time step is too large to fit in memory even at `max_concurrent_reads=1`. In that case, small spatial chunks limit how many pixels are in flight at once.

Expand All @@ -57,17 +57,16 @@ da = lazycogs.open(
)
```

## `set_reproject_workers`
## `LAZYCOGS_REPROJECT_WORKERS`

Controls how many threads each chunk's event loop uses for CPU-bound reprojection (pyproj + numpy). The default is `min(os.cpu_count(), 4)`.
Controls how many threads the shared reprojection pool uses for CPU-bound reprojection (pyproj + numpy). The default is `min(os.cpu_count(), 4)`.

Reprojection is memory-bandwidth-bound rather than compute-bound. Benchmarks show diminishing returns above 4 threads because concurrent large-array operations saturate the memory bus rather than adding throughput. Raising this beyond 4 is rarely useful.

Each chunk gets its own independent thread pool, so dask tasks do not queue behind each other for reprojection.
Set the environment variable before the first lazycogs chunk read:

```python
import lazycogs
lazycogs.set_reproject_workers(2) # reduce from default if memory-constrained
```bash
export LAZYCOGS_REPROJECT_WORKERS=2
```

See also: [API reference for open()](../api/open.md), [API reference for utilities](../api/utils.md), [Architecture](../architecture.md)
7 changes: 3 additions & 4 deletions src/lazycogs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""lazycogs: lazy xarray DataArrays from STAC COG collections."""

from lazycogs._chunk_reader import read_chunk, read_chunk_async
from lazycogs._chunk_reader import read_chunk_async
from lazycogs._core import open # noqa: A004
from lazycogs._executor import set_reproject_workers
from lazycogs._executor import run_on_loop
from lazycogs._explain import ( # noqa: F401 — registers da.lazycogs accessor
ChunkRead,
CogRead,
Expand Down Expand Up @@ -36,8 +36,7 @@
"StdevMethod",
"align_bbox",
"open",
"read_chunk",
"read_chunk_async",
"set_reproject_workers",
"run_on_loop",
"store_for",
]
12 changes: 4 additions & 8 deletions src/lazycogs/_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@

from lazycogs._chunk_reader import read_chunk_async
from lazycogs._cql2 import _extract_filter_fields, _sortby_fields
from lazycogs._executor import (
_DUCKDB_EXECUTOR,
_run_coroutine,
)
from lazycogs._executor import run_duckdb, run_on_loop

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -224,8 +221,7 @@ async def _search_items_async(
List of STAC items returned by DuckDB.

"""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(_DUCKDB_EXECUTOR, _search_items_sync, plan, date)
return await run_duckdb(_search_items_sync, plan, date)


async def _run_one_date(
Expand Down Expand Up @@ -340,7 +336,7 @@ class MultiBandStacBackendArray(BackendArray):
store: Pre-configured :class:`async_geotiff.Store` accepted by
``GeoTIFF.open`` and shared across all chunk reads. When ``None``,
each asset HREF is resolved to an obstore-backed store via the
thread-local cache in :func:`~lazycogs._store.resolve`.
shared process-local cache in :func:`~lazycogs._store.resolve`.
max_concurrent_reads: Maximum number of COG reads to run concurrently
per chunk. Limits peak in-flight memory when a chunk overlaps
many items. Defaults to 32.
Expand Down Expand Up @@ -515,7 +511,7 @@ def _sync_getitem(self, key: tuple[Any, ...]) -> np.ndarray:
Numpy array with shape determined by the indexing key.

"""
return _run_coroutine(self._async_getitem(key))
return run_on_loop(self._async_getitem(key))

async def _async_getitem(self, key: tuple[Any, ...]) -> np.ndarray:
"""Materialise the chunk identified by ``key``.
Expand Down
75 changes: 8 additions & 67 deletions src/lazycogs/_chunk_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from async_geotiff import GeoTIFF, Window
from numpy import ma

from lazycogs._executor import _run_coroutine
from lazycogs._executor import get_reproject_pool
from lazycogs._mosaic_methods import FirstMethod, MosaicMethodBase
from lazycogs._reproject import (
WarpMap,
Expand Down Expand Up @@ -51,13 +51,13 @@ class _ChunkContext:
warp_cache: dict[tuple[tuple[float, ...], CRS], WarpMap] | None


def _log_batch_failure(
def _log_read_failure(
label: str,
key: object,
item_id: str,
err: BaseException,
) -> None:
"""Log a warning for an item that failed inside an asyncio.gather batch."""
"""Log a warning for an item that failed inside bounded concurrent reads."""
logger.warning(
"Failed to read %s %r from item %s: %s",
label,
Expand Down Expand Up @@ -445,7 +445,7 @@ async def _read_band(
# Compute warp maps and apply, sharing maps across bands with identical geometry.
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
None,
get_reproject_pool(),
lambda: _apply_bands_with_warp_cache(
band_rasters,
ctx.chunk_affine,
Expand Down Expand Up @@ -534,8 +534,9 @@ async def read_chunk_async(
same source geometry compute the reprojection warp map only once (via
:func:`_apply_bands_with_warp_cache`).

Items are processed in batches of ``max_concurrent_reads``. When all
per-band mosaic methods signal completion, remaining batches are skipped.
All item reads are scheduled up front, but execution is bounded by
``max_concurrent_reads`` via an ``asyncio.Semaphore``. When all per-band
mosaic methods signal completion, remaining pending reads are skipped.

Args:
items: List of STAC item dicts to mosaic. Processed in order.
Expand Down Expand Up @@ -603,7 +604,7 @@ def _done() -> bool:
return all(m.is_done for m in mosaic_methods.values())

def _error(idx: int, exc: BaseException) -> None:
_log_batch_failure("bands", bands, items[idx].get("id", "<unknown>"), exc)
_log_read_failure("bands", bands, items[idx].get("id", "<unknown>"), exc)

await _drain_in_order(task_list, _feed, _done, _error)

Expand All @@ -619,63 +620,3 @@ def _error(idx: int, exc: BaseException) -> None:
dtype=np.float32,
)
return output


def read_chunk(
items: list[dict],
bands: list[str],
chunk_affine: Affine,
dst_crs: CRS,
chunk_width: int,
chunk_height: int,
nodata: float | None = None,
mosaic_method_cls: type[MosaicMethodBase] | None = None,
store: Store | None = None,
max_concurrent_reads: int = 32,
warp_cache: dict | None = None,
path_fn: Callable[[str], str] | None = None,
) -> dict[str, np.ndarray]:
"""Run :func:`read_chunk_async` on the persistent per-thread background loop.

All arguments are identical to :func:`read_chunk_async`.

Args:
items: List of STAC item dicts to mosaic. Processed in order.
bands: Asset keys identifying the bands to read from each item.
chunk_affine: Affine transform of the destination chunk.
dst_crs: CRS of the destination chunk.
chunk_width: Width of the destination chunk in pixels.
chunk_height: Height of the destination chunk in pixels.
nodata: No-data fill value.
mosaic_method_cls: Mosaic method class instantiated once per band.
Defaults to :class:`~lazycogs._mosaic_methods.FirstMethod`.
store: Optional pre-configured :class:`async_geotiff.Store`
accepted by ``GeoTIFF.open``.
max_concurrent_reads: Maximum number of COG reads to run concurrently.
warp_cache: Optional cache shared across calls for reusing warp maps
from earlier time steps.
path_fn: Optional callable that takes an asset HREF and returns the
object path to use with *store*.

Returns:
``dict`` mapping each band name to an array of shape
``(cog_bands, chunk_height, chunk_width)`` with dtype matching the
source COGs.

"""
return _run_coroutine(
read_chunk_async(
items=items,
bands=bands,
chunk_affine=chunk_affine,
dst_crs=dst_crs,
chunk_width=chunk_width,
chunk_height=chunk_height,
nodata=nodata,
mosaic_method_cls=mosaic_method_cls,
store=store,
max_concurrent_reads=max_concurrent_reads,
warp_cache=warp_cache,
path_fn=path_fn,
),
)
12 changes: 6 additions & 6 deletions src/lazycogs/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from lazycogs._backend import MultiBandStacBackendArray
from lazycogs._cql2 import _extract_filter_fields, _sortby_fields
from lazycogs._executor import _run_coroutine
from lazycogs._executor import run_on_loop
from lazycogs._grid import compute_output_grid
from lazycogs._mosaic_methods import FirstMethod, MosaicMethodBase
from lazycogs._store import resolve
Expand Down Expand Up @@ -168,7 +168,7 @@ def _smoketest_store(

resolved_store, path = resolve(href, store=store, path_fn=path_from_href)
try:
_run_coroutine(_open_store_sample(path, store=resolved_store))
run_on_loop(_open_store_sample(path, store=resolved_store))
except Exception as e:
raise RuntimeError(
f"Store cannot open {href!r} through GeoTIFF.open: {e}. "
Expand Down Expand Up @@ -524,11 +524,11 @@ def open( # noqa: A001
credentials, custom endpoints, or non-default options are needed
without relying on automatic store resolution from each HREF. When
``None`` (default), each asset URL is parsed to create or reuse a
per-thread cached obstore-backed store.
shared cached obstore-backed store behind a small lock.
max_concurrent_reads: Maximum number of COG reads to run concurrently
per chunk. Items are processed in batches of this size, which
bounds peak in-flight memory when a chunk overlaps many files.
Methods that support early exit (e.g. the default
per chunk. Concurrency is bounded to this size with an
``asyncio.Semaphore``, which bounds peak in-flight memory when a
chunk overlaps many files. Methods that support early exit (e.g. the default
:class:`~lazycogs._mosaic_methods.FirstMethod`) will stop
reading once every output pixel is filled, so lower values also
reduce unnecessary I/O on dense datasets. Defaults to 32.
Expand Down
Loading
Loading