From 159948e9ab87ba8561a2f2d7d27ffc1c07644151 Mon Sep 17 00:00:00 2001 From: Matt Kane Date: Sun, 10 May 2026 15:50:39 +0100 Subject: [PATCH] fix(pds): prevent relay desync after failed write MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `applyWrites` was assigning the new `Repo` to in-memory state before sequencing the firehose event. If anything threw between then and `sequenceCommit` succeeding (e.g. mid-flight failure on an image post), Cloudflare rolled back the SQLite writes but the in-memory `Repo` stayed advanced. The next successful write then emitted a firehose commit whose `since` rev the relay had never seen, and the relay marked the repo desynced — requiring a manual `requestCrawl` to recover. `this.repo` is now only assigned after the sequence + broadcast succeed in all four write paths (`rpcCreateRecord`, `rpcDeleteRecord`, `rpcPutRecord`, `rpcApplyWrites`), and any failure in that window invalidates the in-memory cache so the next access reloads from storage. --- .changeset/firehose-relay-desync.md | 9 + packages/pds/src/account-do.ts | 415 +++++++++++++++------------- 2 files changed, 228 insertions(+), 196 deletions(-) create mode 100644 .changeset/firehose-relay-desync.md diff --git a/.changeset/firehose-relay-desync.md b/.changeset/firehose-relay-desync.md new file mode 100644 index 0000000..4a6b16e --- /dev/null +++ b/.changeset/firehose-relay-desync.md @@ -0,0 +1,9 @@ +--- +"@getcirrus/pds": patch +--- + +Fix relay desync after a failed write (e.g. an image post that errors mid-flight). + +`applyWrites` was assigning the new `Repo` to in-memory state before sequencing the firehose event. If anything threw between then and `sequenceCommit` succeeding, Cloudflare rolled back the SQLite writes but the in-memory `Repo` stayed advanced. The next successful write then emitted a firehose commit whose `since` rev the relay had never seen, and the relay marked the repo desynced — requiring a manual `requestCrawl` to recover. + +`this.repo` is now only assigned after the sequence + broadcast succeed, and any failure in that window invalidates the in-memory cache so the next access reloads from storage. diff --git a/packages/pds/src/account-do.ts b/packages/pds/src/account-do.ts index 136bfcb..acd6938 100644 --- a/packages/pds/src/account-do.ts +++ b/packages/pds/src/account-do.ts @@ -210,6 +210,18 @@ export class AccountDurableObject extends DurableObject { this.repo = repo; } + /** + * Drop the in-memory repo so the next access reloads from storage. + * Used after a write fails post-applyWrites: Cloudflare rolls back the + * SQLite writes, but JS state isn't rolled back, so the cached Repo can + * end up ahead of storage. That mismatch produces firehose events whose + * `since` rev the relay never saw, causing it to mark us desynced. + */ + private invalidateRepoCache(): void { + this.repo = null; + this.repoInitialized = false; + } + /** * RPC method: Get repo metadata for describeRepo */ @@ -368,61 +380,64 @@ export class AccountDurableObject extends DurableObject { const prevRev = repo.commit.rev; const updatedRepo = await repo.applyWrites([createOp], keypair); - this.repo = updatedRepo; - // Get the CID for the created record from the MST - const dataKey = `${collection}/${actualRkey}`; - const recordCid = await this.repo.data.get(dataKey); + try { + const dataKey = `${collection}/${actualRkey}`; + const recordCid = await updatedRepo.data.get(dataKey); - if (!recordCid) { - throw new Error(`Failed to create record: ${collection}/${actualRkey}`); - } + if (!recordCid) { + throw new Error( + `Failed to create record: ${collection}/${actualRkey}`, + ); + } + + this.storage!.addCollection(collection); + + if (this.sequencer) { + const newBlocks = new BlockMap(); + const rows = this.ctx.storage.sql + .exec( + "SELECT cid, bytes FROM blocks WHERE rev = ?", + updatedRepo.commit.rev, + ) + .toArray(); + + for (const row of rows) { + const cid = CID.parse(row.cid as string); + const bytes = new Uint8Array(row.bytes as ArrayBuffer); + newBlocks.set(cid, bytes); + } + + const opWithCid = { ...createOp, cid: recordCid }; - // Update collections cache - this.storage!.addCollection(collection); - - // Sequence the commit for firehose - if (this.sequencer) { - // Get blocks that changed - const newBlocks = new BlockMap(); - const rows = this.ctx.storage.sql - .exec( - "SELECT cid, bytes FROM blocks WHERE rev = ?", - this.repo.commit.rev, - ) - .toArray(); - - for (const row of rows) { - const cid = CID.parse(row.cid as string); - const bytes = new Uint8Array(row.bytes as ArrayBuffer); - newBlocks.set(cid, bytes); + const commitData: CommitData = { + did: updatedRepo.did, + commit: updatedRepo.cid, + rev: updatedRepo.commit.rev, + since: prevRev, + newBlocks, + ops: [opWithCid], + }; + + const event = await this.sequencer.sequenceCommit(commitData); + await this.broadcastCommit(event); } - // Include the record CID in the op for the firehose - const opWithCid = { ...createOp, cid: recordCid }; + this.repo = updatedRepo; - const commitData: CommitData = { - did: this.repo.did, - commit: this.repo.cid, - rev: this.repo.commit.rev, - since: prevRev, - newBlocks, - ops: [opWithCid], + return { + uri: `at://${updatedRepo.did}/${collection}/${actualRkey}`, + cid: recordCid.toString(), + commit: { + cid: updatedRepo.cid.toString(), + rev: updatedRepo.commit.rev, + }, + ...(validationStatus !== undefined ? { validationStatus } : {}), }; - - const event = await this.sequencer.sequenceCommit(commitData); - await this.broadcastCommit(event); + } catch (err) { + this.invalidateRepoCache(); + throw err; } - - return { - uri: `at://${this.repo.did}/${collection}/${actualRkey}`, - cid: recordCid.toString(), - commit: { - cid: this.repo.cid.toString(), - rev: this.repo.commit.rev, - }, - ...(validationStatus !== undefined ? { validationStatus } : {}), - }; } /** @@ -447,44 +462,48 @@ export class AccountDurableObject extends DurableObject { const prevRev = repo.commit.rev; const updatedRepo = await repo.applyWrites([deleteOp], keypair); - this.repo = updatedRepo; - - // Sequence the commit for firehose - if (this.sequencer) { - // Get blocks that changed - const newBlocks = new BlockMap(); - const rows = this.ctx.storage.sql - .exec( - "SELECT cid, bytes FROM blocks WHERE rev = ?", - this.repo.commit.rev, - ) - .toArray(); - - for (const row of rows) { - const cid = CID.parse(row.cid as string); - const bytes = new Uint8Array(row.bytes as ArrayBuffer); - newBlocks.set(cid, bytes); + + try { + if (this.sequencer) { + const newBlocks = new BlockMap(); + const rows = this.ctx.storage.sql + .exec( + "SELECT cid, bytes FROM blocks WHERE rev = ?", + updatedRepo.commit.rev, + ) + .toArray(); + + for (const row of rows) { + const cid = CID.parse(row.cid as string); + const bytes = new Uint8Array(row.bytes as ArrayBuffer); + newBlocks.set(cid, bytes); + } + + const commitData: CommitData = { + did: updatedRepo.did, + commit: updatedRepo.cid, + rev: updatedRepo.commit.rev, + since: prevRev, + newBlocks, + ops: [deleteOp], + }; + + const event = await this.sequencer.sequenceCommit(commitData); + await this.broadcastCommit(event); } - const commitData: CommitData = { - did: this.repo.did, - commit: this.repo.cid, - rev: this.repo.commit.rev, - since: prevRev, - newBlocks, - ops: [deleteOp], - }; + this.repo = updatedRepo; - const event = await this.sequencer.sequenceCommit(commitData); - await this.broadcastCommit(event); + return { + commit: { + cid: updatedRepo.cid.toString(), + rev: updatedRepo.commit.rev, + }, + }; + } catch (err) { + this.invalidateRepoCache(); + throw err; } - - return { - commit: { - cid: updatedRepo.cid.toString(), - rev: updatedRepo.commit.rev, - }, - }; } /** @@ -526,59 +545,62 @@ export class AccountDurableObject extends DurableObject { const prevRev = repo.commit.rev; const updatedRepo = await repo.applyWrites([op], keypair); - this.repo = updatedRepo; - // Get the CID for the record from the MST - const dataKey = `${collection}/${rkey}`; - const recordCid = await this.repo.data.get(dataKey); + try { + const dataKey = `${collection}/${rkey}`; + const recordCid = await updatedRepo.data.get(dataKey); - if (!recordCid) { - throw new Error(`Failed to put record: ${collection}/${rkey}`); - } + if (!recordCid) { + throw new Error(`Failed to put record: ${collection}/${rkey}`); + } + + this.storage!.addCollection(collection); + + if (this.sequencer) { + const newBlocks = new BlockMap(); + const rows = this.ctx.storage.sql + .exec( + "SELECT cid, bytes FROM blocks WHERE rev = ?", + updatedRepo.commit.rev, + ) + .toArray(); + + for (const row of rows) { + const cid = CID.parse(row.cid as string); + const bytes = new Uint8Array(row.bytes as ArrayBuffer); + newBlocks.set(cid, bytes); + } - // Update collections cache - this.storage!.addCollection(collection); - - // Sequence the commit for firehose - if (this.sequencer) { - const newBlocks = new BlockMap(); - const rows = this.ctx.storage.sql - .exec( - "SELECT cid, bytes FROM blocks WHERE rev = ?", - this.repo.commit.rev, - ) - .toArray(); - - for (const row of rows) { - const cid = CID.parse(row.cid as string); - const bytes = new Uint8Array(row.bytes as ArrayBuffer); - newBlocks.set(cid, bytes); + const opWithCid = { ...op, cid: recordCid }; + + const commitData: CommitData = { + did: updatedRepo.did, + commit: updatedRepo.cid, + rev: updatedRepo.commit.rev, + since: prevRev, + newBlocks, + ops: [opWithCid], + }; + + const event = await this.sequencer.sequenceCommit(commitData); + await this.broadcastCommit(event); } - const opWithCid = { ...op, cid: recordCid }; + this.repo = updatedRepo; - const commitData: CommitData = { - did: this.repo.did, - commit: this.repo.cid, - rev: this.repo.commit.rev, - since: prevRev, - newBlocks, - ops: [opWithCid], + return { + uri: `at://${updatedRepo.did}/${collection}/${rkey}`, + cid: recordCid.toString(), + commit: { + cid: updatedRepo.cid.toString(), + rev: updatedRepo.commit.rev, + }, + ...(validationStatus !== undefined ? { validationStatus } : {}), }; - - const event = await this.sequencer.sequenceCommit(commitData); - await this.broadcastCommit(event); + } catch (err) { + this.invalidateRepoCache(); + throw err; } - - return { - uri: `at://${this.repo.did}/${collection}/${rkey}`, - cid: recordCid.toString(), - commit: { - cid: this.repo.cid.toString(), - rev: this.repo.commit.rev, - }, - ...(validationStatus !== undefined ? { validationStatus } : {}), - }; } /** @@ -698,86 +720,87 @@ export class AccountDurableObject extends DurableObject { const prevRev = repo.commit.rev; const updatedRepo = await repo.applyWrites(ops, keypair); - this.repo = updatedRepo; - // Update collections cache for create/update ops - for (const op of ops) { - if (op.action !== WriteOpAction.Delete) { - this.storage!.addCollection(op.collection); + try { + for (const op of ops) { + if (op.action !== WriteOpAction.Delete) { + this.storage!.addCollection(op.collection); + } } - } - // Build final results with CIDs and prepare ops with CIDs for firehose - const finalResults: Array<{ - $type: string; - uri?: string; - cid?: string; - validationStatus?: ValidationStatus; - }> = []; - const opsWithCids: Array = []; + const finalResults: Array<{ + $type: string; + uri?: string; + cid?: string; + validationStatus?: ValidationStatus; + }> = []; + const opsWithCids: Array = []; + + for (let i = 0; i < results.length; i++) { + const result = results[i]!; + const op = ops[i]!; + + if (result.action === WriteOpAction.Delete) { + finalResults.push({ + $type: result.$type, + }); + opsWithCids.push(op); + } else { + const dataKey = `${result.collection}/${result.rkey}`; + const recordCid = await updatedRepo.data.get(dataKey); + finalResults.push({ + $type: result.$type, + uri: `at://${updatedRepo.did}/${result.collection}/${result.rkey}`, + cid: recordCid?.toString(), + ...(result.validationStatus !== undefined + ? { validationStatus: result.validationStatus } + : {}), + }); + opsWithCids.push({ ...op, cid: recordCid }); + } + } - for (let i = 0; i < results.length; i++) { - const result = results[i]!; - const op = ops[i]!; + if (this.sequencer) { + const newBlocks = new BlockMap(); + const rows = this.ctx.storage.sql + .exec( + "SELECT cid, bytes FROM blocks WHERE rev = ?", + updatedRepo.commit.rev, + ) + .toArray(); + + for (const row of rows) { + const cid = CID.parse(row.cid as string); + const bytes = new Uint8Array(row.bytes as ArrayBuffer); + newBlocks.set(cid, bytes); + } - if (result.action === WriteOpAction.Delete) { - finalResults.push({ - $type: result.$type, - }); - opsWithCids.push(op); - } else { - // Get the CID for create/update - const dataKey = `${result.collection}/${result.rkey}`; - const recordCid = await this.repo.data.get(dataKey); - finalResults.push({ - $type: result.$type, - uri: `at://${this.repo.did}/${result.collection}/${result.rkey}`, - cid: recordCid?.toString(), - ...(result.validationStatus !== undefined - ? { validationStatus: result.validationStatus } - : {}), - }); - // Include the record CID in the op for the firehose - opsWithCids.push({ ...op, cid: recordCid }); - } - } + const commitData: CommitData = { + did: updatedRepo.did, + commit: updatedRepo.cid, + rev: updatedRepo.commit.rev, + since: prevRev, + newBlocks, + ops: opsWithCids, + }; - // Sequence the commit for firehose - if (this.sequencer) { - const newBlocks = new BlockMap(); - const rows = this.ctx.storage.sql - .exec( - "SELECT cid, bytes FROM blocks WHERE rev = ?", - this.repo.commit.rev, - ) - .toArray(); - - for (const row of rows) { - const cid = CID.parse(row.cid as string); - const bytes = new Uint8Array(row.bytes as ArrayBuffer); - newBlocks.set(cid, bytes); + const event = await this.sequencer.sequenceCommit(commitData); + await this.broadcastCommit(event); } - const commitData: CommitData = { - did: this.repo.did, - commit: this.repo.cid, - rev: this.repo.commit.rev, - since: prevRev, - newBlocks, - ops: opsWithCids, - }; + this.repo = updatedRepo; - const event = await this.sequencer.sequenceCommit(commitData); - await this.broadcastCommit(event); + return { + commit: { + cid: updatedRepo.cid.toString(), + rev: updatedRepo.commit.rev, + }, + results: finalResults, + }; + } catch (err) { + this.invalidateRepoCache(); + throw err; } - - return { - commit: { - cid: this.repo.cid.toString(), - rev: this.repo.commit.rev, - }, - results: finalResults, - }; } /**