diff --git a/package-lock.json b/package-lock.json index 42f7677..c80ad25 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "getbased-relay", - "version": "1.1.0", + "version": "1.2.2", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "getbased-relay", - "version": "1.1.0", + "version": "1.2.2", "license": "AGPL-3.0-or-later", "dependencies": { "@evolu/common": "^7.4.0", diff --git a/src/lib/admin-server.ts b/src/lib/admin-server.ts index 25b2533..71fdb63 100644 --- a/src/lib/admin-server.ts +++ b/src/lib/admin-server.ts @@ -17,6 +17,7 @@ import type { RelayConfig } from "./config.js"; import type { Logger } from "./logger.js"; import type { Metrics } from "./metrics.js"; import type { OwnerTracker } from "./owner-tracker.js"; +import { compactOwner } from "./compact-owner.js"; const __dirname = dirname(fileURLToPath(import.meta.url)); const pkg = JSON.parse( @@ -105,51 +106,15 @@ export function createAdminServer( // is too short for a busy relay where the writer holds the lock during // a large batch ingest. db.pragma("busy_timeout = 30000"); - // Run the SELECTs inside the same transaction so the deletedMessages - // count + before/after storedBytes are consistent with the DELETE/UPDATE - // — without this, a concurrent push between the SELECT and the write - // would yield a slightly stale count in the response. - let before: { storedBytes: number } | undefined; - let after: { storedBytes: number } | undefined; - let deletedMessages = 0; - const tx = db.transaction(() => { - before = db! - .prepare('SELECT "storedBytes" FROM evolu_usage WHERE "ownerId" = ?') - .get(ownerId) as { storedBytes: number } | undefined; - const cnt = db! - .prepare('SELECT COUNT(*) as c FROM evolu_message WHERE "ownerId" = ?') - .get(ownerId) as { c: number }; - deletedMessages = cnt.c; - db! - .prepare('DELETE FROM evolu_message WHERE "ownerId" = ?') - .run(ownerId); - db! - .prepare('UPDATE evolu_usage SET "storedBytes" = 0 WHERE "ownerId" = ?') - .run(ownerId); - after = db! - .prepare('SELECT "storedBytes" FROM evolu_usage WHERE "ownerId" = ?') - .get(ownerId) as { storedBytes: number } | undefined; - }); - tx(); + // Shared transaction with /self/compact-owner — single source of + // truth for what "compact this owner" means (see compact-owner.ts). + const result = compactOwner(db, ownerId); logger.emit("info", "admin.compact_owner", { ownerId: ownerIdStr, - deletedMessages, - beforeStoredBytes: before?.storedBytes ?? 0, - afterStoredBytes: after?.storedBytes ?? 0, + ...result, }); res.writeHead(200, { "Content-Type": "application/json" }); - res.end( - JSON.stringify( - { - ownerId: ownerIdStr, - deletedMessages, - beforeStoredBytes: before?.storedBytes ?? 0, - afterStoredBytes: after?.storedBytes ?? 0, - }, - null, - 2, - ), - ); + res.end(JSON.stringify({ ownerId: ownerIdStr, ...result }, null, 2)); } catch (e) { logger.emit("warn", "admin.compact_owner_failed", { ownerId: ownerIdStr, diff --git a/src/lib/compact-owner.ts b/src/lib/compact-owner.ts new file mode 100644 index 0000000..84c5b50 --- /dev/null +++ b/src/lib/compact-owner.ts @@ -0,0 +1,63 @@ +// Shared compact-owner transaction used by both /admin/compact-owner and +// /self/compact-owner. Lives here so the two endpoints can't drift on what +// "compact" actually wipes — every cleanup step (evolu_message rows, +// evolu_timestamp merkle/fingerprint rows, evolu_usage bookkeeping) runs +// inside one transaction so a partial failure can't strand the owner in +// a half-cleaned state where the next client push gets rejected as +// "already-seen" (see fix history at PR #10 for the production repro). + +import type Database from "better-sqlite3"; + +export interface CompactOwnerResult { + deletedMessages: number; + beforeStoredBytes: number; + afterStoredBytes: number; +} + +/** + * Atomically drops every relay-side trace of `ownerId`: + * - evolu_message rows (the encrypted CRDT log) + * - evolu_timestamp rows (the merkle/fingerprint table feeding negentropy + * reconciliation — leaving these populated after evolu_message is gone + * makes the relay report fingerprints for timestamps without payloads, + * so peers' subsequent per-row pushes get rejected as "you already have + * it" and silently disappear) + * - evolu_usage.storedBytes / firstTimestamp / lastTimestamp (zeroed so + * quota tracking matches reality and getOwnerUsage doesn't fall back + * to stale bookkeeping pointing at deleted message rows) + * + * Caller passes an open `Database` handle with `busy_timeout` already set + * to whatever they want (admin and self both use 30s). The transaction is + * synchronous via better-sqlite3. + */ +export function compactOwner( + db: Database.Database, + ownerId: Buffer, +): CompactOwnerResult { + let before: { storedBytes: number } | undefined; + let after: { storedBytes: number } | undefined; + let deletedMessages = 0; + const tx = db.transaction(() => { + before = db + .prepare('SELECT "storedBytes" FROM evolu_usage WHERE "ownerId" = ?') + .get(ownerId) as { storedBytes: number } | undefined; + const cnt = db + .prepare('SELECT COUNT(*) as c FROM evolu_message WHERE "ownerId" = ?') + .get(ownerId) as { c: number }; + deletedMessages = cnt.c; + db.prepare('DELETE FROM evolu_message WHERE "ownerId" = ?').run(ownerId); + db.prepare('DELETE FROM evolu_timestamp WHERE "ownerId" = ?').run(ownerId); + db.prepare( + 'UPDATE evolu_usage SET "storedBytes" = 0, "firstTimestamp" = NULL, "lastTimestamp" = NULL WHERE "ownerId" = ?', + ).run(ownerId); + after = db + .prepare('SELECT "storedBytes" FROM evolu_usage WHERE "ownerId" = ?') + .get(ownerId) as { storedBytes: number } | undefined; + }); + tx(); + return { + deletedMessages, + beforeStoredBytes: before?.storedBytes ?? 0, + afterStoredBytes: after?.storedBytes ?? 0, + }; +} diff --git a/src/lib/self-server.ts b/src/lib/self-server.ts index f565bbd..9e9fa38 100644 --- a/src/lib/self-server.ts +++ b/src/lib/self-server.ts @@ -38,6 +38,7 @@ import { join } from "path"; import Database from "better-sqlite3"; import type { RelayConfig } from "./config.js"; import type { Logger } from "./logger.js"; +import { compactOwner } from "./compact-owner.js"; const TIMESTAMP_WINDOW_MS = 5 * 60 * 1000; const MAX_BODY_BYTES = 4096; @@ -350,7 +351,8 @@ export function createSelfServer( return; } - // Run the same DELETE/UPDATE transaction as /admin/compact-owner. + // Shared transaction with /admin/compact-owner — single source of + // truth for what "compact this owner" means (see compact-owner.ts). // Open a fresh write-handle so we don't hold the lookup-DB hostage // during the WAL wait. const dbPath = join(config.dataDir, `${config.relayName}.db`); @@ -358,40 +360,12 @@ export function createSelfServer( try { db = new Database(dbPath, { fileMustExist: true }); db.pragma("busy_timeout = 30000"); - let before: { storedBytes: number } | undefined; - let after: { storedBytes: number } | undefined; - let deletedMessages = 0; - const tx = db.transaction(() => { - before = db! - .prepare('SELECT "storedBytes" FROM evolu_usage WHERE "ownerId" = ?') - .get(ownerId) as { storedBytes: number } | undefined; - const cnt = db! - .prepare('SELECT COUNT(*) as c FROM evolu_message WHERE "ownerId" = ?') - .get(ownerId) as { c: number }; - deletedMessages = cnt.c; - db! - .prepare('DELETE FROM evolu_message WHERE "ownerId" = ?') - .run(ownerId); - db! - .prepare('UPDATE evolu_usage SET "storedBytes" = 0 WHERE "ownerId" = ?') - .run(ownerId); - after = db! - .prepare('SELECT "storedBytes" FROM evolu_usage WHERE "ownerId" = ?') - .get(ownerId) as { storedBytes: number } | undefined; - }); - tx(); + const result = compactOwner(db, ownerId); logger.emit("info", "self.compact_owner", { ownerId: ownerIdStr, - deletedMessages, - beforeStoredBytes: before?.storedBytes ?? 0, - afterStoredBytes: after?.storedBytes ?? 0, - }); - jsonResponse(res, 200, { - ownerId: ownerIdStr, - deletedMessages, - beforeStoredBytes: before?.storedBytes ?? 0, - afterStoredBytes: after?.storedBytes ?? 0, + ...result, }); + jsonResponse(res, 200, { ownerId: ownerIdStr, ...result }); } catch (e) { logger.emit("warn", "self.compact_owner_failed", { ownerId: ownerIdStr, diff --git a/test/compact-owner.test.mjs b/test/compact-owner.test.mjs new file mode 100644 index 0000000..ae43a9f --- /dev/null +++ b/test/compact-owner.test.mjs @@ -0,0 +1,138 @@ +// Unit test for the shared compactOwner helper. Both /admin/compact-owner +// and /self/compact-owner call this function — testing it directly means +// either route's coverage is automatic by construction. +// +// The self-server integration test (self-server.integration.test.mjs) +// exercises the same helper end-to-end via the HTTP path. This file +// covers the helper's contract in isolation: schema cleanup happens, +// before/after counts are correct, idempotent on already-empty owners, +// other owners' state is untouched. + +import { test } from "node:test"; +import assert from "node:assert/strict"; +import { mkdtempSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { randomBytes } from "node:crypto"; +import Database from "better-sqlite3"; + +import { compactOwner } from "../dist/lib/compact-owner.js"; + +function setup() { + const dbPath = join(mkdtempSync(join(tmpdir(), "compact-owner-test-")), "relay.db"); + const db = new Database(dbPath); + db.exec(` + CREATE TABLE evolu_usage ( + "ownerId" blob primary key, + "storedBytes" integer not null, + "firstTimestamp" blob, + "lastTimestamp" blob + ) strict; + CREATE TABLE evolu_message ( + "ownerId" blob not null, + "timestamp" blob not null, + "change" blob not null, + primary key ("ownerId", "timestamp") + ) strict; + CREATE TABLE evolu_timestamp ( + "ownerId" blob not null, + "t" blob not null, + "h1" integer, "h2" integer, "c" integer, + "l" integer not null, + primary key ("ownerId", "t") + ) strict; + `); + return { db }; +} + +function seedOwner(db, ownerIdBytes, { messageCount = 5, payloadBytes = 1000 } = {}) { + db.prepare('INSERT INTO evolu_usage ("ownerId", "storedBytes", "firstTimestamp", "lastTimestamp") VALUES (?, ?, ?, ?)') + .run(ownerIdBytes, messageCount * payloadBytes, + Buffer.from("ts-0", "utf8"), Buffer.from(`ts-${messageCount - 1}`, "utf8")); + const insertMsg = db.prepare('INSERT INTO evolu_message ("ownerId", "timestamp", "change") VALUES (?, ?, ?)'); + const insertTs = db.prepare('INSERT INTO evolu_timestamp ("ownerId", "t", "h1", "h2", "c", "l") VALUES (?, ?, ?, ?, ?, ?)'); + for (let i = 0; i < messageCount; i++) { + const ts = Buffer.from(`ts-${i}`, "utf8"); + insertMsg.run(ownerIdBytes, ts, Buffer.alloc(payloadBytes)); + insertTs.run(ownerIdBytes, ts, 0, 0, 0, 1); + } +} + +test("compactOwner deletes evolu_message rows for the target owner", () => { + const { db } = setup(); + const ownerIdBytes = randomBytes(16); + seedOwner(db, ownerIdBytes); + + const result = compactOwner(db, ownerIdBytes); + + assert.equal(result.deletedMessages, 5); + const after = db.prepare('SELECT COUNT(*) as c FROM evolu_message WHERE "ownerId" = ?').get(ownerIdBytes); + assert.equal(after.c, 0); +}); + +test("compactOwner deletes evolu_timestamp rows (the merkle/fingerprint table)", () => { + // This is the regression guard for the production wedge: pre-fix, + // these rows survived compact, fed stale fingerprints to the negentropy + // reconciliation, and stranded every subsequent peer push. + const { db } = setup(); + const ownerIdBytes = randomBytes(16); + seedOwner(db, ownerIdBytes); + + const before = db.prepare('SELECT COUNT(*) as c FROM evolu_timestamp WHERE "ownerId" = ?').get(ownerIdBytes); + assert.equal(before.c, 5, "fixture sanity"); + + compactOwner(db, ownerIdBytes); + + const after = db.prepare('SELECT COUNT(*) as c FROM evolu_timestamp WHERE "ownerId" = ?').get(ownerIdBytes); + assert.equal(after.c, 0, "evolu_timestamp must be empty post-compact (else fresh pushes get stranded)"); +}); + +test("compactOwner zeroes storedBytes + clears first/lastTimestamp on evolu_usage", () => { + const { db } = setup(); + const ownerIdBytes = randomBytes(16); + seedOwner(db, ownerIdBytes); + + const result = compactOwner(db, ownerIdBytes); + + assert.equal(result.beforeStoredBytes, 5000); + assert.equal(result.afterStoredBytes, 0); + const usage = db.prepare('SELECT * FROM evolu_usage WHERE "ownerId" = ?').get(ownerIdBytes); + assert.equal(usage.storedBytes, 0); + assert.equal(usage.firstTimestamp, null, "firstTimestamp pointed at a deleted message"); + assert.equal(usage.lastTimestamp, null, "lastTimestamp pointed at a deleted message"); +}); + +test("compactOwner is idempotent (deletedMessages=0 on already-empty owner)", () => { + const { db } = setup(); + const ownerIdBytes = randomBytes(16); + seedOwner(db, ownerIdBytes); + + compactOwner(db, ownerIdBytes); + const second = compactOwner(db, ownerIdBytes); + + assert.equal(second.deletedMessages, 0); + assert.equal(second.beforeStoredBytes, 0); + assert.equal(second.afterStoredBytes, 0); +}); + +test("compactOwner does not touch other owners' state", () => { + // Defence-in-depth: a buggy WHERE clause refactor would silently wipe + // the wrong owner. This test confirms the helper is properly scoped. + const { db } = setup(); + const ownerA = randomBytes(16); + const ownerB = randomBytes(16); + seedOwner(db, ownerA, { messageCount: 5 }); + seedOwner(db, ownerB, { messageCount: 3 }); + + compactOwner(db, ownerA); + + const aMsgs = db.prepare('SELECT COUNT(*) as c FROM evolu_message WHERE "ownerId" = ?').get(ownerA); + const bMsgs = db.prepare('SELECT COUNT(*) as c FROM evolu_message WHERE "ownerId" = ?').get(ownerB); + const bTs = db.prepare('SELECT COUNT(*) as c FROM evolu_timestamp WHERE "ownerId" = ?').get(ownerB); + const bUsage = db.prepare('SELECT * FROM evolu_usage WHERE "ownerId" = ?').get(ownerB); + assert.equal(aMsgs.c, 0, "compact target wiped"); + assert.equal(bMsgs.c, 3, "other owner's messages untouched"); + assert.equal(bTs.c, 3, "other owner's timestamps untouched"); + assert.equal(bUsage.storedBytes, 3000, "other owner's storedBytes untouched"); + assert.notEqual(bUsage.firstTimestamp, null, "other owner's first/lastTimestamp untouched"); +}); diff --git a/test/self-server.integration.test.mjs b/test/self-server.integration.test.mjs index 908247e..122b371 100644 --- a/test/self-server.integration.test.mjs +++ b/test/self-server.integration.test.mjs @@ -64,8 +64,18 @@ before(async () => { for (let i = 0; i < 5; i++) { insertMsg.run(ownerIdBytes, Buffer.from(`ts-${i}`, "utf8"), Buffer.alloc(1000)); } - // Manually set storedBytes to match what a real relay would have. - db.prepare('UPDATE evolu_usage SET "storedBytes" = 5000 WHERE "ownerId" = ?').run(ownerIdBytes); + // Seed the merkle/fingerprint table the way a real relay populates it + // alongside evolu_message. Compact must wipe these too — leaving them + // strands the owner because negentropy reports stale fingerprints for + // timestamps whose underlying messages we just deleted. + const insertTs = db.prepare('INSERT INTO evolu_timestamp ("ownerId", "t", "h1", "h2", "c", "l") VALUES (?, ?, ?, ?, ?, ?)'); + for (let i = 0; i < 5; i++) { + insertTs.run(ownerIdBytes, Buffer.from(`ts-${i}`, "utf8"), 0, 0, 0, 1); + } + // Manually set storedBytes + first/lastTimestamp to match what a real + // relay would have written alongside the messages. + db.prepare('UPDATE evolu_usage SET "storedBytes" = 5000, "firstTimestamp" = ?, "lastTimestamp" = ? WHERE "ownerId" = ?') + .run(Buffer.from("ts-0", "utf8"), Buffer.from("ts-4", "utf8"), ownerIdBytes); db.close(); // Boot the relay listener on a random port (let OS pick). @@ -127,10 +137,14 @@ test("compact drops every evolu_message row and zeroes storedBytes", async () => // Sanity: precondition. const db = new Database(dbPath, { readonly: true }); const before = db.prepare('SELECT COUNT(*) as c FROM evolu_message WHERE "ownerId" = ?').get(ownerIdBytes); - const beforeUsage = db.prepare('SELECT "storedBytes" FROM evolu_usage WHERE "ownerId" = ?').get(ownerIdBytes); + const beforeTs = db.prepare('SELECT COUNT(*) as c FROM evolu_timestamp WHERE "ownerId" = ?').get(ownerIdBytes); + const beforeUsage = db.prepare('SELECT "storedBytes", "firstTimestamp", "lastTimestamp" FROM evolu_usage WHERE "ownerId" = ?').get(ownerIdBytes); db.close(); assert.equal(before.c, 5, "should have 5 message rows before compact"); + assert.equal(beforeTs.c, 5, "should have 5 timestamp/fingerprint rows before compact"); assert.equal(beforeUsage.storedBytes, 5000, "should have 5000 storedBytes before"); + assert.notEqual(beforeUsage.firstTimestamp, null, "firstTimestamp should be seeded"); + assert.notEqual(beforeUsage.lastTimestamp, null, "lastTimestamp should be seeded"); // Sign + send compact. const ts = Date.now(); @@ -150,10 +164,19 @@ test("compact drops every evolu_message row and zeroes storedBytes", async () => // Verify the actual DB state matches the response. const db2 = new Database(dbPath, { readonly: true }); const after = db2.prepare('SELECT COUNT(*) as c FROM evolu_message WHERE "ownerId" = ?').get(ownerIdBytes); - const afterUsage = db2.prepare('SELECT "storedBytes" FROM evolu_usage WHERE "ownerId" = ?').get(ownerIdBytes); + const afterTs = db2.prepare('SELECT COUNT(*) as c FROM evolu_timestamp WHERE "ownerId" = ?').get(ownerIdBytes); + const afterUsage = db2.prepare('SELECT "storedBytes", "firstTimestamp", "lastTimestamp" FROM evolu_usage WHERE "ownerId" = ?').get(ownerIdBytes); db2.close(); assert.equal(after.c, 0, "should have 0 message rows after compact"); + // Critical regression guard: leaving evolu_timestamp populated after + // evolu_message is empty makes the negentropy reconciliation report + // fingerprints for timestamps that no longer have a payload, so + // peers' fresh per-row pushes get rejected as "already have it" and + // disappear. Verified in production 2026-05-06. + assert.equal(afterTs.c, 0, "should have 0 timestamp/fingerprint rows after compact (else fresh pushes get stranded)"); assert.equal(afterUsage.storedBytes, 0, "should have 0 storedBytes after"); + assert.equal(afterUsage.firstTimestamp, null, "firstTimestamp should be cleared (pointed at deleted messages)"); + assert.equal(afterUsage.lastTimestamp, null, "lastTimestamp should be cleared (pointed at deleted messages)"); }); test("compact is idempotent (second call returns deletedMessages=0)", async () => {