feat(aggregator): Records Jetstream DO with PDS-bound ingestor#972
Conversation
Wires up the long-lived Jetstream subscription that feeds verification jobs into the Records Queue. All loop / cursor / backoff logic lives in a constructor-injected `JetstreamIngestor` so unit tests run it directly against `MockJetstream` + an in-memory queue + a Map-backed storage — no DO/D1/Queue runtime needed. Components: - `JetstreamClient` interface + `RealJetstreamClient` wrapping `@atcute/jetstream`'s `JetstreamSubscription`. The interface narrows to commit events and exposes the cursor; non-commit events (identity/account) are filtered upstream of the ingestor. - `JetstreamIngestor` owns the lifecycle: connect → consume → cursor persist → reconnect with exponential backoff (capped, ±jitter, reset on each successful event so a flaky upstream doesn't spiral). Cursor is persisted after each successful enqueue; a crash between enqueue and persist replays the latest event, which the downstream consumer's idempotency rules absorb. - `RecordsJetstreamDO` is the thin Cloudflare wrapper: wires bindings into the ingestor, fires `run()` as a fire-and-forget promise on construction (the run loop is meant to live as long as the DO), and exposes a status fetch handler. - Worker entrypoint adds `/_admin/start` — hit it once after deploy (e.g. `wrangler deploy && curl https://api.emdashcms.com/_admin/start`) to bootstrap the DO. The DO's outbound WebSocket then keeps it alive on its own; if the WS drops, the ingestor's reconnect logic handles it. Tests: 8 unit tests covering event-to-job conversion, cursor persistence + resume, delete operations, defence-in-depth filtering, stop semantics, backoff with progress reset, and exponential growth + cap. Tests import MockJetstream via the new `@emdash-cms/atproto-test-utils/jetstream` and `/nsid` subpaths so workerd doesn't try to load `@atproto/repo` (Node- crypto only) transitively from the package's main entry. Workspace: - `@emdash-cms/atproto-test-utils` package adds `/jetstream` and `/nsid` subpath exports. The NSID constants moved to their own `nsid.ts` file so the subpath doesn't need to drag in fake-publisher.ts and its @atproto/repo transitive deps. - Regenerated `worker-configuration.d.ts` after wrangler.jsonc change.
Self-review found 3 critical and 2 important issues; fixing all 5 plus
two related correctness items.
Critical:
1. RealJetstreamClient.close() was a no-op — it flipped a local flag the
iterator only checked AFTER `inner.next()` resolved. A quiescent
stream (no events arriving) would hang `stop()` indefinitely. Tests
passed only because MockJetstream actively resolves pending awaiters
on close; production EventIterator does not. Fix: hoist the inner
iterator outside the factory so close() can call `inner.return()`,
which destroys the WebSocket and resolves any pending next() to
`{done: true}`. The loop's `closed` flag is now redundant — removed
along with the `while (!closed)` (was a lint false-positive trigger
anyway).
2. C2 regression test (jetstream-client.test.ts): drives the wrapper
against a stub subscription whose next() only resolves when return()
is called. Asserts that calling close() on the wrapped handle
terminates the for-await within 100ms. Without the C1 fix, this test
times out.
3. DO eviction during long Jetstream outage: during 60s backoff sleeps
the DO has no active WebSocket, so CF can evict it. Nothing was
waking it back up. Switch the cron from 6h to */5min and have
scheduled() ping the DO via its stub.fetch — the DO's constructor
resumes the ingestor from the persisted cursor on reinstantiation.
Important:
4. run() doc claimed queue.send failures bubble; the catch block
actually absorbed them. Updated the doc to match reality and explain
the choice (transient queue failures retry; the cron liveness pump
recovers a wedged DO either way).
5. Fire-and-forget runPromise.catch silently masked ingestor death.
Track `state: "running" | "crashed"` on the DO, set "crashed" on
uncaught run() rejection, expose via fetch handler so external
monitoring can see it.
Worth fixing:
6. Cleaner JetstreamClient input typing. `wrapAtcuteSubscription` now
takes a generic `RawJetstreamSubscription<E>` — both the real
JetstreamSubscription and test stubs satisfy it without casts.
7. computeBackoff: defensive Math.max floor so a future caller passing
failures=0 doesn't fall below initialDelayMs.
8. madeProgress reset moved from run-loop iteration top to
connectAndConsume() top — the flag's lifetime now matches one
connection attempt exactly.
Tests: 13 (was 8 in the previous PR 2 commit; +2 wrapper tests, +3
backoff/cursor regression tests already present). 0 lint, 0 typecheck.
Second adversarial review pass caught that the previous fix didn't
actually unblock production. `@mary-ext/event-iterator`'s `return()`
drops the resolver reference WITHOUT invoking it (lib/index.ts:55-67),
so calling `inner.return()` on a quiescent EventIterator does NOT
resolve the pending `next()` Promise. The first-pass test passed only
because its stub explicitly resolved the awaiter — the production path
still wedged.
Real fix: race `inner.next()` against a `closedSignal` Promise. When
`close()` is called, resolve the signal to a synthetic `{done: true}`,
which unblocks the for-await regardless of the inner iterator's
shutdown behaviour. The orphaned `it.next()` Promise is either resolved
later (when an event arrives, harmless) or leaks forever (if Jetstream
stays quiescent, but no resources are held — only the Promise object
itself).
Updated the regression test's stub to mimic EventIterator's actual
behaviour: `next()` returns a Promise that NEVER resolves, even after
`return()` is called. Test now fails against the previous fix and
passes against the race-based one.
Also dropped `state: "running" | "crashed"` from the DO. The same
review noted it was dead code: `run()` swallows every error path
internally, so `runPromise.catch` never fires, so `state` never flips.
Removed the field entirely. The DO's fetch handler now reports just
`cursor` and `consecutiveFailures`, which IS the real liveness signal.
If we ever introduce a non-recoverable rejection path, we can add the
state back with the same shape.
|
Deploying with
|
| Status | Name | Latest Commit | Updated (UTC) |
|---|---|---|---|
| ✅ Deployment successful! View logs |
emdash-perf-coordinator | fa8da07 | May 09 2026, 04:35 PM |
Deploying with
|
| Status | Name | Latest Commit | Updated (UTC) |
|---|---|---|---|
| ✅ Deployment successful! View logs |
emdash-i18n | fa8da07 | May 09 2026, 04:35 PM |
PR template validation failedPlease fix the following issues by editing your PR description:
See CONTRIBUTING.md for the full contribution policy. |
Deploying with
|
| Status | Name | Latest Commit | Updated (UTC) |
|---|---|---|---|
| ✅ Deployment successful! View logs |
docs | fa8da07 | May 09 2026, 04:36 PM |
Scope checkThis PR changes 1,001 lines across 10 files. Large PRs are harder to review and more likely to be closed without review. If this scope is intentional, no action needed. A maintainer will review it. If not, please consider splitting this into smaller PRs. See CONTRIBUTING.md for contribution guidelines. |
Deploying with
|
| Status | Name | Latest Commit | Updated (UTC) |
|---|---|---|---|
| ✅ Deployment successful! View logs |
emdash-playground | fa8da07 | May 09 2026, 04:36 PM |
@emdash-cms/admin
@emdash-cms/auth
@emdash-cms/blocks
@emdash-cms/cloudflare
emdash
create-emdash
@emdash-cms/gutenberg-to-portable-text
@emdash-cms/x402
@emdash-cms/plugin-ai-moderation
@emdash-cms/plugin-atproto
@emdash-cms/plugin-audit-log
@emdash-cms/plugin-color
@emdash-cms/plugin-embeds
@emdash-cms/plugin-forms
@emdash-cms/plugin-webhook-notifier
commit: |
Deploying with
|
| Status | Name | Latest Commit | Updated (UTC) |
|---|---|---|---|
| ✅ Deployment successful! View logs |
emdash-demo-cache | fa8da07 | May 09 2026, 04:36 PM |
There was a problem hiding this comment.
Pull request overview
Adds the aggregator’s long-lived Jetstream subscription by introducing a Records Durable Object that maintains an outbound WebSocket, converts Jetstream commit events into RecordsJobs, persists a cursor, and reconnects with backoff; plus unit tests and a bootstrap/liveness mechanism to keep the DO running across outages/evictions.
Changes:
- Introduce
JetstreamClient+RealJetstreamClientwrapper andJetstreamIngestor(cursor persistence + reconnect/backoff + stop semantics). - Add
RecordsJetstreamDOthat wires real Worker bindings into the ingestor and exposes a status surface used for bootstrap/liveness. - Add subpath exports in
@emdash-cms/atproto-test-utilsto allow workerd-safe imports, and add new ingestor/wrapper tests.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| packages/atproto-test-utils/src/nsid.ts | Extracts NSID constants into a workerd-safe module. |
| packages/atproto-test-utils/src/fake-publisher.ts | Reuses and re-exports NSIDs from the new module. |
| packages/atproto-test-utils/package.json | Adds subpath exports (./jetstream, ./nsid) for workerd-safe imports. |
| apps/aggregator/wrangler.jsonc | Switches cron trigger to a 5-minute DO liveness ping. |
| apps/aggregator/test/jetstream-ingestor.test.ts | Adds unit coverage for event→job conversion, cursor persistence, reconnect/backoff, and stop semantics. |
| apps/aggregator/test/jetstream-client.test.ts | Adds regression coverage for wrapAtcuteSubscription shutdown semantics. |
| apps/aggregator/src/records-do.ts | Implements the Records DO wrapper that starts the ingestor and exposes status via fetch. |
| apps/aggregator/src/jetstream-ingestor.ts | Implements the core connect/consume/cursor/backoff loop with injected deps. |
| apps/aggregator/src/jetstream-client.ts | Adds the Jetstream client abstraction and cancellation-safe subscription wrapper. |
| apps/aggregator/src/index.ts | Adds /_admin/start bootstrap route and scheduled liveness ping. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| try { | ||
| await this.connectAndConsume(); | ||
| // Subscription ended cleanly (Jetstream closed the socket | ||
| // without error). Treat as a soft failure for backoff | ||
| // purposes — but if we successfully consumed events during | ||
| // the connection, reset the counter first so the backoff | ||
| // reflects the latest streak, not historical failures. | ||
| if (this.madeProgress) this._consecutiveFailures = 0; | ||
| this._consecutiveFailures += 1; | ||
| } catch (err) { | ||
| if (this.madeProgress) this._consecutiveFailures = 0; | ||
| this._consecutiveFailures += 1; | ||
| this.logger.warn?.("jetstream subscription failed", { | ||
| error: err instanceof Error ? err.message : String(err), | ||
| consecutiveFailures: this._consecutiveFailures, | ||
| }); | ||
| } | ||
| if (this.stopped) break; |
There was a problem hiding this comment.
Fixed in fa8da07 — counter now increments only when no progress was made, resets to 0 (without increment) on cycles that consumed events. Added a regression test that runs three connect→emit→close cycles and asserts consecutiveFailures === 0 after each.
| export default { | ||
| async fetch(_request: Request, _env: Env, _ctx: ExecutionContext): Promise<Response> { | ||
| async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> { | ||
| const url = new URL(request.url); | ||
| if (url.pathname === BOOTSTRAP_PATH) { | ||
| return bootstrapRecordsDo(env); | ||
| } |
There was a problem hiding this comment.
Fixed in fa8da07 — /_admin/start now fires the DO fetch via ctx.waitUntil and returns 204 with no body. Probing callers can't tell whether the DO was already running, just woke up, or is mid-startup. The DO's status body is still returned by its own fetch handler but is consumed only internally (cron pump fires-and-forgets too).
| }, | ||
| close: () => { | ||
| fireClosed(); | ||
| void inner?.return?.(); |
There was a problem hiding this comment.
Fixed in fa8da07 — chained .catch(() => {}) on inner?.return?.(). EventIterator's return shouldn't reject today, but a future change could; this prevents an unhandled-promise warning in workerd.
| * failure count, which is the real liveness signal: 0 means the most | ||
| * recent connection attempt produced events; a high value indicates | ||
| * Jetstream is unreachable or the wantedCollections filter is wrong. |
There was a problem hiding this comment.
Fixed in fa8da07 along with the same issue in jetstream-ingestor.ts. Counter semantics now match the doc.
Four findings on PR #972; addressing all. 1. `consecutiveFailures` semantic mismatch (jetstream-ingestor.ts + records-do.ts). Docstring claimed "0 means the most recent attempt produced events", but the run loop unconditionally incremented after each connectAndConsume return. So the counter was always ≥ 1 after any disconnect, including ones that successfully streamed events. Fix: increment only when no progress was made; reset to 0 (without increment) when progress was. Also added a regression test that asserts the counter stays 0 across three connect-disconnect cycles that all produced events. 2. `/_admin/start` returned the DO's status body (cursor + failure count). Even an idempotent admin endpoint shouldn't leak operational data to anonymous callers. Fix: route now fires the DO fetch via `ctx.waitUntil` and returns a fixed 204 — caller learns nothing about whether the DO was already running, just woke up, or is mid-startup. The DO's fetch handler still returns the status body (used internally by the cron liveness pump, which doesn't proxy it either). 3. Unhandled rejection in `wrapAtcuteSubscription.close()`. `void inner?.return?.()` suppressed the value but did NOT catch rejections. If the inner iterator's cleanup ever rejects (today it shouldn't, but a future EventIterator change could), workerd would surface an unhandled-promise warning. Fix: chain `.catch(() => {})`. Tests: 14 (was 13; added counter-semantics regression). 0 lint, 0 typecheck.
What does this PR do?
Adds the long-lived Jetstream subscription that feeds the aggregator's record verification queue. This is the next slice on top of the merged scaffold (#971): the Worker now has a Records DO that holds an outbound WebSocket to Jetstream, converts commit events into
RecordsJobmessages, and enqueues them. PDS-verified ingest (the Records Queue consumer) lands next.Three commits — the foundational PR plus two rounds of adversarial-review fixes:
feat(aggregator): Records Jetstream DO with PDS-bound ingestor— the substantive PR.JetstreamClientinterface +RealJetstreamClientwrapping@atcute/jetstream'sJetstreamSubscription.JetstreamIngestorowns the connect → consume → cursor-persist → reconnect loop (exponential backoff with progress-reset, jitter, capped).RecordsJetstreamDOis a thinDurableObjectwrapper that wires real bindings into the ingestor and firesrun()as a fire-and-forget on construction. Worker entrypoint adds/_admin/startfor post-deploy bootstrap.fix(aggregator): adversarial review fixes for Jetstream DO— first review pass found 3 critical + 2 important issues. Fixed: wedged-await deadlock (hoist inner iterator soclose()reaches it), DO eviction during long Jetstream outages (5-minute cron pings the DO viastub.fetchso it resurrects after potential eviction),runPromise.catchmasking ingestor death (track state field),madeProgressflag lifetime, defensive backoff floor.fix(aggregator): C1 fix was incomplete — switch to closed-signal race— second review pass caught that the first fix didn't actually work.@mary-ext/event-iterator'sreturn()drops its resolver reference WITHOUT invoking it, soinner.return()does NOT unblock a pendingnext(). Real fix: raceinner.next()against aclosedSignalPromise thatclose()resolves. Test stub now mirrors the actualEventIteratorsemantics (returnsnew Promise(() => {})with no resolver captured). Also dropped thestate: "running" | "crashed"field from the DO — it was dead code becauserun()swallows every error path internally, sorunPromise.catchnever fired. The fetch handler now reportscursor+consecutiveFailures, which is the actual liveness signal.A third adversarial-review pass converged with no further issues.
Closes #
Type of change
Checklist
pnpm typecheckpassespnpm lintpasses (no new diagnostics)pnpm testpasses (13 aggregator + 17 atproto-test-utils + the rest of the workspace unchanged)pnpm formathas been runapps/aggregatorandpackages/atproto-test-utilsare private)AI-generated code disclosure
Test output
Notes for review
The two iteration commits are kept separate rather than squashed because each one captures a real bug that the previous design missed. The commit messages document the failure modes (in particular:
EventIterator.return()not unblocking awaiters is a genuine pitfall worth preserving as project-knowledge, since anyone reaching forinner.return()to cancel a pending await on EventIterator-backed code will hit the same wall)./_admin/startis unauthenticated. The action is idempotent — anyone hitting it just bootstraps the DO if it isn't already running. The DO's status response leaks the current Jetstream cursor (microsecond timestamp of last event), which is essentially "when did we last see traffic." Trivial information; flagged for the day this Worker fronts user-facing endpoints, when we'd add a shared-secret header check.