From c278a17d1918139548b39c7ba943d2324e747263 Mon Sep 17 00:00:00 2001 From: Ben Dixon Date: Fri, 1 May 2026 12:41:58 +0100 Subject: [PATCH 1/4] fix(pds): Remove empty collections from cache on deletion of all records --- packages/pds/src/account-do.ts | 11 +++++++++++ packages/pds/src/storage.ts | 7 +++++++ 2 files changed, 18 insertions(+) diff --git a/packages/pds/src/account-do.ts b/packages/pds/src/account-do.ts index d072839..3d22300 100644 --- a/packages/pds/src/account-do.ts +++ b/packages/pds/src/account-do.ts @@ -421,6 +421,17 @@ export class AccountDurableObject extends DurableObject { const updatedRepo = await repo.applyWrites([deleteOp], keypair); this.repo = updatedRepo; + // If the collection has no records left, remove it from the cache + let collectionStillHasRecords = false; + for await (const remaining of this.repo.walkRecords(`${collection}/`)) { + if (remaining.collection !== collection) break; + collectionStillHasRecords = true; + break; + } + if (!collectionStillHasRecords) { + this.storage!.removeCollection(collection); + } + // Sequence the commit for firehose if (this.sequencer) { // Get blocks that changed diff --git a/packages/pds/src/storage.ts b/packages/pds/src/storage.ts index 5e934ae..cff3645 100644 --- a/packages/pds/src/storage.ts +++ b/packages/pds/src/storage.ts @@ -382,6 +382,13 @@ export class SqliteRepoStorage ); } + /** + * Remove a collection name from the cache (no-op if not present). + */ + removeCollection(collection: string): void { + this.sql.exec("DELETE FROM collections WHERE collection = ?", collection); + } + /** * Check if the collections cache has been populated. */ From f932ac510a3ce98eda0f10656339921d5c45f076 Mon Sep 17 00:00:00 2001 From: Ben Dixon Date: Fri, 1 May 2026 12:42:17 +0100 Subject: [PATCH 2/4] test(pds): Cover collection cache cleanup on deletion of all records --- packages/pds/test/storage.test.ts | 88 +++++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/packages/pds/test/storage.test.ts b/packages/pds/test/storage.test.ts index 1400f1a..70ac3a2 100644 --- a/packages/pds/test/storage.test.ts +++ b/packages/pds/test/storage.test.ts @@ -288,6 +288,94 @@ describe("SqliteRepoStorage", () => { }); }); +describe("Collections cache", () => { + it("addCollection / removeCollection / getCollections", async () => { + const id = env.ACCOUNT.newUniqueId(); + const stub = env.ACCOUNT.get(id); + + await runInDurableObject(stub, async (instance: AccountDurableObject) => { + const storage = await instance.getStorage(); + + expect(storage.getCollections()).toEqual([]); + expect(storage.hasCollections()).toBe(false); + + storage.addCollection("app.bsky.feed.post"); + storage.addCollection("app.bsky.feed.like"); + + expect(storage.getCollections()).toEqual([ + "app.bsky.feed.like", + "app.bsky.feed.post", + ]); + expect(storage.hasCollections()).toBe(true); + + storage.removeCollection("app.bsky.feed.post"); + expect(storage.getCollections()).toEqual(["app.bsky.feed.like"]); + + // removing a missing collection is a no-op + storage.removeCollection("app.bsky.feed.post"); + expect(storage.getCollections()).toEqual(["app.bsky.feed.like"]); + }); + }); + + it("removes a collection from the cache when its last record is deleted", async () => { + const id = env.ACCOUNT.newUniqueId(); + const stub = env.ACCOUNT.get(id); + + await runInDurableObject(stub, async (instance: AccountDurableObject) => { + await instance.getStorage(); + + await instance.rpcCreateRecord("app.bsky.feed.post", "rkey-1", { + text: "first", + createdAt: new Date().toISOString(), + }); + await instance.rpcCreateRecord("app.bsky.feed.like", "rkey-2", { + subject: { + uri: `at://${env.DID}/app.bsky.feed.post/rkey-1`, + cid: "bafyreigh2akiscaildc7ypw7e6tqocp3vy3uwgyq37e6kz3sm6f5l3hbjm", + }, + createdAt: new Date().toISOString(), + }); + + const storage = await instance.getStorage(); + expect(storage.getCollections().sort()).toEqual([ + "app.bsky.feed.like", + "app.bsky.feed.post", + ]); + + await instance.rpcDeleteRecord("app.bsky.feed.like", "rkey-2"); + + expect(storage.getCollections()).toEqual(["app.bsky.feed.post"]); + }); + }); + + it("keeps the collection in the cache when records remain", async () => { + const id = env.ACCOUNT.newUniqueId(); + const stub = env.ACCOUNT.get(id); + + await runInDurableObject(stub, async (instance: AccountDurableObject) => { + await instance.getStorage(); + + await instance.rpcCreateRecord("app.bsky.feed.post", "rkey-1", { + text: "first", + createdAt: new Date().toISOString(), + }); + await instance.rpcCreateRecord("app.bsky.feed.post", "rkey-2", { + text: "second", + createdAt: new Date().toISOString(), + }); + + await instance.rpcDeleteRecord("app.bsky.feed.post", "rkey-1"); + + const storage = await instance.getStorage(); + expect(storage.getCollections()).toEqual(["app.bsky.feed.post"]); + + await instance.rpcDeleteRecord("app.bsky.feed.post", "rkey-2"); + + expect(storage.getCollections()).toEqual([]); + }); + }); +}); + describe("AccountDurableObject", () => { it("initializes storage on first access", async () => { const id = env.ACCOUNT.newUniqueId(); From aaddc582ff1b9da3f8960fe4232369b6234d37cb Mon Sep 17 00:00:00 2001 From: Ben Dixon Date: Tue, 5 May 2026 00:01:11 +0100 Subject: [PATCH 3/4] fix(pds): Remove empty collections from cache in rpcApplyWrites Mirror the rpcDeleteRecord fix for the more common deletion path: applyWrites batches that empty a collection now drop the cache entry. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/pds/src/account-do.ts | 21 +++++++ packages/pds/test/storage.test.ts | 101 ++++++++++++++++++++++++++++++ 2 files changed, 122 insertions(+) diff --git a/packages/pds/src/account-do.ts b/packages/pds/src/account-do.ts index 3d22300..57c44c8 100644 --- a/packages/pds/src/account-do.ts +++ b/packages/pds/src/account-do.ts @@ -663,6 +663,27 @@ export class AccountDurableObject extends DurableObject { } } + // For any collection touched by a delete, drop the cache entry if it's + // now empty. Runs after addCollection so a batch that creates + deletes + // in the same collection still walks the MST as the source of truth. + const deletedCollections = new Set(); + for (const op of ops) { + if (op.action === WriteOpAction.Delete) { + deletedCollections.add(op.collection); + } + } + for (const collection of deletedCollections) { + let collectionStillHasRecords = false; + for await (const remaining of this.repo.walkRecords(`${collection}/`)) { + if (remaining.collection !== collection) break; + collectionStillHasRecords = true; + break; + } + if (!collectionStillHasRecords) { + this.storage!.removeCollection(collection); + } + } + // Build final results with CIDs and prepare ops with CIDs for firehose const finalResults: Array<{ $type: string; diff --git a/packages/pds/test/storage.test.ts b/packages/pds/test/storage.test.ts index 70ac3a2..065c2f2 100644 --- a/packages/pds/test/storage.test.ts +++ b/packages/pds/test/storage.test.ts @@ -374,6 +374,107 @@ describe("Collections cache", () => { expect(storage.getCollections()).toEqual([]); }); }); + + it("rpcApplyWrites: removes collection when batch deletes its last record", async () => { + const id = env.ACCOUNT.newUniqueId(); + const stub = env.ACCOUNT.get(id); + + await runInDurableObject(stub, async (instance: AccountDurableObject) => { + await instance.getStorage(); + + await instance.rpcCreateRecord("app.bsky.feed.post", "rkey-1", { + text: "first", + createdAt: new Date().toISOString(), + }); + + const storage = await instance.getStorage(); + expect(storage.getCollections()).toEqual(["app.bsky.feed.post"]); + + await instance.rpcApplyWrites([ + { + $type: "com.atproto.repo.applyWrites#delete", + collection: "app.bsky.feed.post", + rkey: "rkey-1", + }, + ]); + + expect(storage.getCollections()).toEqual([]); + }); + }); + + it("rpcApplyWrites: keeps collection when batch creates and deletes leave records", async () => { + const id = env.ACCOUNT.newUniqueId(); + const stub = env.ACCOUNT.get(id); + + await runInDurableObject(stub, async (instance: AccountDurableObject) => { + await instance.getStorage(); + + await instance.rpcCreateRecord("app.bsky.feed.post", "rkey-1", { + text: "first", + createdAt: new Date().toISOString(), + }); + + const storage = await instance.getStorage(); + expect(storage.getCollections()).toEqual(["app.bsky.feed.post"]); + + await instance.rpcApplyWrites([ + { + $type: "com.atproto.repo.applyWrites#delete", + collection: "app.bsky.feed.post", + rkey: "rkey-1", + }, + { + $type: "com.atproto.repo.applyWrites#create", + collection: "app.bsky.feed.post", + rkey: "rkey-2", + value: { + $type: "app.bsky.feed.post", + text: "second", + createdAt: new Date().toISOString(), + }, + }, + ]); + + expect(storage.getCollections()).toEqual(["app.bsky.feed.post"]); + }); + }); + + it("rpcApplyWrites: empties only the collection whose records were all deleted", async () => { + const id = env.ACCOUNT.newUniqueId(); + const stub = env.ACCOUNT.get(id); + + await runInDurableObject(stub, async (instance: AccountDurableObject) => { + await instance.getStorage(); + + await instance.rpcCreateRecord("app.bsky.feed.post", "rkey-1", { + text: "first", + createdAt: new Date().toISOString(), + }); + await instance.rpcCreateRecord("app.bsky.feed.like", "rkey-2", { + subject: { + uri: `at://${env.DID}/app.bsky.feed.post/rkey-1`, + cid: "bafyreigh2akiscaildc7ypw7e6tqocp3vy3uwgyq37e6kz3sm6f5l3hbjm", + }, + createdAt: new Date().toISOString(), + }); + + const storage = await instance.getStorage(); + expect(storage.getCollections().sort()).toEqual([ + "app.bsky.feed.like", + "app.bsky.feed.post", + ]); + + await instance.rpcApplyWrites([ + { + $type: "com.atproto.repo.applyWrites#delete", + collection: "app.bsky.feed.like", + rkey: "rkey-2", + }, + ]); + + expect(storage.getCollections()).toEqual(["app.bsky.feed.post"]); + }); + }); }); describe("AccountDurableObject", () => { From eb8529aa732f67ee6240aa207f114cf610f8ac42 Mon Sep 17 00:00:00 2001 From: Ben Dixon Date: Tue, 5 May 2026 22:27:26 +0100 Subject: [PATCH 4/4] Added changeset --- .changeset/metal-rooms-say.md | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .changeset/metal-rooms-say.md diff --git a/.changeset/metal-rooms-say.md b/.changeset/metal-rooms-say.md new file mode 100644 index 0000000..d85e3e3 --- /dev/null +++ b/.changeset/metal-rooms-say.md @@ -0,0 +1,7 @@ +--- +"@getcirrus/pds": patch +--- + +fix(pds): Remove empty collections from cache on record delete. + +When all records of a collection are deleted, it is now ensured that the collection is deleted from the user repository so collections don't linger around forever