From 508714315cc69e9010f3435031b6c14420130a10 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 26 May 2026 14:12:53 -0400 Subject: [PATCH] fix(geotiff): revert ConcurrencyLimiter to a chunkd SourceMiddleware chunkd#1697 shipped (@chunkd/source 11.4.1, @chunkd/middleware 11.3.1), so SourceView now forwards the request signal to middleware and SourceChunk forwards it on multi-chunk sub-requests. That removes the reason for the LimitedSource source-wrapper workaround (#557): a middleware can now observe an abort, so a request whose caller aborts while queued for a slot is dropped before any network I/O. Replace the internal LimitedSource with LimiterMiddleware again, composed into the header SourceView *after* SourceChunk/SourceCache (so cache hits short-circuit and never burn a slot) and into a tiny SourceView wrapping the data source. Bump the @chunkd/source, @chunkd/source-http, and @chunkd/middleware floors to the fixed releases. Internal-only: GeoTIFF.fromUrl options, ConcurrencyLimiter, Priority, and PerOriginSemaphore are unchanged. The integration test that aborts a queued header read through the real fromUrl -> SourceView -> limiter path is unchanged and still passes, proving the signal now reaches the middleware end to end. Closes #565. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../2026-05-19-concurrency-limiter-design.md | 2 - packages/geotiff/package.json | 6 +- packages/geotiff/src/geotiff.ts | 42 ++--- packages/geotiff/src/limiter.ts | 87 ++++------ packages/geotiff/tests/limiter.test.ts | 157 +++++++++--------- pnpm-lock.yaml | 46 +++-- 6 files changed, 163 insertions(+), 177 deletions(-) diff --git a/dev-docs/specs/2026-05-19-concurrency-limiter-design.md b/dev-docs/specs/2026-05-19-concurrency-limiter-design.md index fc846e17..61d3404e 100644 --- a/dev-docs/specs/2026-05-19-concurrency-limiter-design.md +++ b/dev-docs/specs/2026-05-19-concurrency-limiter-design.md @@ -112,8 +112,6 @@ class LimiterMiddleware implements SourceMiddleware { `LimiterMiddleware` is also internal: the limiter is wired by `GeoTIFF.fromUrl` (where chunkd's `Source` / `SourceView` types are already in scope), so callers don't need to compose middleware themselves. -> **Implementation note (temporary).** The `SourceMiddleware` shape above is the intended design, but it doesn't work yet: chunkd's `SourceView` doesn't forward the request `signal` to middleware, so a middleware can't observe an abort (only the underlying source receives the read options via `SourceView`'s terminal handler). Until that's fixed upstream ([chunkd#1697](https://github.com/blacha/chunkd/pull/1697)), the limiter is implemented as an internal **source wrapper** — `LimitedSource` — composed *beneath* `SourceChunk` / `SourceCache` (as the `SourceView`'s source) rather than as a middleware on top. Cache hits still short-circuit in `SourceCache` before reaching it. This is internal-only; the public API is unchanged either way. Revert to `LimiterMiddleware` once chunkd ships the fix — tracked in [#565](https://github.com/developmentseed/deck.gl-raster/issues/565). - ## Integration ### `@developmentseed/geotiff` diff --git a/packages/geotiff/package.json b/packages/geotiff/package.json index 91f882c4..3a444ddc 100644 --- a/packages/geotiff/package.json +++ b/packages/geotiff/package.json @@ -50,9 +50,9 @@ "extends": "../../package.json" }, "dependencies": { - "@chunkd/middleware": "^11.3.0", - "@chunkd/source": "^11.4.0", - "@chunkd/source-http": "^11.4.0", + "@chunkd/middleware": "^11.3.1", + "@chunkd/source": "^11.4.1", + "@chunkd/source-http": "^11.4.1", "@chunkd/source-memory": "^11.2.0", "@cogeotiff/core": "^9.5.0", "@developmentseed/affine": "workspace:^", diff --git a/packages/geotiff/src/geotiff.ts b/packages/geotiff/src/geotiff.ts index c209e5e3..f23b0a17 100644 --- a/packages/geotiff/src/geotiff.ts +++ b/packages/geotiff/src/geotiff.ts @@ -13,7 +13,7 @@ import { parseGDALMetadata } from "./gdal-metadata.js"; import type { CachedTags, GeoKeyDirectory } from "./ifd.js"; import { extractGeoKeyDirectory, prefetchTags } from "./ifd.js"; import type { ConcurrencyLimiter, Priority } from "./limiter.js"; -import { LimitedSource } from "./limiter.js"; +import { LimiterMiddleware } from "./limiter.js"; import { Overview } from "./overview.js"; import type { DecoderPool } from "./pool/pool.js"; import type { Tile } from "./tile.js"; @@ -327,30 +327,32 @@ export class GeoTIFF { // unbounded length. Remove once the upstream fix lands. source.metadata = { size: Number.POSITIVE_INFINITY }; - // When a limiter is supplied, gate every network read through it by - // wrapping the raw source. The header `SourceView` composes SourceChunk + - // SourceCache *on top* of this wrapped source, so a cache hit - // short-circuits in SourceCache and never reaches — never burns a slot on - // — the limiter; only reads that escape the cache (and every data read, - // which bypasses the cache) are gated. The same wrapped source backs both - // the header view and the data source, so both share one per-origin pool. - // - // Gating here as a source wrapper rather than a chunkd SourceMiddleware is - // a workaround for chunkd not forwarding the abort signal to middleware; - // see LimitedSource. Once that's fixed upstream this can become a - // middleware again. Tracked in - // https://github.com/developmentseed/deck.gl-raster/issues/565 - const limitedSource = concurrencyLimiter - ? new LimitedSource(source, { limiter: concurrencyLimiter, getPriority }) - : source; - - const view = new SourceView(limitedSource, [ + // When a limiter is supplied, slot a LimiterMiddleware into both + // sources' middleware stacks. On the header source, it sits *after* + // SourceChunk + SourceCache so a cache hit short-circuits and never + // consumes a slot — only network reads that escape the cache are gated. + // On the data source (no caching), every fetch is gated. Both stacks + // share one LimiterMiddleware instance, so they share one per-origin pool. + const limiter = concurrencyLimiter + ? new LimiterMiddleware({ + url: new URL(url), + limiter: concurrencyLimiter, + getPriority, + }) + : null; + + const view = new SourceView(source, [ new SourceChunk({ size: chunkSize }), new SourceCache({ size: cacheSize }), + ...(limiter ? [limiter] : []), ]); + const dataSource: Pick = limiter + ? new SourceView(source, [limiter]) + : source; + return await GeoTIFF.open({ - dataSource: limitedSource, + dataSource, headerSource: view, signal, debug, diff --git a/packages/geotiff/src/limiter.ts b/packages/geotiff/src/limiter.ts index 06083473..93cb1bf0 100644 --- a/packages/geotiff/src/limiter.ts +++ b/packages/geotiff/src/limiter.ts @@ -1,4 +1,8 @@ -import type { Source, SourceMetadata } from "@chunkd/source"; +import type { + SourceCallback, + SourceMiddleware, + SourceRequest, +} from "@chunkd/source"; /** * Numeric priority used to order waiters in a {@link Semaphore}'s queue. Lower @@ -63,7 +67,7 @@ interface Waiter { /** * Counting semaphore with abort-aware acquire and dynamic priority. Internal - * primitive used by {@link PerOriginSemaphore} and {@link LimitedSource}. + * primitive used by {@link PerOriginSemaphore} and {@link LimiterMiddleware}. * * Hands out up to `maxRequests` concurrent slots. Further `acquire()`s queue. * On every slot-open, the queue is searched for the lowest-priority waiter @@ -222,12 +226,15 @@ export class PerOriginSemaphore implements ConcurrencyLimiter { } } -/** Options for {@link LimitedSource}. */ -interface LimitedSourceOptions { - /** The {@link ConcurrencyLimiter} to gate through. The wrapped source's - * own `url` is passed to `limiter.acquire` for per-origin routing. */ +/** Options for {@link LimiterMiddleware}. */ +interface LimiterMiddlewareOptions { + /** The URL the wrapped source is reading from. Passed to + * `limiter.acquire(url, signal?)` on every fetch — the limiter uses it for + * per-origin routing. */ + url: URL; + /** The {@link ConcurrencyLimiter} to gate through. */ limiter: ConcurrencyLimiter; - /** Optional dynamic priority for every fetch through this source. The + /** Optional dynamic priority for every fetch through this middleware. The * limiter re-invokes this callback on each slot-open, so closures over * dynamic state (e.g. layer viewport center) re-sort the queue when that * state changes. Lower = serviced sooner. */ @@ -235,78 +242,50 @@ interface LimitedSourceOptions { } /** - * Wraps a {@link Source} so every `fetch` holds a {@link ConcurrencyLimiter} - * slot for its duration — acquiring before the read, releasing when it settles - * (resolve or reject). Forwards the read's `signal` to `limiter.acquire`, so a - * request whose caller aborts while it is still queued for a slot is dropped - * before any network I/O fires. + * chunkd middleware that holds a {@link ConcurrencyLimiter} slot for the + * duration of each underlying `fetch` — releasing on resolve, on reject, and + * never otherwise interfering. Forwards the request's `signal` to + * `limiter.acquire`, so if the caller aborts while the call is queued the + * request is dropped before any network I/O fires. * - * Compose this *beneath* `SourceChunk` / `SourceCache` (i.e. as the - * `SourceView`'s underlying source), so a cache hit short-circuits in - * `SourceCache` and never reaches — never burns a slot on — the limiter: + * Composed into a {@link SourceView}'s middleware list alongside the chunkd + * middlewares (`SourceChunk`, `SourceCache`, …). Place it after caching so + * cache hits don't burn a slot. * * @example * ```ts * import { SourceView } from "@chunkd/source"; * import { SourceCache, SourceChunk } from "@chunkd/middleware"; * - * const limited = new LimitedSource(source, { limiter }); - * const view = new SourceView(limited, [ + * const view = new SourceView(source, [ * new SourceChunk({ size: 64 * 1024 }), * new SourceCache({ size: 8 * 1024 * 1024 }), + * new LimiterMiddleware({ url, limiter }), * ]); * ``` * - * **Why a source wrapper and not a chunkd `SourceMiddleware`** (which would - * compose more naturally): chunkd's `SourceView` does not forward the request - * `signal` to its middleware, so a middleware cannot observe an abort — only - * the underlying source receives the read options (incl. `signal`) via - * `SourceView`'s terminal handler. Wrapping the source is therefore the only - * layer that can drop a queued request on abort. Revert to a `SourceMiddleware` - * once chunkd forwards the signal (https://github.com/blacha/chunkd/pull/1697); - * tracked in https://github.com/developmentseed/deck.gl-raster/issues/565. - * * @internal */ -export class LimitedSource implements Source { - private readonly source: Source; +export class LimiterMiddleware implements SourceMiddleware { + readonly name = "limiter"; + private readonly url: URL; private readonly limiter: ConcurrencyLimiter; private readonly getPriority?: () => Priority; - constructor(source: Source, opts: LimitedSourceOptions) { - this.source = source; + constructor(opts: LimiterMiddlewareOptions) { + this.url = opts.url; this.limiter = opts.limiter; this.getPriority = opts.getPriority; } - get type(): string { - return this.source.type; - } - - get url(): URL { - return this.source.url; - } - - get metadata(): SourceMetadata | undefined { - return this.source.metadata; - } - - head(options?: { signal: AbortSignal }): Promise { - return this.source.head(options); - } - - async fetch( - offset: number, - length?: number, - options?: { signal: AbortSignal }, - ): Promise { + async fetch(req: SourceRequest, next: SourceCallback): Promise { const release = await this.limiter.acquire( - this.source.url, - options?.signal, + this.url, + req.signal, this.getPriority, ); try { - return await this.source.fetch(offset, length, options); + return await next(req); } finally { release(); } diff --git a/packages/geotiff/tests/limiter.test.ts b/packages/geotiff/tests/limiter.test.ts index 9319214e..3e5e56da 100644 --- a/packages/geotiff/tests/limiter.test.ts +++ b/packages/geotiff/tests/limiter.test.ts @@ -1,8 +1,8 @@ -import type { Source } from "@chunkd/source"; +import type { SourceCallback, SourceRequest } from "@chunkd/source"; import { describe, expect, it } from "vitest"; import type { ConcurrencyLimiter, Priority } from "../src/limiter.js"; import { - LimitedSource, + LimiterMiddleware, PerOriginSemaphore, Semaphore, } from "../src/limiter.js"; @@ -322,23 +322,15 @@ describe("PerOriginSemaphore", () => { }); }); -describe("LimitedSource", () => { +describe("LimiterMiddleware", () => { const URL_A = new URL("https://a.example.com/cog.tif"); + const REQ: SourceRequest = { + source: {} as never, + offset: 0, + length: 4, + }; - /** A minimal recording {@link Source} for wrapping. */ - function fakeSource( - fetchImpl: Source["fetch"] = async () => new ArrayBuffer(0), - ): Source { - return { - type: "test", - url: URL_A, - metadata: { size: 1024 }, - head: async () => ({ size: 1024 }), - fetch: fetchImpl, - }; - } - - it("acquires a slot before fetching and releases after", async () => { + it("only invokes `next` after acquiring a slot, and releases after", async () => { const order: string[] = []; const limiter: ConcurrencyLimiter = { acquire: async () => { @@ -346,104 +338,105 @@ describe("LimitedSource", () => { return () => order.push("release"); }, }; - const limited = new LimitedSource( - fakeSource(async () => { - order.push("fetch"); - return new ArrayBuffer(0); - }), - { limiter }, - ); - await limited.fetch(0, 4); - expect(order).toEqual(["acquire", "fetch", "release"]); + const mw = new LimiterMiddleware({ url: URL_A, limiter }); + const next: SourceCallback = async () => { + order.push("next"); + return new ArrayBuffer(0); + }; + await mw.fetch(REQ, next); + expect(order).toEqual(["acquire", "next", "release"]); }); - it("forwards offset/length/options to the wrapped source's fetch", async () => { - const calls: Array<[number, number | undefined, unknown]> = []; - const limiter: ConcurrencyLimiter = { acquire: async () => () => {} }; + it("forwards req to `next` unchanged", async () => { + const calls: SourceRequest[] = []; + const limiter: ConcurrencyLimiter = { + acquire: async () => () => {}, + }; + const mw = new LimiterMiddleware({ url: URL_A, limiter }); const signal = new AbortController().signal; - const limited = new LimitedSource( - fakeSource(async (offset, length, options) => { - calls.push([offset, length, options]); - return new ArrayBuffer(0); - }), - { limiter }, - ); - await limited.fetch(100, 200, { signal }); - expect(calls).toEqual([[100, 200, { signal }]]); + const req: SourceRequest = { + source: {} as never, + offset: 100, + length: 200, + signal, + }; + const next: SourceCallback = async (r) => { + calls.push(r); + return new ArrayBuffer(0); + }; + await mw.fetch(req, next); + expect(calls).toEqual([req]); }); - it("releases the slot when the wrapped fetch rejects (and propagates)", async () => { + it("releases the slot when `next` rejects (and propagates the error)", async () => { const sem = new Semaphore({ maxRequests: 1 }); const limiter: ConcurrencyLimiter = { acquire: (_url, signal) => sem.acquire(signal), }; - const limited = new LimitedSource( - fakeSource(async () => { + const mw = new LimiterMiddleware({ url: URL_A, limiter }); + await expect( + mw.fetch(REQ, async () => { throw new Error("network down"); }), - { limiter }, - ); - await expect(limited.fetch(0, 4)).rejects.toThrow("network down"); - // Slot was released — a second fetch (with a source that resolves) must - // not hang. - const ok = new LimitedSource(fakeSource(), { limiter }); - await ok.fetch(0, 4); + ).rejects.toThrow("network down"); + // Slot was released — a second fetch must not hang. + await mw.fetch(REQ, async () => new ArrayBuffer(0)); }); - it("forwards the signal to limiter.acquire so a queued abort drops the read before fetching", async () => { + it("forwards req.signal to limiter.acquire so a queued abort drops the call", async () => { const sem = new Semaphore({ maxRequests: 1 }); const limiter: ConcurrencyLimiter = { acquire: (_url, signal) => sem.acquire(signal), }; // Saturate the semaphore so the next acquire queues. const hold = await sem.acquire(); - let fetched = false; - const limited = new LimitedSource( - fakeSource(async () => { - fetched = true; - return new ArrayBuffer(0); - }), - { limiter }, - ); + let nextCalled = false; + const mw = new LimiterMiddleware({ url: URL_A, limiter }); const ac = new AbortController(); - const pending = limited.fetch(0, 8, { signal: ac.signal }); + const req: SourceRequest = { + source: {} as never, + offset: 0, + length: 8, + signal: ac.signal, + }; + const pending = mw.fetch(req, async () => { + nextCalled = true; + return new ArrayBuffer(0); + }); ac.abort(new Error("pan-away")); await expect(pending).rejects.toThrow("pan-away"); - expect(fetched).toBe(false); + expect(nextCalled).toBe(false); hold(); }); - it("delegates type/url/metadata/head and routes acquire on the source's url", async () => { - const acquired: URL[] = []; - const limiter: ConcurrencyLimiter = { - acquire: async (url) => { - acquired.push(url); - return () => {}; - }, - }; - const source = fakeSource(); - const limited = new LimitedSource(source, { limiter }); - expect(limited.type).toBe(source.type); - expect(limited.url).toBe(source.url); - expect(limited.metadata).toBe(source.metadata); - expect(await limited.head()).toEqual({ size: 1024 }); - await limited.fetch(0, 4); - expect(acquired).toEqual([source.url]); + it("has the expected SourceMiddleware shape (name + fetch)", () => { + const mw = new LimiterMiddleware({ + url: URL_A, + limiter: { acquire: async () => () => {} }, + }); + expect(mw.name).toBe("limiter"); + expect(typeof mw.fetch).toBe("function"); }); - it("threads getPriority through to limiter.acquire", async () => { - const priorities: Array = []; + it("threads getPriority from constructor through to limiter.acquire", async () => { + const calls: Array<{ + url: URL; + signal?: AbortSignal; + priority: Priority | undefined; + }> = []; const limiter: ConcurrencyLimiter = { - acquire: async (_url, _signal, getPriority) => { - priorities.push(getPriority?.()); + acquire: async (url, signal, getPriority) => { + calls.push({ url, signal, priority: getPriority?.() }); return () => {}; }, }; - const limited = new LimitedSource(fakeSource(), { + const mw = new LimiterMiddleware({ + url: URL_A, limiter, getPriority: () => [2, 7], }); - await limited.fetch(0, 4); - expect(priorities).toEqual([[2, 7]]); + await mw.fetch(REQ, async () => new ArrayBuffer(0)); + expect(calls).toHaveLength(1); + expect(calls[0]!.priority).toEqual([2, 7]); }); }); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 74eff44e..69f9e273 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1016,14 +1016,14 @@ importers: packages/geotiff: dependencies: '@chunkd/middleware': - specifier: ^11.3.0 - version: 11.3.0 + specifier: ^11.3.1 + version: 11.3.1 '@chunkd/source': - specifier: ^11.4.0 - version: 11.4.0 + specifier: ^11.4.1 + version: 11.4.1 '@chunkd/source-http': - specifier: ^11.4.0 - version: 11.4.0 + specifier: ^11.4.1 + version: 11.4.1 '@chunkd/source-memory': specifier: ^11.2.0 version: 11.2.0 @@ -1054,7 +1054,7 @@ importers: devDependencies: '@chunkd/source-file': specifier: ^11.2.0 - version: 11.2.0 + version: 11.2.1 '@developmentseed/geotiff': specifier: workspace:^ version: 'link:' @@ -1911,16 +1911,20 @@ packages: react: '>=18' react-dom: '>=18' - '@chunkd/middleware@11.3.0': - resolution: {integrity: sha512-9AzKHP4zX3DUawwJY4wOCCNSPJTmGQc6y1rABsLmoUQ6F5lN73/1u2FkQB0ihNIzzLYz2j86nwvdJSA6GArrtQ==} + '@chunkd/middleware@11.3.1': + resolution: {integrity: sha512-BMvxd2Zfb0zhcZUjLera/9ozt1CKcpS71LjekNtJwOxeMN4AJ4DJK8gev9erHX5ZnwBsjbq1VnZSm2TxEFhv1Q==} engines: {node: '>=16.0.0'} '@chunkd/source-file@11.2.0': resolution: {integrity: sha512-QQpVYMELqzjLfn2T+S3s2sVLt5KfI0zHas50Ce8g/o5hYiVraukjTTUCxgbKYJITwIcwARuTxY8rIbKMGdj2RA==} engines: {node: '>=16.0.0'} - '@chunkd/source-http@11.4.0': - resolution: {integrity: sha512-ZkekctSU+7m8SNWxvZNBJA4Uw+eOqhaQq9Sy6gYLMeoZCTWDleID21zjWSbh/4tuSoTN/jiA/U1N9uljWjEY/g==} + '@chunkd/source-file@11.2.1': + resolution: {integrity: sha512-+WV6kCcd4ehe0/9nd7VCUBFOJPtP+X0OHohlZlr1RsM8LnMBd0+Jo/CUwrlStmGAUgU4T3qSdbUE/cwCODXZNQ==} + engines: {node: '>=16.0.0'} + + '@chunkd/source-http@11.4.1': + resolution: {integrity: sha512-h/7tWxFh6EUh5k/pcYh66WSwiVbIWM6mCCLH0TZpBSWzgekjOXqXkEMRUBXrhekqrmtPQdTPro5e7tnknIdESg==} engines: {node: '>=18.0.0'} '@chunkd/source-memory@11.2.0': @@ -1931,6 +1935,10 @@ packages: resolution: {integrity: sha512-UVvihfAoGXYqH8sLJBHL7w3sKphIQpzoldfutZa7Y8UvaMqKxsIxOp8FbmxfCA2gd6EDLwBJaDA6HfkTIu1jmA==} engines: {node: '>=16.0.0'} + '@chunkd/source@11.4.1': + resolution: {integrity: sha512-R4/gIKbx059Am4UOYplEjKkE6g9jhDQF4M7jKNkMZR8Toeudr9vQl0z9ppb9fkTQIuvW8oKjO0Hc5/s4RCCCzg==} + engines: {node: '>=16.0.0'} + '@cogeotiff/core@9.5.0': resolution: {integrity: sha512-BsBlk8UGq9b8aIlUliKweS2V48KK8O3xORjd+IZLBxErr0kHP1WmWnqBRG+JcmNGoPMaE4lUHl/YVbY9UHLr+A==} engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0} @@ -10276,24 +10284,30 @@ snapshots: react: 19.2.5 react-dom: 19.2.5(react@19.2.5) - '@chunkd/middleware@11.3.0': + '@chunkd/middleware@11.3.1': dependencies: - '@chunkd/source': 11.4.0 + '@chunkd/source': 11.4.1 '@chunkd/source-file@11.2.0': dependencies: '@chunkd/source': 11.4.0 - '@chunkd/source-http@11.4.0': + '@chunkd/source-file@11.2.1': dependencies: - '@chunkd/source': 11.4.0 + '@chunkd/source': 11.4.1 + + '@chunkd/source-http@11.4.1': + dependencies: + '@chunkd/source': 11.4.1 '@chunkd/source-memory@11.2.0': dependencies: - '@chunkd/source': 11.4.0 + '@chunkd/source': 11.4.1 '@chunkd/source@11.4.0': {} + '@chunkd/source@11.4.1': {} + '@cogeotiff/core@9.5.0': {} '@colors/colors@1.5.0':