feat(aggregator): records consumer with PDS-verified ingest#975
Conversation
First piece of the records-consumer slice. Schema additions to 0001_init.sql (still pre-deploy, no follow-up migration needed): - publishers + publisher_verifications for the publisher.* NSIDs - dead_letters for verification-failure forensics (distinct from the configured Cloudflare DLQ, which is for transient retry exhaustion — different failure modes, different destinations) - signing_key / signing_key_id columns on known_publishers so the table doubles as the DID-doc resolution cache; 24h TTL applied at query time WANTED_COLLECTIONS extended with publisher.profile and publisher.verification. DidResolver: pure constructor-injected class with a DidDocCache interface (Map-backed in tests, known_publishers-backed in prod via createD1DidDocCache). Materialises cached multibase keys to PublicKey instances via @atcute/crypto, exhaustive on the p256/secp256k1 discriminated union. invalidate() for the re-resolve-after-key-rotation path the verification step needs. 14 tests covering cache hit/miss/expiry/invalidate, malformed-DID rejection, missing PDS / missing #atproto verification method rejection, and the D1 binding contract (round-trip, first_seen_at preserved across updates, end-to-end with the resolver).
Replaces the no-op queue() handler in index.ts with the full verification
+ ingest pipeline for the four record collections in scope (package.profile,
package.release, publisher.profile, publisher.verification).
Per-job pipeline:
1. Resolve PDS endpoint + signing key via DidResolver (cached in
known_publishers, 24h TTL).
2. Fetch CAR via com.atproto.sync.getRecord, hand to @atcute/repo's
verifyRecord (MST + signature in one call), capture carBytes for
record_blob storage.
3. Cross-check verified vs Jetstream-supplied bytes; verified always
wins; mismatch logs as `[aggregator] jetstream-discrepancy` for
future Jetstream-correctness monitoring.
4. Lexicon-validate via @atcute/lexicons safeParse against the schema
from @emdash-cms/registry-lexicons.
5. Per-collection structural checks:
- package.profile: record.id must equal at://did/collection/rkey
(the publisher could lie in the body even though MST verifies);
slug optional, falls back to rkey, must match rkey if present
- package.release: rkey must equal `<package>:<encoded-version>`,
version must parse as semver, version_sort computed from a 10-digit
zero-padded major.minor.patch + prerelease fragment
- publisher.profile: rkey must be 'self'; contact entries must have
at least one of url/email (lexicon can't express that constraint)
- publisher.verification: facts stored as observed; validity check
(current handle/displayName matches bound values) is read-time
6. Write to D1 with upserts on the appropriate primary key. Releases
use INSERT … DO NOTHING; on a different-content same-version retry,
audit a release_duplicate_attempts row with reason IMMUTABLE_VERSION.
Delete handling:
- Hard-delete for one-per-DID rows (package.profile, publisher.profile)
- Soft-delete (tombstoned_at) for everything else
- 0001_init.sql: releases FK now ON DELETE CASCADE so an out-of-order
delete (publisher deletes profile before releases drain) doesn't get
blocked by FK violation; the publisher's intent is the whole package
going away
Error policy:
- PDS network/timeout/5xx: message.retry()
- 404, oversized response, signature/MST/lexicon failure, structural
failure: write dead_letters row with structured reason + payload,
message.ack(). Never retry — these are malicious or broken upstream
and we know retrying won't help
- Unexpected programming errors: log loud, dead-letters, ack — never
crash the worker (would block the queue slot)
Test coverage (37 new tests, 65 aggregator tests total):
- Per-collection writers: insert, upsert, lexicon failure, structural
rejection (RKEY_MISMATCH, CONTACT_VALIDATION_FAILED, INVALID_VERSION)
- Releases: version_sort computation; same-content replay is silent;
different-content replay audits release_duplicate_attempts
- Delete: hard-delete vs tombstone per collection; verification
upsert clears tombstone (re-publish recovers)
- Dispatcher: ack on success, retry on transient PDS error, retry on
network error, ack+dead_letters on permanent PDS error, delete
short-circuits PDS fetch
Known gaps (deferred):
- End-to-end happy-path test using FakePublisher + MockPds requires a
node-pool vitest project; left for a follow-up. Each layer is
independently tested; the wiring between them is the gap.
- releases.cts column mirrors verified_at because the lexicon doesn't
expose a creation timestamp and verifyRecord doesn't surface the
commit rev. Tracked in the writer with a TODO.
- Slice 3 artifact-mirror enqueue is a TODO comment in the release
writer; mirrored_artifacts stays empty for now.
Address findings from the adversarial review of the records consumer slice. Per-finding rationale: BLOCKERS / CORRECTNESS - B1: refreshPackageLatest() recomputes packages.latest_version + capabilities after release insert / un-tombstone / tombstone-on-delete. Before: profile writer bound `null` for both columns with a comment promising the release writer would populate them; release writer never did. Read APIs that read latest_version got NULL forever. - B2: same-content release on a tombstoned row clears tombstoned_at instead of silently no-op'ing through DO NOTHING. Otherwise a delete-then-republish-same-content round-trip leaves the release invisible to readers with no audit row to explain why. - B3: release.extensions field is now validated as a plain object containing a releaseExtension-keyed entry that passes PackageReleaseExtension.mainSchema. emdash_extension column stores only the validated payload (not arbitrary record.extensions). Previously the consumer wrote whatever the publisher sent, including scalars like "lol" — read API parsing would have thrown. - M1: package.profile.security[] entries enforce "at least one of url|email" per lexicon. Was only enforced for publisher.profile. - Mi7: release writer pre-checks parent profile existence and throws the new MissingDependencyError → controller.retry() instead of letting the FK violation bubble up as UNEXPECTED_ERROR with no recovery path. Out-of-order Jetstream delivery (release event before its profile event) now retries until the profile arrives or hits max_retries → DLQ for the reconciliation pass. SHOULD-FIX / SAFETY - M3: version_sort rejects components or prerelease numerics longer than 10 digits as INVALID_VERSION (the pad-width ceiling). - M5/M6/N1: mapPdsReason parameter typed via the imported VerificationFailureReason union so a new reason added in pds-verify.ts becomes a compile-time error here. PDS_NETWORK_ERROR (the unreachable transient case) now throws "unreachable" instead of silently dead-lettering as UNEXPECTED_ERROR. Exhaustive `never` default catches future variants. - M9: removed JSON.stringify discrepancy comparison between Jetstream copy and verified PDS copy. The verified copy always wins so the comparison is a monitoring signal only, but JSON.stringify isn't canonical (key order, undefined-vs-missing) so it would fire false-positive warnings constantly in real traffic. Add a CBOR-canonical comparator when this monitoring becomes load-bearing. - M2 (piggybacked): applyDelete for releases parses rkey to use the PK index instead of the partial idx_releases_latest seek-then-scan. - Mi5: applyDelete for unknown collections throws IngestError UNKNOWN_COLLECTION instead of silently warning + acking. The dispatcher's delete branch catches IngestError → forensics + ack so unknown collections land in dead_letters instead of disappearing. - Mi6: record.package field validated against the package-slug regex (`^[a-zA-Z][a-zA-Z0-9_-]*$`). Closes the ambiguous-rkey hole where package="foo:bar" + version="1.0.0" produced the same rkey as package="foo" + version="bar:1.0.0". - B4: processBatch accepts an optional depsOverride parameter for testability. Production wiring path is now unit-test reachable. LINT / CODE QUALITY - jetstream-client.ts: replaced `event as unknown as JetstreamCommitEvent` with a type-predicate `isCommitEvent` so the narrowing is explicit. Same runtime semantics. - records-do.ts: dropped redundant `<Env>` type argument on `extends DurableObject` — `Cloudflare.Env` is the default. - records-consumer.ts: introduced isPlainObject() type guard; dropped redundant constructor on MissingDependencyError. REVIEW FINDINGS NOT FIXED - M4 (DID syntax allowing underscore): phantom finding. The original regex already allowed `_` everywhere — the reviewer misread the character class. Verified via xxd. No-op change reverted. - M7 (cache invalidate race): not load-bearing under serial dispatch. Documented for the eventual parallel-dispatcher follow-up. - M8 (release_duplicate_attempts noise during retry windows): observe first; the proposed mitigation (skip audit if verified_at younger than 10min) adds complexity that may itself be wrong. - Mi1, Mi2, N2, N3: cosmetic. 83 aggregator tests pass (was 65). Typecheck clean. Zero lint diagnostics in apps/aggregator.
Round-2 review found 8 byproduct bugs introduced by the round-1 fixes themselves. Per-finding rationale: HIGH - #1: refreshPackageLatest TOCTOU race. Two concurrent consumer invocations could each SELECT max version_sort, then race their UPDATEs, leaving packages.latest_version regressed (final write wins from the older snapshot). Replaced the SELECT-then-UPDATE pair with a single UPDATE using correlated subqueries; D1 serialises the read+write at write-commit time, so concurrent callers always compute against the actual current state. Capabilities now extracted via SQLite's json1 functions (json_each + json_group_array on $.declaredAccess). - #2: stale latest_version on refresh failure. INSERT committed, refresh threw, dispatcher acked → permanent inconsistency because subsequent same-content retries hit DO NOTHING and the prior conditional-only refresh skipped. Now insert + refresh run in db.batch() — D1 wraps in one transaction, so either both commit or both roll back. Same fix in applyDelete (tombstone + refresh batched). Refresh runs unconditionally on every release ingest path (idempotent; race-safe via #1). MEDIUM - #3: mapPdsReason throw escaped the catch. Function-arg evaluation ran mapPdsReason(err.reason) BEFORE writeDeadLetter, so a throw there crashed the whole batch. Now wrapped in its own try/catch with an UNEXPECTED_ERROR fallback so the dead_letters write still happens. - #4: isCommitEvent predicate trusted `kind` only. A producer emitting `{kind: "commit"}` without a structurally valid `commit` field would slip through and crash the ingestor at `event.commit.collection`; cursor wouldn't advance, Jetstream would replay the malformed event forever. Predicate now verifies commit.collection / commit.rkey / commit.operation are present strings. Test stub updated to match what real producers emit. - #5: parseReleaseRkey URIError on malformed %-encoding. A delete with rkey "demo:1.0.0%XX" threw URIError → not caught as IngestError → controller.retry() → 5 wasted attempts before DLQ. Now caught and returned as null so applyDelete silently no-ops. - #6: DLQ had no consumer. Configured a second consumer in wrangler.jsonc draining `emdash-aggregator-records-dlq`. New drainDeadLetterBatch handler logs each job to Workers logs + writes a `dead_letters` forensics row, then acks. Until reconciliation lands, this prevents permanent silent drops of legitimate-but-out-of-order release events that exhausted retries. - #7: release_duplicate_attempts unbounded under spam. Hostile publisher pumping the same different-content payload could fill the audit table indefinitely. Added UNIQUE(did, package, version, attempted_record_blob) constraint + ON CONFLICT DO NOTHING on the insert, so true duplicates dedupe at the storage layer. LOW - #8: previously-claimed "MissingDependencyError retry" test actually only exercised the PDS_HTTP_ERROR(5xx) path. Added a proper test using the new ConsumerDeps.verify injection point — exercises the writer's parent-profile pre-check and asserts the dispatcher routes MissingDependencyError → controller.retry(). WIRING / TESTABILITY - ConsumerDeps gained an optional `verify` override, defaulting to fetchAndVerifyRecord. Tests can inject a stub that returns a synthetic VerifiedPdsRecord without standing up a real CAR fixture. This is what makes the MissingDependencyError test reachable inside the workers pool. - index.ts queue() handler now dispatches by batch.queue name to either processBatch (records) or drainDeadLetterBatch (DLQ). LINT CLEANUP - jetstream-client.ts: defined a wider `MaybeCommitEvent` parameter type for the predicate so commit-field inspection doesn't need an unsafe cast. The type's `commit?` is structurally-typed so any RawJetstreamSubscription<E> with E extends {kind: string} remains assignable. - records-consumer.ts: removed unused refreshPackageLatest wrapper; callers use refreshPackageLatestStmt directly inside batches. 89 tests pass (was 83). Typecheck clean. Zero lint diagnostics in apps/aggregator. Reviewer's verdicts on round-1 fixes I claimed: all present and structurally correct. The new bugs were byproducts, not regressions.
Round-3 review found 5 bugs in the round-2 fixes themselves. Per-finding rationale: MEDIUM - #1: drainDeadLetterBatch silently dropped messages on D1 failure. The try/catch around writeDeadLetter swallowed errors, then the unconditional message.ack() outside the try acked anyway — forensics lost AND message gone. The configured max_retries: 3 in wrangler.jsonc was dead config because the handler never threw. Fix: move ack() inside the try; on error, controller.retry() so workerd redelivers per the DLQ consumer's max_retries. - #2: FTS thrashing on idempotent refresh. Round-2 traded releaseSetChanged guard for "always run refresh" to fix the insert-success / refresh-failure consistency bug. But the unconditional refresh hit the packages_au AFTER UPDATE trigger, which DELETEs and re-INSERTs into packages_fts on every row touch — even when latest_version + capabilities are unchanged. Every same-content Jetstream replay (the common case) reindexed FTS for no reason. Fix: extend REFRESH_PACKAGE_LATEST_SQL's WHERE clause to short-circuit at SQL level when both target columns already hold the computed values. Avoids the trigger fire entirely; keeps the round-2 atomicity (insert + refresh still in one batch). MINOR - #3: applyDelete malformed rkey was a silent no-op. Round-2 fix correctly stopped the URIError-causes-retry path but overcorrected by removing the audit trail. Operators investigating "why didn't this delete take effect?" had nothing to look at. Now throws IngestError("RKEY_MISMATCH") so the dispatcher writes a dead_letters row before acking. - #4: release_duplicate_attempts.rejected_at was frozen at first attempt. Hostile publisher pumping the same bytes for a year showed in the audit table as a single row dated a year ago. Conflict clause now DO UPDATE SET rejected_at = excluded.rejected_at so the audit row tracks the latest attempt; same-bytes deduping at the storage layer still works. - #5: idx_release_duplicates was redundant with the new UNIQUE constraint's implicit index (the UNIQUE on (did, package, version, attempted_record_blob) covers all (did, package, version) prefix lookups). Dropped the explicit index — fewer indexes to maintain on each write. ASKED-ABOUT, NO BUG FOUND Reviewer verified all 12 prompt concerns either lead to fixes above or are clean: D1 batch atomicity, correlated-subquery semantics under SQLite snapshot isolation, json_each(NULL) behavior, MaybeCommitEvent parameter assignability, ConsumerDeps.verify injection correctness, duplicate dead_letters writes, jetstream-client test stub fidelity, concurrent dead_letters writes. No round-1 or round-2 finding regressed. 92 tests pass (was 89). Typecheck clean. Zero lint diagnostics in apps/aggregator. The slice has now had three rounds of adversarial review with all findings addressed in-tree. Outstanding deferred items (M7 cache race under hypothetical parallel dispatcher, M8 audit-noise mitigation, B4 production wiring still untested by integration test) tracked for follow-up PRs.
|
Deploying with
|
| Status | Name | Latest Commit | Updated (UTC) |
|---|---|---|---|
| ✅ Deployment successful! View logs |
emdash-perf-coordinator | f3abc62 | May 10 2026, 01:54 PM |
Deploying with
|
| Status | Name | Latest Commit | Updated (UTC) |
|---|---|---|---|
| ✅ Deployment successful! View logs |
emdash-i18n | f3abc62 | May 10 2026, 01:53 PM |
Deploying with
|
| Status | Name | Latest Commit | Updated (UTC) |
|---|---|---|---|
| ✅ Deployment successful! View logs |
docs | f3abc62 | May 10 2026, 01:55 PM |
Deploying with
|
| Status | Name | Latest Commit | Updated (UTC) |
|---|---|---|---|
| ✅ Deployment successful! View logs |
emdash-demo-cache | f3abc62 | May 10 2026, 01:55 PM |
Deploying with
|
| Status | Name | Latest Commit | Updated (UTC) |
|---|---|---|---|
| ✅ Deployment successful! View logs |
emdash-playground | f3abc62 | May 10 2026, 01:55 PM |
Scope checkThis PR changes 3,733 lines across 16 files. Large PRs are harder to review and more likely to be closed without review. If this scope is intentional, no action needed. A maintainer will review it. If not, please consider splitting this into smaller PRs. See CONTRIBUTING.md for contribution guidelines. |
@emdash-cms/admin
@emdash-cms/auth
@emdash-cms/blocks
@emdash-cms/cloudflare
emdash
create-emdash
@emdash-cms/gutenberg-to-portable-text
@emdash-cms/x402
@emdash-cms/plugin-ai-moderation
@emdash-cms/plugin-atproto
@emdash-cms/plugin-audit-log
@emdash-cms/plugin-color
@emdash-cms/plugin-embeds
@emdash-cms/plugin-forms
@emdash-cms/plugin-webhook-notifier
commit: |
PR template validation failedPlease fix the following issues by editing your PR description:
See CONTRIBUTING.md for the full contribution policy. |
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Implements the full PDS-verified ingest pipeline for aggregator record collections (package.* and publisher.*), replacing the previous no-op queue handler and adding DLQ draining + structured forensics.
Changes:
- Add DID document resolver with D1-backed TTL cache (
known_publishers) and verification key materialisation. - Implement records queue consumer: PDS fetch + CAR verification + lexicon/structural validation + D1 writes, with
dead_lettersforensics and a DLQ drain handler. - Extend D1 schema for publishers/verification records, forensics tables, and release/package relationship semantics (incl. UNIQUE dedupe + ON DELETE CASCADE).
Reviewed changes
Copilot reviewed 15 out of 16 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| pnpm-workspace.yaml | Adds @atcute/identity to workspace catalog for DID doc parsing. |
| apps/aggregator/package.json | Pulls @atcute/identity into aggregator dependencies. |
| apps/aggregator/wrangler.jsonc | Adds a second queue consumer binding to drain the DLQ. |
| apps/aggregator/src/index.ts | Routes queue batches to either main consumer or DLQ drain by batch.queue. |
| apps/aggregator/src/records-consumer.ts | New: core consumer logic, writers, delete handling, forensics, DLQ draining. |
| apps/aggregator/src/pds-verify.ts | New: fetch + CAR verification wrapper with structured error reasons + retry policy helper. |
| apps/aggregator/src/did-resolver.ts | New: DID resolver with TTL cache and D1-backed cache implementation. |
| apps/aggregator/src/constants.ts | Expands subscribed collections to include publisher.*. |
| apps/aggregator/src/jetstream-client.ts | Hardens commit-event narrowing via a structural predicate. |
| apps/aggregator/src/records-do.ts | Updates DO base class typing usage. |
| apps/aggregator/migrations/0001_init.sql | Schema additions for publishers/verifications/dead_letters + constraint/index changes. |
| apps/aggregator/test/records-consumer.test.ts | New: extensive consumer/writer/delete/dispatcher/DLQ drain tests. |
| apps/aggregator/test/pds-verify.test.ts | New: unit tests for HTTP/error-shaping and transient mapping. |
| apps/aggregator/test/did-resolver.test.ts | New: unit + D1 binding contract tests for DID resolver/cache. |
| apps/aggregator/test/jetstream-client.test.ts | Updates test stubs to satisfy stricter commit-event validation. |
Files not reviewed (1)
- pnpm-lock.yaml: Language not supported
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // record between Jetstream emitting and us fetching, in which case the | ||
| // caller should ack without forensics. Other 4xx (auth, bad request) | ||
| // are programming errors and warrant forensics. |
| /** | ||
| * Parse a release rkey of the form `<package>:<encoded-version>` back into its | ||
| * components. Returns null on malformed input — callers should treat that as | ||
| * "this isn't a release we recognise" and no-op. | ||
| * | ||
| * Splits on the FIRST `:`. Both `package` (slug regex) and `version` (semver | ||
| * subset) reject `:` in the lexicon, so a single split is unambiguous. | ||
| * | ||
| * `decodeURIComponent` throws URIError on malformed `%`-escapes (e.g. | ||
| * `1.0.0%XX`). Callers must not let URIError propagate — the dispatcher's | ||
| * delete branch maps non-IngestError throws to `controller.retry()`, which | ||
| * would burn 5 attempts on a permanently malformed rkey. Catch + null-out | ||
| * here. | ||
| */ |
| -- `payload` holds the unverified record bytes from the Jetstream event so an | ||
| -- operator can inspect what was attempted without going back to the source PDS. | ||
| CREATE TABLE dead_letters ( | ||
| id INTEGER PRIMARY KEY AUTOINCREMENT, | ||
| did TEXT NOT NULL, | ||
| collection TEXT NOT NULL, | ||
| rkey TEXT NOT NULL, | ||
| reason TEXT NOT NULL, -- 'BAD_SIGNATURE', 'MST_PROOF_FAIL', 'LEXICON_FAIL', 'AT_URI_MISMATCH', 'RKEY_MISMATCH', 'CONTENT_MISMATCH' | ||
| detail TEXT, -- free-form context (which field, expected vs got) | ||
| payload BLOB NOT NULL, -- unverified record bytes for inspection |
| capabilities = ( | ||
| SELECT json_group_array(key) FROM json_each( | ||
| (SELECT json_extract(emdash_extension, '$.declaredAccess') | ||
| FROM releases | ||
| WHERE did = packages.did AND package = packages.slug AND tombstoned_at IS NULL | ||
| ORDER BY version_sort DESC LIMIT 1) | ||
| ) | ||
| ) |
| OR capabilities IS NOT ( | ||
| SELECT json_group_array(key) FROM json_each( | ||
| (SELECT json_extract(emdash_extension, '$.declaredAccess') | ||
| FROM releases | ||
| WHERE did = packages.did AND package = packages.slug AND tombstoned_at IS NULL | ||
| ORDER BY version_sort DESC LIMIT 1) | ||
| ) | ||
| ) |
|
|
||
| it("retries the message when MissingDependencyError surfaces in the dispatcher", async () => { | ||
| // The dispatcher should map MissingDependencyError → controller.retry(). | ||
| // Cache-seed but don't seed a profile; verifyAndIngest runs through | ||
| // resolver → fetch (returns garbage so verifyRecord throws) — that's | ||
| // not the right path. Instead, build a verified record path via stub | ||
| // fetch returning... actually simpler: skip the dispatcher and assert | ||
| // directly on the writer (other dispatcher branches are covered). | ||
| // Coverage of dispatcher's retry-on-MissingDependency lives in the | ||
| // dispatcher dedicated suite below. | ||
| }); |
| // itself has no DLQ — repeated drain failures get acked | ||
| // after retries. |
Seven Copilot comments on PR #975. One real bug + four doc/comment fixes + one test cleanup + one cosmetic. REAL BUG - json_group_array(key) over json_each enumerates in unspecified order, so the capabilities JSON string could vary between runs even when the key set is unchanged — defeating the WHERE-AND-IS-NOT short-circuit and re-firing the packages_au trigger on every idempotent refresh. Wrapped both subqueries in `(SELECT key FROM json_each(...) ORDER BY key)` so the resulting array is order-stable. DOC FIXES - pds-verify.ts 404 comment claimed "ack without forensics", but the consumer writes a dead_letters row for RECORD_NOT_FOUND (and tests assert that behavior). Updated comment to match the implemented policy: forensics for both the legitimate-race case and the programming-error case, distinguished by reason code for queryability. - parseReleaseRkey docstring claimed "callers should treat null as no-op", but applyDelete now throws IngestError on null to surface the malformed delete in dead_letters. Updated docstring to describe the actual contract. - dead_letters table comment listed reason values that don't match the implemented `DeadLetterReason` union. Replaced with the actual list and noted the union lives in records-consumer.ts. Also clarified payload is UTF-8 JSON bytes, not raw record bytes. - wrangler.jsonc DLQ-consumer comment claimed "repeated failures get acked after retries". Actual behavior: handler calls retry() on D1 failure, max_retries: 3 then workerd drops (no DLQ-of-DLQ). TEST CLEANUP - Removed an empty placeholder test that documented why the dispatcher's retry-on-MissingDependency wasn't covered. The proper coverage now exists in the dispatcher suite via ConsumerDeps.verify injection; the placeholder was dead. - Stripped review-finding shorthand codes (Mi6, B3, round-3 #1, etc.) from describe-block names. The codes were leaking review-process scaffolding into permanent test descriptions. 91 tests pass (was 92 — dropped the empty placeholder). Typecheck clean. Zero lint diagnostics in apps/aggregator.
| // Hard-delete the profile. Releases hang off the profile via FK; we | ||
| // don't cascade because doing so silently throws away publication | ||
| // history. Operators inspecting an "orphaned" release row can tell | ||
| // the publisher deleted the profile. |
| /** Force a re-resolution next time. Used by the verification path on | ||
| * signature failure (the cached signing key may be stale after a | ||
| * publisher key rotation). */ | ||
| async invalidate(did: string): Promise<void> { | ||
| // The cache doesn't expose a delete because invalidation is rare and | ||
| // we want the row to stay around as a "known publisher" record. | ||
| // Setting `pds_resolved_at` to a long-past time forces re-resolve on | ||
| // the next `resolve()` call without losing the discovery membership. | ||
| const long_ago = new Date(0); | ||
| const cached = await this.cache.read(asDid(did)); | ||
| if (!cached) return; | ||
| await this.cache.upsert( | ||
| asDid(did), | ||
| { pds: cached.pds, signingKey: cached.signingKey, signingKeyId: cached.signingKeyId }, | ||
| long_ago, | ||
| ); |
| -- same version, same payload) so a hostile publisher pumping the same bytes | ||
| -- doesn't fill the audit table — each unique (did, package, version, | ||
| -- attempted_record_blob) tuple writes at most one row. The consumer's | ||
| -- INSERT carries `ON CONFLICT … DO NOTHING` to honour this. |
| function isCommitEvent(event: MaybeCommitEvent): event is JetstreamCommitEvent { | ||
| return ( | ||
| event.kind === "commit" && | ||
| event.commit !== undefined && | ||
| typeof event.commit.collection === "string" && |
Four more Copilot comments. Two real bugs + two doc fixes. REAL BUGS - DidResolver.invalidate() bumped known_publishers.last_seen_at backwards to 1970-01-01. The implementation called cache.upsert() with `new Date(0)`, and the D1 binding writes the same timestamp to all three of `pds_resolved_at`, `first_seen_at`, and `last_seen_at`. Net effect: invalidating a publisher's cached signing key (after a key rotation) corrupted the membership/observation timestamps as a side-effect. Fix: added a dedicated `expire(did)` method to the DidDocCache interface. The D1 binding's expire only touches `pds_resolved_at` (sets to epoch); the Map-backed test cache does the same in-memory. Resolver's `invalidate()` delegates to `cache.expire()` instead of forcing the issue through `upsert()`. - isCommitEvent didn't validate `commit.cid`. The ingestor reads `event.commit.cid` for non-delete operations, so a malformed commit event with no cid would slip through and produce a RecordsJob with `cid: undefined` — which the consumer would then try to verify against, with no useful error path. Fix: extended the predicate to require `cid` to be a string when `operation !== "delete"`. Delete events legitimately have no cid so the check is conditional. Test stub updated to match. DOC FIXES - applyDelete comment for package.profile claimed "we don't cascade because doing so silently throws away publication history". The schema FK has been ON DELETE CASCADE since the round-1 fixes (so out-of-order Jetstream delivery doesn't fail with FK violation when a profile-delete arrives before its release-deletes). Updated the comment to describe the actual cascade behavior + the audit history that DOES survive (release_duplicate_attempts, dead_letters). - release_duplicate_attempts schema comment said the consumer used `ON CONFLICT … DO NOTHING`, but the round-3 fix changed it to `DO UPDATE SET rejected_at = excluded.rejected_at` so operators can read freshness from the audit row. Updated the comment. 95 tests pass (was 91; added 2 jetstream-client cid-validation tests + 2 did-resolver expire tests). Typecheck clean. Zero lint diagnostics in apps/aggregator.
|
/review |
There was a problem hiding this comment.
Adversarial review pass. The PR has had three rounds of in-tree adversarial review plus two prior Copilot passes; the substantive findings I would have raised on a cold read are already addressed in-tree (parent-profile presence check, mapPdsReason guarded throw, DLQ-drain D1-failure path, idempotent-refresh short-circuit at the SQL level, capabilities array order-stability, parseReleaseRkey URIError catch, isCommitEvent cid validation for create/update, applyDelete malformed-rkey audit trail, release_duplicate_attempts UNIQUE + DO UPDATE rejected_at).
One low-severity batch-isolation observation. Nothing blocking.
| ); | ||
| controller.ack(); | ||
| } | ||
| } |
There was a problem hiding this comment.
LOW: a writeDeadLetter throw aborts the rest of the batch loop.
If the D1 INSERT inside writeDeadLetter fails (transient D1 hiccup, schema lock, lock timeout), the throw escapes processMessage and propagates up to processBatch, halting the for (const message of batch.messages) loop. Subsequent messages in the same batch never get ack() or retry() called and will be redelivered by the platform.
With max_batch_size: 25, one D1 hiccup mid-batch can cause up to 24 messages to redo their full PDS verification round-trips on redelivery. The downstream D1 writes are idempotent (upserts), so this isn't a correctness issue — just wasted PDS bandwidth and verification CPU.
The analogous robustness exists in drainDeadLetterBatch (which catches the D1 failure and calls message.retry()); the main consumer's batch loop in processBatch doesn't have that isolation.
Fix: wrap the per-message call in processBatch with a try/catch:
for (const message of batch.messages) {
try {
await processMessage(message.body, message, deps);
} catch (err) {
console.error("[aggregator] processMessage threw unexpectedly", {
did: message.body.did,
rkey: message.body.rkey,
error: err instanceof Error ? err.message : String(err),
});
message.retry();
}
}Not a blocker — the redelivery semantics make it self-healing — but worth the couple of lines for batch isolation, and matches the defensive shape drainDeadLetterBatch already uses.
|
Review posted successfully: #975 (review) Posted a single-comment review on PR #975. The PR has had three rounds of adversarial review plus two Copilot passes — every cold-read concern I'd raise was already addressed in-tree (the prior copilot findings checked: 404-with-forensics comment, One LOW finding posted (records-consumer.ts:287): |
| return `${pad(major)}.${pad(minor)}.${pad(patch)}.${padded.join(".")}`; | ||
| } | ||
| return `${pad(major)}.${pad(minor)}.${pad(patch)}.zzz`; | ||
| } |
| version = decodeURIComponent(encodedVersion); | ||
| } catch { | ||
| return null; | ||
| } |
| function isCommitEvent(event: MaybeCommitEvent): event is JetstreamCommitEvent { | ||
| if (event.kind !== "commit" || event.commit === undefined) return false; | ||
| const c = event.commit; | ||
| if ( | ||
| typeof c.collection !== "string" || | ||
| typeof c.rkey !== "string" || | ||
| typeof c.operation !== "string" | ||
| ) { | ||
| return false; | ||
| } | ||
| // `cid` is required for create/update (the ingestor reads it into the | ||
| // RecordsJob); delete events legitimately have no cid. Validate | ||
| // conditionally so a malformed create/update with missing cid doesn't | ||
| // slip through and produce a job with `cid: undefined`. | ||
| if (c.operation !== "delete" && typeof c.cid !== "string") return false; | ||
| return true; |
| * Cover the HTTP / error-shaping logic with a stub `fetch`. The actual | ||
| * verification handoff to `@atcute/repo`'s `verifyRecord` is exercised | ||
| * end-to-end at the consumer level (where the full MockPds + FakePublisher | ||
| * fixture is wired in), because building a valid signed CAR by hand here | ||
| * would re-implement what `@atcute/repo` already tests internally. |
| const existing = await db | ||
| .prepare( | ||
| `SELECT record_blob, tombstoned_at | ||
| FROM releases WHERE did = ? AND package = ? AND version = ?`, | ||
| ) | ||
| .bind(job.did, record.package, record.version) | ||
| .first<{ record_blob: ArrayBuffer | Uint8Array; tombstoned_at: string | null }>(); | ||
| if (existing) { | ||
| const sameContent = bytesEqual(toUint8(existing.record_blob), verified.carBytes); |
ask-bonk + Copilot caught 5 real bugs and 1 stale comment.
REAL BUGS
- Duplicate-version detection compared raw CAR bytes. CARs include
the publisher's commit + MST proof, which churns whenever the
publisher writes any other record in the same repo — so a benign
re-fetch of an unchanged record produced different bytes and got
misclassified as an immutability violation.
Fix: compare the verified record CID instead. CIDs are
content-addressed and stable for unchanged records. Schema
updated: `release_duplicate_attempts` gained `attempted_cid TEXT
NOT NULL` and the UNIQUE constraint now uses it instead of
`attempted_record_blob`. The blob is still kept for forensics so
operators can inspect what was actually attempted; the conflict
clause refreshes both `rejected_at` and `attempted_record_blob`
so the latest envelope wins. Comparison logic in the writer reads
the existing CID out of `signature_metadata` JSON via a small
`parseCid` helper.
- `processBatch` for-loop didn't isolate per-message failures. A
`writeDeadLetter` throw inside `processMessage` (e.g. transient D1
hiccup mid-batch) escaped to the loop and halted it, leaving
subsequent messages without ack/retry. With max_batch_size 25,
one D1 hiccup could waste up to 24 PDS verification round-trips
on redelivery. Now each message is wrapped in try/catch with a
retry() on uncaught throw, matching the shape `drainDeadLetterBatch`
already uses.
- `computeVersionSort`'s `.zzz` final-release sentinel didn't
actually beat all valid prereleases — `1.0.0-zzzz` sorts AFTER
`1.0.0` because `"zzzz"` is lexically greater than `"zzz"`. Switched
the sentinel to `~` (ASCII 126), one above the prerelease
alphabet's max char (`z` at 122). Final releases now sort after
any prerelease at the same major.minor.patch.
- `parseReleaseRkey` validated nothing about its components — a
malformed rkey like `demo:1.0.0:extra` parsed as
`pkg=demo`, `version=1.0.0:extra`, then UPDATE matched no row and
silently acked. Now validates pkg with PACKAGE_SLUG_RE and version
with SEMVER_RE; returns null on mismatch so applyDelete throws
IngestError → forensics row.
- `isCommitEvent` accepted any string for `commit.operation`. An
unknown operation slipping through would produce a RecordsJob the
consumer can't dispatch on — landing as UNEXPECTED_ERROR in
dead_letters. Now restricts to {create, update, delete}.
DOC FIX
- pds-verify.test.ts header claimed end-to-end coverage of the
`verifyRecord` handoff exists in records-consumer.test.ts via
MockPds + FakePublisher. Actually it's stubbed via
ConsumerDeps.verify — the fixture can't load in the workers test
pool. Updated comment to explain the actual coverage shape.
102 tests pass (was 95). Typecheck clean. Zero lint diagnostics.
What does this PR do?
Replaces the no-op
queue()handler inapps/aggregator/src/index.tswith the full PDS-verified ingest pipeline for the four record collections in scope (package.profile,package.release,publisher.profile,publisher.verification). Third slice on the same workstream as #971 (scaffold) and #972 (Jetstream DO).The end-to-end shape: Jetstream → DO → Records Queue → consumer → D1, with structured forensics in
dead_lettersand a DLQ-drain handler for messages that exhaust retries.Five commits — the foundational PR plus a DID resolver prerequisite plus three rounds of adversarial-review fixes:
feat(aggregator): DID resolver with known_publishers cache—DidResolverclass with constructor injection (cache + resolver + ttl + now), 24h TTL'd cache backed byknown_publishers(extended withsigning_key/signing_key_idcolumns so the table doubles as the DID-doc resolution cache). Materialises cached multibase keys toPublicKeyinstances via@atcute/crypto, exhaustive on the p256/secp256k1 union.invalidate(did)for the post-key-rotation re-resolve path. Production bindingcreateD1DidDocCache(db)ships in the same file.feat(aggregator): records consumer with PDS-verified ingest— the substantive piece. Per-job pipeline: resolve PDS+key → fetch CAR viacom.atproto.sync.getRecord→@atcute/repo'sverifyRecord(MST + signature in one call) → lexicon-validate via@atcute/lexiconssafeParseagainst the schema from@emdash-cms/registry-lexicons→ per-collection structural checks → D1 write. Error policy: PDS network/timeout/5xx retry; 404, oversized response, signature/MST/lexicon failure, structural failure →dead_lettersrow +message.ack(); unexpected programming errors log loud +dead_letters+ ack so the queue isn't blocked. Schema additions (still pre-deploy):publishers,publisher_verifications,dead_letters, signing-key cache columns onknown_publishers,ON DELETE CASCADEon releases→packages FK.fix(aggregator): adversarial review fixes for records consumer— first review pass found 4 blockers + 7 majors. Fixes covered B1 (packages.latest_version/capabilitieswere never written), B2 (re-publish of tombstoned release silently no-op'd), B3 (releaseextensionsfield wasn't validated), M1 (package.profile.security[]contact rule unenforced), M3 (semver versions over the 10-digit ceiling silently produced wrong sort), M5/M6/N1 (mapPdsReasonhad no compile-time totality), M9 (JSON.stringify discrepancy comparison was a noise generator), Mi5 (applyDeletefor unknown collections silently dropped), Mi6 (record.packagecharset unchecked → ambiguous rkey hole), Mi7 (release event arriving before its profile was treated asUNEXPECTED_ERRORwith no recovery — newMissingDependencyError+ dispatcher retry), B4 (processBatchaccepts an optionaldepsOverridefor testability).fix(aggregator): round 2 adversarial review fixes— second pass found 8 byproducts of the round-1 fixes themselves. Fixes covered TOCTOU race inrefreshPackageLatest(single UPDATE with correlated subqueries), atomicity gap on insert+refresh (db.batchfor atomic commit),mapPdsReasonthrow escaping the catch (wrapped in its own try/catch),isCommitEventpredicate too permissive (now structurally validates thecommitfield),parseReleaseRkeyURIError on malformed%-encoding (caught and null), DLQ had no consumer (added drain handler that logs + writes forensics row),release_duplicate_attemptsunbounded under spam (UNIQUE constraint + ON CONFLICT DO NOTHING), MissingDependencyError test that didn't actually exercise the path (now usesConsumerDeps.verifyinjection).fix(aggregator): round 3 adversarial review fixes— third pass found 5 bugs in the round-2 fixes. Fixes covered DLQ drain silently dropping messages on D1 failure (moveack()inside try,retry()on error), FTS thrashing on idempotent refresh (extendedREFRESH_PACKAGE_LATEST_SQLWHERE clause to short-circuit at SQL level when values are unchanged so thepackages_autrigger doesn't fire),applyDeletemalformed rkey lost the audit trail (now throwsIngestError→dead_lettersrow),release_duplicate_attempts.rejected_atfrozen at first attempt (DO UPDATE SET tracks latest), and a redundant index that the new UNIQUE constraint already covered.A round-4 review hasn't run; the round-3 fixes are minimal and the prior two rounds were each substantial. Happy to spin up another round if you want belt-and-braces, but this slice has had three rounds of adversarial review with all findings addressed in-tree.
Closes #
Type of change
Checklist
pnpm typecheckpassespnpm lintpasses (no new diagnostics inapps/aggregator; pre-existing diagnostics elsewhere unchanged)pnpm testpasses (92 aggregator tests, 17 atproto-test-utils tests, rest of workspace unchanged)pnpm formathas been runapps/aggregatorandpackages/atproto-test-utilsare private)AI-generated code disclosure
Test output
Notes for review
Schema additions pre-deploy. The aggregator's D1 hasn't been deployed yet, so
0001_init.sqlwas extended in-place across each round (cache columns onknown_publishers, newdead_letters,publishers,publisher_verifications,ON DELETE CASCADEon the releases FK, UNIQUE onrelease_duplicate_attempts). No follow-up migration needed.Two known coverage gaps documented in code:
FakePublisher+MockPdsrequires a node-pool vitest project; the fixture's@atproto/repotransitivelex-datadep doesn't load inside@cloudflare/vitest-pool-workers@0.16.x's CJS shim layer (verified — the pre-release commit cirrus pins predates this regression). Each layer is unit-tested independently; the wiring between resolver → pds-verify → consumer → D1 isn't exercised by an integration test.releases.ctscolumn mirrorsverified_atbecause the release lexicon doesn't expose a creation timestamp andverifyRecorddoesn't surface the commitrev. TODO comment in the writer.Operational note on the DLQ. The drain handler currently logs + writes a forensics row + acks. Once the reconciliation pass lands (Slice 1 next chunk), it will replace this with retry-from-
listRecordsso legitimate-but-out-of-order release events that exhausted retries are recovered automatically. Until then this prevents permanent silent drops without a trail.Three rounds of adversarial review is more than the rule asks for. I ran them because each round found real bugs (round 1: schema-vs-code mismatches and missing structural checks; round 2: byproducts of the round-1 fixes; round 3: byproducts of the round-2 fixes). Round 3's findings were notably narrower than round 2's, which were narrower than round 1's — the slice has converged.