Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/metal-rooms-say.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@getcirrus/pds": patch
---

fix(pds): Remove empty collections from cache on record delete.

When all records of a collection are deleted, it is now ensured that the collection is deleted from the user repository so collections don't linger around forever
32 changes: 32 additions & 0 deletions packages/pds/src/account-do.ts
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,17 @@ export class AccountDurableObject extends DurableObject<PDSEnv> {

this.repo = updatedRepo;

// If the collection has no records left, remove it from the cache
let collectionStillHasRecords = false;
for await (const remaining of updatedRepo.walkRecords(`${collection}/`)) {
if (remaining.collection !== collection) break;
collectionStillHasRecords = true;
break;
}
if (!collectionStillHasRecords) {
this.storage!.removeCollection(collection);
}

return {
commit: {
cid: updatedRepo.cid.toString(),
Expand Down Expand Up @@ -790,6 +801,27 @@ export class AccountDurableObject extends DurableObject<PDSEnv> {

this.repo = updatedRepo;

// For any collection touched by a delete, drop the cache entry if it's
// now empty. Runs after addCollection so a batch that creates + deletes
// in the same collection still walks the MST as the source of truth.
const deletedCollections = new Set<string>();
for (const op of ops) {
if (op.action === WriteOpAction.Delete) {
deletedCollections.add(op.collection);
}
}
for (const collection of deletedCollections) {
let collectionStillHasRecords = false;
for await (const remaining of updatedRepo.walkRecords(`${collection}/`)) {
if (remaining.collection !== collection) break;
collectionStillHasRecords = true;
break;
}
if (!collectionStillHasRecords) {
this.storage!.removeCollection(collection);
}
}

return {
commit: {
cid: updatedRepo.cid.toString(),
Expand Down
7 changes: 7 additions & 0 deletions packages/pds/src/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,13 @@ export class SqliteRepoStorage
);
}

/**
* Remove a collection name from the cache (no-op if not present).
*/
removeCollection(collection: string): void {
this.sql.exec("DELETE FROM collections WHERE collection = ?", collection);
}

/**
* Check if the collections cache has been populated.
*/
Expand Down
189 changes: 189 additions & 0 deletions packages/pds/test/storage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,195 @@ describe("SqliteRepoStorage", () => {
});
});

describe("Collections cache", () => {
it("addCollection / removeCollection / getCollections", async () => {
const id = env.ACCOUNT.newUniqueId();
const stub = env.ACCOUNT.get(id);

await runInDurableObject(stub, async (instance: AccountDurableObject) => {
const storage = await instance.getStorage();

expect(storage.getCollections()).toEqual([]);
expect(storage.hasCollections()).toBe(false);

storage.addCollection("app.bsky.feed.post");
storage.addCollection("app.bsky.feed.like");

expect(storage.getCollections()).toEqual([
"app.bsky.feed.like",
"app.bsky.feed.post",
]);
expect(storage.hasCollections()).toBe(true);

storage.removeCollection("app.bsky.feed.post");
expect(storage.getCollections()).toEqual(["app.bsky.feed.like"]);

// removing a missing collection is a no-op
storage.removeCollection("app.bsky.feed.post");
expect(storage.getCollections()).toEqual(["app.bsky.feed.like"]);
});
});

it("removes a collection from the cache when its last record is deleted", async () => {
const id = env.ACCOUNT.newUniqueId();
const stub = env.ACCOUNT.get(id);

await runInDurableObject(stub, async (instance: AccountDurableObject) => {
await instance.getStorage();

await instance.rpcCreateRecord("app.bsky.feed.post", "rkey-1", {
text: "first",
createdAt: new Date().toISOString(),
});
await instance.rpcCreateRecord("app.bsky.feed.like", "rkey-2", {
subject: {
uri: `at://${env.DID}/app.bsky.feed.post/rkey-1`,
cid: "bafyreigh2akiscaildc7ypw7e6tqocp3vy3uwgyq37e6kz3sm6f5l3hbjm",
},
createdAt: new Date().toISOString(),
});

const storage = await instance.getStorage();
expect(storage.getCollections().sort()).toEqual([
"app.bsky.feed.like",
"app.bsky.feed.post",
]);

await instance.rpcDeleteRecord("app.bsky.feed.like", "rkey-2");

expect(storage.getCollections()).toEqual(["app.bsky.feed.post"]);
});
});

it("keeps the collection in the cache when records remain", async () => {
const id = env.ACCOUNT.newUniqueId();
const stub = env.ACCOUNT.get(id);

await runInDurableObject(stub, async (instance: AccountDurableObject) => {
await instance.getStorage();

await instance.rpcCreateRecord("app.bsky.feed.post", "rkey-1", {
text: "first",
createdAt: new Date().toISOString(),
});
await instance.rpcCreateRecord("app.bsky.feed.post", "rkey-2", {
text: "second",
createdAt: new Date().toISOString(),
});

await instance.rpcDeleteRecord("app.bsky.feed.post", "rkey-1");

const storage = await instance.getStorage();
expect(storage.getCollections()).toEqual(["app.bsky.feed.post"]);

await instance.rpcDeleteRecord("app.bsky.feed.post", "rkey-2");

expect(storage.getCollections()).toEqual([]);
});
});

it("rpcApplyWrites: removes collection when batch deletes its last record", async () => {
const id = env.ACCOUNT.newUniqueId();
const stub = env.ACCOUNT.get(id);

await runInDurableObject(stub, async (instance: AccountDurableObject) => {
await instance.getStorage();

await instance.rpcCreateRecord("app.bsky.feed.post", "rkey-1", {
text: "first",
createdAt: new Date().toISOString(),
});

const storage = await instance.getStorage();
expect(storage.getCollections()).toEqual(["app.bsky.feed.post"]);

await instance.rpcApplyWrites([
{
$type: "com.atproto.repo.applyWrites#delete",
collection: "app.bsky.feed.post",
rkey: "rkey-1",
},
]);

expect(storage.getCollections()).toEqual([]);
});
});

it("rpcApplyWrites: keeps collection when batch creates and deletes leave records", async () => {
const id = env.ACCOUNT.newUniqueId();
const stub = env.ACCOUNT.get(id);

await runInDurableObject(stub, async (instance: AccountDurableObject) => {
await instance.getStorage();

await instance.rpcCreateRecord("app.bsky.feed.post", "rkey-1", {
text: "first",
createdAt: new Date().toISOString(),
});

const storage = await instance.getStorage();
expect(storage.getCollections()).toEqual(["app.bsky.feed.post"]);

await instance.rpcApplyWrites([
{
$type: "com.atproto.repo.applyWrites#delete",
collection: "app.bsky.feed.post",
rkey: "rkey-1",
},
{
$type: "com.atproto.repo.applyWrites#create",
collection: "app.bsky.feed.post",
rkey: "rkey-2",
value: {
$type: "app.bsky.feed.post",
text: "second",
createdAt: new Date().toISOString(),
},
},
]);

expect(storage.getCollections()).toEqual(["app.bsky.feed.post"]);
});
});

it("rpcApplyWrites: empties only the collection whose records were all deleted", async () => {
const id = env.ACCOUNT.newUniqueId();
const stub = env.ACCOUNT.get(id);

await runInDurableObject(stub, async (instance: AccountDurableObject) => {
await instance.getStorage();

await instance.rpcCreateRecord("app.bsky.feed.post", "rkey-1", {
text: "first",
createdAt: new Date().toISOString(),
});
await instance.rpcCreateRecord("app.bsky.feed.like", "rkey-2", {
subject: {
uri: `at://${env.DID}/app.bsky.feed.post/rkey-1`,
cid: "bafyreigh2akiscaildc7ypw7e6tqocp3vy3uwgyq37e6kz3sm6f5l3hbjm",
},
createdAt: new Date().toISOString(),
});

const storage = await instance.getStorage();
expect(storage.getCollections().sort()).toEqual([
"app.bsky.feed.like",
"app.bsky.feed.post",
]);

await instance.rpcApplyWrites([
{
$type: "com.atproto.repo.applyWrites#delete",
collection: "app.bsky.feed.like",
rkey: "rkey-2",
},
]);

expect(storage.getCollections()).toEqual(["app.bsky.feed.post"]);
});
});
});

describe("AccountDurableObject", () => {
it("initializes storage on first access", async () => {
const id = env.ACCOUNT.newUniqueId();
Expand Down
Loading