diff --git a/.changeset/firehose-prevdata.md b/.changeset/firehose-prevdata.md new file mode 100644 index 00000000..d1f423b3 --- /dev/null +++ b/.changeset/firehose-prevdata.md @@ -0,0 +1,7 @@ +--- +"@getcirrus/pds": patch +--- + +Include `prevData` in firehose `#commit` events. + +`prevData` is the MST root CID of the previous commit (the `data` field at the `since` rev) and is effectively required for the inductive version of the firehose. Without it, relays running strict commit validation (e.g. indigo with `LenientSyncValidation` off) fail verification with "missing prevData field" and reject every commit after the first, freezing the repo on that relay. It is now populated from the pre-write repo's `commit.data` at each write path. diff --git a/packages/pds/scripts/verify-firehose.ts b/packages/pds/scripts/verify-firehose.ts index 9c8ca985..71854568 100644 --- a/packages/pds/scripts/verify-firehose.ts +++ b/packages/pds/scripts/verify-firehose.ts @@ -32,6 +32,9 @@ interface CommitEvent { commit: Cid; rev: string; since: string | null; + // Optional: absent on legacy (pre-prevData) firehose data and on any + // initial commit where `since` is null. + prevData?: Cid; blocks: Uint8Array; ops: CommitOp[]; blobs: Cid[]; diff --git a/packages/pds/src/account-do.ts b/packages/pds/src/account-do.ts index 3bfb5f92..9cb550eb 100644 --- a/packages/pds/src/account-do.ts +++ b/packages/pds/src/account-do.ts @@ -415,6 +415,7 @@ export class AccountDurableObject extends DurableObject { commit: updatedRepo.cid, rev: updatedRepo.commit.rev, since: prevRev, + prevData: repo.commit.data, newBlocks, ops: [opWithCid], }; @@ -484,6 +485,7 @@ export class AccountDurableObject extends DurableObject { commit: updatedRepo.cid, rev: updatedRepo.commit.rev, since: prevRev, + prevData: repo.commit.data, newBlocks, ops: [deleteOp], }; @@ -589,6 +591,7 @@ export class AccountDurableObject extends DurableObject { commit: updatedRepo.cid, rev: updatedRepo.commit.rev, since: prevRev, + prevData: repo.commit.data, newBlocks, ops: [opWithCid], }; @@ -791,6 +794,7 @@ export class AccountDurableObject extends DurableObject { commit: updatedRepo.cid, rev: updatedRepo.commit.rev, since: prevRev, + prevData: repo.commit.data, newBlocks, ops: opsWithCids, }; diff --git a/packages/pds/src/sequencer.ts b/packages/pds/src/sequencer.ts index e6a96a63..6fb1e3b1 100644 --- a/packages/pds/src/sequencer.ts +++ b/packages/pds/src/sequencer.ts @@ -14,6 +14,11 @@ export interface CommitEvent { commit: CID; rev: string; since: string | null; + // Root CID of the MST for the previous commit (the `data` field of the + // commit at the `since` rev). Required for relays doing inductive firehose + // verification (com.atproto.sync.subscribeRepos#commit `prevData`). Nullable + // to mirror `since`; in practice every write has a prior commit so it is set. + prevData: CID | null; blocks: Uint8Array; ops: RepoOp[]; blobs: CID[]; @@ -72,6 +77,7 @@ export interface CommitData { commit: CID; rev: string; since: string | null; + prevData: CID | null; newBlocks: BlockMap; ops: Array; } @@ -102,6 +108,7 @@ export class Sequencer { commit: data.commit, rev: data.rev, since: data.since, + prevData: data.prevData, blocks: carBytes, ops: data.ops.map( (op): RepoOp => ({ diff --git a/packages/pds/test/firehose.test.ts b/packages/pds/test/firehose.test.ts index 7487ecb0..a07e137a 100644 --- a/packages/pds/test/firehose.test.ts +++ b/packages/pds/test/firehose.test.ts @@ -343,6 +343,45 @@ describe("Firehose (subscribeRepos)", () => { }); }); + it("should include prevData matching the previous commit's MST root", async () => { + const id = env.ACCOUNT.idFromName("account"); + const stub = env.ACCOUNT.get(id); + + await runInDurableObject(stub, async (instance: AccountDurableObject) => { + await instance.getStorage(); + const sequencer = (instance as any).sequencer; + const encodeEventFrame = (instance as any).encodeEventFrame.bind( + instance, + ); + + // Load the repo and capture the current (soon-to-be-previous) state. + await instance.rpcGetRepoStatus(); + const prevRepo = (instance as any).repo; + const expectedPrevData = prevRepo.commit.data.toString(); + const expectedSince = prevRepo.commit.rev; + + const seqBefore = sequencer.getLatestSeq(); + await instance.rpcCreateRecord("app.bsky.feed.post", "prevdata-test", { + text: "prevData test", + createdAt: new Date().toISOString(), + }); + + const events = await sequencer.getEventsSince(seqBefore, 1); + const frame = encodeEventFrame(events[0] as SeqCommitEvent); + const { body } = decodeFrame(frame); + const commitBody = body as { + prevData?: { toString(): string }; + since?: string; + }; + + // prevData must be present (relays require it for verification) + // and equal the data CID of the commit at the `since` rev. + expect(commitBody.prevData).toBeDefined(); + expect(commitBody.prevData!.toString()).toBe(expectedPrevData); + expect(commitBody.since).toBe(expectedSince); + }); + }); + it("should encode identity events with #identity frame type", async () => { const id = env.ACCOUNT.idFromName("account"); const stub = env.ACCOUNT.get(id);