Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 6 additions & 41 deletions src/lib/admin-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import type { RelayConfig } from "./config.js";
import type { Logger } from "./logger.js";
import type { Metrics } from "./metrics.js";
import type { OwnerTracker } from "./owner-tracker.js";
import { compactOwner } from "./compact-owner.js";

const __dirname = dirname(fileURLToPath(import.meta.url));
const pkg = JSON.parse(
Expand Down Expand Up @@ -105,51 +106,15 @@ export function createAdminServer(
// is too short for a busy relay where the writer holds the lock during
// a large batch ingest.
db.pragma("busy_timeout = 30000");
// Run the SELECTs inside the same transaction so the deletedMessages
// count + before/after storedBytes are consistent with the DELETE/UPDATE
// — without this, a concurrent push between the SELECT and the write
// would yield a slightly stale count in the response.
let before: { storedBytes: number } | undefined;
let after: { storedBytes: number } | undefined;
let deletedMessages = 0;
const tx = db.transaction(() => {
before = db!
.prepare('SELECT "storedBytes" FROM evolu_usage WHERE "ownerId" = ?')
.get(ownerId) as { storedBytes: number } | undefined;
const cnt = db!
.prepare('SELECT COUNT(*) as c FROM evolu_message WHERE "ownerId" = ?')
.get(ownerId) as { c: number };
deletedMessages = cnt.c;
db!
.prepare('DELETE FROM evolu_message WHERE "ownerId" = ?')
.run(ownerId);
db!
.prepare('UPDATE evolu_usage SET "storedBytes" = 0 WHERE "ownerId" = ?')
.run(ownerId);
after = db!
.prepare('SELECT "storedBytes" FROM evolu_usage WHERE "ownerId" = ?')
.get(ownerId) as { storedBytes: number } | undefined;
});
tx();
// Shared transaction with /self/compact-owner — single source of
// truth for what "compact this owner" means (see compact-owner.ts).
const result = compactOwner(db, ownerId);
logger.emit("info", "admin.compact_owner", {
ownerId: ownerIdStr,
deletedMessages,
beforeStoredBytes: before?.storedBytes ?? 0,
afterStoredBytes: after?.storedBytes ?? 0,
...result,
});
res.writeHead(200, { "Content-Type": "application/json" });
res.end(
JSON.stringify(
{
ownerId: ownerIdStr,
deletedMessages,
beforeStoredBytes: before?.storedBytes ?? 0,
afterStoredBytes: after?.storedBytes ?? 0,
},
null,
2,
),
);
res.end(JSON.stringify({ ownerId: ownerIdStr, ...result }, null, 2));
} catch (e) {
logger.emit("warn", "admin.compact_owner_failed", {
ownerId: ownerIdStr,
Expand Down
63 changes: 63 additions & 0 deletions src/lib/compact-owner.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Shared compact-owner transaction used by both /admin/compact-owner and
// /self/compact-owner. Lives here so the two endpoints can't drift on what
// "compact" actually wipes — every cleanup step (evolu_message rows,
// evolu_timestamp merkle/fingerprint rows, evolu_usage bookkeeping) runs
// inside one transaction so a partial failure can't strand the owner in
// a half-cleaned state where the next client push gets rejected as
// "already-seen" (see fix history at PR #10 for the production repro).

import type Database from "better-sqlite3";

export interface CompactOwnerResult {
deletedMessages: number;
beforeStoredBytes: number;
afterStoredBytes: number;
}

/**
* Atomically drops every relay-side trace of `ownerId`:
* - evolu_message rows (the encrypted CRDT log)
* - evolu_timestamp rows (the merkle/fingerprint table feeding negentropy
* reconciliation — leaving these populated after evolu_message is gone
* makes the relay report fingerprints for timestamps without payloads,
* so peers' subsequent per-row pushes get rejected as "you already have
* it" and silently disappear)
* - evolu_usage.storedBytes / firstTimestamp / lastTimestamp (zeroed so
* quota tracking matches reality and getOwnerUsage doesn't fall back
* to stale bookkeeping pointing at deleted message rows)
*
* Caller passes an open `Database` handle with `busy_timeout` already set
* to whatever they want (admin and self both use 30s). The transaction is
* synchronous via better-sqlite3.
*/
export function compactOwner(
db: Database.Database,
ownerId: Buffer,
): CompactOwnerResult {
let before: { storedBytes: number } | undefined;
let after: { storedBytes: number } | undefined;
let deletedMessages = 0;
const tx = db.transaction(() => {
before = db
.prepare('SELECT "storedBytes" FROM evolu_usage WHERE "ownerId" = ?')
.get(ownerId) as { storedBytes: number } | undefined;
const cnt = db
.prepare('SELECT COUNT(*) as c FROM evolu_message WHERE "ownerId" = ?')
.get(ownerId) as { c: number };
deletedMessages = cnt.c;
db.prepare('DELETE FROM evolu_message WHERE "ownerId" = ?').run(ownerId);
db.prepare('DELETE FROM evolu_timestamp WHERE "ownerId" = ?').run(ownerId);
db.prepare(
'UPDATE evolu_usage SET "storedBytes" = 0, "firstTimestamp" = NULL, "lastTimestamp" = NULL WHERE "ownerId" = ?',
).run(ownerId);
after = db
.prepare('SELECT "storedBytes" FROM evolu_usage WHERE "ownerId" = ?')
.get(ownerId) as { storedBytes: number } | undefined;
});
tx();
return {
deletedMessages,
beforeStoredBytes: before?.storedBytes ?? 0,
afterStoredBytes: after?.storedBytes ?? 0,
};
}
38 changes: 6 additions & 32 deletions src/lib/self-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import { join } from "path";
import Database from "better-sqlite3";
import type { RelayConfig } from "./config.js";
import type { Logger } from "./logger.js";
import { compactOwner } from "./compact-owner.js";

const TIMESTAMP_WINDOW_MS = 5 * 60 * 1000;
const MAX_BODY_BYTES = 4096;
Expand Down Expand Up @@ -350,48 +351,21 @@ export function createSelfServer(
return;
}

// Run the same DELETE/UPDATE transaction as /admin/compact-owner.
// Shared transaction with /admin/compact-owner — single source of
// truth for what "compact this owner" means (see compact-owner.ts).
// Open a fresh write-handle so we don't hold the lookup-DB hostage
// during the WAL wait.
const dbPath = join(config.dataDir, `${config.relayName}.db`);
let db: Database.Database | null = null;
try {
db = new Database(dbPath, { fileMustExist: true });
db.pragma("busy_timeout = 30000");
let before: { storedBytes: number } | undefined;
let after: { storedBytes: number } | undefined;
let deletedMessages = 0;
const tx = db.transaction(() => {
before = db!
.prepare('SELECT "storedBytes" FROM evolu_usage WHERE "ownerId" = ?')
.get(ownerId) as { storedBytes: number } | undefined;
const cnt = db!
.prepare('SELECT COUNT(*) as c FROM evolu_message WHERE "ownerId" = ?')
.get(ownerId) as { c: number };
deletedMessages = cnt.c;
db!
.prepare('DELETE FROM evolu_message WHERE "ownerId" = ?')
.run(ownerId);
db!
.prepare('UPDATE evolu_usage SET "storedBytes" = 0 WHERE "ownerId" = ?')
.run(ownerId);
after = db!
.prepare('SELECT "storedBytes" FROM evolu_usage WHERE "ownerId" = ?')
.get(ownerId) as { storedBytes: number } | undefined;
});
tx();
const result = compactOwner(db, ownerId);
logger.emit("info", "self.compact_owner", {
ownerId: ownerIdStr,
deletedMessages,
beforeStoredBytes: before?.storedBytes ?? 0,
afterStoredBytes: after?.storedBytes ?? 0,
});
jsonResponse(res, 200, {
ownerId: ownerIdStr,
deletedMessages,
beforeStoredBytes: before?.storedBytes ?? 0,
afterStoredBytes: after?.storedBytes ?? 0,
...result,
});
jsonResponse(res, 200, { ownerId: ownerIdStr, ...result });
} catch (e) {
logger.emit("warn", "self.compact_owner_failed", {
ownerId: ownerIdStr,
Expand Down
138 changes: 138 additions & 0 deletions test/compact-owner.test.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Unit test for the shared compactOwner helper. Both /admin/compact-owner
// and /self/compact-owner call this function — testing it directly means
// either route's coverage is automatic by construction.
//
// The self-server integration test (self-server.integration.test.mjs)
// exercises the same helper end-to-end via the HTTP path. This file
// covers the helper's contract in isolation: schema cleanup happens,
// before/after counts are correct, idempotent on already-empty owners,
// other owners' state is untouched.

import { test } from "node:test";
import assert from "node:assert/strict";
import { mkdtempSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { randomBytes } from "node:crypto";
import Database from "better-sqlite3";

import { compactOwner } from "../dist/lib/compact-owner.js";

function setup() {
const dbPath = join(mkdtempSync(join(tmpdir(), "compact-owner-test-")), "relay.db");
const db = new Database(dbPath);
db.exec(`
CREATE TABLE evolu_usage (
"ownerId" blob primary key,
"storedBytes" integer not null,
"firstTimestamp" blob,
"lastTimestamp" blob
) strict;
CREATE TABLE evolu_message (
"ownerId" blob not null,
"timestamp" blob not null,
"change" blob not null,
primary key ("ownerId", "timestamp")
) strict;
CREATE TABLE evolu_timestamp (
"ownerId" blob not null,
"t" blob not null,
"h1" integer, "h2" integer, "c" integer,
"l" integer not null,
primary key ("ownerId", "t")
) strict;
`);
return { db };
}

function seedOwner(db, ownerIdBytes, { messageCount = 5, payloadBytes = 1000 } = {}) {
db.prepare('INSERT INTO evolu_usage ("ownerId", "storedBytes", "firstTimestamp", "lastTimestamp") VALUES (?, ?, ?, ?)')
.run(ownerIdBytes, messageCount * payloadBytes,
Buffer.from("ts-0", "utf8"), Buffer.from(`ts-${messageCount - 1}`, "utf8"));
const insertMsg = db.prepare('INSERT INTO evolu_message ("ownerId", "timestamp", "change") VALUES (?, ?, ?)');
const insertTs = db.prepare('INSERT INTO evolu_timestamp ("ownerId", "t", "h1", "h2", "c", "l") VALUES (?, ?, ?, ?, ?, ?)');
for (let i = 0; i < messageCount; i++) {
const ts = Buffer.from(`ts-${i}`, "utf8");
insertMsg.run(ownerIdBytes, ts, Buffer.alloc(payloadBytes));
insertTs.run(ownerIdBytes, ts, 0, 0, 0, 1);
}
}

test("compactOwner deletes evolu_message rows for the target owner", () => {
const { db } = setup();
const ownerIdBytes = randomBytes(16);
seedOwner(db, ownerIdBytes);

const result = compactOwner(db, ownerIdBytes);

assert.equal(result.deletedMessages, 5);
const after = db.prepare('SELECT COUNT(*) as c FROM evolu_message WHERE "ownerId" = ?').get(ownerIdBytes);
assert.equal(after.c, 0);
});

test("compactOwner deletes evolu_timestamp rows (the merkle/fingerprint table)", () => {
// This is the regression guard for the production wedge: pre-fix,
// these rows survived compact, fed stale fingerprints to the negentropy
// reconciliation, and stranded every subsequent peer push.
const { db } = setup();
const ownerIdBytes = randomBytes(16);
seedOwner(db, ownerIdBytes);

const before = db.prepare('SELECT COUNT(*) as c FROM evolu_timestamp WHERE "ownerId" = ?').get(ownerIdBytes);
assert.equal(before.c, 5, "fixture sanity");

compactOwner(db, ownerIdBytes);

const after = db.prepare('SELECT COUNT(*) as c FROM evolu_timestamp WHERE "ownerId" = ?').get(ownerIdBytes);
assert.equal(after.c, 0, "evolu_timestamp must be empty post-compact (else fresh pushes get stranded)");
});

test("compactOwner zeroes storedBytes + clears first/lastTimestamp on evolu_usage", () => {
const { db } = setup();
const ownerIdBytes = randomBytes(16);
seedOwner(db, ownerIdBytes);

const result = compactOwner(db, ownerIdBytes);

assert.equal(result.beforeStoredBytes, 5000);
assert.equal(result.afterStoredBytes, 0);
const usage = db.prepare('SELECT * FROM evolu_usage WHERE "ownerId" = ?').get(ownerIdBytes);
assert.equal(usage.storedBytes, 0);
assert.equal(usage.firstTimestamp, null, "firstTimestamp pointed at a deleted message");
assert.equal(usage.lastTimestamp, null, "lastTimestamp pointed at a deleted message");
});

test("compactOwner is idempotent (deletedMessages=0 on already-empty owner)", () => {
const { db } = setup();
const ownerIdBytes = randomBytes(16);
seedOwner(db, ownerIdBytes);

compactOwner(db, ownerIdBytes);
const second = compactOwner(db, ownerIdBytes);

assert.equal(second.deletedMessages, 0);
assert.equal(second.beforeStoredBytes, 0);
assert.equal(second.afterStoredBytes, 0);
});

test("compactOwner does not touch other owners' state", () => {
// Defence-in-depth: a buggy WHERE clause refactor would silently wipe
// the wrong owner. This test confirms the helper is properly scoped.
const { db } = setup();
const ownerA = randomBytes(16);
const ownerB = randomBytes(16);
seedOwner(db, ownerA, { messageCount: 5 });
seedOwner(db, ownerB, { messageCount: 3 });

compactOwner(db, ownerA);

const aMsgs = db.prepare('SELECT COUNT(*) as c FROM evolu_message WHERE "ownerId" = ?').get(ownerA);
const bMsgs = db.prepare('SELECT COUNT(*) as c FROM evolu_message WHERE "ownerId" = ?').get(ownerB);
const bTs = db.prepare('SELECT COUNT(*) as c FROM evolu_timestamp WHERE "ownerId" = ?').get(ownerB);
const bUsage = db.prepare('SELECT * FROM evolu_usage WHERE "ownerId" = ?').get(ownerB);
assert.equal(aMsgs.c, 0, "compact target wiped");
assert.equal(bMsgs.c, 3, "other owner's messages untouched");
assert.equal(bTs.c, 3, "other owner's timestamps untouched");
assert.equal(bUsage.storedBytes, 3000, "other owner's storedBytes untouched");
assert.notEqual(bUsage.firstTimestamp, null, "other owner's first/lastTimestamp untouched");
});
Loading
Loading