Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions dev-docs/specs/2026-05-19-concurrency-limiter-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,6 @@ class LimiterMiddleware implements SourceMiddleware {

`LimiterMiddleware` is also internal: the limiter is wired by `GeoTIFF.fromUrl` (where chunkd's `Source` / `SourceView` types are already in scope), so callers don't need to compose middleware themselves.

> **Implementation note (temporary).** The `SourceMiddleware` shape above is the intended design, but it doesn't work yet: chunkd's `SourceView` doesn't forward the request `signal` to middleware, so a middleware can't observe an abort (only the underlying source receives the read options via `SourceView`'s terminal handler). Until that's fixed upstream ([chunkd#1697](https://github.com/blacha/chunkd/pull/1697)), the limiter is implemented as an internal **source wrapper** — `LimitedSource` — composed *beneath* `SourceChunk` / `SourceCache` (as the `SourceView`'s source) rather than as a middleware on top. Cache hits still short-circuit in `SourceCache` before reaching it. This is internal-only; the public API is unchanged either way. Revert to `LimiterMiddleware` once chunkd ships the fix — tracked in [#565](https://github.com/developmentseed/deck.gl-raster/issues/565).

## Integration

### `@developmentseed/geotiff`
Expand Down
6 changes: 3 additions & 3 deletions packages/geotiff/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@
"extends": "../../package.json"
},
"dependencies": {
"@chunkd/middleware": "^11.3.0",
"@chunkd/source": "^11.4.0",
"@chunkd/source-http": "^11.4.0",
"@chunkd/middleware": "^11.3.1",
"@chunkd/source": "^11.4.1",
"@chunkd/source-http": "^11.4.1",
"@chunkd/source-memory": "^11.2.0",
"@cogeotiff/core": "^9.5.0",
"@developmentseed/affine": "workspace:^",
Expand Down
42 changes: 22 additions & 20 deletions packages/geotiff/src/geotiff.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { parseGDALMetadata } from "./gdal-metadata.js";
import type { CachedTags, GeoKeyDirectory } from "./ifd.js";
import { extractGeoKeyDirectory, prefetchTags } from "./ifd.js";
import type { ConcurrencyLimiter, Priority } from "./limiter.js";
import { LimitedSource } from "./limiter.js";
import { LimiterMiddleware } from "./limiter.js";
import { Overview } from "./overview.js";
import type { DecoderPool } from "./pool/pool.js";
import type { Tile } from "./tile.js";
Expand Down Expand Up @@ -327,30 +327,32 @@ export class GeoTIFF {
// unbounded length. Remove once the upstream fix lands.
source.metadata = { size: Number.POSITIVE_INFINITY };

// When a limiter is supplied, gate every network read through it by
// wrapping the raw source. The header `SourceView` composes SourceChunk +
// SourceCache *on top* of this wrapped source, so a cache hit
// short-circuits in SourceCache and never reaches — never burns a slot on
// — the limiter; only reads that escape the cache (and every data read,
// which bypasses the cache) are gated. The same wrapped source backs both
// the header view and the data source, so both share one per-origin pool.
//
// Gating here as a source wrapper rather than a chunkd SourceMiddleware is
// a workaround for chunkd not forwarding the abort signal to middleware;
// see LimitedSource. Once that's fixed upstream this can become a
// middleware again. Tracked in
// https://github.com/developmentseed/deck.gl-raster/issues/565
const limitedSource = concurrencyLimiter
? new LimitedSource(source, { limiter: concurrencyLimiter, getPriority })
: source;

const view = new SourceView(limitedSource, [
// When a limiter is supplied, slot a LimiterMiddleware into both
// sources' middleware stacks. On the header source, it sits *after*
// SourceChunk + SourceCache so a cache hit short-circuits and never
// consumes a slot — only network reads that escape the cache are gated.
// On the data source (no caching), every fetch is gated. Both stacks
// share one LimiterMiddleware instance, so they share one per-origin pool.
const limiter = concurrencyLimiter
? new LimiterMiddleware({
url: new URL(url),
limiter: concurrencyLimiter,
getPriority,
})
: null;

const view = new SourceView(source, [
new SourceChunk({ size: chunkSize }),
new SourceCache({ size: cacheSize }),
...(limiter ? [limiter] : []),
]);

const dataSource: Pick<Source, "fetch"> = limiter
? new SourceView(source, [limiter])
: source;

return await GeoTIFF.open({
dataSource: limitedSource,
dataSource,
headerSource: view,
signal,
debug,
Expand Down
87 changes: 33 additions & 54 deletions packages/geotiff/src/limiter.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import type { Source, SourceMetadata } from "@chunkd/source";
import type {
SourceCallback,
SourceMiddleware,
SourceRequest,
} from "@chunkd/source";

/**
* Numeric priority used to order waiters in a {@link Semaphore}'s queue. Lower
Expand Down Expand Up @@ -63,7 +67,7 @@ interface Waiter {

/**
* Counting semaphore with abort-aware acquire and dynamic priority. Internal
* primitive used by {@link PerOriginSemaphore} and {@link LimitedSource}.
* primitive used by {@link PerOriginSemaphore} and {@link LimiterMiddleware}.
*
* Hands out up to `maxRequests` concurrent slots. Further `acquire()`s queue.
* On every slot-open, the queue is searched for the lowest-priority waiter
Expand Down Expand Up @@ -222,91 +226,66 @@ export class PerOriginSemaphore implements ConcurrencyLimiter {
}
}

/** Options for {@link LimitedSource}. */
interface LimitedSourceOptions {
/** The {@link ConcurrencyLimiter} to gate through. The wrapped source's
* own `url` is passed to `limiter.acquire` for per-origin routing. */
/** Options for {@link LimiterMiddleware}. */
interface LimiterMiddlewareOptions {
/** The URL the wrapped source is reading from. Passed to
* `limiter.acquire(url, signal?)` on every fetch — the limiter uses it for
* per-origin routing. */
url: URL;
/** The {@link ConcurrencyLimiter} to gate through. */
limiter: ConcurrencyLimiter;
/** Optional dynamic priority for every fetch through this source. The
/** Optional dynamic priority for every fetch through this middleware. The
* limiter re-invokes this callback on each slot-open, so closures over
* dynamic state (e.g. layer viewport center) re-sort the queue when that
* state changes. Lower = serviced sooner. */
getPriority?: () => Priority;
}

/**
* Wraps a {@link Source} so every `fetch` holds a {@link ConcurrencyLimiter}
* slot for its duration — acquiring before the read, releasing when it settles
* (resolve or reject). Forwards the read's `signal` to `limiter.acquire`, so a
* request whose caller aborts while it is still queued for a slot is dropped
* before any network I/O fires.
* chunkd middleware that holds a {@link ConcurrencyLimiter} slot for the
* duration of each underlying `fetch` — releasing on resolve, on reject, and
* never otherwise interfering. Forwards the request's `signal` to
* `limiter.acquire`, so if the caller aborts while the call is queued the
* request is dropped before any network I/O fires.
*
* Compose this *beneath* `SourceChunk` / `SourceCache` (i.e. as the
* `SourceView`'s underlying source), so a cache hit short-circuits in
* `SourceCache` and never reaches — never burns a slot on — the limiter:
* Composed into a {@link SourceView}'s middleware list alongside the chunkd
* middlewares (`SourceChunk`, `SourceCache`, …). Place it after caching so
* cache hits don't burn a slot.
*
* @example
* ```ts
* import { SourceView } from "@chunkd/source";
* import { SourceCache, SourceChunk } from "@chunkd/middleware";
*
* const limited = new LimitedSource(source, { limiter });
* const view = new SourceView(limited, [
* const view = new SourceView(source, [
* new SourceChunk({ size: 64 * 1024 }),
* new SourceCache({ size: 8 * 1024 * 1024 }),
* new LimiterMiddleware({ url, limiter }),
* ]);
* ```
*
* **Why a source wrapper and not a chunkd `SourceMiddleware`** (which would
* compose more naturally): chunkd's `SourceView` does not forward the request
* `signal` to its middleware, so a middleware cannot observe an abort — only
* the underlying source receives the read options (incl. `signal`) via
* `SourceView`'s terminal handler. Wrapping the source is therefore the only
* layer that can drop a queued request on abort. Revert to a `SourceMiddleware`
* once chunkd forwards the signal (https://github.com/blacha/chunkd/pull/1697);
* tracked in https://github.com/developmentseed/deck.gl-raster/issues/565.
*
* @internal
*/
export class LimitedSource implements Source {
private readonly source: Source;
export class LimiterMiddleware implements SourceMiddleware {
readonly name = "limiter";
private readonly url: URL;
private readonly limiter: ConcurrencyLimiter;
private readonly getPriority?: () => Priority;

constructor(source: Source, opts: LimitedSourceOptions) {
this.source = source;
constructor(opts: LimiterMiddlewareOptions) {
this.url = opts.url;
this.limiter = opts.limiter;
this.getPriority = opts.getPriority;
}

get type(): string {
return this.source.type;
}

get url(): URL {
return this.source.url;
}

get metadata(): SourceMetadata | undefined {
return this.source.metadata;
}

head(options?: { signal: AbortSignal }): Promise<SourceMetadata> {
return this.source.head(options);
}

async fetch(
offset: number,
length?: number,
options?: { signal: AbortSignal },
): Promise<ArrayBuffer> {
async fetch(req: SourceRequest, next: SourceCallback): Promise<ArrayBuffer> {
const release = await this.limiter.acquire(
this.source.url,
options?.signal,
this.url,
req.signal,
this.getPriority,
);
try {
return await this.source.fetch(offset, length, options);
return await next(req);
} finally {
release();
}
Expand Down
Loading