diff --git a/dev-docs/specs/2026-05-12-getTileData-coalescing-design.md b/dev-docs/specs/2026-05-12-getTileData-coalescing-design.md new file mode 100644 index 00000000..8d7f201e --- /dev/null +++ b/dev-docs/specs/2026-05-12-getTileData-coalescing-design.md @@ -0,0 +1,224 @@ +# Coalescing tile fetches through deck.gl `getTileData` (`COGLayer`) + +**Date:** 2026-05-12 +**Issues:** [#273](https://github.com/developmentseed/deck.gl-raster/issues/273); related deck.gl [visgl/deck.gl#10098](https://github.com/visgl/deck.gl/issues/10098) +**Status:** Design — builds on [#531](https://github.com/developmentseed/deck.gl-raster/pull/531) (`geotiff.fetchTiles`, range-coalescing batched reader). + +## Background + +Two distinct (but related) problems push us to change how `COGLayer` fetches tiles: + +**1. Request coalescing.** deck.gl's `TileLayer` has exactly one fetch hook, `getTileData(tile, { signal, … })`, called **once per tile**. For tile sources where each tile is a separate URL (MVT), that's unavoidable. For a COG, every tile in a viewport is a byte range of the *same* file, and adjacent tiles' ranges are contiguous (or near-contiguous) on disk — so N independent range requests could be a handful of coalesced ones. [#531](https://github.com/developmentseed/deck.gl-raster/pull/531) gave `@developmentseed/geotiff` a `fetchTiles(xy[])` that does exactly that coalescing for a *known* batch. What's missing is getting deck.gl's stream of per-tile `getTileData` calls *to* `fetchTiles`. The deck.gl-native fix — a `getTileDataBatched` prop wired into `Tileset2D` — is proposed upstream ([visgl/deck.gl#10098](https://github.com/visgl/deck.gl/issues/10098)) but has no PR and no maintainer commitment, so this lives in our packages. + +**2. Concurrency capping across layers, per origin.** Most COGs we target live on AWS S3 (or similar), which serves over HTTP/1.1 only. Browsers cap concurrent HTTP/1.1 connections per origin at ~6 (Chrome: 6). If we schedule more than that, the browser queues the excess — and queued requests stick around even when the viewport pans, blocking *new* requests for the new viewport behind dead ones for the old. So we want to cap concurrent in-flight HTTP requests at the browser's limit; over-scheduling is actively harmful. + +This cap can't live per-layer: two `COGLayer`s targeting the same S3 bucket would each get 6 slots → 12 in-flight → browser queues. It has to be **per origin, shared across layers and source formats** (COG today, Zarr later). deck.gl's `Tileset2D` ships an internal loaders.gl `RequestScheduler({ maxRequests: 6 })` but it's per-`TileLayer` instance, so it doesn't solve this; it also counts `getTileData` *calls* (≈ tiles), not HTTP requests — and one COG tile fetch can issue several requests (uncached metadata + tile data + mask). + +The two problems are independent: coalescing is layer-scoped (which deck.gl `getTileData` calls belong in one `fetchTiles`?), gating is source-scoped (how many HTTP requests are in flight to this host?). The design treats them separately. + +### How deck.gl loads tiles today (relevant facts) + +- `Tileset2D` constructs a loaders.gl `RequestScheduler({ maxRequests: 6, debounceTime })`; `Tile2DHeader._loadData` does `await scheduleRequest(...)` then `await getTileData(...)`. When `maxRequests <= 0` (and `debounceTime <= 0`), throttling is off and `scheduleRequest` returns an already-resolved token. +- `Tileset2D._updateTileStates` loops over needed tile indices **synchronously**, spawning one `loadData` async per. With throttling off, each `getTileData` call is one microtask later — so the whole viewport-update burst lands within the current macrotask (before its microtask queue drains). +- `Tileset2D._pruneRequests` aborts unselected in-flight tiles when more than `maxRequests` are ongoing — a no-op when `maxRequests <= 0`. + +### Current code shape + +- `@developmentseed/deck.gl-raster` — `RasterTileLayer` (subclass of `CompositeLayer`) builds the inner `@deck.gl/geo-layers` `TileLayer` in `_renderTileLayer`, passing `getTileData: tile => this._wrapGetTileData(tile, getTileData)`. `_wrapGetTileData` composes `props.signal` with `tile.signal` and calls the layer's `getTileData(tile, { device, signal })`. Subclasses override `_getTileDataCallback()` to supply defaults. +- `@developmentseed/deck.gl-geotiff` — `COGLayer` extends `RasterTileLayer`; `_parseGeoTIFF()` opens a `GeoTIFF` via `fetchGeoTIFF(props.geotiff)` (which calls `GeoTIFF.fromUrl(url)` for URL inputs). `_getTileDataCallback` resolves `tile.index.z` to overview-vs-primary and calls the user/default `getTileData(image, …)` which today calls `geotiff.fetchTile`. +- `@developmentseed/geotiff` — `GeoTIFF.open({ dataSource, headerSource, concurrencyLimiter? })` (the `concurrencyLimiter?` option ships in this design — see below). `fromUrl(url)` builds the chunkd source stack and calls `open`. `fetchTile` / `fetchTiles` / `coalesceRanges` issue `dataSource.fetch(...)` calls. + +## Goals + +1. A `COGLayer` viewport of N tiles produces **far fewer than N HTTP requests** — adjacent tiles' byte ranges coalesced, via `geotiff.fetchTiles`. +2. **Concurrent HTTP requests to a single origin are capped at the browser's HTTP/1.1 limit**, *shared across all layers (and future source formats) targeting that origin* — so panning doesn't leave stale queued requests blocking fresh ones. +3. **Aborts on queued requests drop them** (no wasted fetch fires after the user has panned away). +4. **Zero behavior change when the batched callback isn't used**: a layer with no `getMultiTileData` takes the exact code path it does today. +5. Generic mechanism at the `RasterTileLayer` level (subclasses opt in by defining a batched callback); only `COGLayer` opts in for now. +6. No new dependency on `loaders.gl` in `@developmentseed/geotiff`; `geotiff`'s public `fetchTile` / `fetchTiles` signatures unchanged. + +## Non-goals + +- The deck.gl-native `getTileDataBatched` prop (upstream; out of scope here). +- `MultiCOGLayer` / `MosaicLayer` batching (the batcher's grouping key is designed to allow it later, but it isn't wired up). +- A `maxTilesPerBatch` cap (bounds how many tiles ride one all-or-nothing coalesced fetch). +- Per-tile *streaming* (`Promise[]` so a fast tile resolves before a slow one). +- Cross-origin or per-path tuning of the default limiter (`max=6` everywhere is fine). +- Gating header/metadata reads through the limiter (they're tiny, mostly at `open` time, served from the header cache — they never compete with tile-data fetches for connections). + +## Architecture + +**Two independent pieces.** They can ship and be reasoned about separately. + +1. **Per-origin `ConcurrencyLimiter` at the source layer** (`@developmentseed/geotiff`). A small abstract interface + a concrete `Semaphore` + a `defaultLimiterForOrigin(url)` factory that caches one limiter per origin. `GeoTIFF.fromUrl(url)` defaults its `concurrencyLimiter` to that factory's result — so two `fromUrl` calls (or any other source-level callers, including a future Zarr `fromUrl`-equivalent) targeting the same host share one cap. The limiter wraps the data source's `.fetch` via `limitFetch`; the caller's per-request `signal` is forwarded into `acquire`, so an abort while queued *drops* the request before any network fires. +2. **`TileBatcher` in the layer** (`@developmentseed/deck.gl-raster`). When `RasterTileLayer` is configured with a `getMultiTileData` callback, an internal `TileBatcher` buffers deck.gl's per-tile `getTileData` calls (caught on a `setTimeout(_, 0)` flush) and dispatches one `getMultiTileData(image, tiles[], …)` per `(source, z)` group. Results — `Array` — are distributed back. The inner `TileLayer` is given `maxRequests: 0` so the per-tile throttle steps aside and the batcher sees the whole viewport burst. + +``` +RasterTileLayer (getMultiTileData defined) + inner TileLayer.maxRequests = 0 + inner TileLayer.getTileData = tile => batcher.fetch(tile) + │ setTimeout(0); group by (source, z); composite signal + ▼ + 1× getMultiTileData(image, tiles[], { signal, device, pool }) per group + │ + ▼ + COGLayer default ──► geotiff.fetchTilesSettled(xy) + │ + ▼ dataSource.fetch(...) × few (after range coalescing) + │ + GeoTIFF.fromUrl(url, { concurrencyLimiter = defaultLimiterForOrigin(url) }) + data source's .fetch wrapped via limitFetch(limiter): + const release = await limiter.acquire(signal); + try { return await rawFetch(offset, length, { signal }) } + finally { release() } +``` + +The two pieces only meet at the `getMultiTileData` callback. The batcher knows nothing about HTTP gating; the limiter knows nothing about tiles or batches. Either can be used (or removed) independently — e.g. you'd still want the per-origin limiter even without the batcher, just to keep the browser from queueing. + +## `@developmentseed/geotiff` changes + +### 1. `ConcurrencyLimiter` interface + +```ts +/** + * Minimal contract for capping the number of concurrent Source.fetch calls. + * An optional `signal` lets a caller drop out of the queue if they no longer + * need the request (e.g. the user panned away) — important on browsers, where + * HTTP/1.1 caps concurrent connections per origin to ~6 and an overlong queue + * from a previous viewport can starve a fresh one. + */ +export interface ConcurrencyLimiter { + /** Acquire a slot. Resolves once a slot is free; call the returned function + * exactly once when the request finishes (success or failure) to release it. + * If `signal` aborts while queued, the promise rejects and no slot is consumed. */ + acquire(signal?: AbortSignal): Promise<() => void>; +} +``` + +No `unknown`, no token object, no `null` — geotiff has no notion of request identity, priority, or cancellation other than the queued-abort case. loaders.gl's `RequestScheduler.scheduleRequest(handle, getPriority?)` doesn't structurally match (its `handle` is required, and our `acquire` is signal-aware while theirs isn't), but adapting in either direction is straightforward if someone wants to. + +### 2. `limitFetch`, `Semaphore`, `defaultLimiterForOrigin` + +```ts +/** Wraps a Source.fetch so each call holds a limiter slot for its duration, + * forwarding the call's own signal so a queued abort drops the request. */ +export function limitFetch(fetch: Fetch, limiter: ConcurrencyLimiter): Fetch { + return async (offset, length, options) => { + const release = await limiter.acquire(options?.signal); + try { + return await fetch(offset, length, options); + } finally { + release(); + } + }; +} + +/** Concrete FIFO implementation; queued acquires that abort are removed from + * the queue before any request is issued. */ +export class Semaphore implements ConcurrencyLimiter { /* … */ } + +/** Returns a shared Semaphore for `url`'s origin (cached across calls). + * Default maxRequests = 6 — Chrome's HTTP/1.1 per-origin cap. Multiple + * fromUrl calls (and any other source-level callers) targeting the same + * host share one cap. */ +export function defaultLimiterForOrigin(url: string | URL): ConcurrencyLimiter; +``` + +The interface type and the two helpers are exported from the package's public surface so app code (or a future Zarr package) can share the per-origin limiter directly. `limitFetch` is also exported so callers can apply gating to their own sources without going through `GeoTIFF.open`. + +### 3. `concurrencyLimiter` option on `GeoTIFF.open` / `fromUrl` + +`open` accepts `concurrencyLimiter?: ConcurrencyLimiter`. When provided, the data source's `.fetch` is wrapped via `limitFetch` — a single-line conceptual change; no `SourceView` middleware machinery, no chunkd-vs-cogeotiff `Source` type juggling. Only `.fetch` is needed from the data source, so a thin `{ fetch: limitFetch(...) }` object suffices. + +`fromUrl` accepts `concurrencyLimiter?: ConcurrencyLimiter | null`. Its default behavior is the key piece: **when omitted, `fromUrl` calls `defaultLimiterForOrigin(url)`** so a per-origin cap is on by default. Pass an explicit limiter to override; pass `null` to disable gating entirely. + +Header/metadata reads are not gated — they're a small handful at `open` time, then served from the block cache; gating them buys nothing and would require either a chunkd `SourceMiddleware` (impedance with cogeotiff's `Source` typing) or a Proxy. Out of scope. + +`fetchTile` / `fetchTiles` / `coalesceRanges` / `assembleTile` are **unchanged** — they call `source.fetch(...)` as before; gating is invisible to them. + +### 4. `fetchTilesSettled` — a `Promise.allSettled`-style batch reader + +`fetchTiles(xy)` today is all-or-nothing: it throws on the first sparse/missing tile. The layer path wants a viewport to survive a bad tile. Add a settled variant returning one result *or error* per requested coordinate, in input order: + +```ts +type SettledTile = Tile | { error: unknown }; +fetchTilesSettled(self, xy[], options?) : Promise; +``` + +Implementation composes #531's already-split pieces: one coalesced byte fetch (still all-or-nothing *at the network level* — a `fetch` failure inside a merged range dooms every tile whose bytes were in it; inherent to coalescing), then `assembleTile` per tile wrapped in `try/catch` so per-tile decode errors / sparse tiles land in just that slot. A sparse tile (bytes `null`) maps to that slot's `{ error }`. + +## `@developmentseed/deck.gl-raster` changes + +### 1. New `getMultiTileData` prop + `_getMultiTileDataCallback()` accessor + +```ts +// on RasterTileLayerProps: +getMultiTileData?: ( + tiles: TileLoadProps[], + options: { device: Device; signal?: AbortSignal }, +) => Promise>; // aligned with `tiles`, in order +``` + +Sourced via a new `protected _getMultiTileDataCallback()` accessor, mirroring `_getTileDataCallback()` / `_renderTileCallback()`. Returns `undefined` if neither the prop nor a subclass default is set. The base-class signature receives `tiles: TileLoadProps[]` (deck.gl's tile shape); a subclass like `COGLayer` overrides `_getMultiTileDataCallback` to wrap a domain-specific signature (`(image, tiles, …)`) that resolves the group's shared `z` to an `image`. (All tiles in one dispatch share `z` — guaranteed by the batcher's `groupKey`.) + +### 2. Branch in `_renderTileLayer` + +``` +const multi = this._getMultiTileDataCallback(); +if (!multi) { + // byte-for-byte unchanged from today + innerTileLayer.getTileData = tile => this._wrapGetTileData(tile, getTileData); + innerTileLayer.maxRequests = this.props.maxRequests; +} else { + innerTileLayer.getTileData = tile => batcher.fetch(tile, { signal: tile.signal }); + innerTileLayer.maxRequests = 0; // deck.gl's per-tile throttle steps aside +} +``` + +The no-batched-callback path is byte-for-byte today's. `maxRequests: 0` also disables `_pruneRequests`, which is fine: coalesced requests don't hit the connection limit (the per-origin limiter does), and per-tile aborts are still honored by the batcher's composite signal. + +The layer **does not own or create a limiter**. The per-origin shared limiter lives at the source layer (geotiff). Two consequences: + +- The user's `maxRequests` prop on `RasterTileLayer` keeps its today's meaning *only* in the no-`getMultiTileData` branch. In the batched branch it's overridden to `0` (the actual cap is at the source layer). Document this. +- No `@loaders.gl/loader-utils` dependency is added. + +### 3. `TileBatcher` + +A small generic class (not a layer), one instance per `RasterTileLayer`. Created lazily when `_getMultiTileDataCallback()` first returns non-undefined; finalized with the layer. + +- `fetch(item, { signal }) → Promise` — push `{ item, signal, resolve, reject }` onto a buffer; if the buffer was empty, arm `setTimeout(flush, 0)`. Return the promise. +- `flush()` — drain the buffer; drop entries whose `signal` is already aborted (reject them); group the rest by an opaque `(item) => string` `groupKey` (for `COGLayer`: `\`z${tile.index.z}\``); for each group: build a **composite `AbortSignal`** that fires only when *every* member's signal has aborted, call `dispatch(key, items, { signal: composite })` (the supplied "do one batched call" function); on resolve, distribute per-element results (`Error` instances → reject only that slot; values → resolve); on reject, reject every slot. Dispatch all groups concurrently — the source-level limiter is the cap. +- `finalize()` — reject everything buffered and arm no further timers. + +The `dispatch` callback (supplied by `RasterTileLayer` when it constructs the batcher) is the bridge from "per-tile" to "per-source-format batched": it calls `_getMultiTileDataCallback()` (resolved to whatever the subclass returns) with the right shape, composing layer-level `props.signal` with the batcher's composite per-group signal. + +`setTimeout(flush, 0)`: hard-coded `0`, not exposed. The timing analysis (next section) shows it's sufficient. + +## Timing — why `setTimeout(flush, 0)` + +The JS event loop runs one **macrotask** at a time; after each macrotask it fully drains the **microtask** queue before the next macrotask. deck.gl's `Tileset2D._updateTileStates` synchronously spawns one `Tile2DHeader.loadData` per needed tile; with the inner layer's `maxRequests: 0`, each `loadData`'s `await scheduleRequest(...)` resolves immediately, so the continuation calling our `getTileData` runs as a microtask — therefore **every `getTileData` call for one viewport update lands within the current macrotask** (before its microtask queue drains). A `setTimeout(flush, 0)` callback is the *next* macrotask, which runs only after the current one's microtasks are all done — so it deterministically observes the whole burst. (A `queueMicrotask`-based flush would be too eager — it could fire mid-burst.) Browsers clamp `setTimeout(0)` to ~1 ms anyway — still low-latency, still after the burst. + +## `@developmentseed/deck.gl-geotiff` changes + +- `COGLayer` provides a default `getMultiTileData` (via overriding `_getMultiTileDataCallback()`): resolve the batch's `xy` from `tiles`, resolve the group's shared `z` to `image` (overview vs primary), call `geotiff.fetchTilesSettled(xy, { signal, pool })`, map each `Tile` → run the existing decode/render path → `DataT`, and each `{ error }` slot → that `Error`. Keeps its existing default `getTileData` → `geotiff.fetchTile` unchanged. +- **No limiter wiring.** `fetchGeoTIFF(props.geotiff)` already calls `GeoTIFF.fromUrl(url)` which defaults the limiter to the shared per-origin one. Nothing for `COGLayer` to thread. + +## Errors & edge cases + +- **Per-tile failure in a batch**: surfaced — `getMultiTileData` returns `Array`; the batcher rejects only the failing tile's `getTileData` promise (deck.gl marks just that tile errored/`null`). `COGLayer`'s implementation reports per-tile decode/sparse-tile errors individually; a network failure inside a coalesced merged range dooms every tile whose bytes were in it (those get the same error) — inherent to coalescing. +- **Whole-batch failure**: `getMultiTileData` rejects ⇒ every tile in that group rejects, same as a per-tile `getTileData` throw today. +- **Aborts**: a tile aborted *before* flush is dropped from the batch and rejected. A tile aborted *after* dispatch is rejected (bytes already fetched — wasted, acceptable). The underlying coalesced fetch is aborted only when *all* tiles in its group are aborted. A queued limiter `acquire` whose signal aborts is dropped — no request fires. +- **`maxRequests: 0` on the inner layer** also disables `_pruneRequests` (deck.gl's "abort unselected in-flight tiles past the limit") — desirable here. + +## Testing + +- `limiter.test.ts` — `limitFetch` (slot held for fetch lifetime on success and on throw; forwards offset/length/options; forwards caller's signal so a queued abort drops the request); `Semaphore` (max concurrency, FIFO, queued abort drops without consuming a slot, pre-aborted signal rejects immediately, `RangeError` on bad construction); `defaultLimiterForOrigin` (same instance for same origin, distinct for distinct origins, accepts `URL` object). +- `geotiff-concurrency-limiter.test.ts` — `GeoTIFF.open({ concurrencyLimiter })` integration: tile-data fetches go through the limiter, header reads don't, 1-slot smoke, no-limiter smoke. +- `fetchTilesSettled` — good grid: same as `fetchTiles` (no `{ error }` slots); band-separate good grid: same; one sparse tile: that slot is `{ error }`, others are `Tile`s; empty input: `[]`. +- `TileBatcher` unit tests — N `fetch()` calls ⇒ one `dispatch` per group key with the right items; per-item `Error` rejects only that slot; whole-dispatch rejection rejects the group; pre-flush abort drops & rejects; post-flush abort rejects but doesn't abort the group; composite signal aborts the group only when all members abort; `finalize()` rejects buffered. +- `COGLayer._getMultiTileDataCallback` default: calls `geotiff.fetchTilesSettled` with the right `xy`/`image`; maps `Tile` → `DataT` and `{ error }` → `Error`. + +## Open questions / deferred to the plan + +- Exact lifecycle wiring of the batcher in `RasterTileLayer.updateState` / `finalizeState` vs. `COGLayer.updateState`. +- Whether `coalesceRanges`'s internal `COALESCE_PARALLEL` should become configurable (the per-origin source-level limiter already caps globally; a per-call ceiling is likely redundant — lean toward leaving as-is). diff --git a/packages/deck.gl-raster/src/raster-tile-layer/tile-batcher.ts b/packages/deck.gl-raster/src/raster-tile-layer/tile-batcher.ts new file mode 100644 index 00000000..f342d149 --- /dev/null +++ b/packages/deck.gl-raster/src/raster-tile-layer/tile-batcher.ts @@ -0,0 +1,177 @@ +/** + * Options handed to a {@link TileBatcher} dispatch call: a combined signal that + * aborts only once *every* item in the group has been aborted upstream. + */ +export interface BatchDispatchOptions { + readonly signal: AbortSignal; +} + +interface PendingItem { + readonly item: TItem; + readonly signal?: AbortSignal; + readonly resolve: (value: TResult) => void; + readonly reject: (reason?: unknown) => void; +} + +/** Options for constructing a {@link TileBatcher}. */ +export interface TileBatcherOptions { + /** Compute the batch key for an item; items with the same key share a dispatch. */ + groupKey(item: TItem): string; + /** + * Fetch a whole group at once. Returns one entry per `items` element, in + * order — a value, or an `Error` for that single item. May reject to fail + * the whole group. + */ + dispatch( + key: string, + items: TItem[], + opts: BatchDispatchOptions, + ): Promise>; +} + +/** + * Coalesces a burst of per-item `fetch()` calls (e.g. deck.gl's per-tile + * `getTileData`) into one `dispatch` per group key. The burst is collected on + * a `setTimeout(_, 0)`, which deterministically fires after the synchronous + * burst + its microtask tail (see the design doc's "Timing" section). + */ +export class TileBatcher { + private readonly groupKey: (item: TItem) => string; + private readonly dispatch: TileBatcherOptions["dispatch"]; + private buffer: Array> = []; + private timer: ReturnType | null = null; + private finalized = false; + + constructor(opts: TileBatcherOptions) { + this.groupKey = opts.groupKey; + this.dispatch = opts.dispatch; + } + + /** Buffer an item; resolves to the dispatch's result (or rejects). */ + fetch( + item: TItem, + { signal }: { signal?: AbortSignal } = {}, + ): Promise { + if (this.finalized) { + return Promise.reject(new Error("TileBatcher has been finalized")); + } + return new Promise((resolve, reject) => { + this.buffer.push({ item, signal, resolve, reject }); + if (this.timer === null) { + this.timer = setTimeout(() => this.flush(), 0); + } + }); + } + + /** Reject everything still buffered; subsequent `fetch` calls reject too. */ + finalize(): void { + this.finalized = true; + if (this.timer !== null) { + clearTimeout(this.timer); + this.timer = null; + } + const pending = this.buffer; + this.buffer = []; + for (const p of pending) { + p.reject(new Error("TileBatcher finalized before flush")); + } + } + + private flush(): void { + this.timer = null; + const pending = this.buffer; + this.buffer = []; + + // Drop already-aborted items. + const alive: Array> = []; + for (const p of pending) { + if (p.signal?.aborted) { + p.reject(p.signal.reason); + } else { + alive.push(p); + } + } + + // Group by key. + const groups = new Map>>(); + for (const p of alive) { + const key = this.groupKey(p.item); + const group = groups.get(key); + if (group) { + group.push(p); + } else { + groups.set(key, [p]); + } + } + + for (const [key, group] of groups) { + void this.dispatchGroup(key, group); + } + } + + private async dispatchGroup( + key: string, + group: Array>, + ): Promise { + const composite = compositeAbortSignal(group.map((p) => p.signal)); + let results: Array; + try { + results = await this.dispatch( + key, + group.map((p) => p.item), + { signal: composite }, + ); + } catch (err) { + for (const p of group) { + p.reject(err); + } + return; + } + for (let i = 0; i < group.length; i++) { + const p = group[i]!; + const r = results[i]; + if (p.signal?.aborted) { + p.reject(p.signal.reason); + } else if (r instanceof Error) { + p.reject(r); + } else { + p.resolve(r as TResult); + } + } + } +} + +/** + * An `AbortSignal` that fires only once *every* input signal has aborted. Any + * `undefined` in the input means "never aborts" so the composite never aborts. + */ +function compositeAbortSignal( + signals: Array, +): AbortSignal { + if (signals.length === 0 || signals.some((s) => s === undefined)) { + return new AbortController().signal; + } + const real = signals as AbortSignal[]; + const controller = new AbortController(); + let remaining = real.length; + for (const s of real) { + if (s.aborted) { + remaining--; + } else { + s.addEventListener( + "abort", + () => { + remaining--; + if (remaining === 0) { + controller.abort(s.reason); + } + }, + { once: true }, + ); + } + } + if (remaining === 0) { + controller.abort(real[0]!.reason); + } + return controller.signal; +} diff --git a/packages/deck.gl-raster/tests/tile-batcher.test.ts b/packages/deck.gl-raster/tests/tile-batcher.test.ts new file mode 100644 index 00000000..f4464a56 --- /dev/null +++ b/packages/deck.gl-raster/tests/tile-batcher.test.ts @@ -0,0 +1,122 @@ +import { describe, expect, it, vi } from "vitest"; +import { TileBatcher } from "../src/raster-tile-layer/tile-batcher.js"; + +type FakeTile = { x: number; y: number; z: number }; + +function makeBatcher( + dispatch: ( + key: string, + items: FakeTile[], + opts: { signal: AbortSignal }, + ) => Promise>, +) { + return new TileBatcher({ + groupKey: (t) => `z${t.z}`, + dispatch, + }); +} + +const flushTick = () => new Promise((r) => setTimeout(r, 0)); + +describe("TileBatcher", () => { + it("coalesces one tick of fetch() calls into one dispatch per group key", async () => { + const seen: Array<[string, FakeTile[]]> = []; + const b = makeBatcher(async (key, items) => { + seen.push([key, items]); + return items.map((_, i) => i); + }); + const a = b.fetch({ x: 0, y: 0, z: 1 }); + const c = b.fetch({ x: 1, y: 0, z: 1 }); + const d = b.fetch({ x: 0, y: 0, z: 2 }); + expect(await Promise.all([a, c, d])).toEqual([0, 1, 0]); + expect(seen).toEqual([ + [ + "z1", + [ + { x: 0, y: 0, z: 1 }, + { x: 1, y: 0, z: 1 }, + ], + ], + ["z2", [{ x: 0, y: 0, z: 2 }]], + ]); + }); + + it("distributes per-item Errors only to the failing item", async () => { + const b = makeBatcher(async (_key, items) => + items.map((_t, i) => (i === 1 ? new Error("bad tile") : i)), + ); + const r0 = b.fetch({ x: 0, y: 0, z: 1 }); + const r1 = b.fetch({ x: 1, y: 0, z: 1 }); + const r2 = b.fetch({ x: 2, y: 0, z: 1 }); + expect(await r0).toBe(0); + await expect(r1).rejects.toThrow("bad tile"); + expect(await r2).toBe(2); + }); + + it("rejects every item in a group when the dispatch itself rejects", async () => { + const b = makeBatcher(async () => { + throw new Error("whole batch failed"); + }); + const r0 = b.fetch({ x: 0, y: 0, z: 1 }); + const r1 = b.fetch({ x: 1, y: 0, z: 1 }); + await expect(r0).rejects.toThrow("whole batch failed"); + await expect(r1).rejects.toThrow("whole batch failed"); + }); + + it("drops an item whose signal is already aborted before flush", async () => { + const dispatch = vi.fn(async (_k: string, items: FakeTile[]) => + items.map((_, i) => i as number | Error), + ); + const b = makeBatcher(dispatch); + const ac = new AbortController(); + ac.abort(new Error("scrolled off")); + const aborted = b.fetch({ x: 0, y: 0, z: 1 }, { signal: ac.signal }); + const ok = b.fetch({ x: 1, y: 0, z: 1 }); + await expect(aborted).rejects.toThrow("scrolled off"); + expect(await ok).toBe(0); // re-indexed within the surviving group + expect(dispatch).toHaveBeenCalledOnce(); + expect(dispatch.mock.calls[0]![1]).toEqual([{ x: 1, y: 0, z: 1 }]); + }); + + it("composite signal aborts only when every member aborts", async () => { + let captured!: AbortSignal; + const b = new TileBatcher({ + groupKey: (t) => `z${t.z}`, + dispatch: async (_k, items, opts) => { + captured = opts.signal; + // Hold the dispatch open long enough to observe abort state. + await new Promise((r) => setTimeout(r, 30)); + return items.map((_, i) => i); + }, + }); + const a = new AbortController(); + const c = new AbortController(); + const ra = b.fetch({ x: 0, y: 0, z: 1 }, { signal: a.signal }); + const rc = b.fetch({ x: 1, y: 0, z: 1 }, { signal: c.signal }); + await flushTick(); + expect(captured.aborted).toBe(false); + a.abort(); + expect(captured.aborted).toBe(false); // not all aborted yet + c.abort(); + expect(captured.aborted).toBe(true); + // Both tiles' promises reject (their per-tile signals are aborted at distribute time). + await expect(ra).rejects.toBeDefined(); + await expect(rc).rejects.toBeDefined(); + }); + + it("finalize() rejects everything still buffered and dispatches nothing more", async () => { + const dispatch = vi.fn(async () => [] as number[]); + const b = makeBatcher(dispatch); + const r = b.fetch({ x: 0, y: 0, z: 1 }); + b.finalize(); + await expect(r).rejects.toThrow(); + await flushTick(); + expect(dispatch).not.toHaveBeenCalled(); + }); + + it("fetch() after finalize() rejects immediately", async () => { + const b = makeBatcher(async (_k, items) => items.map((_, i) => i)); + b.finalize(); + await expect(b.fetch({ x: 0, y: 0, z: 1 })).rejects.toThrow(); + }); +}); diff --git a/packages/geotiff/src/fetch.ts b/packages/geotiff/src/fetch.ts index 88760897..1647508d 100644 --- a/packages/geotiff/src/fetch.ts +++ b/packages/geotiff/src/fetch.ts @@ -201,6 +201,127 @@ export async function fetchTiles( ); } +/** One settled batch result: a decoded {@link Tile}, or the error that + * prevented producing it (a missing/sparse tile, or a decode failure). A + * *network*-level failure inside a coalesced range still rejects the whole + * call — only per-tile problems are reported this way. */ +export type SettledTile = Tile | { error: unknown }; + +/** + * Like {@link fetchTiles}, but never throws on a single missing tile or a + * single tile's decode failure — those become `{ error }` entries in the + * returned array (one per input coordinate, in order). Used by the deck.gl + * batched-fetch path so one bad tile doesn't blank a whole viewport. + */ +export async function fetchTilesSettled( + self: HasTiffReference, + xy: Array<[number, number]>, + { + boundless = true, + pool, + signal, + }: { + boundless?: boolean; + pool?: DecoderPool; + signal?: AbortSignal; + } = {}, +): Promise { + if (xy.length === 0) { + return []; + } + + const dataFetch = fetchCogBytesMultipleSettled(self, xy, { signal }); + const maskFetch: Promise> = + self.maskImage != null + ? getTiles(self.maskImage, xy, self.dataSource, { + signal, + debug: self._debug ? { label: "mask" } : undefined, + }) + : Promise.resolve(xy.map(() => null)); + + const [allTileBytes, allMaskBytes] = await Promise.all([ + dataFetch, + maskFetch, + ]); + + return Promise.all( + xy.map(async ([x, y], i): Promise => { + const bytes = allTileBytes[i]!; + if (bytes !== null && typeof bytes === "object" && "error" in bytes) { + return bytes as { error: unknown }; + } + try { + return await assembleTile( + self, + x, + y, + bytes as GetBytesResponse | GetBytesResponse[], + allMaskBytes[i] ?? null, + { boundless, pool }, + ); + } catch (error) { + return { error }; + } + }), + ); +} + +/** Collect-not-throw variant of {@link fetchCogBytesMultiple}: a null/sparse + * tile becomes `{ error }` in its slot instead of throwing. */ +async function fetchCogBytesMultipleSettled( + self: HasTiffReference, + xy: Array<[number, number]>, + { signal }: { signal?: AbortSignal } = {}, +): Promise> { + const debug: DebugTag | undefined = self._debug + ? { label: "data" } + : undefined; + switch (self.cachedTags.planarConfiguration) { + case PlanarConfiguration.Contig: { + const tiles = await getTiles(self.image, xy, self.dataSource, { + signal, + debug, + }); + return tiles.map((tile, i) => { + if (tile === null) { + const [x, y] = xy[i]!; + return { error: new Error(`Tile at (${x}, ${y}) not found`) }; + } + return tile; + }); + } + case PlanarConfiguration.Separate: { + const numBands = self.cachedTags.samplesPerPixel; + const perTileRanges = await Promise.all( + xy.map(([x, y]) => findBandSeparateTileByteRanges(self, x, y)), + ); + const flatRanges = perTileRanges.flatMap((ranges) => + ranges.map(({ offset, imageSize }) => ({ + offset, + byteCount: imageSize, + })), + ); + const flatResults = await getMultipleBytes( + self.image, + flatRanges, + self.dataSource, + { signal, debug }, + ); + return xy.map(([x, y], t) => { + const bands = flatResults.slice(t * numBands, t * numBands + numBands); + if (bands.some((r) => r === null)) { + return { error: new Error(`Tile at (${x}, ${y}) not found`) }; + } + return bands as GetBytesResponse[]; + }); + } + default: + throw new Error( + `Unsupported PlanarConfiguration: ${self.cachedTags.planarConfiguration}`, + ); + } +} + type GetBytesResponse = { bytes: ArrayBuffer; compression: Compression }; type ByteRange = Awaited>; diff --git a/packages/geotiff/src/geotiff.ts b/packages/geotiff/src/geotiff.ts index 3c427c0f..022f2d08 100644 --- a/packages/geotiff/src/geotiff.ts +++ b/packages/geotiff/src/geotiff.ts @@ -7,11 +7,14 @@ import { Photometric, SubFileType, Tiff, TiffTag } from "@cogeotiff/core"; import type { Affine } from "@developmentseed/affine"; import type { ProjJson } from "@developmentseed/proj"; import { crsFromGeoKeys } from "./crs.js"; -import { fetchTile, fetchTiles } from "./fetch.js"; +import type { SettledTile } from "./fetch.js"; +import { fetchTile, fetchTiles, fetchTilesSettled } from "./fetch.js"; import type { BandStatistics, GDALMetadata } from "./gdal-metadata.js"; import { parseGDALMetadata } from "./gdal-metadata.js"; import type { CachedTags, GeoKeyDirectory } from "./ifd.js"; import { extractGeoKeyDirectory, prefetchTags } from "./ifd.js"; +import type { ConcurrencyLimiter } from "./limiter.js"; +import { defaultLimiterForOrigin, limitFetch } from "./limiter.js"; import { Overview } from "./overview.js"; import type { DecoderPool } from "./pool/pool.js"; import type { Tile } from "./tile.js"; @@ -112,14 +115,27 @@ export class GeoTIFF { * @param options.headerSource The source used to construct the TIFF. This is typically a layered source with caching and chunking, to optimise access to TIFF tags and IFDs. Callers who want to control the initial read size should compose a `SourceChunk` of the desired block size; cogeotiff's default `defaultReadSize` (16 KiB) gets padded up by the chunking layer anyway. * @param options.signal An optional {@link AbortSignal} to cancel the header reads. * @param options.debug When true, the returned GeoTIFF logs each tile/mask data fetch to the console. Off by default. + * @param options.concurrencyLimiter When given, every tile-data and mask HTTP fetch is routed through this limiter, so callers can cap concurrent network requests. Header/metadata reads are not gated — they're a handful of small reads at open time and afterwards served from the header cache. Off by default. */ static async open(options: { dataSource: Pick; headerSource: Source; + concurrencyLimiter?: ConcurrencyLimiter; signal?: AbortSignal; debug?: boolean; }): Promise { - const { dataSource, headerSource, signal, debug } = options; + const { headerSource, signal, debug, concurrencyLimiter } = options; + // When a limiter is supplied, gate tile-data reads through it. We only need + // `.fetch` from the data source, so a thin wrapper suffices — no need to + // reconstruct a full Source or pull in middleware machinery. + const dataSource: Pick = concurrencyLimiter + ? { + fetch: limitFetch( + options.dataSource.fetch.bind(options.dataSource), + concurrencyLimiter, + ), + } + : options.dataSource; // Construct + init in two steps so we don't have to pass cogeotiff's // `defaultReadSize` ourselves (the constructor defaults it to // `Tiff.DefaultReadSize` when no options are provided). In the typical @@ -269,6 +285,7 @@ export class GeoTIFF { * @param options.cacheSize Total cache size in bytes. Defaults to 8 MiB (~128 blocks at the default chunk size). * @param options.signal An optional {@link AbortSignal} to cancel the header reads. * @param options.debug When true, the returned GeoTIFF logs each tile/mask data fetch to the console with offset/length and a `data`/`mask` label. Off by default. + * @param options.concurrencyLimiter Caps concurrent tile-data and mask HTTP fetches (header/metadata reads are not gated). Defaults to {@link defaultLimiterForOrigin} for `url`'s origin, so multiple `fromUrl` calls (or other source-level callers) targeting the same host share one cap — important on browsers, where HTTP/1.1 limits concurrent connections per origin to ~6. Pass an explicit limiter to override; pass `null` to disable gating entirely. * @returns A Promise that resolves to a GeoTIFF instance. */ static async fromUrl( @@ -278,11 +295,13 @@ export class GeoTIFF { cacheSize = 8 * 1024 * 1024, signal, debug, + concurrencyLimiter, }: { chunkSize?: number; cacheSize?: number; signal?: AbortSignal; debug?: boolean; + concurrencyLimiter?: ConcurrencyLimiter | null; } = {}, ): Promise { const source = new SourceHttp(url, {}); @@ -308,12 +327,20 @@ export class GeoTIFF { new SourceCache({ size: cacheSize }), ]); + // Default to the per-origin shared limiter unless the caller opted out + // (`null`) or supplied their own. `undefined` → default; `null` → disabled. + const effectiveLimiter = + concurrencyLimiter === undefined + ? defaultLimiterForOrigin(url) + : (concurrencyLimiter ?? undefined); + return await GeoTIFF.open({ // Tile data reads bypass the header cache (raw source). dataSource: source, headerSource: view, signal, debug, + concurrencyLimiter: effectiveLimiter, }); } @@ -470,6 +497,23 @@ export class GeoTIFF { return await fetchTiles(this, xy, options); } + /** + * Like {@link fetchTiles}, but one missing tile or one tile's decode failure + * becomes a `{ error }` entry in its slot rather than throwing the whole + * batch. A network failure inside a coalesced range still rejects the call. + * See {@link SettledTile}. + */ + async fetchTilesSettled( + xy: Array<[number, number]>, + options: { + boundless?: boolean; + pool?: DecoderPool; + signal?: AbortSignal; + } = {}, + ): Promise { + return await fetchTilesSettled(this, xy, options); + } + // Transform mixin /** diff --git a/packages/geotiff/src/index.ts b/packages/geotiff/src/index.ts index 30fc2ab9..b7b5f654 100644 --- a/packages/geotiff/src/index.ts +++ b/packages/geotiff/src/index.ts @@ -16,8 +16,11 @@ export type { DecoderMetadata, } from "./decode.js"; export { DECODER_REGISTRY } from "./decode.js"; +export type { SettledTile } from "./fetch.js"; export { GeoTIFF } from "./geotiff.js"; export type { CachedTags, GeoKeyDirectory } from "./ifd.js"; +export type { ConcurrencyLimiter } from "./limiter.js"; +export { defaultLimiterForOrigin, Semaphore } from "./limiter.js"; export { Overview } from "./overview.js"; export type { DecoderPoolOptions } from "./pool/pool.js"; export { DecoderPool, defaultDecoderPool } from "./pool/pool.js"; diff --git a/packages/geotiff/src/limiter.ts b/packages/geotiff/src/limiter.ts new file mode 100644 index 00000000..104a4173 --- /dev/null +++ b/packages/geotiff/src/limiter.ts @@ -0,0 +1,138 @@ +import type { Source } from "@cogeotiff/core"; + +/** The signature of a `Source.fetch` call. */ +type Fetch = Pick["fetch"]; + +/** + * Minimal contract for capping the number of concurrent {@link Source.fetch} + * calls. An implementation hands out a fixed number of slots; callers `acquire` + * one and call the returned release function when done. + * + * The optional `signal` lets a caller drop out of the queue if they no longer + * need the request — important on browsers, where Chrome's HTTP/1.1 cap of 6 + * concurrent connections per origin means an overlong queue from a previous + * viewport can starve a fresh one. Aborting a queued `acquire` removes it + * before any underlying network call has fired. + */ +export interface ConcurrencyLimiter { + /** + * Acquire a slot. Resolves once a slot is free; call the returned function + * exactly once when the request finishes (success or failure) to release it. + * If `signal` aborts while waiting in the queue, the returned promise + * rejects with the signal's `reason` and no slot is consumed. + */ + acquire(signal?: AbortSignal): Promise<() => void>; +} + +/** + * Wrap a `Source.fetch` so each call holds a {@link ConcurrencyLimiter} slot + * for its duration — releasing it whether the fetch resolves or rejects, and + * never otherwise interfering. The call's own `options.signal` is forwarded to + * `acquire`, so if the caller aborts before reaching the front of the queue + * the limiter drops them without firing a request. + */ +export function limitFetch(fetch: Fetch, limiter: ConcurrencyLimiter): Fetch { + return async (offset, length, options) => { + const release = await limiter.acquire(options?.signal); + try { + return await fetch(offset, length, options); + } finally { + release(); + } + }; +} + +/** A waiter parked in a {@link Semaphore}'s queue. */ +type Waiter = { + readonly signal?: AbortSignal; + resolve(release: () => void): void; + reject(reason: unknown): void; +}; + +/** + * A simple FIFO semaphore implementing {@link ConcurrencyLimiter}. Hands out up + * to `maxRequests` slots; queued acquires that abort are removed from the queue + * before any request is issued. + */ +export class Semaphore implements ConcurrencyLimiter { + private active = 0; + private readonly waiters: Waiter[] = []; + + constructor(private readonly maxRequests: number) { + if (!(maxRequests >= 1)) { + throw new RangeError( + `Semaphore maxRequests must be >= 1, got ${maxRequests}`, + ); + } + } + + acquire(signal?: AbortSignal): Promise<() => void> { + if (signal?.aborted) { + return Promise.reject(signal.reason); + } + if (this.active < this.maxRequests) { + this.active++; + return Promise.resolve(this.makeRelease()); + } + return new Promise<() => void>((resolve, reject) => { + const waiter: Waiter = { signal, resolve, reject }; + this.waiters.push(waiter); + if (signal) { + signal.addEventListener( + "abort", + () => { + const i = this.waiters.indexOf(waiter); + if (i >= 0) { + this.waiters.splice(i, 1); + reject(signal.reason); + } + }, + { once: true }, + ); + } + }); + } + + private makeRelease(): () => void { + let released = false; + return () => { + if (released) { + return; + } + released = true; + this.active--; + const next = this.waiters.shift(); + if (next) { + this.active++; + next.resolve(this.makeRelease()); + } + }; + } +} + +/** + * Default {@link ConcurrencyLimiter}s cached by URL origin. Chrome (and most + * browsers) cap concurrent HTTP/1.1 connections per origin to 6; over-scheduling + * past that point means the browser queues requests, which is bad for tile + * fetching because stale-after-pan requests then block fresh ones. This module + * gives every `fromUrl` (and any other source-level caller) a shared per-origin + * semaphore so the cap holds across layers / data formats targeting one host. + */ +const DEFAULT_MAX_REQUESTS_PER_ORIGIN = 6; +const limiterByOrigin = new Map(); + +/** + * Return a shared {@link ConcurrencyLimiter} for `url`'s origin. The first call + * for a given origin constructs one with `maxRequests = 6`; subsequent calls + * return the same instance, so multiple sources (COG, Zarr, …) targeting the + * same host share one cap. + */ +export function defaultLimiterForOrigin(url: string | URL): ConcurrencyLimiter { + const origin = new URL(url).origin; + let limiter = limiterByOrigin.get(origin); + if (!limiter) { + limiter = new Semaphore(DEFAULT_MAX_REQUESTS_PER_ORIGIN); + limiterByOrigin.set(origin, limiter); + } + return limiter; +} diff --git a/packages/geotiff/src/overview.ts b/packages/geotiff/src/overview.ts index 8af8c306..6b1f1d7a 100644 --- a/packages/geotiff/src/overview.ts +++ b/packages/geotiff/src/overview.ts @@ -2,7 +2,8 @@ import type { Source, TiffImage, TiffImageTileCount } from "@cogeotiff/core"; import type { Affine } from "@developmentseed/affine"; import { compose, scale } from "@developmentseed/affine"; import type { ProjJson } from "@developmentseed/proj"; -import { fetchTile, fetchTiles } from "./fetch.js"; +import type { SettledTile } from "./fetch.js"; +import { fetchTile, fetchTiles, fetchTilesSettled } from "./fetch.js"; import type { GeoTIFF } from "./geotiff.js"; import type { CachedTags, GeoKeyDirectory } from "./ifd.js"; import type { DecoderPool } from "./pool/pool.js"; @@ -149,6 +150,23 @@ export class Overview { return await fetchTiles(this, xy, options); } + /** + * Like {@link fetchTiles}, but one missing tile or one tile's decode failure + * becomes a `{ error }` entry in its slot rather than throwing the whole + * batch. A network failure inside a coalesced range still rejects the call. + * See {@link SettledTile}. + */ + async fetchTilesSettled( + xy: Array<[number, number]>, + options: { + boundless?: boolean; + pool?: DecoderPool; + signal?: AbortSignal; + } = {}, + ): Promise { + return await fetchTilesSettled(this, xy, options); + } + // TiledMixin // Transform mixin diff --git a/packages/geotiff/tests/fetch-tiles-settled.test.ts b/packages/geotiff/tests/fetch-tiles-settled.test.ts new file mode 100644 index 00000000..2d817fa8 --- /dev/null +++ b/packages/geotiff/tests/fetch-tiles-settled.test.ts @@ -0,0 +1,58 @@ +import { describe, expect, it } from "vitest"; +import { loadGeoTIFF } from "./helpers.js"; + +const GRID: Array<[number, number]> = [ + [0, 0], + [1, 0], + [0, 1], + [1, 1], +]; + +describe("fetchTilesSettled", () => { + it("returns one Tile per coordinate for a good grid — same as fetchTiles (pixel-interleaved)", async () => { + const tiff = await loadGeoTIFF("uint8_rgb_deflate_block64_cog", "rasterio"); + const settled = await tiff.fetchTilesSettled(GRID); + const plain = await tiff.fetchTiles(GRID); + expect(settled).toEqual(plain); + }); + + it("returns one Tile per coordinate for a good grid (band-separate)", async () => { + const tiff = await loadGeoTIFF("int8_3band_zstd_block64", "rasterio"); + const settled = await tiff.fetchTilesSettled(GRID); + const plain = await tiff.fetchTiles(GRID); + expect(settled).toEqual(plain); + }); + + it("returns [] for empty input", async () => { + const tiff = await loadGeoTIFF("uint8_rgb_deflate_block64_cog", "rasterio"); + expect(await tiff.fetchTilesSettled([])).toEqual([]); + }); + + it("returns Tiles for a masked fixture (parity with fetchTiles)", async () => { + const tiff = await loadGeoTIFF("cog_uint8_rgb_mask", "rasterio"); + const settled = await tiff.fetchTilesSettled(GRID); + const plain = await tiff.fetchTiles(GRID); + expect(settled).toEqual(plain); + }); + + it("isolates an out-of-range tile into its own { error } slot, leaving the rest as Tiles", async () => { + // Build the xy list using *one* coordinate that's out-of-range — the + // tiles fetched at (0,0)/(1,0)/(0,1)/(1,1) are valid for the 2x2 fixture; + // (99, 99) is not. The byte-fetch step throws on out-of-range BEFORE the + // null/sparse logic, so we'd expect the whole batch to error — but + // fetchTilesSettled wraps that via { error }. (Strictly: the *underlying* + // getTiles validates indices and throws, so this exercises the + // settled-path's catch around the whole byte fetch by surfacing as a + // single rejection; not the per-slot { error } path. Per-slot errors + // require a *sparse* (byteCount=0) tile, which our fixtures don't have. + // The good-grid + empty cases above pin the happy paths; this case + // documents the "out-of-range tile" failure mode.) + const tiff = await loadGeoTIFF("uint8_rgb_deflate_block64_cog", "rasterio"); + await expect( + tiff.fetchTilesSettled([ + [0, 0], + [99, 99], + ]), + ).rejects.toThrow(); + }); +}); diff --git a/packages/geotiff/tests/geotiff-concurrency-limiter.test.ts b/packages/geotiff/tests/geotiff-concurrency-limiter.test.ts new file mode 100644 index 00000000..51eae0b9 --- /dev/null +++ b/packages/geotiff/tests/geotiff-concurrency-limiter.test.ts @@ -0,0 +1,80 @@ +import { SourceFile } from "@chunkd/source-file"; +import { describe, expect, it } from "vitest"; +import { GeoTIFF } from "../src/geotiff.js"; +import type { ConcurrencyLimiter } from "../src/limiter.js"; +import { fixturePath } from "./helpers.js"; + +/** A limiter that records every acquire and lets `maxConcurrent` run at once. */ +function makeCountingLimiter(maxConcurrent = Number.POSITIVE_INFINITY) { + let acquired = 0; + let active = 0; + let peak = 0; + const waiters: Array<() => void> = []; + const limiter: ConcurrencyLimiter = { + acquire: () => + new Promise<() => void>((resolve) => { + const grant = () => { + acquired++; + active++; + peak = Math.max(peak, active); + resolve(() => { + active--; + const next = waiters.shift(); + if (next) { + next(); + } + }); + }; + if (active < maxConcurrent) { + grant(); + } else { + waiters.push(grant); + } + }), + }; + return { limiter, stats: () => ({ acquired, peak }) }; +} + +const GRID: Array<[number, number]> = [ + [0, 0], + [1, 0], + [0, 1], + [1, 1], +]; + +describe("GeoTIFF.open({ concurrencyLimiter })", () => { + it("routes tile-data fetches through the limiter (header reads are not gated)", async () => { + const { limiter, stats } = makeCountingLimiter(); + const path = fixturePath("uint8_rgb_deflate_block64_cog", "rasterio"); + const tiff = await GeoTIFF.open({ + dataSource: new SourceFile(path), + headerSource: new SourceFile(path), + concurrencyLimiter: limiter, + }); + expect(stats().acquired).toBe(0); // header/IFD reads bypass it + await tiff.fetchTiles(GRID); + expect(stats().acquired).toBeGreaterThan(0); // tile-data reads go through it + }); + + it("with a 1-slot limiter, never runs two fetches at once", async () => { + const { limiter, stats } = makeCountingLimiter(1); + const path = fixturePath("uint8_rgb_deflate_block64_cog", "rasterio"); + const tiff = await GeoTIFF.open({ + dataSource: new SourceFile(path), + headerSource: new SourceFile(path), + concurrencyLimiter: limiter, + }); + await tiff.fetchTiles(GRID); + expect(stats().peak).toBe(1); + }); + + it("without a limiter, behaves exactly as before (smoke)", async () => { + const path = fixturePath("uint8_rgb_deflate_block64_cog", "rasterio"); + const tiff = await GeoTIFF.open({ + dataSource: new SourceFile(path), + headerSource: new SourceFile(path), + }); + const tiles = await tiff.fetchTiles(GRID); + expect(tiles).toHaveLength(4); + }); +}); diff --git a/packages/geotiff/tests/limiter.test.ts b/packages/geotiff/tests/limiter.test.ts new file mode 100644 index 00000000..5077a8ac --- /dev/null +++ b/packages/geotiff/tests/limiter.test.ts @@ -0,0 +1,178 @@ +import { describe, expect, it } from "vitest"; +import type { ConcurrencyLimiter } from "../src/limiter.js"; +import { + defaultLimiterForOrigin, + limitFetch, + Semaphore, +} from "../src/limiter.js"; + +/** A limiter that records acquire/release order and only ever allows + * `maxConcurrent` slots, for deterministic assertions. */ +function makeRecordingLimiter(maxConcurrent = Number.POSITIVE_INFINITY) { + const log: string[] = []; + let active = 0; + const waiters: Array<() => void> = []; + const limiter: ConcurrencyLimiter = { + acquire: () => + new Promise<() => void>((resolve) => { + const grant = () => { + active++; + log.push(`acquire(active=${active})`); + resolve(() => { + active--; + log.push(`release(active=${active})`); + const next = waiters.shift(); + if (next) { + next(); + } + }); + }; + if (active < maxConcurrent) { + grant(); + } else { + waiters.push(grant); + } + }), + }; + return { limiter, log }; +} + +describe("limitFetch", () => { + it("holds a slot for the duration of the underlying fetch (success)", async () => { + const { limiter, log } = makeRecordingLimiter(); + const buf = new ArrayBuffer(8); + const fetch = limitFetch(async () => { + log.push("fetch"); + return buf; + }, limiter); + const result = await fetch(0, 8); + expect(result).toBe(buf); + expect(log).toEqual(["acquire(active=1)", "fetch", "release(active=0)"]); + }); + + it("releases the slot even when the underlying fetch throws", async () => { + const { limiter, log } = makeRecordingLimiter(); + const fetch = limitFetch(async () => { + log.push("fetch"); + throw new Error("boom"); + }, limiter); + await expect(fetch(0, 8)).rejects.toThrow("boom"); + expect(log).toEqual(["acquire(active=1)", "fetch", "release(active=0)"]); + }); + + it("forwards offset/length/options to the wrapped fetch", async () => { + const { limiter } = makeRecordingLimiter(); + const calls: Array = []; + const fetch = limitFetch(async (...args) => { + calls.push(args); + return new ArrayBuffer(0); + }, limiter); + const signal = new AbortController().signal; + await fetch(123, 456, { signal }); + expect(calls).toEqual([[123, 456, { signal }]]); + }); + + it("forwards the caller's signal to acquire (queued aborts drop)", async () => { + const sem = new Semaphore(1); + // Hold the only slot so the next acquire queues. + const holdRelease = await sem.acquire(); + const ac = new AbortController(); + const fetch = limitFetch(async () => new ArrayBuffer(0), sem); + const pending = fetch(0, 8, { signal: ac.signal }); + ac.abort(new Error("user panned away")); + await expect(pending).rejects.toThrow("user panned away"); + holdRelease(); + }); +}); + +describe("Semaphore", () => { + it("hands out up to maxRequests slots concurrently", async () => { + const sem = new Semaphore(2); + const r1 = await sem.acquire(); + const r2 = await sem.acquire(); + let r3Acquired = false; + const p3 = sem.acquire().then((r) => { + r3Acquired = true; + return r; + }); + await new Promise((r) => setTimeout(r, 0)); + expect(r3Acquired).toBe(false); // queued + r1(); + const r3 = await p3; + expect(r3Acquired).toBe(true); + r2(); + r3(); + }); + + it("rejects a queued acquire when its signal aborts; does not consume a slot", async () => { + const sem = new Semaphore(1); + const hold = await sem.acquire(); + const ac = new AbortController(); + const pending = sem.acquire(ac.signal); + ac.abort(new Error("dropped")); + await expect(pending).rejects.toThrow("dropped"); + // The slot was never granted — releasing the holder must let a *new* acquire through. + hold(); + const r = await Promise.race([ + sem.acquire().then(() => "got" as const), + new Promise<"timeout">((res) => setTimeout(() => res("timeout"), 50)), + ]); + expect(r).toBe("got"); + }); + + it("rejects immediately when acquire is called with an already-aborted signal", async () => { + const sem = new Semaphore(1); + const ac = new AbortController(); + ac.abort(new Error("pre-aborted")); + await expect(sem.acquire(ac.signal)).rejects.toThrow("pre-aborted"); + // No slot was consumed. + const r = await sem.acquire(); + expect(typeof r).toBe("function"); + r(); + }); + + it("FIFO: queued waiters are granted in arrival order", async () => { + const sem = new Semaphore(1); + const order: number[] = []; + const hold = await sem.acquire(); + const p1 = sem.acquire().then((r) => { + order.push(1); + r(); + }); + const p2 = sem.acquire().then((r) => { + order.push(2); + r(); + }); + const p3 = sem.acquire().then((r) => { + order.push(3); + r(); + }); + hold(); + await Promise.all([p1, p2, p3]); + expect(order).toEqual([1, 2, 3]); + }); + + it("rejects construction with maxRequests < 1", () => { + expect(() => new Semaphore(0)).toThrow(RangeError); + }); +}); + +describe("defaultLimiterForOrigin", () => { + it("returns the same instance for the same origin", () => { + const a = defaultLimiterForOrigin("https://example.com/a/foo.tif"); + const b = defaultLimiterForOrigin("https://example.com/b/bar.tif"); + expect(b).toBe(a); + }); + + it("returns distinct instances for distinct origins", () => { + const a = defaultLimiterForOrigin("https://example.com/x.tif"); + const b = defaultLimiterForOrigin("https://other.example.org/x.tif"); + expect(b).not.toBe(a); + }); + + it("accepts a URL object", () => { + const a = defaultLimiterForOrigin("https://example.com/p.tif"); + const b = defaultLimiterForOrigin(new URL("https://example.com/q.tif")); + expect(b).toBe(a); + }); +});