diff --git a/.changeset/metal-rooms-say.md b/.changeset/metal-rooms-say.md new file mode 100644 index 00000000..d85e3e36 --- /dev/null +++ b/.changeset/metal-rooms-say.md @@ -0,0 +1,7 @@ +--- +"@getcirrus/pds": patch +--- + +fix(pds): Remove empty collections from cache on record delete. + +When all records of a collection are deleted, it is now ensured that the collection is deleted from the user repository so collections don't linger around forever diff --git a/packages/pds/src/account-do.ts b/packages/pds/src/account-do.ts index acd6938f..c15d8e73 100644 --- a/packages/pds/src/account-do.ts +++ b/packages/pds/src/account-do.ts @@ -494,6 +494,17 @@ export class AccountDurableObject extends DurableObject { this.repo = updatedRepo; + // If the collection has no records left, remove it from the cache + let collectionStillHasRecords = false; + for await (const remaining of updatedRepo.walkRecords(`${collection}/`)) { + if (remaining.collection !== collection) break; + collectionStillHasRecords = true; + break; + } + if (!collectionStillHasRecords) { + this.storage!.removeCollection(collection); + } + return { commit: { cid: updatedRepo.cid.toString(), @@ -790,6 +801,27 @@ export class AccountDurableObject extends DurableObject { this.repo = updatedRepo; + // For any collection touched by a delete, drop the cache entry if it's + // now empty. Runs after addCollection so a batch that creates + deletes + // in the same collection still walks the MST as the source of truth. + const deletedCollections = new Set(); + for (const op of ops) { + if (op.action === WriteOpAction.Delete) { + deletedCollections.add(op.collection); + } + } + for (const collection of deletedCollections) { + let collectionStillHasRecords = false; + for await (const remaining of updatedRepo.walkRecords(`${collection}/`)) { + if (remaining.collection !== collection) break; + collectionStillHasRecords = true; + break; + } + if (!collectionStillHasRecords) { + this.storage!.removeCollection(collection); + } + } + return { commit: { cid: updatedRepo.cid.toString(), diff --git a/packages/pds/src/storage.ts b/packages/pds/src/storage.ts index 5e934ae7..cff36454 100644 --- a/packages/pds/src/storage.ts +++ b/packages/pds/src/storage.ts @@ -382,6 +382,13 @@ export class SqliteRepoStorage ); } + /** + * Remove a collection name from the cache (no-op if not present). + */ + removeCollection(collection: string): void { + this.sql.exec("DELETE FROM collections WHERE collection = ?", collection); + } + /** * Check if the collections cache has been populated. */ diff --git a/packages/pds/test/storage.test.ts b/packages/pds/test/storage.test.ts index 8e1c4ea1..aaa389ad 100644 --- a/packages/pds/test/storage.test.ts +++ b/packages/pds/test/storage.test.ts @@ -290,6 +290,195 @@ describe("SqliteRepoStorage", () => { }); }); +describe("Collections cache", () => { + it("addCollection / removeCollection / getCollections", async () => { + const id = env.ACCOUNT.newUniqueId(); + const stub = env.ACCOUNT.get(id); + + await runInDurableObject(stub, async (instance: AccountDurableObject) => { + const storage = await instance.getStorage(); + + expect(storage.getCollections()).toEqual([]); + expect(storage.hasCollections()).toBe(false); + + storage.addCollection("app.bsky.feed.post"); + storage.addCollection("app.bsky.feed.like"); + + expect(storage.getCollections()).toEqual([ + "app.bsky.feed.like", + "app.bsky.feed.post", + ]); + expect(storage.hasCollections()).toBe(true); + + storage.removeCollection("app.bsky.feed.post"); + expect(storage.getCollections()).toEqual(["app.bsky.feed.like"]); + + // removing a missing collection is a no-op + storage.removeCollection("app.bsky.feed.post"); + expect(storage.getCollections()).toEqual(["app.bsky.feed.like"]); + }); + }); + + it("removes a collection from the cache when its last record is deleted", async () => { + const id = env.ACCOUNT.newUniqueId(); + const stub = env.ACCOUNT.get(id); + + await runInDurableObject(stub, async (instance: AccountDurableObject) => { + await instance.getStorage(); + + await instance.rpcCreateRecord("app.bsky.feed.post", "rkey-1", { + text: "first", + createdAt: new Date().toISOString(), + }); + await instance.rpcCreateRecord("app.bsky.feed.like", "rkey-2", { + subject: { + uri: `at://${env.DID}/app.bsky.feed.post/rkey-1`, + cid: "bafyreigh2akiscaildc7ypw7e6tqocp3vy3uwgyq37e6kz3sm6f5l3hbjm", + }, + createdAt: new Date().toISOString(), + }); + + const storage = await instance.getStorage(); + expect(storage.getCollections().sort()).toEqual([ + "app.bsky.feed.like", + "app.bsky.feed.post", + ]); + + await instance.rpcDeleteRecord("app.bsky.feed.like", "rkey-2"); + + expect(storage.getCollections()).toEqual(["app.bsky.feed.post"]); + }); + }); + + it("keeps the collection in the cache when records remain", async () => { + const id = env.ACCOUNT.newUniqueId(); + const stub = env.ACCOUNT.get(id); + + await runInDurableObject(stub, async (instance: AccountDurableObject) => { + await instance.getStorage(); + + await instance.rpcCreateRecord("app.bsky.feed.post", "rkey-1", { + text: "first", + createdAt: new Date().toISOString(), + }); + await instance.rpcCreateRecord("app.bsky.feed.post", "rkey-2", { + text: "second", + createdAt: new Date().toISOString(), + }); + + await instance.rpcDeleteRecord("app.bsky.feed.post", "rkey-1"); + + const storage = await instance.getStorage(); + expect(storage.getCollections()).toEqual(["app.bsky.feed.post"]); + + await instance.rpcDeleteRecord("app.bsky.feed.post", "rkey-2"); + + expect(storage.getCollections()).toEqual([]); + }); + }); + + it("rpcApplyWrites: removes collection when batch deletes its last record", async () => { + const id = env.ACCOUNT.newUniqueId(); + const stub = env.ACCOUNT.get(id); + + await runInDurableObject(stub, async (instance: AccountDurableObject) => { + await instance.getStorage(); + + await instance.rpcCreateRecord("app.bsky.feed.post", "rkey-1", { + text: "first", + createdAt: new Date().toISOString(), + }); + + const storage = await instance.getStorage(); + expect(storage.getCollections()).toEqual(["app.bsky.feed.post"]); + + await instance.rpcApplyWrites([ + { + $type: "com.atproto.repo.applyWrites#delete", + collection: "app.bsky.feed.post", + rkey: "rkey-1", + }, + ]); + + expect(storage.getCollections()).toEqual([]); + }); + }); + + it("rpcApplyWrites: keeps collection when batch creates and deletes leave records", async () => { + const id = env.ACCOUNT.newUniqueId(); + const stub = env.ACCOUNT.get(id); + + await runInDurableObject(stub, async (instance: AccountDurableObject) => { + await instance.getStorage(); + + await instance.rpcCreateRecord("app.bsky.feed.post", "rkey-1", { + text: "first", + createdAt: new Date().toISOString(), + }); + + const storage = await instance.getStorage(); + expect(storage.getCollections()).toEqual(["app.bsky.feed.post"]); + + await instance.rpcApplyWrites([ + { + $type: "com.atproto.repo.applyWrites#delete", + collection: "app.bsky.feed.post", + rkey: "rkey-1", + }, + { + $type: "com.atproto.repo.applyWrites#create", + collection: "app.bsky.feed.post", + rkey: "rkey-2", + value: { + $type: "app.bsky.feed.post", + text: "second", + createdAt: new Date().toISOString(), + }, + }, + ]); + + expect(storage.getCollections()).toEqual(["app.bsky.feed.post"]); + }); + }); + + it("rpcApplyWrites: empties only the collection whose records were all deleted", async () => { + const id = env.ACCOUNT.newUniqueId(); + const stub = env.ACCOUNT.get(id); + + await runInDurableObject(stub, async (instance: AccountDurableObject) => { + await instance.getStorage(); + + await instance.rpcCreateRecord("app.bsky.feed.post", "rkey-1", { + text: "first", + createdAt: new Date().toISOString(), + }); + await instance.rpcCreateRecord("app.bsky.feed.like", "rkey-2", { + subject: { + uri: `at://${env.DID}/app.bsky.feed.post/rkey-1`, + cid: "bafyreigh2akiscaildc7ypw7e6tqocp3vy3uwgyq37e6kz3sm6f5l3hbjm", + }, + createdAt: new Date().toISOString(), + }); + + const storage = await instance.getStorage(); + expect(storage.getCollections().sort()).toEqual([ + "app.bsky.feed.like", + "app.bsky.feed.post", + ]); + + await instance.rpcApplyWrites([ + { + $type: "com.atproto.repo.applyWrites#delete", + collection: "app.bsky.feed.like", + rkey: "rkey-2", + }, + ]); + + expect(storage.getCollections()).toEqual(["app.bsky.feed.post"]); + }); + }); +}); + describe("AccountDurableObject", () => { it("initializes storage on first access", async () => { const id = env.ACCOUNT.newUniqueId();