Skip to content

Commit cbef616

Browse files
committed
refactor: centralize async runtime around shared loop and pools
1 parent e7dd077 commit cbef616

15 files changed

Lines changed: 504 additions & 204 deletions

ARCHITECTURE.md

Lines changed: 13 additions & 17 deletions
Large diffs are not rendered by default.

README.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,20 @@ event loop. Multiple concurrent chunk reads overlap naturally, so the
9797
async path can be faster than the synchronous `da.compute()` when
9898
reading many chunks inside an already-running loop.
9999

100+
### Concurrency notes
101+
102+
- Sync callers submit work to one shared persistent lazycogs event loop.
103+
- CPU-bound reprojection runs on one bounded shared thread pool. Set
104+
`LAZYCOGS_REPROJECT_WORKERS` before first use to change the default
105+
`min(os.cpu_count(), 4)` limit.
106+
- DuckDB queries yield the event loop by running on a small explicit
107+
executor instead of on the loop thread. On the local benchmark fixture,
108+
DuckDB stayed under 2% of per-date chunk wall time, so there is no
109+
separate per-thread DuckDB client pool today.
110+
- If you need to construct a loop-bound resource for lazycogs internals,
111+
use `lazycogs.run_on_loop(...)`.
112+
- Low-level callers should use `await lazycogs.read_chunk_async(...)`.
113+
100114
## Documentation
101115

102116
- [Home](https://developmentseed.github.io/lazycogs/) — quickstart and full usage guide

docs/api/utils.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
::: lazycogs.store_for
66

7-
::: lazycogs.set_reproject_workers
7+
::: lazycogs.run_on_loop
88

99
::: lazycogs.ExplainPlan
1010

docs/guides/chunking.md

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ vals = da.sel(x=299965, y=2653947, method="nearest").sel(time=slice("2025-06", "
1919

2020
## When to add chunks
2121

22-
Add `chunks={"time": 1}` when you want dask to distribute work across multiple workers. Without chunks, all time steps share one event loop and one reprojection thread pool. I/O is concurrent but CPU-bound reprojection is serialized within that pool. With temporal chunks, each time step becomes an independent dask task with its own event loop and thread pool, so reprojection work can run in true parallel across workers:
22+
Add `chunks={"time": 1}` when you want dask to distribute work across multiple workers. Without chunks, all time steps share one lazycogs event loop and one bounded reprojection pool. I/O is concurrent but CPU-bound reprojection is bounded by that shared pool. With temporal chunks, dask can run multiple chunk tasks in parallel across worker threads, all submitting to the same lazycogs loop while still sharing the same bounded reprojection pool:
2323

2424
```python
2525
da = lazycogs.open(
@@ -34,7 +34,7 @@ da.max(dim="time").compute() # each time step runs in its own dask task
3434

3535
## Spatial chunks
3636

37-
Avoid spatial chunks unless you are under memory pressure. lazycogs handles spatial I/O concurrency internally through its async event loop — adding spatial dask tasks layers DuckDB query overhead on top of I/O that was already happening concurrently.
37+
Avoid spatial chunks unless you are under memory pressure. lazycogs handles spatial I/O concurrency internally through its async event loop — adding spatial dask tasks layers extra DuckDB query overhead on top of I/O that was already happening concurrently.
3838

3939
The one case where spatial chunks are useful is when a single time step is too large to fit in memory even at `max_concurrent_reads=1`. In that case, small spatial chunks limit how many pixels are in flight at once.
4040

@@ -57,17 +57,16 @@ da = lazycogs.open(
5757
)
5858
```
5959

60-
## `set_reproject_workers`
60+
## `LAZYCOGS_REPROJECT_WORKERS`
6161

62-
Controls how many threads each chunk's event loop uses for CPU-bound reprojection (pyproj + numpy). The default is `min(os.cpu_count(), 4)`.
62+
Controls how many threads the shared reprojection pool uses for CPU-bound reprojection (pyproj + numpy). The default is `min(os.cpu_count(), 4)`.
6363

6464
Reprojection is memory-bandwidth-bound rather than compute-bound. Benchmarks show diminishing returns above 4 threads because concurrent large-array operations saturate the memory bus rather than adding throughput. Raising this beyond 4 is rarely useful.
6565

66-
Each chunk gets its own independent thread pool, so dask tasks do not queue behind each other for reprojection.
66+
Set the environment variable before the first lazycogs chunk read:
6767

68-
```python
69-
import lazycogs
70-
lazycogs.set_reproject_workers(2) # reduce from default if memory-constrained
68+
```bash
69+
export LAZYCOGS_REPROJECT_WORKERS=2
7170
```
7271

7372
See also: [API reference for open()](../api/open.md), [API reference for utilities](../api/utils.md), [Architecture](../architecture.md)

src/lazycogs/__init__.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
"""lazycogs: lazy xarray DataArrays from STAC COG collections."""
22

3-
from lazycogs._chunk_reader import read_chunk, read_chunk_async
3+
from lazycogs._chunk_reader import read_chunk_async
44
from lazycogs._core import open # noqa: A004
5-
from lazycogs._executor import set_reproject_workers
5+
from lazycogs._executor import run_on_loop
66
from lazycogs._explain import ( # noqa: F401 — registers da.lazycogs accessor
77
ChunkRead,
88
CogRead,
@@ -36,8 +36,7 @@
3636
"StdevMethod",
3737
"align_bbox",
3838
"open",
39-
"read_chunk",
4039
"read_chunk_async",
41-
"set_reproject_workers",
40+
"run_on_loop",
4241
"store_for",
4342
]

src/lazycogs/_backend.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,7 @@
1616

1717
from lazycogs._chunk_reader import read_chunk_async
1818
from lazycogs._cql2 import _extract_filter_fields, _sortby_fields
19-
from lazycogs._executor import (
20-
_DUCKDB_EXECUTOR,
21-
_run_coroutine,
22-
)
19+
from lazycogs._executor import _run_coroutine, get_duckdb_pool
2320

2421
logger = logging.getLogger(__name__)
2522

@@ -223,7 +220,7 @@ async def _search_items_async(
223220
224221
"""
225222
loop = asyncio.get_running_loop()
226-
return await loop.run_in_executor(_DUCKDB_EXECUTOR, _search_items_sync, plan, date)
223+
return await loop.run_in_executor(get_duckdb_pool(), _search_items_sync, plan, date)
227224

228225

229226
async def _run_one_date(

src/lazycogs/_chunk_reader.py

Lines changed: 2 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from async_geotiff import GeoTIFF, Overview, RasterArray, Window
1313
from numpy import ma
1414

15-
from lazycogs._executor import _run_coroutine
15+
from lazycogs._executor import get_reproject_pool
1616
from lazycogs._mosaic_methods import FirstMethod, MosaicMethodBase
1717
from lazycogs._reproject import (
1818
WarpMap,
@@ -442,7 +442,7 @@ async def _read_band(
442442
# Compute warp maps and apply, sharing maps across bands with identical geometry.
443443
loop = asyncio.get_running_loop()
444444
return await loop.run_in_executor(
445-
None,
445+
get_reproject_pool(),
446446
lambda: _apply_bands_with_warp_cache(
447447
band_rasters,
448448
ctx.chunk_affine,
@@ -615,62 +615,3 @@ def _error(idx: int, exc: BaseException) -> None:
615615
dtype=np.float32,
616616
)
617617
return output
618-
619-
620-
def read_chunk(
621-
items: list[dict],
622-
bands: list[str],
623-
chunk_affine: Affine,
624-
dst_crs: CRS,
625-
chunk_width: int,
626-
chunk_height: int,
627-
nodata: float | None = None,
628-
mosaic_method_cls: type[MosaicMethodBase] | None = None,
629-
store: ObjectStore | None = None,
630-
max_concurrent_reads: int = 32,
631-
warp_cache: dict | None = None,
632-
path_fn: Callable[[str], str] | None = None,
633-
) -> dict[str, np.ndarray]:
634-
"""Run :func:`read_chunk_async` on the persistent per-thread background loop.
635-
636-
All arguments are identical to :func:`read_chunk_async`.
637-
638-
Args:
639-
items: List of STAC item dicts to mosaic. Processed in order.
640-
bands: Asset keys identifying the bands to read from each item.
641-
chunk_affine: Affine transform of the destination chunk.
642-
dst_crs: CRS of the destination chunk.
643-
chunk_width: Width of the destination chunk in pixels.
644-
chunk_height: Height of the destination chunk in pixels.
645-
nodata: No-data fill value.
646-
mosaic_method_cls: Mosaic method class instantiated once per band.
647-
Defaults to :class:`~lazycogs._mosaic_methods.FirstMethod`.
648-
store: Optional pre-configured obstore ``ObjectStore`` instance.
649-
max_concurrent_reads: Maximum number of COG reads to run concurrently.
650-
warp_cache: Optional cache shared across calls for reusing warp maps
651-
from earlier time steps.
652-
path_fn: Optional callable that takes an asset HREF and returns the
653-
object path to use with *store*.
654-
655-
Returns:
656-
``dict`` mapping each band name to an array of shape
657-
``(cog_bands, chunk_height, chunk_width)`` with dtype matching the
658-
source COGs.
659-
660-
"""
661-
return _run_coroutine(
662-
read_chunk_async(
663-
items=items,
664-
bands=bands,
665-
chunk_affine=chunk_affine,
666-
dst_crs=dst_crs,
667-
chunk_width=chunk_width,
668-
chunk_height=chunk_height,
669-
nodata=nodata,
670-
mosaic_method_cls=mosaic_method_cls,
671-
store=store,
672-
max_concurrent_reads=max_concurrent_reads,
673-
warp_cache=warp_cache,
674-
path_fn=path_fn,
675-
),
676-
)

0 commit comments

Comments
 (0)