Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/firehose-prevdata.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@getcirrus/pds": patch
---

Include `prevData` in firehose `#commit` events.

`prevData` is the MST root CID of the previous commit (the `data` field at the `since` rev) and is effectively required for the inductive version of the firehose. Without it, relays running strict commit validation (e.g. indigo with `LenientSyncValidation` off) fail verification with "missing prevData field" and reject every commit after the first, freezing the repo on that relay. It is now populated from the pre-write repo's `commit.data` at each write path.
3 changes: 3 additions & 0 deletions packages/pds/scripts/verify-firehose.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ interface CommitEvent {
commit: Cid;
rev: string;
since: string | null;
// Optional: absent on legacy (pre-prevData) firehose data and on any
// initial commit where `since` is null.
prevData?: Cid;
blocks: Uint8Array;
ops: CommitOp[];
blobs: Cid[];
Expand Down
4 changes: 4 additions & 0 deletions packages/pds/src/account-do.ts
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ export class AccountDurableObject extends DurableObject<PDSEnv> {
commit: updatedRepo.cid,
rev: updatedRepo.commit.rev,
since: prevRev,
prevData: repo.commit.data,
newBlocks,
ops: [opWithCid],
};
Expand Down Expand Up @@ -484,6 +485,7 @@ export class AccountDurableObject extends DurableObject<PDSEnv> {
commit: updatedRepo.cid,
rev: updatedRepo.commit.rev,
since: prevRev,
prevData: repo.commit.data,
newBlocks,
ops: [deleteOp],
};
Expand Down Expand Up @@ -589,6 +591,7 @@ export class AccountDurableObject extends DurableObject<PDSEnv> {
commit: updatedRepo.cid,
rev: updatedRepo.commit.rev,
since: prevRev,
prevData: repo.commit.data,
newBlocks,
ops: [opWithCid],
};
Expand Down Expand Up @@ -791,6 +794,7 @@ export class AccountDurableObject extends DurableObject<PDSEnv> {
commit: updatedRepo.cid,
rev: updatedRepo.commit.rev,
since: prevRev,
prevData: repo.commit.data,
newBlocks,
ops: opsWithCids,
};
Expand Down
7 changes: 7 additions & 0 deletions packages/pds/src/sequencer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ export interface CommitEvent {
commit: CID;
rev: string;
since: string | null;
// Root CID of the MST for the previous commit (the `data` field of the
// commit at the `since` rev). Required for relays doing inductive firehose
// verification (com.atproto.sync.subscribeRepos#commit `prevData`). Nullable
// to mirror `since`; in practice every write has a prior commit so it is set.
prevData: CID | null;
blocks: Uint8Array;
ops: RepoOp[];
blobs: CID[];
Expand Down Expand Up @@ -72,6 +77,7 @@ export interface CommitData {
commit: CID;
rev: string;
since: string | null;
prevData: CID | null;
newBlocks: BlockMap;
ops: Array<RecordWriteOp & { cid?: CID | null }>;
}
Expand Down Expand Up @@ -102,6 +108,7 @@ export class Sequencer {
commit: data.commit,
rev: data.rev,
since: data.since,
prevData: data.prevData,
blocks: carBytes,
ops: data.ops.map(
(op): RepoOp => ({
Expand Down
39 changes: 39 additions & 0 deletions packages/pds/test/firehose.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,45 @@ describe("Firehose (subscribeRepos)", () => {
});
});

it("should include prevData matching the previous commit's MST root", async () => {
const id = env.ACCOUNT.idFromName("account");
const stub = env.ACCOUNT.get(id);

await runInDurableObject(stub, async (instance: AccountDurableObject) => {
await instance.getStorage();
const sequencer = (instance as any).sequencer;
const encodeEventFrame = (instance as any).encodeEventFrame.bind(
instance,
);

// Load the repo and capture the current (soon-to-be-previous) state.
await instance.rpcGetRepoStatus();
const prevRepo = (instance as any).repo;
Comment thread
simnaut marked this conversation as resolved.
const expectedPrevData = prevRepo.commit.data.toString();
const expectedSince = prevRepo.commit.rev;

const seqBefore = sequencer.getLatestSeq();
await instance.rpcCreateRecord("app.bsky.feed.post", "prevdata-test", {
text: "prevData test",
createdAt: new Date().toISOString(),
});

const events = await sequencer.getEventsSince(seqBefore, 1);
const frame = encodeEventFrame(events[0] as SeqCommitEvent);
const { body } = decodeFrame(frame);
const commitBody = body as {
prevData?: { toString(): string };
since?: string;
};

// prevData must be present (relays require it for verification)
// and equal the data CID of the commit at the `since` rev.
expect(commitBody.prevData).toBeDefined();
expect(commitBody.prevData!.toString()).toBe(expectedPrevData);
expect(commitBody.since).toBe(expectedSince);
});
});

it("should encode identity events with #identity frame type", async () => {
const id = env.ACCOUNT.idFromName("account");
const stub = env.ACCOUNT.get(id);
Expand Down