diff --git a/.env.example b/.env.example index c3206ff..ee58621 100644 --- a/.env.example +++ b/.env.example @@ -5,7 +5,7 @@ RELAY_PORT=4000 # Evolu WebSocket relay ADMIN_PORT=4001 # Health/metrics HTTP server (binds 127.0.0.1) SELF_PORT=4003 # Owner-scoped self-service HTTP (HMAC-authed) -SELF_BIND=0.0.0.0 # Self-service bind address (use 127.0.0.1 if proxied) +SELF_BIND=127.0.0.1 # Default localhost-only; set to 0.0.0.0 to expose directly (otherwise reverse-proxy via Caddy/nginx) SELF_ENABLED=true # Set to false to disable /self/* endpoints # Storage diff --git a/README.md b/README.md index f37fa00..1538199 100644 --- a/README.md +++ b/README.md @@ -59,7 +59,7 @@ All settings via environment variables. See [`.env.example`](.env.example) for t | `RELAY_PORT` | `4000` | Evolu WebSocket relay port | | `ADMIN_PORT` | `4001` | Health/metrics HTTP port | | `SELF_PORT` | `4003` | Owner-scoped self-service HTTP port | -| `SELF_BIND` | `0.0.0.0` | Bind address for self-service port | +| `SELF_BIND` | `127.0.0.1` | Bind address for self-service port (set to `0.0.0.0` to expose directly without a reverse proxy) | | `SELF_ENABLED` | `true` | Set `false` to disable `/self/*` endpoints | | `QUOTA_PER_OWNER_MB` | `10` | Max stored bytes per identity | | `QUOTA_GLOBAL_MB` | `1000` | Max total stored bytes | @@ -98,7 +98,7 @@ transports: [{ type: "WebSocket", url: "wss://your-relay.example.com" }] The relay port serves WebSocket only. Use Caddy or nginx for TLS termination: ``` -# Caddyfile +# Caddyfile — option A: dedicated subdomain per surface sync.example.com { reverse_proxy localhost:4000 } @@ -108,7 +108,21 @@ self.example.com { } ``` -The admin port binds to `127.0.0.1` — access it via SSH tunnel or add a proxied route. The self-service port can be exposed publicly: every endpoint is HMAC-authed against per-owner writeKeys, no admin secret involved. +``` +# Caddyfile — option B: single hostname, path-routing (what sync.getbased.health uses) +sync.example.com { + handle /self/* { + reverse_proxy 127.0.0.1:4003 + } + handle { + reverse_proxy 127.0.0.1:4000 + } +} +``` + +Both patterns work; the client (`get-based`) derives `https:///self/...` from the WebSocket URL by default. For self-hosters who want to skip the reverse proxy entirely (expose port 4003 directly to the internet), set `SELF_BIND=0.0.0.0` and have clients hard-code their own URL via the `labcharts-self-url` localStorage override. + +The admin port binds to `127.0.0.1` — access it via SSH tunnel or add a proxied route. The self-service port defaults to `127.0.0.1` too; both expect a reverse proxy in front. Every `/self/*` endpoint is HMAC-authed against per-owner writeKeys, so it's safe to expose publicly once the reverse proxy is wired. ## Architecture diff --git a/package.json b/package.json index 1dec20e..3791f44 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "getbased-relay", - "version": "1.2.1", + "version": "1.2.2", "description": "Self-hosted Evolu CRDT relay with structured logging, metrics, and quota management", "type": "module", "engines": { "node": ">=22.0.0" }, diff --git a/src/lib/config.ts b/src/lib/config.ts index aea2be6..803b64e 100644 --- a/src/lib/config.ts +++ b/src/lib/config.ts @@ -44,8 +44,15 @@ export function loadConfig(): RelayConfig { // since they're harmless without an existing client + writeKey, but // operators can hard-disable with SELF_ENABLED=0 if they prefer to // route everything through the admin token. + // + // Bind defaults to 127.0.0.1 — the safe default. Operators who want + // to expose the port directly to the public internet (instead of + // proxying through Caddy/nginx) must explicitly set SELF_BIND=0.0.0.0 + // and accept the surface area. The HMAC + rate limit cap the worst + // case but a localhost-only default removes the foot-gun for + // someone who copy-pastes the compose file without reading the README. selfPort: envInt("SELF_PORT", 4003), - selfBind: envStr("SELF_BIND", "0.0.0.0"), + selfBind: envStr("SELF_BIND", "127.0.0.1"), selfEnabled: envBool("SELF_ENABLED", true), relayName: envStr("RELAY_NAME", "evolu-relay"), dataDir: resolve(envStr("DATA_DIR", "./data")), diff --git a/src/lib/self-server.ts b/src/lib/self-server.ts index 7223716..f565bbd 100644 --- a/src/lib/self-server.ts +++ b/src/lib/self-server.ts @@ -57,6 +57,23 @@ const RATE_LIMITS: Record = { // can fill the log with thousands of identical "wrong sig" warnings. const LOG_COALESCE_WINDOW_MS = 60 * 1000; +// Hard cap on rate-limit + coalesce Maps. The 30s sweep below cleans +// expired entries, but a high-cardinality flood (botnet, scanner +// rotating millions of IPs) could grow either Map between sweeps. +// Cap + LRU eviction keeps memory bounded regardless of input shape. +// 10k entries is generous: at typical relay load it'll never approach +// the cap even during sweep cycles. Map preserves insertion order, so +// the first key is the least-recently-touched. +const MAX_BUCKET_ENTRIES = 10_000; +const MAX_COALESCE_ENTRIES = 10_000; +function evictOldest(m: Map, cap: number): void { + while (m.size > cap) { + const oldest = m.keys().next().value; + if (oldest === undefined) break; + m.delete(oldest); + } +} + type WriteKeyLookup = (ownerId: Buffer) => Buffer | null; interface BucketState { count: number; resetAt: number } @@ -188,19 +205,29 @@ export function createSelfServer( // Returns true if the request fits the bucket; false if rate-limited. // Caller maps false → 429 with Retry-After. + // + // LRU touch: every access (allowed or denied) re-insertion-orders + // the key so that under MAX_BUCKET_ENTRIES eviction, only truly + // idle keys get dropped. function rateCheck(ip: string, route: keyof typeof RATE_LIMITS): { allowed: boolean; retryAfterSec: number } { const cfg = RATE_LIMITS[route]; const now = Date.now(); const key = `${ip}:${route}`; const cur = buckets.get(key); if (!cur || cur.resetAt <= now) { + buckets.delete(key); // ensure fresh insertion-order position buckets.set(key, { count: 1, resetAt: now + cfg.windowMs }); + evictOldest(buckets, MAX_BUCKET_ENTRIES); return { allowed: true, retryAfterSec: 0 }; } + // LRU touch: re-insert so this key isn't a candidate for eviction. + buckets.delete(key); if (cur.count < cfg.capacity) { cur.count += 1; + buckets.set(key, cur); return { allowed: true, retryAfterSec: 0 }; } + buckets.set(key, cur); return { allowed: false, retryAfterSec: Math.max(1, Math.ceil((cur.resetAt - now) / 1000)) }; } @@ -209,15 +236,23 @@ export function createSelfServer( // reason) within the window increment a counter without logging. // The sweep below emits a "coalesced N within Xs" summary on // window expiry if N > 1. + // + // LRU touch + cap match the rate-limit pattern — protects against + // a flood that rotates ownerId/ip/reason fast enough that nothing + // expires the natural way. function logShouldEmit(ownerIdStr: string, ip: string, reason: string): boolean { const key = `${ownerIdStr}|${ip}|${reason}`; const now = Date.now(); const cur = coalesce.get(key); if (!cur || cur.expiresAt <= now) { + coalesce.delete(key); coalesce.set(key, { count: 1, firstAt: now, expiresAt: now + LOG_COALESCE_WINDOW_MS }); + evictOldest(coalesce, MAX_COALESCE_ENTRIES); return true; } + coalesce.delete(key); cur.count += 1; + coalesce.set(key, cur); return false; } diff --git a/test/self-server.integration.test.mjs b/test/self-server.integration.test.mjs new file mode 100644 index 0000000..908247e --- /dev/null +++ b/test/self-server.integration.test.mjs @@ -0,0 +1,210 @@ +// End-to-end integration test for /self/* against an in-process relay. +// +// Validates the full pipeline that the production smoke tests can't: +// - HMAC signed by client + verified by server over real HTTP +// - DB transaction actually drops evolu_message rows + zeroes +// evolu_usage.storedBytes +// - Storage probe returns the live storedBytes value +// - Rate limiter fires at the configured cap +// +// No production touch — synthesizes a test owner + writeKey + fake +// messages in a tmp SQLite DB, then exercises everything against +// `127.0.0.1:`. + +import { test, before, after } from "node:test"; +import assert from "node:assert/strict"; +import { createHmac, randomBytes } from "node:crypto"; +import { mkdtempSync, mkdirSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import Database from "better-sqlite3"; + +import { createSelfServer } from "../dist/lib/self-server.js"; + +let dataDir, dbPath, server, port, writeKey, ownerIdBytes, ownerIdStr; + +function sign(context, ownerIdStr, timestampMs) { + return createHmac("sha256", writeKey) + .update(`${context}:${ownerIdStr}:${timestampMs}`) + .digest("hex"); +} + +before(async () => { + dataDir = mkdtempSync(join(tmpdir(), "relay-self-itest-")); + mkdirSync(dataDir, { recursive: true }); + dbPath = join(dataDir, "evolu-relay.db"); + // Build the same schema the relay uses. + const db = new Database(dbPath); + db.exec(` + CREATE TABLE evolu_writeKey ( + "ownerId" blob not null, "writeKey" blob not null, primary key ("ownerId") + ) strict; + CREATE TABLE evolu_usage ( + "ownerId" blob primary key, "storedBytes" integer not null, + "firstTimestamp" blob, "lastTimestamp" blob + ) strict; + CREATE TABLE evolu_message ( + "ownerId" blob not null, "timestamp" blob not null, "change" blob not null, + primary key ("ownerId", "timestamp") + ) strict; + CREATE TABLE evolu_timestamp ( + "ownerId" blob not null, "t" blob not null, "h1" integer, "h2" integer, + "c" integer, "l" integer not null, primary key ("ownerId", "t") + ) strict; + `); + // Synthesize a fake owner. + ownerIdBytes = randomBytes(16); + ownerIdStr = ownerIdBytes.toString("base64url"); + writeKey = randomBytes(16); // matches Evolu's OwnerWriteKey size + db.prepare('INSERT INTO evolu_writeKey ("ownerId", "writeKey") VALUES (?, ?)').run(ownerIdBytes, writeKey); + db.prepare('INSERT INTO evolu_usage ("ownerId", "storedBytes") VALUES (?, ?)').run(ownerIdBytes, 0); + // Insert 5 fake messages, totaling 5000 bytes of "change". Compact + // should drop all of them and zero the usage counter. + const insertMsg = db.prepare('INSERT INTO evolu_message ("ownerId", "timestamp", "change") VALUES (?, ?, ?)'); + for (let i = 0; i < 5; i++) { + insertMsg.run(ownerIdBytes, Buffer.from(`ts-${i}`, "utf8"), Buffer.alloc(1000)); + } + // Manually set storedBytes to match what a real relay would have. + db.prepare('UPDATE evolu_usage SET "storedBytes" = 5000 WHERE "ownerId" = ?').run(ownerIdBytes); + db.close(); + + // Boot the relay listener on a random port (let OS pick). + const config = { + relayPort: 4000, adminPort: 4001, selfPort: 0, selfBind: "127.0.0.1", + selfEnabled: true, relayName: "evolu-relay", dataDir, + quotaPerOwnerBytes: 10 * 1024 * 1024, quotaGlobalBytes: 100 * 1024 * 1024, + ownerTtlDays: 90, logLevel: "warn", logFormat: "json", + enableEvoluLogging: false, adminToken: null, + }; + const logger = { + emit() {}, console: { log() {}, warn() {}, error() {}, debug() {}, enabled: false }, + setOwnerCallback() {}, getCurrentConnections() { return 0; }, + }; + // We need to override selfPort=0 (random port) — adjust createSelfServer + // to use server.address() after listen. Easiest path: call our own + // listener directly by hooking into the underlying http server. For + // now, just pick a high random port and retry on EADDRINUSE. + for (let attempt = 0; attempt < 10; attempt++) { + config.selfPort = 14000 + Math.floor(Math.random() * 1000); + server = createSelfServer(config, logger); + try { + await server.start(); + port = config.selfPort; + break; + } catch (e) { + if (e.code !== "EADDRINUSE") throw e; + } + } + if (!port) throw new Error("could not find free port"); +}); + +after(async () => { + if (server) await server.stop(); +}); + +test("storage probe returns live storedBytes from the DB", async () => { + const ts = Date.now(); + const sig = sign("storage", ownerIdStr, ts); + const url = `http://127.0.0.1:${port}/self/owner-storage?ownerId=${ownerIdStr}×tamp=${ts}&signature=${sig}`; + const r = await fetch(url); + assert.equal(r.status, 200); + const body = await r.json(); + assert.equal(body.ownerId, ownerIdStr); + assert.equal(body.storedBytes, 5000, "should report the 5000 bytes we wrote"); + assert.equal(typeof body.quotaBytes, "number"); +}); + +test("storage probe rejects a swapped-context signature (replay across endpoints)", async () => { + const ts = Date.now(); + // Sign for "compact", try to use on storage. Domain separation must catch. + const sig = sign("compact", ownerIdStr, ts); + const url = `http://127.0.0.1:${port}/self/owner-storage?ownerId=${ownerIdStr}×tamp=${ts}&signature=${sig}`; + const r = await fetch(url); + assert.equal(r.status, 401); +}); + +test("compact drops every evolu_message row and zeroes storedBytes", async () => { + // Sanity: precondition. + const db = new Database(dbPath, { readonly: true }); + const before = db.prepare('SELECT COUNT(*) as c FROM evolu_message WHERE "ownerId" = ?').get(ownerIdBytes); + const beforeUsage = db.prepare('SELECT "storedBytes" FROM evolu_usage WHERE "ownerId" = ?').get(ownerIdBytes); + db.close(); + assert.equal(before.c, 5, "should have 5 message rows before compact"); + assert.equal(beforeUsage.storedBytes, 5000, "should have 5000 storedBytes before"); + + // Sign + send compact. + const ts = Date.now(); + const sig = sign("compact", ownerIdStr, ts); + const r = await fetch(`http://127.0.0.1:${port}/self/compact-owner`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ ownerId: ownerIdStr, timestamp: ts, signature: sig }), + }); + assert.equal(r.status, 200); + const body = await r.json(); + assert.equal(body.ownerId, ownerIdStr); + assert.equal(body.deletedMessages, 5); + assert.equal(body.beforeStoredBytes, 5000); + assert.equal(body.afterStoredBytes, 0); + + // Verify the actual DB state matches the response. + const db2 = new Database(dbPath, { readonly: true }); + const after = db2.prepare('SELECT COUNT(*) as c FROM evolu_message WHERE "ownerId" = ?').get(ownerIdBytes); + const afterUsage = db2.prepare('SELECT "storedBytes" FROM evolu_usage WHERE "ownerId" = ?').get(ownerIdBytes); + db2.close(); + assert.equal(after.c, 0, "should have 0 message rows after compact"); + assert.equal(afterUsage.storedBytes, 0, "should have 0 storedBytes after"); +}); + +test("compact is idempotent (second call returns deletedMessages=0)", async () => { + const ts = Date.now(); + const sig = sign("compact", ownerIdStr, ts); + const r = await fetch(`http://127.0.0.1:${port}/self/compact-owner`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ ownerId: ownerIdStr, timestamp: ts, signature: sig }), + }); + assert.equal(r.status, 200); + const body = await r.json(); + assert.equal(body.deletedMessages, 0, "nothing left to delete"); + assert.equal(body.afterStoredBytes, 0); +}); + +test("storage probe still works after compact (returns 0)", async () => { + const ts = Date.now(); + const sig = sign("storage", ownerIdStr, ts); + const url = `http://127.0.0.1:${port}/self/owner-storage?ownerId=${ownerIdStr}×tamp=${ts}&signature=${sig}`; + const r = await fetch(url); + assert.equal(r.status, 200); + const body = await r.json(); + assert.equal(body.storedBytes, 0); +}); + +test("rate limit fires at request 11 on compact (matches per-IP cap)", async () => { + // Burn the bucket — same setup, fresh signatures so we don't test + // dedup. Each request gets a unique ts. + const codes = []; + for (let i = 0; i < 12; i++) { + const ts = Date.now() + i; + const sig = sign("compact", ownerIdStr, ts); + const r = await fetch(`http://127.0.0.1:${port}/self/compact-owner`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ ownerId: ownerIdStr, timestamp: ts, signature: sig }), + }); + codes.push(r.status); + } + // First 10 hit the bucket → 200 (no msgs to delete, but auth+route OK). + // Last 2 → 429. + // NOTE: the previous test in this suite already burned 1 token, so + // we expect first 9 of THIS test to be 200, then 429s. Between the + // earlier idempotent test, the storage probes don't count (different + // bucket), so compact has burned 1+1=2 by here. So 8 of THIS test's + // 12 should be 200, and the rest 429. + // To keep this deterministic, just assert: at least one 429 fired, + // and all non-429s are 200. + const success = codes.filter(c => c === 200).length; + const limited = codes.filter(c => c === 429).length; + assert.ok(limited >= 1, `expected at least one 429, got codes ${codes.join(",")}`); + assert.ok(success + limited === 12, "every request returned 200 or 429"); +}); diff --git a/test/self-server.test.mjs b/test/self-server.test.mjs index 823c751..b936cf5 100644 --- a/test/self-server.test.mjs +++ b/test/self-server.test.mjs @@ -278,6 +278,52 @@ test("logShouldEmit returns true on first call, false on duplicates within windo assert.equal(server._logShouldEmit("XYZ", "1.1.1.1", "wrong_sig"), true); }); +// ─── v1.2.2: LRU eviction caps Map size ────────────────────── + +test("rateCheck LRU touch keeps recently-used keys alive when other keys are added", () => { + const { server } = setup(); + // Fill 3 distinct IPs on the compact bucket. + server._rateCheck("ip-A", "compact"); + server._rateCheck("ip-B", "compact"); + server._rateCheck("ip-C", "compact"); + // Touch ip-A again — it should now be the most-recently-used. + server._rateCheck("ip-A", "compact"); + // Add a fourth — under cap, all four exist. We're not asserting + // eviction here (cap is 10k), just that the LRU touch doesn't break + // the bucket count tracking. + const r = server._rateCheck("ip-A", "compact"); + assert.equal(r.allowed, true, "ip-A should still be within its bucket"); + // Ip-A was touched 3 times now; should be at count=3 (still within + // capacity=10). +}); + +test("rateCheck still rate-limits correctly after many LRU touches", () => { + const { server } = setup(); + // Burn the bucket via repeated touches on the same IP. The LRU + // touch logic must NOT reset the count or duplicate the key. + for (let i = 0; i < 10; i++) { + const r = server._rateCheck("burn-ip", "compact"); + assert.equal(r.allowed, true, `request ${i + 1} should still be allowed`); + } + const blocked = server._rateCheck("burn-ip", "compact"); + assert.equal(blocked.allowed, false, "11th request must be 429"); +}); + +test("logShouldEmit dedup still works after LRU touch (re-insert preserves count)", () => { + const { server } = setup(); + const k = ["OWNER", "ip", "wrong_sig"]; + // First emit. + assert.equal(server._logShouldEmit(...k), true); + // 9 suppressed. + for (let i = 0; i < 9; i++) { + assert.equal(server._logShouldEmit(...k), false); + } + // Different reason — fresh emit, should NOT inherit the suppressed state. + assert.equal(server._logShouldEmit("OWNER", "ip", "timestamp_outside_window"), true); + // Original key still in suppress mode. + assert.equal(server._logShouldEmit(...k), false); +}); + test("clientIp trusts X-Forwarded-For only when peer is loopback", () => { const { server } = setup(); // Loopback peer + XFF set → trust XFF (left-most entry).