Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 224 additions & 0 deletions dev-docs/specs/2026-05-12-getTileData-coalescing-design.md

Large diffs are not rendered by default.

177 changes: 177 additions & 0 deletions packages/deck.gl-raster/src/raster-tile-layer/tile-batcher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/**
* Options handed to a {@link TileBatcher} dispatch call: a combined signal that
* aborts only once *every* item in the group has been aborted upstream.
*/
export interface BatchDispatchOptions {
readonly signal: AbortSignal;
}

interface PendingItem<TItem, TResult> {
readonly item: TItem;
readonly signal?: AbortSignal;
readonly resolve: (value: TResult) => void;
readonly reject: (reason?: unknown) => void;
}

/** Options for constructing a {@link TileBatcher}. */
export interface TileBatcherOptions<TItem, TResult> {
/** Compute the batch key for an item; items with the same key share a dispatch. */
groupKey(item: TItem): string;
/**
* Fetch a whole group at once. Returns one entry per `items` element, in
* order — a value, or an `Error` for that single item. May reject to fail
* the whole group.
*/
dispatch(
key: string,
items: TItem[],
opts: BatchDispatchOptions,
): Promise<Array<TResult | Error>>;
}

/**
* Coalesces a burst of per-item `fetch()` calls (e.g. deck.gl's per-tile
* `getTileData`) into one `dispatch` per group key. The burst is collected on
* a `setTimeout(_, 0)`, which deterministically fires after the synchronous
* burst + its microtask tail (see the design doc's "Timing" section).
*/
export class TileBatcher<TItem, TResult> {
private readonly groupKey: (item: TItem) => string;
private readonly dispatch: TileBatcherOptions<TItem, TResult>["dispatch"];
private buffer: Array<PendingItem<TItem, TResult>> = [];
private timer: ReturnType<typeof setTimeout> | null = null;
private finalized = false;

constructor(opts: TileBatcherOptions<TItem, TResult>) {
this.groupKey = opts.groupKey;
this.dispatch = opts.dispatch;
}

/** Buffer an item; resolves to the dispatch's result (or rejects). */
fetch(
item: TItem,
{ signal }: { signal?: AbortSignal } = {},
): Promise<TResult> {
if (this.finalized) {
return Promise.reject(new Error("TileBatcher has been finalized"));
}
return new Promise<TResult>((resolve, reject) => {
this.buffer.push({ item, signal, resolve, reject });
if (this.timer === null) {
this.timer = setTimeout(() => this.flush(), 0);
}
});
}

/** Reject everything still buffered; subsequent `fetch` calls reject too. */
finalize(): void {
this.finalized = true;
if (this.timer !== null) {
clearTimeout(this.timer);
this.timer = null;
}
const pending = this.buffer;
this.buffer = [];
for (const p of pending) {
p.reject(new Error("TileBatcher finalized before flush"));
}
}

private flush(): void {
this.timer = null;
const pending = this.buffer;
this.buffer = [];

// Drop already-aborted items.
const alive: Array<PendingItem<TItem, TResult>> = [];
for (const p of pending) {
if (p.signal?.aborted) {
p.reject(p.signal.reason);
} else {
alive.push(p);
}
}

// Group by key.
const groups = new Map<string, Array<PendingItem<TItem, TResult>>>();
for (const p of alive) {
const key = this.groupKey(p.item);
const group = groups.get(key);
if (group) {
group.push(p);
} else {
groups.set(key, [p]);
}
}

for (const [key, group] of groups) {
void this.dispatchGroup(key, group);
}
}

private async dispatchGroup(
key: string,
group: Array<PendingItem<TItem, TResult>>,
): Promise<void> {
const composite = compositeAbortSignal(group.map((p) => p.signal));
let results: Array<TResult | Error>;
try {
results = await this.dispatch(
key,
group.map((p) => p.item),
{ signal: composite },
);
} catch (err) {
for (const p of group) {
p.reject(err);
}
return;
}
for (let i = 0; i < group.length; i++) {
const p = group[i]!;
const r = results[i];
if (p.signal?.aborted) {
p.reject(p.signal.reason);
} else if (r instanceof Error) {
p.reject(r);
} else {
p.resolve(r as TResult);
}
}
}
}

/**
* An `AbortSignal` that fires only once *every* input signal has aborted. Any
* `undefined` in the input means "never aborts" so the composite never aborts.
*/
function compositeAbortSignal(
signals: Array<AbortSignal | undefined>,
): AbortSignal {
if (signals.length === 0 || signals.some((s) => s === undefined)) {
return new AbortController().signal;
}
const real = signals as AbortSignal[];
const controller = new AbortController();
let remaining = real.length;
for (const s of real) {
if (s.aborted) {
remaining--;
} else {
s.addEventListener(
"abort",
() => {
remaining--;
if (remaining === 0) {
controller.abort(s.reason);
}
},
{ once: true },
);
}
}
if (remaining === 0) {
controller.abort(real[0]!.reason);
}
return controller.signal;
}
122 changes: 122 additions & 0 deletions packages/deck.gl-raster/tests/tile-batcher.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import { describe, expect, it, vi } from "vitest";
import { TileBatcher } from "../src/raster-tile-layer/tile-batcher.js";

type FakeTile = { x: number; y: number; z: number };

function makeBatcher(
dispatch: (
key: string,
items: FakeTile[],
opts: { signal: AbortSignal },
) => Promise<Array<number | Error>>,
) {
return new TileBatcher<FakeTile, number>({
groupKey: (t) => `z${t.z}`,
dispatch,
});
}

const flushTick = () => new Promise((r) => setTimeout(r, 0));

describe("TileBatcher", () => {
it("coalesces one tick of fetch() calls into one dispatch per group key", async () => {
const seen: Array<[string, FakeTile[]]> = [];
const b = makeBatcher(async (key, items) => {
seen.push([key, items]);
return items.map((_, i) => i);
});
const a = b.fetch({ x: 0, y: 0, z: 1 });
const c = b.fetch({ x: 1, y: 0, z: 1 });
const d = b.fetch({ x: 0, y: 0, z: 2 });
expect(await Promise.all([a, c, d])).toEqual([0, 1, 0]);
expect(seen).toEqual([
[
"z1",
[
{ x: 0, y: 0, z: 1 },
{ x: 1, y: 0, z: 1 },
],
],
["z2", [{ x: 0, y: 0, z: 2 }]],
]);
});

it("distributes per-item Errors only to the failing item", async () => {
const b = makeBatcher(async (_key, items) =>
items.map((_t, i) => (i === 1 ? new Error("bad tile") : i)),
);
const r0 = b.fetch({ x: 0, y: 0, z: 1 });
const r1 = b.fetch({ x: 1, y: 0, z: 1 });
const r2 = b.fetch({ x: 2, y: 0, z: 1 });
expect(await r0).toBe(0);
await expect(r1).rejects.toThrow("bad tile");
expect(await r2).toBe(2);
});

it("rejects every item in a group when the dispatch itself rejects", async () => {
const b = makeBatcher(async () => {
throw new Error("whole batch failed");
});
const r0 = b.fetch({ x: 0, y: 0, z: 1 });
const r1 = b.fetch({ x: 1, y: 0, z: 1 });
await expect(r0).rejects.toThrow("whole batch failed");
await expect(r1).rejects.toThrow("whole batch failed");
});

it("drops an item whose signal is already aborted before flush", async () => {
const dispatch = vi.fn(async (_k: string, items: FakeTile[]) =>
items.map((_, i) => i as number | Error),
);
const b = makeBatcher(dispatch);
const ac = new AbortController();
ac.abort(new Error("scrolled off"));
const aborted = b.fetch({ x: 0, y: 0, z: 1 }, { signal: ac.signal });
const ok = b.fetch({ x: 1, y: 0, z: 1 });
await expect(aborted).rejects.toThrow("scrolled off");
expect(await ok).toBe(0); // re-indexed within the surviving group
expect(dispatch).toHaveBeenCalledOnce();
expect(dispatch.mock.calls[0]![1]).toEqual([{ x: 1, y: 0, z: 1 }]);
});

it("composite signal aborts only when every member aborts", async () => {
let captured!: AbortSignal;
const b = new TileBatcher<FakeTile, number>({
groupKey: (t) => `z${t.z}`,
dispatch: async (_k, items, opts) => {
captured = opts.signal;
// Hold the dispatch open long enough to observe abort state.
await new Promise((r) => setTimeout(r, 30));
return items.map((_, i) => i);
},
});
const a = new AbortController();
const c = new AbortController();
const ra = b.fetch({ x: 0, y: 0, z: 1 }, { signal: a.signal });
const rc = b.fetch({ x: 1, y: 0, z: 1 }, { signal: c.signal });
await flushTick();
expect(captured.aborted).toBe(false);
a.abort();
expect(captured.aborted).toBe(false); // not all aborted yet
c.abort();
expect(captured.aborted).toBe(true);
// Both tiles' promises reject (their per-tile signals are aborted at distribute time).
await expect(ra).rejects.toBeDefined();
await expect(rc).rejects.toBeDefined();
});

it("finalize() rejects everything still buffered and dispatches nothing more", async () => {
const dispatch = vi.fn(async () => [] as number[]);
const b = makeBatcher(dispatch);
const r = b.fetch({ x: 0, y: 0, z: 1 });
b.finalize();
await expect(r).rejects.toThrow();
await flushTick();
expect(dispatch).not.toHaveBeenCalled();
});

it("fetch() after finalize() rejects immediately", async () => {
const b = makeBatcher(async (_k, items) => items.map((_, i) => i));
b.finalize();
await expect(b.fetch({ x: 0, y: 0, z: 1 })).rejects.toThrow();
});
});
Loading