diff --git a/dev-docs/specs/2026-05-19-concurrency-limiter-design.md b/dev-docs/specs/2026-05-19-concurrency-limiter-design.md new file mode 100644 index 00000000..fc846e17 --- /dev/null +++ b/dev-docs/specs/2026-05-19-concurrency-limiter-design.md @@ -0,0 +1,217 @@ +# Per-origin concurrency limiter for tile-source HTTP requests + +- **Date:** 2026-05-19 +- **Issues:** [#273](https://github.com/developmentseed/deck.gl-raster/issues/273) +- **Status:** Design — supersedes [`2026-05-12-getTileData-coalescing-design.md`](2026-05-12-getTileData-coalescing-design.md), which was scoped to *both* coalescing and gating; this spec narrows to gating only and explicitly defers coalescing. + +## Background + +Most COGs `deck.gl-raster` targets live on AWS S3 or similar object stores, which serve HTTP/1.1 only. Chrome (and other major browsers) cap concurrent HTTP/1.1 connections per origin at ~6. Over-scheduling above that point means the browser queues the excess, and queued requests stick around even when the viewport pans — so stale-after-pan requests block fresh ones for the new viewport. The browser cap is **global to the page**, not per-layer. + +deck.gl's `Tileset2D` ships an internal `loaders.gl/RequestScheduler({ maxRequests: 6 })` but it's *per-`TileLayer` instance*: two `COGLayer`s targeting the same S3 bucket each get 6 slots, so the browser sees 12+ requests and queues them. That scheduler also counts `getTileData` calls (≈ tiles), not HTTP requests, and one COG tile fetch issues several requests (metadata + data + mask) — so the per-tile cap is a poor proxy for the actual network-cap that matters. + +The fix is a concurrency limiter at the *source* layer (between `Source.fetch` and the network), per-origin, **shared across layers** targeting the same host. This spec specifies that limiter and its integration into `@developmentseed/geotiff` and `@developmentseed/deck.gl-geotiff`. + +### How `Tileset2D._pruneRequests` interacts with `maxRequests` + +deck.gl's `Tileset2D` fires a tile's abort signal in exactly one place: `_pruneRequests`, which only triggers when ongoing requests exceed `maxRequests`. So setting `maxRequests = 0` disables that pruning entirely — stale tiles' signals never fire, and the source-level limiter never sees a cancellation. We therefore keep `maxRequests` at its current pass-through behavior (deck.gl default 6) and accept that the per-layer cap and the source-level cap coexist as two independent (slightly redundant) gates. A future change that wants per-tile pruning *without* a per-layer cap will need to subclass `Tileset2D`; that's out of scope here. + +## Goals + +1. Cap concurrent HTTP requests **per origin**, **shared across all layers** (and source formats — COG today, Zarr or similar tomorrow) targeting that origin. +2. Signal-aware queueing: when a queued request's `signal` aborts (e.g. user panned away), the request is dropped without firing a network call. +3. **Dynamic priority**: callers attach a `getPriority` callback to each fetch; the limiter re-evaluates it on every slot-open so the queue re-orders when the viewport pans, surfacing newly-central sources ahead of stale edge sources. +4. Zero-config default that works out of the box (cross-layer per-origin gating on, `maxRequests = 6`), with explicit opt-out and explicit override per layer. +5. No new dependency added; no implicit module-level state hidden inside `@developmentseed/geotiff`. + +## Non-goals (deferred, not removed from consideration) + +- **Multi-tile request coalescing** (`TileBatcher`, `getMultiTileData`, `fetchTilesSettled` used from a layer): the user-facing trade-off between batching shape (row/box/single) and time-to-first-pixel is real, and integrating with deck.gl's pruning is nontrivial. Decided to ship gating first and revisit coalescing as a follow-up; the API here doesn't preclude it. +- Subclassing `Tileset2D` to fire abort signals from `onTileUnload` independent of `maxRequests` — needed eventually if a future batcher wants `maxRequests = 0` with working cancellation. +- Pluggable batching strategy (when a batcher is added). +- Upstream deck.gl proposals (`getTileDataBatched`, exposing `_requestScheduler`, signalling abort on unload). +- Extracting the limiter to a new shared package (e.g. `@developmentseed/concurrency`). Lives in `@developmentseed/geotiff` for now; revisit if a non-geotiff source-type ever wants to share an instance. + +## Architecture + +Four types, all in `@developmentseed/geotiff`: + +```ts +/** Priority for one queued request. Numbers compare numerically; arrays + * compare lexicographically, with missing trailing elements treated as 0 + * (so `[1]` and `[1, 0, 0]` are equivalent, and `[1, -1]` jumps ahead of + * `[1]`). Lower = serviced sooner. */ +export type Priority = number | readonly number[]; + +/** The public contract a layer / source can accept. */ +export interface ConcurrencyLimiter { + /** Acquire a slot to perform one fetch to `url`. Resolves to a release + * function (call it once when the fetch settles). If `signal` aborts while + * the call is queued, the promise rejects with the signal's reason and no + * slot is consumed. + * + * `getPriority` is re-invoked on every slot-open: on each release, the + * limiter scans pending waiters, evaluates each one's `getPriority`, and + * resumes the lowest-priority waiter. Pan the viewport between releases + * and the next scan sees the new values — no explicit re-queue. Omit it + * (or have it return a constant) for FIFO behavior. */ + acquire( + url: URL, + signal?: AbortSignal, + getPriority?: () => Priority, + ): Promise<() => void>; +} + +/** Default implementation. Maintains one Semaphore per URL origin; new origins + * mint a new Semaphore lazily with the same `maxRequests`. Two layers on the + * same origin share one cap; two layers on different origins don't compete. */ +export class PerOriginSemaphore implements ConcurrencyLimiter { + constructor(opts: { maxRequests: number }); + acquire( + url: URL, + signal?: AbortSignal, + getPriority?: () => Priority, + ): Promise<() => void>; +} + +// Internal (not exported from index.ts): + +/** Counting semaphore with signal-aware acquire and dynamic priority. On each + * release, linear-scans the pending list, evaluates each waiter's + * `getPriority`, and resumes the min. Waiters without a `getPriority` are + * treated as `+Infinity` so explicit-priority waiters always run first; ties + * break by insertion order (so it degrades to FIFO when nobody supplies + * priorities). Linear scan rather than a heap because priorities change + * under the structure between releases — a heap would have to be rebuilt + * each time anyway. */ +class Semaphore { + constructor(opts: { maxRequests: number }); + acquire( + signal?: AbortSignal, + getPriority?: () => Priority, + ): Promise<() => void>; +} + +/** chunkd `SourceMiddleware` class, shape matches `SourceChunk` / `SourceCache`. + * Gates one `Source.fetch` through `limiter.acquire(url, signal, getPriority)`, + * forwarding the call's signal so a queued abort drops the request. Composes + * with the header source's existing middleware stack (chunking + cache → then + * limiter), so cache hits never burn a slot. */ +class LimiterMiddleware implements SourceMiddleware { + name: "limiter"; + constructor(opts: { + url: URL; + limiter: ConcurrencyLimiter; + getPriority?: () => Priority; + }); + fetch(req: SourceRequest, next: SourceFetch): Promise; +} +``` + +`Semaphore` is internal because users have no reason to construct one directly — `PerOriginSemaphore` is the public class. Keeping it internal also avoids the "which one do I use?" question. Promote later if someone wants a flat (single-pool) limiter. + +`LimiterMiddleware` is also internal: the limiter is wired by `GeoTIFF.fromUrl` (where chunkd's `Source` / `SourceView` types are already in scope), so callers don't need to compose middleware themselves. + +> **Implementation note (temporary).** The `SourceMiddleware` shape above is the intended design, but it doesn't work yet: chunkd's `SourceView` doesn't forward the request `signal` to middleware, so a middleware can't observe an abort (only the underlying source receives the read options via `SourceView`'s terminal handler). Until that's fixed upstream ([chunkd#1697](https://github.com/blacha/chunkd/pull/1697)), the limiter is implemented as an internal **source wrapper** — `LimitedSource` — composed *beneath* `SourceChunk` / `SourceCache` (as the `SourceView`'s source) rather than as a middleware on top. Cache hits still short-circuit in `SourceCache` before reaching it. This is internal-only; the public API is unchanged either way. Revert to `LimiterMiddleware` once chunkd ships the fix — tracked in [#565](https://github.com/developmentseed/deck.gl-raster/issues/565). + +## Integration + +### `@developmentseed/geotiff` + +- `GeoTIFF.fromUrl(url, { …, concurrencyLimiter?, getPriority? })` — `concurrencyLimiter: ConcurrencyLimiter | null | undefined`, `getPriority: () => Priority`. When `concurrencyLimiter` is non-null, constructs `new LimiterMiddleware({ url, limiter, getPriority })` and slots it into both the header source's `SourceView` (after `SourceChunk` + `SourceCache`, so cache hits short-circuit before the limiter) and a tiny `SourceView` wrapping the raw data source. When `null` or `undefined`, no gating; `getPriority` is then a no-op. (`fromUrl` does *not* default to a shared limiter — that's a layer-level concern; see below.) +- `GeoTIFF.open({ … })` — unchanged. Users wanting gating with `open` wire the middleware themselves before calling. + +Both header and data fetches gate so the limiter has full visibility into per-origin demand; cache hits in the header path short-circuit before reaching the limiter (the middleware order in the `SourceView` puts `SourceCache` ahead of `LimiterMiddleware`). + +### `@developmentseed/deck.gl-geotiff` + +A module-level default instance lives here (not in `@developmentseed/geotiff`, so consumers of `geotiff` that don't use layers don't get a stray module-load semaphore): + +```ts +// packages/deck.gl-geotiff/src/default-concurrency-limiter.ts (or top of cog-layer.ts) +import { PerOriginSemaphore } from "@developmentseed/geotiff"; + +/** Shared by every COGLayer / MultiCOGLayer that doesn't override its + * concurrencyLimiter prop, so multiple layers on the same origin share one + * HTTP/1.1 connection pool. */ +export const DEFAULT_CONCURRENCY_LIMITER = new PerOriginSemaphore({ maxRequests: 6 }); +``` + +`COGLayer`: + +```ts +class COGLayer extends RasterTileLayer { + static override defaultProps = { + ...RasterTileLayer.defaultProps, + concurrencyLimiter: DEFAULT_CONCURRENCY_LIMITER, + }; +} + +// props type: +type COGLayerProps = … & { + /** Caps concurrent HTTP requests to each origin this layer fetches from. + * Defaults to a module-level shared `PerOriginSemaphore({ maxRequests: 6 })` + * so two layers on the same bucket share one cap. Pass your own to override; + * pass `null` to disable gating. */ + concurrencyLimiter?: ConcurrencyLimiter | null; +}; +``` + +The layer threads its prop into `fetchGeoTIFF(url, { concurrencyLimiter })` → `GeoTIFF.fromUrl(url, { concurrencyLimiter })`. When `props.geotiff` is a pre-opened `GeoTIFF` instance, the prop is ignored (doc note: "you already wired the limiter at `fromUrl`/`open` time"). + +Same module-level default is reused by `MultiCOGLayer` and `MosaicLayer` (and any other layer that opens a `GeoTIFF`) so cross-layer-type sharing works out of the box. + +#### `MosaicLayer` — viewport-aware priority + +`MosaicLayer`'s `getTileData` builds a `getPriority` closure per source: euclidean distance from `source.bbox` center to `this.context.viewport.{longitude, latitude}` in degree-space (just an ordering key — great-circle isn't needed). The closure reads `this.context.viewport` lazily, so each invocation sees the *current* viewport, not the one at queue time. + +The closure plus the layer's `concurrencyLimiter` (resolved as `defaultProps`-fill with explicit `null` preserved as opt-out) are passed through `getSource(source, opts)`. A consumer's `getSource` can spread `opts` straight into `GeoTIFF.fromUrl`, so a typical usage is one line: + +```ts +new MosaicLayer({ + sources, + getSource: async (source, opts) => + getCachedGeoTIFF(source.assets.image.href, opts), + renderSource: …, +}); +``` + +`COGLayer` / `MultiCOGLayer` don't (yet) thread a `getPriority` — they only have a single source per layer, so dynamic priority doesn't reorder anything across layer boundaries. If consumers want priority across many `COGLayer`s sharing one limiter, they can construct their `GeoTIFF` outside the layer and pass `getPriority` to `fromUrl` themselves. + +`RasterTileLayer.props.maxRequests` is unchanged — still passed through to deck.gl's `Tileset2D`. Independent cap from the source-level one; users typically leave it at deck.gl's default 6 so `_pruneRequests` keeps firing. + +## Cancellation flow + +1. User pans. deck.gl's `Tileset2D._pruneRequests` fires `tile.abort()` for unselected in-flight tiles (because `ongoing > maxRequests`). +2. The tile's `AbortController.signal` aborts. `getTileData(tile, { signal })` (already awaiting our chain) sees it. +3. The signal threads through `fetchTile(image, { x, y, signal })` → `dataSource.fetch(offset, length, { signal })`. +4. `LimiterMiddleware` passes the signal to `limiter.acquire(url, signal, getPriority)`: + - Already aborted on entry → reject immediately, no slot consumed. + - Aborted while queued in the inner `Semaphore` → splice from the queue, reject, no slot consumed. + - Aborted in-flight (after acquiring the slot) → the underlying `fetch` itself aborts via its own signal handling; the `finally` releases the slot. + +## Priority flow + +1. Layer builds a `getPriority` closure that reads the current viewport when invoked. +2. Closure threads through `getSource(opts)` → `GeoTIFF.fromUrl({ getPriority })` → `LimiterMiddleware` → `limiter.acquire(url, signal, getPriority)` → `Semaphore.acquire(signal, getPriority)`. +3. On each release, `Semaphore` linear-scans pending waiters, calls each `getPriority()` (catching synchronous throws → treat as `+Infinity` so a broken closure can't deadlock the queue), picks the min, and resumes it. Panning between releases mutates the values the next scan sees — no explicit re-queue or external "reprioritize" call. +4. We don't use a priority queue / heap here because priorities are *not* stable under the structure: a heap built from one snapshot would have to be rebuilt each time the viewport moves. A linear scan is O(N) per release and trivially correct; queue depths in practice are bounded by the number of overlapping sources visible at once (single digits to low hundreds), so the constant factor is fine. + +## Testing + +- `Semaphore` (unit): FIFO ordering when no priority is given; `maxRequests` honored; `acquire(signal)` rejects on already-aborted; aborts while queued splice cleanly without consuming a slot; release is idempotent; explicit priorities serviced lowest-first; dynamic priority — a waiter's `getPriority` return value can change between releases and the next release picks the new min; ties break by insertion order; a `getPriority` that throws is treated as `+Infinity` (no deadlock). +- `PerOriginSemaphore` (unit): two different-origin `acquire`s don't compete; two same-origin acquires share one pool; per-origin Semaphores are minted lazily; `getPriority` is forwarded to the right per-origin Semaphore. +- `comparePriorities` (unit): numbers compared numerically; arrays compared lex; missing trailing elements treated as 0; mixed number / array comparisons. +- `LimiterMiddleware` (unit): forwards `offset`/`length`/`options` unmodified to `next`; releases on resolve and on throw; forwards `options.signal` and the constructor's `getPriority` to `limiter.acquire`. +- `GeoTIFF.fromUrl({ concurrencyLimiter, getPriority })` (integration, with a recording counting limiter wrapping a fixture file source): with `maxRequests: 1`, `peak in-flight` never exceeds 1; both header (cache miss) and data source `.fetch` are gated; cache hits in the header path don't burn a slot; `getPriority` is forwarded to every `acquire`. +- `COGLayer.defaultProps.concurrencyLimiter` (unit): `COGLayer`, `MultiCOGLayer`, and `MosaicLayer` instances without explicit prop end up with the same limiter instance. + +## Future work (for design context, not built here) + +- **Coalescing**: `TileBatcher` / `getMultiTileData` / `fetchTilesSettled` from a layer-side dispatcher. The tension with `_pruneRequests` (which only fires when `ongoing > maxRequests`) means the batcher either accepts small (per-wave) coalescing windows or requires a `Tileset2D` subclass that fires aborts on `onTileUnload`. Pluggable batching strategy — row vs box vs single — exposed via a structured `groupKey: (tile) => { z, y }` (or similar) on the batcher. +- **Upstream deck.gl proposals** (likely worth opening issues for): + - Make `Tileset2D.pruneRequests` a *public* method (currently `_pruneRequests`) so callers can trigger cancellation of unselected in-flight tiles imperatively — e.g. our source-level limiter could ask the tileset to drop stale tiles when its queue grows past a threshold, instead of relying on the implicit "ongoing > maxRequests" trigger. + - Fire tile abort signals on `onTileUnload` (cache eviction) independent of `maxRequests`, so cancellation works when `maxRequests = 0`. + - Expose `_requestScheduler` as `requestScheduler` (or an interface) so callers can inspect / replace it. + - Add a native `getTileDataBatched` prop (the original request behind this whole design). +- **Shared concurrency package**: if a non-geotiff source format (Zarr, etc.) ever wants the same `ConcurrencyLimiter` *instance* a `COGLayer` is using, the limiter primitives extract cleanly to a new package and both packages depend on it. diff --git a/examples/naip-mosaic/src/App.tsx b/examples/naip-mosaic/src/App.tsx index 5ab7bcc5..6f0dc050 100644 --- a/examples/naip-mosaic/src/App.tsx +++ b/examples/naip-mosaic/src/App.tsx @@ -11,7 +11,7 @@ import { LinearRescale, } from "@developmentseed/deck.gl-raster/gpu-modules"; import colormapsPngUrl from "@developmentseed/deck.gl-raster/gpu-modules/colormaps.png"; -import type { Overview } from "@developmentseed/geotiff"; +import type { GeoTIFFFromUrlOptions, Overview } from "@developmentseed/geotiff"; import { GeoTIFF } from "@developmentseed/geotiff"; import type { Device, Texture } from "@luma.gl/core"; import type { ShaderModule } from "@luma.gl/shadertools"; @@ -85,10 +85,13 @@ type TextureDataT = { */ const geotiffCache = new Map>(); -function getCachedGeoTIFF(url: string, signal?: AbortSignal): Promise { +function getCachedGeoTIFF( + url: string, + opts: GeoTIFFFromUrlOptions, +): Promise { let promise = geotiffCache.get(url); if (!promise) { - promise = GeoTIFF.fromUrl(url, { signal }).catch((err) => { + promise = GeoTIFF.fromUrl(url, opts).catch((err) => { geotiffCache.delete(url); throw err; }); @@ -351,7 +354,6 @@ export default function App() { async function wrappedFetchSTACItems() { try { const data = STAC_DATA as unknown as STACFeatureCollection; - (window as any).data = data; setStacItems(data.features); } catch (err) { console.error("Error fetching STAC items:", err); @@ -410,8 +412,8 @@ export default function App() { // the MosaicLayer's TileLayer cache so we can keep cheap header metadata // around indefinitely without pinning every parent tile (and its inner // COGLayer's in-flight tile requests) in memory. - getSource: async (source, { signal }) => - getCachedGeoTIFF(source.assets.image.href, signal), + getSource: async (source, opts) => + getCachedGeoTIFF(source.assets.image.href, opts), renderSource: (source, { data, signal }) => { const url = source.assets.image.href; return new COGLayer({ @@ -434,8 +436,9 @@ export default function App() { signal, }); }, - // Smaller cache for MosaicLayer cache, since it caches full COGLayer - // instances + // Disable the MosaicLayer tile cache: each cached tile is a full + // COGLayer instance, and opened GeoTIFFs are already kept in the + // module-level `geotiffCache`, so there's nothing cheap to retain here. maxCacheSize: 0, // @ts-expect-error beforeId is injected by @deck.gl/mapbox; LayerProps // doesn't know about it. diff --git a/packages/deck.gl-geotiff/src/cog-layer.ts b/packages/deck.gl-geotiff/src/cog-layer.ts index e6d3822c..bf68f090 100644 --- a/packages/deck.gl-geotiff/src/cog-layer.ts +++ b/packages/deck.gl-geotiff/src/cog-layer.ts @@ -1,4 +1,4 @@ -import type { UpdateParameters } from "@deck.gl/core"; +import type { LayerContext, UpdateParameters } from "@deck.gl/core"; import type { MinimalTileData, GetTileDataOptions as RasterTileGetTileDataOptions, @@ -7,7 +7,12 @@ import type { RenderTileResult, } from "@developmentseed/deck.gl-raster"; import { RasterTileLayer } from "@developmentseed/deck.gl-raster"; -import type { DecoderPool, GeoTIFF, Overview } from "@developmentseed/geotiff"; +import type { + ConcurrencyLimiter, + DecoderPool, + GeoTIFF, + Overview, +} from "@developmentseed/geotiff"; import { defaultDecoderPool } from "@developmentseed/geotiff"; import type { EpsgResolver, ProjectionDefinition } from "@developmentseed/proj"; import { @@ -18,6 +23,7 @@ import { } from "@developmentseed/proj"; import type { Texture } from "@luma.gl/core"; import proj4 from "proj4"; +import { DEFAULT_CONCURRENCY_LIMITER } from "./default-concurrency-limiter.js"; import { fetchGeoTIFF, getGeographicBounds } from "./geotiff/geotiff.js"; import type { TextureDataT } from "./geotiff/render-pipeline.js"; import { inferRenderPipeline } from "./geotiff/render-pipeline.js"; @@ -133,6 +139,19 @@ export type COGLayerProps = Omit< * automatically aborted. */ signal?: AbortSignal; + + /** + * Caps concurrent HTTP requests for this layer's source fetches. + * + * Defaults to a maximum of 6 concurrent requests per origin, which aligns + * with browser limits of 6 HTTP/1.1 requests per origin. If your sources + * support HTTP/2 or HTTP/3, you may want to increase this limit or disable + * it entirely by passing `null`. + * + * Ignored when `geotiff` is a pre-opened `GeoTIFF` instance — wire the + * limiter via {@link GeoTIFF.fromUrl} at construction time instead. + */ + concurrencyLimiter?: ConcurrencyLimiter | null; }; /** @@ -150,6 +169,7 @@ export class COGLayer< static override defaultProps = { ...RasterTileLayer.defaultProps, epsgResolver, + concurrencyLimiter: DEFAULT_CONCURRENCY_LIMITER, } as typeof RasterTileLayer.defaultProps; declare state: { @@ -157,10 +177,19 @@ export class COGLayer< tilesetDescriptor?: RasterTilesetDescriptor; defaultGetTileData?: COGLayerProps["getTileData"]; defaultRenderTile?: COGLayerProps["renderTile"]; + /** Aborts the in-flight header read when the `geotiff` prop changes or the + * layer is removed + */ + abortController?: AbortController; }; override initializeState(): void { - this.setState({}); + this.setState({ abortController: new AbortController() }); + } + + override finalizeState(context: LayerContext): void { + this.state.abortController?.abort(); + super.finalizeState(context); } override updateState(params: UpdateParameters) { @@ -189,13 +218,31 @@ export class COGLayer< } async _parseGeoTIFF(): Promise { - const geotiff = await fetchGeoTIFF(this.props.geotiff); + const signal = this.state.abortController?.signal; + + let geotiff: GeoTIFF; + try { + geotiff = await fetchGeoTIFF(this.props.geotiff, { + concurrencyLimiter: this.props.concurrencyLimiter, + signal, + }); + } catch (err) { + // Layer removed mid-open (finalizeState aborted the signal); drop it. + if (signal?.aborted) { + return; + } + throw err; + } const crs = geotiff.crs; const sourceProjection = typeof crs === "number" ? await this.props.epsgResolver!(crs) : parseWkt(crs); + if (signal?.aborted) { + return; + } + // @ts-expect-error - proj4 typings are incomplete and don't support // wkt-parser input const converter4326 = proj4(sourceProjection, "EPSG:4326"); diff --git a/packages/deck.gl-geotiff/src/default-concurrency-limiter.ts b/packages/deck.gl-geotiff/src/default-concurrency-limiter.ts new file mode 100644 index 00000000..52755645 --- /dev/null +++ b/packages/deck.gl-geotiff/src/default-concurrency-limiter.ts @@ -0,0 +1,12 @@ +import { PerOriginSemaphore } from "@developmentseed/geotiff"; + +/** + * Shared default concurrency limiter for every COGLayer / MultiCOGLayer that + * doesn't override its `concurrencyLimiter` prop. A single module-level + * `PerOriginSemaphore({ maxRequests: 6 })` so two layers fetching from the + * same origin (e.g. the same S3 bucket) share *one* HTTP/1.1 connection + * pool. The cap matches Chrome's default per-origin HTTP/1.1 limit. + */ +export const DEFAULT_CONCURRENCY_LIMITER = new PerOriginSemaphore({ + maxRequests: 6, +}); diff --git a/packages/deck.gl-geotiff/src/geotiff/geotiff.ts b/packages/deck.gl-geotiff/src/geotiff/geotiff.ts index 6a3cbe15..1f128373 100644 --- a/packages/deck.gl-geotiff/src/geotiff/geotiff.ts +++ b/packages/deck.gl-geotiff/src/geotiff/geotiff.ts @@ -1,6 +1,10 @@ // Utilities for interacting with a GeoTIFF -import type { RasterArray } from "@developmentseed/geotiff"; +import type { + ConcurrencyLimiter, + Priority, + RasterArray, +} from "@developmentseed/geotiff"; import { GeoTIFF } from "@developmentseed/geotiff"; import type { Converter } from "proj4"; @@ -54,9 +58,14 @@ export function addAlphaChannel(rgbImage: RasterArray): RasterArray { export async function fetchGeoTIFF( input: GeoTIFF | string | URL | ArrayBuffer, + options: { + concurrencyLimiter?: ConcurrencyLimiter | null; + getPriority?: () => Priority; + signal?: AbortSignal; + } = {}, ): Promise { if (typeof input === "string" || input instanceof URL) { - return await GeoTIFF.fromUrl(input); + return await GeoTIFF.fromUrl(input, options); } if (input instanceof ArrayBuffer) { diff --git a/packages/deck.gl-geotiff/src/index.ts b/packages/deck.gl-geotiff/src/index.ts index 1955ad55..9a6a59f1 100644 --- a/packages/deck.gl-geotiff/src/index.ts +++ b/packages/deck.gl-geotiff/src/index.ts @@ -3,6 +3,7 @@ export type { GetTileDataOptions, } from "./cog-layer.js"; export { COGLayer } from "./cog-layer.js"; +export { DEFAULT_CONCURRENCY_LIMITER } from "./default-concurrency-limiter.js"; export { addAlphaChannel } from "./geotiff/geotiff.js"; export * as texture from "./geotiff/texture.js"; export type { MosaicLayerProps } from "./mosaic-layer/mosaic-layer.js"; diff --git a/packages/deck.gl-geotiff/src/mosaic-layer/mosaic-layer.ts b/packages/deck.gl-geotiff/src/mosaic-layer/mosaic-layer.ts index 0d9b76e0..79084286 100644 --- a/packages/deck.gl-geotiff/src/mosaic-layer/mosaic-layer.ts +++ b/packages/deck.gl-geotiff/src/mosaic-layer/mosaic-layer.ts @@ -4,11 +4,18 @@ import type { LayerContext, LayersList, UpdateParameters, + Viewport, +} from "@deck.gl/core"; +import { + _GlobeViewport, + CompositeLayer, + WebMercatorViewport, } from "@deck.gl/core"; -import { CompositeLayer } from "@deck.gl/core"; import type { TileLayerProps } from "@deck.gl/geo-layers"; import { TileLayer } from "@deck.gl/geo-layers"; +import type { ConcurrencyLimiter, Priority } from "@developmentseed/geotiff"; import Flatbush from "flatbush"; +import { DEFAULT_CONCURRENCY_LIMITER } from "../default-concurrency-limiter.js"; import type { MosaicSource } from "./mosaic-tileset-2d.js"; import { MosaicTileset2D } from "./mosaic-tileset-2d.js"; @@ -18,7 +25,8 @@ export type MosaicLayerProps< > = CompositeLayerProps & Pick< TileLayerProps, - | "debounceTime" + // NOTE: `debounceTime` is intentionally not exposed. + // See https://github.com/developmentseed/deck.gl-raster/issues/562 | "extent" | "maxCacheByteSize" | "maxCacheSize" @@ -47,10 +55,36 @@ export type MosaicLayerProps< */ sources: MosaicT[]; + /** + * Caps concurrent HTTP requests for this layer's source fetches. + * + * Defaults to a maximum of 6 concurrent requests per origin, which aligns + * with browser limits of 6 HTTP/1.1 requests per origin. If your sources + * support HTTP/2 or HTTP/3, you may want to increase this limit or disable + * it entirely by passing `null`. + */ + concurrencyLimiter?: ConcurrencyLimiter | null; + /** Fetch data for this source. */ getSource?: ( source: MosaicT, - opts: { signal?: AbortSignal }, + opts: { + signal?: AbortSignal; + /** + * The layer's current `concurrencyLimiter` prop. Forward to + * {@link GeoTIFF.fromUrl}'s `concurrencyLimiter` option so this + * source's fetches join the shared per-origin queue. + */ + concurrencyLimiter?: ConcurrencyLimiter | null; + /** + * Callback that provides dynamic priority for fetches related to this + * source. + * + * This is designed to re-sort the limiter's queue on viewport pan, + * preferring sources closer to the viewport center. + */ + getPriority?: () => Priority; + }, ) => Promise; /** Render a source */ @@ -89,9 +123,48 @@ export type MosaicLayerProps< }; const defaultProps: Partial = { + concurrencyLimiter: DEFAULT_CONCURRENCY_LIMITER, sources: [], }; +/** + * Build the limiter `getPriority` callback for one mosaic source: euclidean + * distance from the source's bbox center to the current viewport center, in + * lon/lat degree-space (just an ordering key — great-circle isn't needed). + * + * `getViewport` is read on every call, so the limiter re-sorts its queue as + * the viewport pans, pulling newly-central sources ahead of edge sources. + * + * Returns `undefined` for non-geographic viewports — where the source bbox and + * viewport center don't share a coordinate space — so the limiter falls back + * to FIFO instead of comparing mismatched units. The viewport type is checked + * once here; it isn't expected to change under the layer. + */ +function createGetPriorityCallback( + bbox: readonly [number, number, number, number], + getViewport: () => Viewport, +): (() => number) | undefined { + const viewport = getViewport(); + if ( + !(viewport instanceof WebMercatorViewport) && + !(viewport instanceof _GlobeViewport) + ) { + return undefined; + } + + const [minX, minY, maxX, maxY] = bbox; + const sourceCx = (minX + maxX) / 2; + const sourceCy = (minY + maxY) / 2; + + return (): number => { + // Geographic viewport (checked above); both types expose lon/lat. + const v = getViewport() as WebMercatorViewport | _GlobeViewport; + const dx = sourceCx - v.longitude; + const dy = sourceCy - v.latitude; + return Math.hypot(dx, dy); + }; +} + /** * A deck.gl layer for rendering a mosaic of raster sources. * @@ -146,13 +219,13 @@ export class MosaicLayer< ): TileLayer { const { id, - minZoom, - maxZoom, - debounceTime, + concurrencyLimiter, extent, maxCacheByteSize, maxCacheSize, maxRequests, + maxZoom, + minZoom, onSourceLoad, onSourceError, onSourceUnload, @@ -177,22 +250,31 @@ export class MosaicLayer< }>({ id: `mosaic-layer-${id}`, TilesetClass: MosaicTileset2DFactory, - minZoom, - maxZoom, - debounceTime, - extent, - ...(maxCacheByteSize !== undefined && { maxCacheByteSize }), - maxCacheSize, - maxRequests, + ...omitUndefined({ + minZoom, + maxZoom, + extent, + maxCacheByteSize, + maxCacheSize, + maxRequests, + }), getTileData: async (data) => { // We hard-cast this because TilesetClass is not generic. // MosaicTileset2D returns MosaicT in `index`, but TileLayer's typing // exposes only the plain `TileIndex` here. const index = data.index as unknown as MosaicT; const { signal } = data; + const getPriority = createGetPriorityCallback( + index.bbox, + () => this.context.viewport, + ); const userData = this.props.getSource && - (await this.props.getSource(index, { signal })); + (await this.props.getSource(index, { + signal, + concurrencyLimiter, + getPriority, + })); return { source: index, @@ -255,3 +337,18 @@ export class MosaicLayer< return this.renderTileLayer(renderSource); } } + +/** + * Drop keys whose value is `undefined`. + * + * Passing down an explicit `undefined` will override any default prop values. + */ +function omitUndefined(obj: T): Partial { + const result: Partial = {}; + for (const key in obj) { + if (obj[key] !== undefined) { + result[key] = obj[key]; + } + } + return result; +} diff --git a/packages/deck.gl-geotiff/src/multi-cog-layer.ts b/packages/deck.gl-geotiff/src/multi-cog-layer.ts index 7ff170e2..8472054e 100644 --- a/packages/deck.gl-geotiff/src/multi-cog-layer.ts +++ b/packages/deck.gl-geotiff/src/multi-cog-layer.ts @@ -1,6 +1,7 @@ import type { CompositeLayerProps, Layer, + LayerContext, UpdateParameters, } from "@deck.gl/core"; import type { @@ -32,13 +33,14 @@ import { CompositeBands, } from "@developmentseed/deck.gl-raster/gpu-modules"; import type { + ConcurrencyLimiter, DecoderPool, GeoTIFF, Overview, RasterArray, } from "@developmentseed/geotiff"; import { assembleTiles, defaultDecoderPool } from "@developmentseed/geotiff"; -import type { EpsgResolver } from "@developmentseed/proj"; +import type { EpsgResolver, ProjectionDefinition } from "@developmentseed/proj"; import { epsgResolver as defaultEpsgResolver, makeClampedForwardTo3857, @@ -47,6 +49,7 @@ import { } from "@developmentseed/proj"; import type { Device, Texture, TextureFormat } from "@luma.gl/core"; import proj4 from "proj4"; +import { DEFAULT_CONCURRENCY_LIMITER } from "./default-concurrency-limiter.js"; import { fetchGeoTIFF, getGeographicBounds } from "./geotiff/geotiff.js"; import { geoTiffToDescriptor } from "./geotiff-tileset.js"; @@ -257,14 +260,68 @@ export type MultiCOGLayerProps = CompositeLayerProps & * @default 1 */ debugLevel?: 1 | 2 | 3; + + /** + * Caps concurrent HTTP requests for this layer's source fetches. + * + * Defaults to a maximum of 6 concurrent requests per origin, which aligns + * with browser limits of 6 HTTP/1.1 requests per origin. If your sources + * support HTTP/2 or HTTP/3, you may want to increase this limit or disable + * it entirely by passing `null`. + */ + concurrencyLimiter?: ConcurrencyLimiter | null; }; const defaultProps = { ...RasterTileLayer.defaultProps, epsgResolver: { type: "accessor" as const, value: defaultEpsgResolver }, debugLevel: { type: "number" as const, value: 1 }, + concurrencyLimiter: DEFAULT_CONCURRENCY_LIMITER, +}; + +/** A source's opened GeoTIFF paired with its resolved projection. */ +type OpenedCogSource = { + name: string; + geotiff: GeoTIFF; + sourceProjection: ProjectionDefinition; }; +/** + * Open every configured source's GeoTIFF in parallel and resolve each one's + * projection. Returns `null` when `signal` aborts mid-open (the layer was + * removed), so the caller can bail without applying stale state. + */ +async function openCogSources( + entries: [string, MultiCOGSourceConfig][], + options: { + concurrencyLimiter?: ConcurrencyLimiter | null; + epsgResolver: EpsgResolver; + signal?: AbortSignal; + }, +): Promise { + const { concurrencyLimiter, epsgResolver, signal } = options; + try { + return await Promise.all( + entries.map(async ([name, config]) => { + const geotiff = await fetchGeoTIFF(config.url, { + concurrencyLimiter, + signal, + }); + const crs = geotiff.crs; + const sourceProjection = + typeof crs === "number" ? await epsgResolver(crs) : parseWkt(crs); + return { name, geotiff, sourceProjection }; + }), + ); + } catch (err) { + // Layer removed mid-open (finalizeState aborted the signal); bail. + if (signal?.aborted) { + return null; + } + throw err; + } +} + /** * A deck.gl {@link CompositeLayer} that opens multiple Cloud-Optimized GeoTIFFs * (COGs) in parallel, builds a {@link RasterTilesetDescriptor} for each, and groups @@ -292,15 +349,26 @@ export class MultiCOGLayer extends RasterTileLayer< declare state: { sources: Map | null; multiDescriptor: MultiRasterTilesetDescriptor | null; + /** Aborts the in-flight header reads when the layer is removed, freeing + * their limiter slots for fresh work. */ + abortController?: AbortController; }; override initializeState(): void { this.setState({ sources: null, multiDescriptor: null, + // One controller for the layer's lifetime; aborted in finalizeState so + // header reads still in flight when the layer is removed are cancelled. + abortController: new AbortController(), }); } + override finalizeState(context: LayerContext): void { + this.state.abortController?.abort(); + super.finalizeState(context); + } + override updateState({ changeFlags, props, @@ -331,18 +399,20 @@ export class MultiCOGLayer extends RasterTileLayer< const { sources } = this.props; const entries = Object.entries(sources); - // Open all COGs in parallel - const cogSources = await Promise.all( - entries.map(async ([name, config]) => { - const geotiff = await fetchGeoTIFF(config.url); - const crs = geotiff.crs; - const sourceProjection = - typeof crs === "number" - ? await this.props.epsgResolver!(crs) - : parseWkt(crs); - return { name, geotiff, sourceProjection }; - }), - ); + if (entries.length === 0) { + return; + } + + const signal = this.state.abortController?.signal; + + const cogSources = await openCogSources(entries, { + concurrencyLimiter: this.props.concurrencyLimiter, + epsgResolver: this.props.epsgResolver!, + signal, + }); + if (cogSources === null) { + return; + } // Use the first source's projection for shared projection functions // (all sources must share the same CRS) @@ -396,6 +466,12 @@ export class MultiCOGLayer extends RasterTileLayer< const multiDescriptor = createMultiRasterTilesetDescriptor(tilesetMap); + // Layer was removed while we resolved projections; don't setState on a + // finalized layer. + if (signal?.aborted) { + return; + } + this.setState({ sources: sourceMap, multiDescriptor, diff --git a/packages/deck.gl-geotiff/tests/concurrency-limiter.test.ts b/packages/deck.gl-geotiff/tests/concurrency-limiter.test.ts new file mode 100644 index 00000000..b4660732 --- /dev/null +++ b/packages/deck.gl-geotiff/tests/concurrency-limiter.test.ts @@ -0,0 +1,38 @@ +import { describe, expect, it } from "vitest"; +import { COGLayer } from "../src/cog-layer.js"; +import { DEFAULT_CONCURRENCY_LIMITER } from "../src/default-concurrency-limiter.js"; +import { MosaicLayer } from "../src/mosaic-layer/mosaic-layer.js"; +import { MultiCOGLayer } from "../src/multi-cog-layer.js"; + +describe("COGLayer default concurrencyLimiter", () => { + it("defaultProps.concurrencyLimiter is the shared module-level instance", () => { + // @ts-expect-error — defaultProps is cast to the base type at the + // declaration site, so the field isn't visible on its static type. The + // *value* is still the one we want. + expect(COGLayer.defaultProps.concurrencyLimiter).toBe( + DEFAULT_CONCURRENCY_LIMITER, + ); + }); +}); + +describe("MultiCOGLayer default concurrencyLimiter", () => { + it("defaultProps.concurrencyLimiter is the same shared instance as COGLayer's", () => { + // @ts-expect-error — see COGLayer test above + expect(MultiCOGLayer.defaultProps.concurrencyLimiter).toBe( + DEFAULT_CONCURRENCY_LIMITER, + ); + // @ts-expect-error + expect(MultiCOGLayer.defaultProps.concurrencyLimiter).toBe( + // @ts-expect-error + COGLayer.defaultProps.concurrencyLimiter, + ); + }); +}); + +describe("MosaicLayer default concurrencyLimiter", () => { + it("defaultProps.concurrencyLimiter is the same shared instance", () => { + expect(MosaicLayer.defaultProps.concurrencyLimiter).toBe( + DEFAULT_CONCURRENCY_LIMITER, + ); + }); +}); diff --git a/packages/geotiff/src/fetch.ts b/packages/geotiff/src/fetch.ts index 88760897..299abc2d 100644 --- a/packages/geotiff/src/fetch.ts +++ b/packages/geotiff/src/fetch.ts @@ -326,6 +326,7 @@ async function findBandSeparateTileByteRanges( self: HasTiffReference, x: number, y: number, + options?: { signal?: AbortSignal }, ): Promise { // TODO: error here if user-provided band-indexes are out of bounds const { x: tilesPerRow, y: tilesPerColumn } = self.image.tileCount; @@ -333,7 +334,7 @@ async function findBandSeparateTileByteRanges( const numBands = self.cachedTags.samplesPerPixel; const tileSizes = [...Array(numBands).keys()].map((band) => { const bandIdx = band * tilesPerBand + y * tilesPerRow + x; - return self.image.getTileSize(bandIdx); + return self.image.getTileSize(bandIdx, options); }); return Promise.all(tileSizes); } @@ -342,16 +343,15 @@ async function fetchBandSeparateTileBytes( self: HasTiffReference, x: number, y: number, - { - signal, - }: { + options: { signal?: AbortSignal; } = {}, ): Promise { + const { signal } = options; const debug: DebugTag | undefined = self._debug ? { label: "data" } : undefined; - const byteRanges = await findBandSeparateTileByteRanges(self, x, y); + const byteRanges = await findBandSeparateTileByteRanges(self, x, y, options); const buffers = byteRanges.map(async ({ offset, imageSize }) => { const tile = await getBytes( self.image, @@ -418,18 +418,17 @@ async function fetchCogBytesMultiple( async function fetchBandSeparateTileBytesMultiple( self: HasTiffReference, xy: Array<[number, number]>, - { - signal, - }: { + options: { signal?: AbortSignal; } = {}, ): Promise { + const { signal } = options; const debug: DebugTag | undefined = self._debug ? { label: "data" } : undefined; const numBands = self.cachedTags.samplesPerPixel; const perTileRanges = await Promise.all( - xy.map(([x, y]) => findBandSeparateTileByteRanges(self, x, y)), + xy.map(([x, y]) => findBandSeparateTileByteRanges(self, x, y, options)), ); const flatRanges = perTileRanges.flatMap((ranges) => ranges.map(({ offset, imageSize }) => ({ @@ -505,8 +504,11 @@ async function getTile( // image.getTileSize() reads TileOffsets[idx] and TileByteCounts[idx] from // the header source (cogeotiff's lazy per-entry path, served by the chunk // cache). It does NOT read tile data — only the 4–8 byte offset/count - // entries. - const { offset, imageSize } = await image.getTileSize(idx); + // entries. Thread the signal so a cache-miss read aborts (and releases its + // limiter slot) alongside the data fetch. + const { offset, imageSize } = await image.getTileSize(idx, { + signal: options?.signal, + }); // The actual tile bytes go through dataSource (uncached HTTP). return getBytes(image, offset, imageSize, dataSource, options); @@ -687,7 +689,9 @@ export async function getTiles( return idx; }); - const sizes = await Promise.all(indices.map((i) => image.getTileSize(i))); + const sizes = await Promise.all( + indices.map((i) => image.getTileSize(i, { signal: options?.signal })), + ); return getMultipleBytes( image, sizes.map((s) => ({ offset: s.offset, byteCount: s.imageSize })), diff --git a/packages/geotiff/src/geotiff.ts b/packages/geotiff/src/geotiff.ts index 3c427c0f..c209e5e3 100644 --- a/packages/geotiff/src/geotiff.ts +++ b/packages/geotiff/src/geotiff.ts @@ -12,11 +12,42 @@ import type { BandStatistics, GDALMetadata } from "./gdal-metadata.js"; import { parseGDALMetadata } from "./gdal-metadata.js"; import type { CachedTags, GeoKeyDirectory } from "./ifd.js"; import { extractGeoKeyDirectory, prefetchTags } from "./ifd.js"; +import type { ConcurrencyLimiter, Priority } from "./limiter.js"; +import { LimitedSource } from "./limiter.js"; import { Overview } from "./overview.js"; import type { DecoderPool } from "./pool/pool.js"; import type { Tile } from "./tile.js"; import { createTransform, index, xy } from "./transform.js"; +/** Options for {@link GeoTIFF.fromUrl}. */ +export interface GeoTIFFFromUrlOptions { + /** Bytes per chunk for the header cache. Defaults to 64 KiB (matches + * geotiff.js's BlockedSource). */ + chunkSize?: number; + /** Total cache size in bytes. Defaults to 8 MiB (~128 blocks at the default + * chunk size). */ + cacheSize?: number; + /** An optional {@link AbortSignal} to cancel the header reads. */ + signal?: AbortSignal; + /** When true, the returned GeoTIFF logs each tile/mask data fetch to the + * console with offset/length and a `data`/`mask` label. Off by default. */ + debug?: boolean; + /** Caps concurrent HTTP requests for both the header/metadata and tile-data + * paths. Header reads go through the cached `SourceView`, so cache hits + * short-circuit before the limiter and never consume a slot — only network + * reads gate. Pass `null` to explicitly disable; omit (or pass `undefined`) + * for the same effect — `GeoTIFF.fromUrl` does *not* default to a shared + * limiter on its own. The deck.gl-geotiff layers default to a shared + * {@link PerOriginSemaphore} via their `defaultProps`. */ + concurrencyLimiter?: ConcurrencyLimiter | null; + /** Optional dynamic priority for every fetch through this GeoTIFF's sources. + * Re-invoked by the limiter on each slot-open, so closures over dynamic + * state (e.g. layer viewport center, tile bbox) re-sort the queue when that + * state changes. Lower = serviced sooner. Only meaningful when + * `concurrencyLimiter` is set. */ + getPriority?: () => Priority; +} + /** * A high-level GeoTIFF abstraction built on * {@link https://github.com/blacha/cogeotiff | @cogeotiff/core}'s `Tiff` and @@ -264,11 +295,7 @@ export class GeoTIFF { * bypass the cache and go straight to the raw HTTP source. * * @param url The URL of the GeoTIFF to open. - * @param options Optional parameters. - * @param options.chunkSize Bytes per chunk for the header cache. Defaults to 64 KiB (matches geotiff.js's BlockedSource). - * @param options.cacheSize Total cache size in bytes. Defaults to 8 MiB (~128 blocks at the default chunk size). - * @param options.signal An optional {@link AbortSignal} to cancel the header reads. - * @param options.debug When true, the returned GeoTIFF logs each tile/mask data fetch to the console with offset/length and a `data`/`mask` label. Off by default. + * @param options Optional parameters; see {@link GeoTIFFFromUrlOptions}. * @returns A Promise that resolves to a GeoTIFF instance. */ static async fromUrl( @@ -278,12 +305,9 @@ export class GeoTIFF { cacheSize = 8 * 1024 * 1024, signal, debug, - }: { - chunkSize?: number; - cacheSize?: number; - signal?: AbortSignal; - debug?: boolean; - } = {}, + concurrencyLimiter, + getPriority, + }: GeoTIFFFromUrlOptions = {}, ): Promise { const source = new SourceHttp(url, {}); @@ -303,14 +327,30 @@ export class GeoTIFF { // unbounded length. Remove once the upstream fix lands. source.metadata = { size: Number.POSITIVE_INFINITY }; - const view = new SourceView(source, [ + // When a limiter is supplied, gate every network read through it by + // wrapping the raw source. The header `SourceView` composes SourceChunk + + // SourceCache *on top* of this wrapped source, so a cache hit + // short-circuits in SourceCache and never reaches — never burns a slot on + // — the limiter; only reads that escape the cache (and every data read, + // which bypasses the cache) are gated. The same wrapped source backs both + // the header view and the data source, so both share one per-origin pool. + // + // Gating here as a source wrapper rather than a chunkd SourceMiddleware is + // a workaround for chunkd not forwarding the abort signal to middleware; + // see LimitedSource. Once that's fixed upstream this can become a + // middleware again. Tracked in + // https://github.com/developmentseed/deck.gl-raster/issues/565 + const limitedSource = concurrencyLimiter + ? new LimitedSource(source, { limiter: concurrencyLimiter, getPriority }) + : source; + + const view = new SourceView(limitedSource, [ new SourceChunk({ size: chunkSize }), new SourceCache({ size: cacheSize }), ]); return await GeoTIFF.open({ - // Tile data reads bypass the header cache (raw source). - dataSource: source, + dataSource: limitedSource, headerSource: view, signal, debug, diff --git a/packages/geotiff/src/index.ts b/packages/geotiff/src/index.ts index 30fc2ab9..8f2b7a1f 100644 --- a/packages/geotiff/src/index.ts +++ b/packages/geotiff/src/index.ts @@ -16,8 +16,11 @@ export type { DecoderMetadata, } from "./decode.js"; export { DECODER_REGISTRY } from "./decode.js"; +export type { GeoTIFFFromUrlOptions } from "./geotiff.js"; export { GeoTIFF } from "./geotiff.js"; export type { CachedTags, GeoKeyDirectory } from "./ifd.js"; +export type { ConcurrencyLimiter, Priority } from "./limiter.js"; +export { PerOriginSemaphore } from "./limiter.js"; export { Overview } from "./overview.js"; export type { DecoderPoolOptions } from "./pool/pool.js"; export { DecoderPool, defaultDecoderPool } from "./pool/pool.js"; diff --git a/packages/geotiff/src/limiter.ts b/packages/geotiff/src/limiter.ts new file mode 100644 index 00000000..06083473 --- /dev/null +++ b/packages/geotiff/src/limiter.ts @@ -0,0 +1,314 @@ +import type { Source, SourceMetadata } from "@chunkd/source"; + +/** + * Numeric priority used to order waiters in a {@link Semaphore}'s queue. Lower + * = serviced sooner. A single `number` is equivalent to a 1-tuple; arrays are + * compared lexicographically (element-wise), with missing trailing elements + * treated as 0. Returning `0` (or omitting `getPriority` entirely) makes the + * waiter effectively un-prioritized — FIFO arrival order wins among ties. + */ +export type Priority = number | readonly number[]; + +/** + * Normalize priority value: `undefined` or `NaN` becomes 0, so it sorts as un-prioritized. + * + * Coercing `NaN` matters because `NaN < x` and `NaN > x` both resolve to false, + * so a `getPriority` that returns `NaN` (e.g. a distance from a degenerate + * viewport) would otherwise compare as a tie with everything and silently + * mis-sort, rather than falling back to FIFO. + */ +function normalizePriority(value: number | undefined): number { + return value === undefined || Number.isNaN(value) ? 0 : value; +} + +/** + * Compare two priorities. Returns negative if `a` should be serviced before + * `b`, positive if `b` should go first, 0 on tie (queue then breaks the tie + * by FIFO arrival order). Both shapes are normalised to arrays for compare. + */ +function comparePriorities(a: Priority, b: Priority): number { + const arrA = typeof a === "number" ? [a] : a; + const arrB = typeof b === "number" ? [b] : b; + const len = Math.max(arrA.length, arrB.length); + for (let i = 0; i < len; i++) { + const ai = normalizePriority(arrA[i]); + const bi = normalizePriority(arrB[i]); + if (ai < bi) { + return -1; + } + if (ai > bi) { + return 1; + } + } + return 0; +} + +/** A pending acquire parked in {@link Semaphore.queue}, waiting for a slot. */ +interface Waiter { + /** Settles the caller's `acquire(...)` promise with a release function. */ + resolve(release: () => void): void; + /** Settles the caller's `acquire(...)` promise as rejected (e.g. on abort). */ + reject(reason: unknown): void; + /** Optional caller-supplied signal. If it aborts while we're queued, the + * waiter is spliced out and {@link Waiter.reject reject}ed. */ + signal?: AbortSignal; + /** The listener installed on `signal` so we can later + * `removeEventListener("abort", onAbort)` when the slot is granted. */ + onAbort?: () => void; + /** Dynamic priority callback. Re-invoked by `_releaseOne` on every slot- + * open so the queue can re-sort if priorities have changed (e.g. viewport + * panned, distance-from-center changed). Omitted = priority 0. */ + getPriority?: () => Priority; +} + +/** + * Counting semaphore with abort-aware acquire and dynamic priority. Internal + * primitive used by {@link PerOriginSemaphore} and {@link LimitedSource}. + * + * Hands out up to `maxRequests` concurrent slots. Further `acquire()`s queue. + * On every slot-open, the queue is searched for the lowest-priority waiter + * (re-evaluating `getPriority` on each — so panning the viewport re-sorts the + * queue if callers' priorities depend on viewport state). Ties break by FIFO + * arrival order. A `Semaphore` with no priorities is therefore equivalent to + * a plain FIFO queue. + * + * Acquires with an `AbortSignal` reject (and never consume a slot) if the + * signal aborts before the slot is granted — either because it's already + * aborted at call time, or because it aborts while queued. + * + * We use a single linear-scan find-min instead of a priority queue (heap) + * because priorities are *dynamic* — we have to re-evaluate every waiter's + * `getPriority` on each release anyway, which costs O(N). Linear scan + find- + * min in the same pass also costs O(N), with a smaller constant and simpler + * code; a heap would only win if we extracted multiple minima per release, + * which we don't (one slot opens at a time). + */ +export class Semaphore { + private active = 0; + private readonly maxRequests: number; + private readonly queue: Waiter[] = []; + + constructor(options: { maxRequests: number }) { + this.maxRequests = options.maxRequests; + } + + acquire( + signal?: AbortSignal, + getPriority?: () => Priority, + ): Promise<() => void> { + if (signal?.aborted) { + return Promise.reject(signal.reason); + } + if (this.active < this.maxRequests) { + this.active += 1; + return Promise.resolve(this._makeRelease()); + } + return new Promise<() => void>((resolve, reject) => { + const waiter: Waiter = { resolve, reject, signal, getPriority }; + if (signal) { + const onAbort = () => { + const idx = this.queue.indexOf(waiter); + if (idx >= 0) { + this.queue.splice(idx, 1); + reject(signal.reason); + } + }; + waiter.onAbort = onAbort; + signal.addEventListener("abort", onAbort, { once: true }); + } + this.queue.push(waiter); + }); + } + + /** Build a single-use release function for a freshly-granted slot. + * Calls beyond the first are no-ops, so double-releasing is safe. */ + private _makeRelease(): () => void { + let released = false; + return () => { + if (released) { + return; + } + released = true; + this._releaseOne(); + }; + } + + /** Hand off one slot: pick the lowest-priority waiter (re-evaluating each + * waiter's `getPriority` for dynamic ordering), grant it the slot — or, if + * the queue is empty, decrement {@link Semaphore.active} so the next + * `acquire` can take the freed slot directly. FIFO break on ties. */ + private _releaseOne(): void { + if (this.queue.length === 0) { + this.active -= 1; + return; + } + // Linear scan find-min. `bestIdx === 0` initially gives the earliest + // arrival the implicit tiebreaker — only strictly-lower priorities can + // bump it. + let bestIdx = 0; + let bestPrio: Priority = this.queue[0]!.getPriority?.() ?? 0; + for (let i = 1; i < this.queue.length; i++) { + const p: Priority = this.queue[i]!.getPriority?.() ?? 0; + if (comparePriorities(p, bestPrio) < 0) { + bestIdx = i; + bestPrio = p; + } + } + const next = this.queue.splice(bestIdx, 1)[0]!; + if (next.signal && next.onAbort) { + next.signal.removeEventListener("abort", next.onAbort); + } + // Hand the slot directly to the next waiter — `active` stays the same + // because we're transferring ownership, not freeing and re-taking. + next.resolve(this._makeRelease()); + } +} + +/** + * Minimal contract for capping concurrent {@link Source.fetch} calls. An + * implementation hands out slots scoped however it likes; the default + * {@link PerOriginSemaphore} scopes per `url.origin`. + */ +export interface ConcurrencyLimiter { + /** + * Acquire a slot to perform one fetch to `url`. Resolves to a release + * function — call it exactly once when the fetch settles. If `signal` + * aborts while waiting in the queue, the returned promise rejects with the + * signal's reason and no slot is consumed. + * + * `getPriority` is an optional callback re-evaluated by the limiter on + * every slot-open, so queued waiters can be re-ordered if their priority + * depends on dynamic state (e.g. distance from viewport center, which + * changes on pan). Lower-numeric = serviced sooner. A tuple sorts + * lexicographically. Omitted = priority 0, FIFO among ties. + */ + acquire( + url: URL, + signal?: AbortSignal, + getPriority?: () => Priority, + ): Promise<() => void>; +} + +/** + * Default {@link ConcurrencyLimiter}. Maintains a separate {@link Semaphore} + * per `url.origin`, minted lazily on first encounter. Multiple consumers (e.g. + * two `COGLayer`s on the same S3 bucket) targeting one origin share that + * origin's slot pool; consumers targeting different origins don't compete. + * + * The browser's HTTP/1.1 per-origin connection cap (~6 on Chrome) is the + * reason the cap is *per origin*, shared across layers — exceeding it just + * makes the browser queue requests, blocking fresh ones behind stale ones. + */ +export class PerOriginSemaphore implements ConcurrencyLimiter { + private readonly maxRequests: number; + private readonly byOrigin = new Map(); + + constructor(options: { maxRequests: number }) { + this.maxRequests = options.maxRequests; + } + + acquire( + url: URL, + signal?: AbortSignal, + getPriority?: () => Priority, + ): Promise<() => void> { + const { origin } = url; + let sem = this.byOrigin.get(origin); + if (!sem) { + sem = new Semaphore({ maxRequests: this.maxRequests }); + this.byOrigin.set(origin, sem); + } + return sem.acquire(signal, getPriority); + } +} + +/** Options for {@link LimitedSource}. */ +interface LimitedSourceOptions { + /** The {@link ConcurrencyLimiter} to gate through. The wrapped source's + * own `url` is passed to `limiter.acquire` for per-origin routing. */ + limiter: ConcurrencyLimiter; + /** Optional dynamic priority for every fetch through this source. The + * limiter re-invokes this callback on each slot-open, so closures over + * dynamic state (e.g. layer viewport center) re-sort the queue when that + * state changes. Lower = serviced sooner. */ + getPriority?: () => Priority; +} + +/** + * Wraps a {@link Source} so every `fetch` holds a {@link ConcurrencyLimiter} + * slot for its duration — acquiring before the read, releasing when it settles + * (resolve or reject). Forwards the read's `signal` to `limiter.acquire`, so a + * request whose caller aborts while it is still queued for a slot is dropped + * before any network I/O fires. + * + * Compose this *beneath* `SourceChunk` / `SourceCache` (i.e. as the + * `SourceView`'s underlying source), so a cache hit short-circuits in + * `SourceCache` and never reaches — never burns a slot on — the limiter: + * + * @example + * ```ts + * import { SourceView } from "@chunkd/source"; + * import { SourceCache, SourceChunk } from "@chunkd/middleware"; + * + * const limited = new LimitedSource(source, { limiter }); + * const view = new SourceView(limited, [ + * new SourceChunk({ size: 64 * 1024 }), + * new SourceCache({ size: 8 * 1024 * 1024 }), + * ]); + * ``` + * + * **Why a source wrapper and not a chunkd `SourceMiddleware`** (which would + * compose more naturally): chunkd's `SourceView` does not forward the request + * `signal` to its middleware, so a middleware cannot observe an abort — only + * the underlying source receives the read options (incl. `signal`) via + * `SourceView`'s terminal handler. Wrapping the source is therefore the only + * layer that can drop a queued request on abort. Revert to a `SourceMiddleware` + * once chunkd forwards the signal (https://github.com/blacha/chunkd/pull/1697); + * tracked in https://github.com/developmentseed/deck.gl-raster/issues/565. + * + * @internal + */ +export class LimitedSource implements Source { + private readonly source: Source; + private readonly limiter: ConcurrencyLimiter; + private readonly getPriority?: () => Priority; + + constructor(source: Source, opts: LimitedSourceOptions) { + this.source = source; + this.limiter = opts.limiter; + this.getPriority = opts.getPriority; + } + + get type(): string { + return this.source.type; + } + + get url(): URL { + return this.source.url; + } + + get metadata(): SourceMetadata | undefined { + return this.source.metadata; + } + + head(options?: { signal: AbortSignal }): Promise { + return this.source.head(options); + } + + async fetch( + offset: number, + length?: number, + options?: { signal: AbortSignal }, + ): Promise { + const release = await this.limiter.acquire( + this.source.url, + options?.signal, + this.getPriority, + ); + try { + return await this.source.fetch(offset, length, options); + } finally { + release(); + } + } +} diff --git a/packages/geotiff/tests/geotiff-concurrency-limiter.test.ts b/packages/geotiff/tests/geotiff-concurrency-limiter.test.ts new file mode 100644 index 00000000..b7d98b92 --- /dev/null +++ b/packages/geotiff/tests/geotiff-concurrency-limiter.test.ts @@ -0,0 +1,185 @@ +/** + * Verifies that GeoTIFF.fromUrl gates network reads through a + * ConcurrencyLimiter when one is supplied. Both the header reads (cache + * misses through the SourceView) and the tile-data reads are gated; cache + * hits short-circuit in SourceCache and never reach the limiter. + * + * The SourceHttp stubbing pattern mirrors fromurl.test.ts. + */ + +import { readFileSync } from "node:fs"; +import { SourceHttp } from "@chunkd/source-http"; +import { afterEach, describe, expect, it } from "vitest"; +import { GeoTIFF } from "../src/geotiff.js"; +import type { ConcurrencyLimiter } from "../src/limiter.js"; +import { PerOriginSemaphore } from "../src/limiter.js"; +import { fixturePath } from "./helpers.js"; + +const FIXTURE = readFileSync( + fixturePath("uint8_rgb_deflate_block64_cog", "rasterio"), +); + +function makeResponse(body: Uint8Array) { + return { + ok: true, + status: 206, + statusText: "", + headers: { + get: (key: string) => + key.toLowerCase() === "content-length" ? String(body.byteLength) : null, + }, + body: null, + arrayBuffer: async () => + body.buffer.slice( + body.byteOffset, + body.byteOffset + body.byteLength, + ) as ArrayBuffer, + }; +} + +function staticFetch(file: Uint8Array) { + return async ( + _url: string | URL, + init?: { method?: string; headers?: Record }, + ) => { + const method = (init?.method ?? "GET").toUpperCase(); + if (method === "HEAD") { + return { + ok: true, + status: 200, + statusText: "", + headers: { + get: (key: string) => + key.toLowerCase() === "content-length" + ? String(file.byteLength) + : null, + }, + body: null, + arrayBuffer: async () => new ArrayBuffer(0), + }; + } + const range = init?.headers?.range ?? ""; + const match = /^bytes=(\d+)-(\d+)?$/.exec(range); + const start = match ? Number(match[1]) : 0; + const end = + match?.[2] != null + ? Math.min(Number(match[2]), file.byteLength - 1) + : file.byteLength - 1; + return makeResponse(file.subarray(start, end + 1)); + }; +} + +describe("GeoTIFF.fromUrl({ concurrencyLimiter })", () => { + const realFetch = SourceHttp.fetch; + afterEach(() => { + SourceHttp.fetch = realFetch; + }); + + it("routes both header and tile-data fetches through the limiter (cache hits skip it)", async () => { + SourceHttp.fetch = staticFetch(FIXTURE) as typeof SourceHttp.fetch; + + const acquired: URL[] = []; + const limiter: ConcurrencyLimiter = { + acquire: async (url) => { + acquired.push(url); + return () => {}; + }, + }; + const url = "https://example.test/cog.tif"; + const tiff = await GeoTIFF.fromUrl(url, { concurrencyLimiter: limiter }); + + // Opening the TIFF reads headers — those network reads (cache misses + // through the SourceView) go through the limiter too. + expect(acquired.length).toBeGreaterThan(0); + const headerCount = acquired.length; + + await tiff.fetchTile(0, 0); + + // The tile fetch added at least one more acquire (the data-source + // path). Every URL must be ours. + expect(acquired.length).toBeGreaterThan(headerCount); + for (const u of acquired) { + expect(u.href).toBe(url); + } + }); + + it("with concurrencyLimiter: null does not wrap (no acquires)", async () => { + SourceHttp.fetch = staticFetch(FIXTURE) as typeof SourceHttp.fetch; + const acquired: URL[] = []; + const limiter: ConcurrencyLimiter = { + acquire: async (url) => { + acquired.push(url); + return () => {}; + }, + }; + // null = explicitly off. + const tiff = await GeoTIFF.fromUrl("https://example.test/cog.tif", { + concurrencyLimiter: null, + }); + await tiff.fetchTile(0, 0); + // Limiter was passed `null`, so `acquired` only contains entries from + // explicit calls — but no one called this `limiter` from anywhere, so + // it must be exactly empty. + expect(acquired).toEqual([]); + // Reference `limiter` so it isn't flagged as unused. + expect(limiter.acquire).toBeDefined(); + }); + + it("drops a queued header read when its signal aborts, without fetching it", async () => { + // One slot per origin, so the second open must queue behind the first. + const limiter = new PerOriginSemaphore({ maxRequests: 1 }); + + const fetched: string[] = []; + let firstHolds!: () => void; + const firstHolding = new Promise((resolve) => { + firstHolds = resolve; + }); + let releaseFirst!: () => void; + const firstReleased = new Promise((resolve) => { + releaseFirst = resolve; + }); + + SourceHttp.fetch = (async ( + url: string | URL, + init?: { method?: string; headers?: Record }, + ) => { + const href = String(url); + fetched.push(href); + if (href.includes("first.tif")) { + // The first open holds the only slot until we release it, so the + // second open's first read must queue in the limiter. + firstHolds(); + await firstReleased; + } + const range = init?.headers?.range ?? ""; + const match = /^bytes=(\d+)-(\d+)?$/.exec(range); + const start = match ? Number(match[1]) : 0; + const end = + match?.[2] != null + ? Math.min(Number(match[2]), FIXTURE.byteLength - 1) + : FIXTURE.byteLength - 1; + return makeResponse(FIXTURE.subarray(start, end + 1)); + }) as typeof SourceHttp.fetch; + + // First open acquires the only slot and parks in its first read. + const first = GeoTIFF.fromUrl("https://ex.test/first.tif", { + concurrencyLimiter: limiter, + }); + await firstHolding; + + // Second open queues behind the first; abort it while it waits. + const ac = new AbortController(); + const second = GeoTIFF.fromUrl("https://ex.test/second.tif", { + concurrencyLimiter: limiter, + signal: ac.signal, + }); + ac.abort(new Error("pan-away")); + + await expect(second).rejects.toThrow(); + // The queued read was dropped before any network fetch for second.tif. + expect(fetched.some((u) => u.includes("second.tif"))).toBe(false); + + releaseFirst(); + await first; + }); +}); diff --git a/packages/geotiff/tests/limiter.test.ts b/packages/geotiff/tests/limiter.test.ts new file mode 100644 index 00000000..9319214e --- /dev/null +++ b/packages/geotiff/tests/limiter.test.ts @@ -0,0 +1,449 @@ +import type { Source } from "@chunkd/source"; +import { describe, expect, it } from "vitest"; +import type { ConcurrencyLimiter, Priority } from "../src/limiter.js"; +import { + LimitedSource, + PerOriginSemaphore, + Semaphore, +} from "../src/limiter.js"; + +describe("Semaphore", () => { + it("allows up to maxRequests concurrent acquires; further acquires queue", async () => { + const sem = new Semaphore({ maxRequests: 2 }); + const a = await sem.acquire(); + const b = await sem.acquire(); + let cResolved = false; + const cPromise = sem.acquire().then((release) => { + cResolved = true; + return release; + }); + // give the microtask queue a chance — c must NOT resolve while a+b hold slots + await new Promise((r) => setTimeout(r, 0)); + expect(cResolved).toBe(false); + a(); + const c = await cPromise; + expect(cResolved).toBe(true); + b(); + c(); + }); + + it("waiters resolve in FIFO order", async () => { + const sem = new Semaphore({ maxRequests: 1 }); + const hold = await sem.acquire(); + const order: number[] = []; + const p1 = sem.acquire().then((r) => { + order.push(1); + r(); + }); + const p2 = sem.acquire().then((r) => { + order.push(2); + r(); + }); + const p3 = sem.acquire().then((r) => { + order.push(3); + r(); + }); + hold(); + await Promise.all([p1, p2, p3]); + expect(order).toEqual([1, 2, 3]); + }); + + it("acquire(signal) with already-aborted signal rejects and consumes no slot", async () => { + const sem = new Semaphore({ maxRequests: 1 }); + const ac = new AbortController(); + ac.abort(new Error("nope")); + await expect(sem.acquire(ac.signal)).rejects.toThrow("nope"); + // The slot was never consumed — a fresh acquire should resolve immediately. + const release = await sem.acquire(); + expect(typeof release).toBe("function"); + release(); + }); + + it("aborting a queued acquire rejects it and frees its queue slot", async () => { + const sem = new Semaphore({ maxRequests: 1 }); + const hold = await sem.acquire(); + const ac = new AbortController(); + const queued = sem.acquire(ac.signal); + ac.abort(new Error("pan-away")); + await expect(queued).rejects.toThrow("pan-away"); + // A fresh acquire (no signal) should be next-in-line, not blocked behind the aborted one. + let nextResolved = false; + const next = sem.acquire().then((r) => { + nextResolved = true; + return r; + }); + hold(); + await next; + expect(nextResolved).toBe(true); + }); + + it("orders queued waiters by priority (lower = sooner)", async () => { + const sem = new Semaphore({ maxRequests: 1 }); + const hold = await sem.acquire(); + const order: number[] = []; + // Queue in arrival order [c, a, b] but priorities say a < b < c. + const c = sem + .acquire(undefined, () => 3) + .then((r) => { + order.push(3); + r(); + }); + const a = sem + .acquire(undefined, () => 1) + .then((r) => { + order.push(1); + r(); + }); + const b = sem + .acquire(undefined, () => 2) + .then((r) => { + order.push(2); + r(); + }); + hold(); + await Promise.all([a, b, c]); + expect(order).toEqual([1, 2, 3]); + }); + + it("FIFO tiebreak among waiters with the same priority", async () => { + const sem = new Semaphore({ maxRequests: 1 }); + const hold = await sem.acquire(); + const order: number[] = []; + const p1 = sem + .acquire(undefined, () => 5) + .then((r) => { + order.push(1); + r(); + }); + const p2 = sem + .acquire(undefined, () => 5) + .then((r) => { + order.push(2); + r(); + }); + const p3 = sem + .acquire(undefined, () => 5) + .then((r) => { + order.push(3); + r(); + }); + hold(); + await Promise.all([p1, p2, p3]); + expect(order).toEqual([1, 2, 3]); + }); + + it("re-evaluates getPriority on every slot-open (dynamic priority)", async () => { + const sem = new Semaphore({ maxRequests: 1 }); + const hold = await sem.acquire(); + // Two waiters; each reads from a shared mutable state. + const prio = { a: 10, b: 1 }; + const order: string[] = []; + const aPromise = sem + .acquire(undefined, () => prio.a) + .then((r) => { + order.push("a"); + r(); + }); + const bPromise = sem + .acquire(undefined, () => prio.b) + .then((r) => { + order.push("b"); + r(); + }); + // Right now b's priority (1) < a's (10), so on the first release b wins. + hold(); + // Wait for b to finish. + await bPromise; + // Now flip priorities BEFORE a gets serviced. Only a is in the queue, so + // there's no contender, but this exercises that getPriority is read fresh + // on each call rather than memoised at acquire time. + prio.a = 0; + await aPromise; + expect(order).toEqual(["b", "a"]); + }); + + it("sorts tuple priorities lexicographically with missing trailing elements as 0", async () => { + const sem = new Semaphore({ maxRequests: 1 }); + const hold = await sem.acquire(); + const order: string[] = []; + // [5, 3] vs [5, 1] vs [5] — second-element decides; [5] = [5, 0] (smallest). + const p53 = sem + .acquire(undefined, () => [5, 3] as const) + .then((r) => { + order.push("[5,3]"); + r(); + }); + const p51 = sem + .acquire(undefined, () => [5, 1] as const) + .then((r) => { + order.push("[5,1]"); + r(); + }); + const p5 = sem + .acquire(undefined, () => [5] as const) + .then((r) => { + order.push("[5]"); + r(); + }); + hold(); + await Promise.all([p5, p51, p53]); + expect(order).toEqual(["[5]", "[5,1]", "[5,3]"]); + }); + + it("mixes number and tuple priorities — number is treated as 1-tuple", async () => { + const sem = new Semaphore({ maxRequests: 1 }); + const hold = await sem.acquire(); + const order: string[] = []; + // priority 3 (= [3]) and [3, 5] tie on first element; second decides — [3] (= [3,0]) wins. + const p3tuple = sem + .acquire(undefined, () => [3, 5] as const) + .then((r) => { + order.push("[3,5]"); + r(); + }); + const p3num = sem + .acquire(undefined, () => 3) + .then((r) => { + order.push("3"); + r(); + }); + hold(); + await Promise.all([p3num, p3tuple]); + expect(order).toEqual(["3", "[3,5]"]); + }); + + it("treats omitted getPriority as priority 0 (so unprio'd waiters lead the queue)", async () => { + const sem = new Semaphore({ maxRequests: 1 }); + const hold = await sem.acquire(); + const order: string[] = []; + // priority 5 first arrival; no-priority second arrival. No-prio = 0 < 5 → wins. + const p5 = sem + .acquire(undefined, () => 5) + .then((r) => { + order.push("5"); + r(); + }); + const pNone = sem.acquire().then((r) => { + order.push("none"); + r(); + }); + hold(); + await Promise.all([p5, pNone]); + expect(order).toEqual(["none", "5"]); + }); +}); + +describe("PerOriginSemaphore", () => { + const A = new URL("https://a.example.com/file-1.tif"); + const A2 = new URL("https://a.example.com/file-2.tif"); + const B = new URL("https://b.example.com/file-1.tif"); + + it("implements ConcurrencyLimiter", () => { + const limiter: ConcurrencyLimiter = new PerOriginSemaphore({ + maxRequests: 2, + }); + expect(typeof limiter.acquire).toBe("function"); + }); + + it("acquire/release works for one origin", async () => { + const limiter = new PerOriginSemaphore({ maxRequests: 1 }); + const release = await limiter.acquire(A); + let secondResolved = false; + const second = limiter.acquire(A2).then((r) => { + secondResolved = true; + return r; + }); + await new Promise((r) => setTimeout(r, 0)); + expect(secondResolved).toBe(false); // same origin, queued + release(); + (await second)(); + }); + + it("different origins don't compete: saturating origin A doesn't block origin B", async () => { + const limiter = new PerOriginSemaphore({ maxRequests: 1 }); + const holdA = await limiter.acquire(A); + // origin A is saturated. origin B should still grant immediately. + let bResolved = false; + const b = limiter.acquire(B).then((r) => { + bResolved = true; + return r; + }); + await new Promise((r) => setTimeout(r, 0)); + expect(bResolved).toBe(true); + holdA(); + (await b)(); + }); + + it("same origin URLs with different paths share one pool", async () => { + const limiter = new PerOriginSemaphore({ maxRequests: 1 }); + const holdA1 = await limiter.acquire(A); + let a2Resolved = false; + const a2 = limiter.acquire(A2).then((r) => { + a2Resolved = true; + return r; + }); + await new Promise((r) => setTimeout(r, 0)); + expect(a2Resolved).toBe(false); + holdA1(); + (await a2)(); + }); + + it("mints a new per-origin Semaphore lazily on first acquire", async () => { + const limiter = new PerOriginSemaphore({ maxRequests: 1 }); + // Saturate origin A. + const hold = await limiter.acquire(A); + // A brand-new origin C should resolve immediately even though A is full. + const C = new URL("https://c.example.com/file.tif"); + const release = await limiter.acquire(C); + expect(typeof release).toBe("function"); + release(); + hold(); + }); + + it("forwards getPriority to the per-origin Semaphore", async () => { + const limiter = new PerOriginSemaphore({ maxRequests: 1 }); + const hold = await limiter.acquire(A); + const order: string[] = []; + const low = limiter + .acquire(A, undefined, () => 99) + .then((r) => { + order.push("low"); + r(); + }); + const high = limiter + .acquire(A2, undefined, () => 1) + .then((r) => { + order.push("high"); + r(); + }); + hold(); + await Promise.all([low, high]); + expect(order).toEqual(["high", "low"]); + }); +}); + +describe("LimitedSource", () => { + const URL_A = new URL("https://a.example.com/cog.tif"); + + /** A minimal recording {@link Source} for wrapping. */ + function fakeSource( + fetchImpl: Source["fetch"] = async () => new ArrayBuffer(0), + ): Source { + return { + type: "test", + url: URL_A, + metadata: { size: 1024 }, + head: async () => ({ size: 1024 }), + fetch: fetchImpl, + }; + } + + it("acquires a slot before fetching and releases after", async () => { + const order: string[] = []; + const limiter: ConcurrencyLimiter = { + acquire: async () => { + order.push("acquire"); + return () => order.push("release"); + }, + }; + const limited = new LimitedSource( + fakeSource(async () => { + order.push("fetch"); + return new ArrayBuffer(0); + }), + { limiter }, + ); + await limited.fetch(0, 4); + expect(order).toEqual(["acquire", "fetch", "release"]); + }); + + it("forwards offset/length/options to the wrapped source's fetch", async () => { + const calls: Array<[number, number | undefined, unknown]> = []; + const limiter: ConcurrencyLimiter = { acquire: async () => () => {} }; + const signal = new AbortController().signal; + const limited = new LimitedSource( + fakeSource(async (offset, length, options) => { + calls.push([offset, length, options]); + return new ArrayBuffer(0); + }), + { limiter }, + ); + await limited.fetch(100, 200, { signal }); + expect(calls).toEqual([[100, 200, { signal }]]); + }); + + it("releases the slot when the wrapped fetch rejects (and propagates)", async () => { + const sem = new Semaphore({ maxRequests: 1 }); + const limiter: ConcurrencyLimiter = { + acquire: (_url, signal) => sem.acquire(signal), + }; + const limited = new LimitedSource( + fakeSource(async () => { + throw new Error("network down"); + }), + { limiter }, + ); + await expect(limited.fetch(0, 4)).rejects.toThrow("network down"); + // Slot was released — a second fetch (with a source that resolves) must + // not hang. + const ok = new LimitedSource(fakeSource(), { limiter }); + await ok.fetch(0, 4); + }); + + it("forwards the signal to limiter.acquire so a queued abort drops the read before fetching", async () => { + const sem = new Semaphore({ maxRequests: 1 }); + const limiter: ConcurrencyLimiter = { + acquire: (_url, signal) => sem.acquire(signal), + }; + // Saturate the semaphore so the next acquire queues. + const hold = await sem.acquire(); + let fetched = false; + const limited = new LimitedSource( + fakeSource(async () => { + fetched = true; + return new ArrayBuffer(0); + }), + { limiter }, + ); + const ac = new AbortController(); + const pending = limited.fetch(0, 8, { signal: ac.signal }); + ac.abort(new Error("pan-away")); + await expect(pending).rejects.toThrow("pan-away"); + expect(fetched).toBe(false); + hold(); + }); + + it("delegates type/url/metadata/head and routes acquire on the source's url", async () => { + const acquired: URL[] = []; + const limiter: ConcurrencyLimiter = { + acquire: async (url) => { + acquired.push(url); + return () => {}; + }, + }; + const source = fakeSource(); + const limited = new LimitedSource(source, { limiter }); + expect(limited.type).toBe(source.type); + expect(limited.url).toBe(source.url); + expect(limited.metadata).toBe(source.metadata); + expect(await limited.head()).toEqual({ size: 1024 }); + await limited.fetch(0, 4); + expect(acquired).toEqual([source.url]); + }); + + it("threads getPriority through to limiter.acquire", async () => { + const priorities: Array = []; + const limiter: ConcurrencyLimiter = { + acquire: async (_url, _signal, getPriority) => { + priorities.push(getPriority?.()); + return () => {}; + }, + }; + const limited = new LimitedSource(fakeSource(), { + limiter, + getPriority: () => [2, 7], + }); + await limited.fetch(0, 4); + expect(priorities).toEqual([[2, 7]]); + }); +});