Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
RELAY_PORT=4000 # Evolu WebSocket relay
ADMIN_PORT=4001 # Health/metrics HTTP server (binds 127.0.0.1)
SELF_PORT=4003 # Owner-scoped self-service HTTP (HMAC-authed)
SELF_BIND=0.0.0.0 # Self-service bind address (use 127.0.0.1 if proxied)
SELF_BIND=127.0.0.1 # Default localhost-only; set to 0.0.0.0 to expose directly (otherwise reverse-proxy via Caddy/nginx)
SELF_ENABLED=true # Set to false to disable /self/* endpoints

# Storage
Expand Down
20 changes: 17 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ All settings via environment variables. See [`.env.example`](.env.example) for t
| `RELAY_PORT` | `4000` | Evolu WebSocket relay port |
| `ADMIN_PORT` | `4001` | Health/metrics HTTP port |
| `SELF_PORT` | `4003` | Owner-scoped self-service HTTP port |
| `SELF_BIND` | `0.0.0.0` | Bind address for self-service port |
| `SELF_BIND` | `127.0.0.1` | Bind address for self-service port (set to `0.0.0.0` to expose directly without a reverse proxy) |
| `SELF_ENABLED` | `true` | Set `false` to disable `/self/*` endpoints |
| `QUOTA_PER_OWNER_MB` | `10` | Max stored bytes per identity |
| `QUOTA_GLOBAL_MB` | `1000` | Max total stored bytes |
Expand Down Expand Up @@ -98,7 +98,7 @@ transports: [{ type: "WebSocket", url: "wss://your-relay.example.com" }]
The relay port serves WebSocket only. Use Caddy or nginx for TLS termination:

```
# Caddyfile
# Caddyfile — option A: dedicated subdomain per surface
sync.example.com {
reverse_proxy localhost:4000
}
Expand All @@ -108,7 +108,21 @@ self.example.com {
}
```

The admin port binds to `127.0.0.1` — access it via SSH tunnel or add a proxied route. The self-service port can be exposed publicly: every endpoint is HMAC-authed against per-owner writeKeys, no admin secret involved.
```
# Caddyfile — option B: single hostname, path-routing (what sync.getbased.health uses)
sync.example.com {
handle /self/* {
reverse_proxy 127.0.0.1:4003
}
handle {
reverse_proxy 127.0.0.1:4000
}
}
```

Both patterns work; the client (`get-based`) derives `https://<relay-hostname>/self/...` from the WebSocket URL by default. For self-hosters who want to skip the reverse proxy entirely (expose port 4003 directly to the internet), set `SELF_BIND=0.0.0.0` and have clients hard-code their own URL via the `labcharts-self-url` localStorage override.

The admin port binds to `127.0.0.1` — access it via SSH tunnel or add a proxied route. The self-service port defaults to `127.0.0.1` too; both expect a reverse proxy in front. Every `/self/*` endpoint is HMAC-authed against per-owner writeKeys, so it's safe to expose publicly once the reverse proxy is wired.

## Architecture

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "getbased-relay",
"version": "1.2.1",
"version": "1.2.2",
"description": "Self-hosted Evolu CRDT relay with structured logging, metrics, and quota management",
"type": "module",
"engines": { "node": ">=22.0.0" },
Expand Down
9 changes: 8 additions & 1 deletion src/lib/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,15 @@ export function loadConfig(): RelayConfig {
// since they're harmless without an existing client + writeKey, but
// operators can hard-disable with SELF_ENABLED=0 if they prefer to
// route everything through the admin token.
//
// Bind defaults to 127.0.0.1 — the safe default. Operators who want
// to expose the port directly to the public internet (instead of
// proxying through Caddy/nginx) must explicitly set SELF_BIND=0.0.0.0
// and accept the surface area. The HMAC + rate limit cap the worst
// case but a localhost-only default removes the foot-gun for
// someone who copy-pastes the compose file without reading the README.
selfPort: envInt("SELF_PORT", 4003),
selfBind: envStr("SELF_BIND", "0.0.0.0"),
selfBind: envStr("SELF_BIND", "127.0.0.1"),
selfEnabled: envBool("SELF_ENABLED", true),
relayName: envStr("RELAY_NAME", "evolu-relay"),
dataDir: resolve(envStr("DATA_DIR", "./data")),
Expand Down
35 changes: 35 additions & 0 deletions src/lib/self-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,23 @@ const RATE_LIMITS: Record<string, { capacity: number; windowMs: number }> = {
// can fill the log with thousands of identical "wrong sig" warnings.
const LOG_COALESCE_WINDOW_MS = 60 * 1000;

// Hard cap on rate-limit + coalesce Maps. The 30s sweep below cleans
// expired entries, but a high-cardinality flood (botnet, scanner
// rotating millions of IPs) could grow either Map between sweeps.
// Cap + LRU eviction keeps memory bounded regardless of input shape.
// 10k entries is generous: at typical relay load it'll never approach
// the cap even during sweep cycles. Map preserves insertion order, so
// the first key is the least-recently-touched.
const MAX_BUCKET_ENTRIES = 10_000;
const MAX_COALESCE_ENTRIES = 10_000;
function evictOldest<K, V>(m: Map<K, V>, cap: number): void {
while (m.size > cap) {
const oldest = m.keys().next().value;
if (oldest === undefined) break;
m.delete(oldest);
}
}

type WriteKeyLookup = (ownerId: Buffer) => Buffer | null;

interface BucketState { count: number; resetAt: number }
Expand Down Expand Up @@ -188,19 +205,29 @@ export function createSelfServer(

// Returns true if the request fits the bucket; false if rate-limited.
// Caller maps false → 429 with Retry-After.
//
// LRU touch: every access (allowed or denied) re-insertion-orders
// the key so that under MAX_BUCKET_ENTRIES eviction, only truly
// idle keys get dropped.
function rateCheck(ip: string, route: keyof typeof RATE_LIMITS): { allowed: boolean; retryAfterSec: number } {
const cfg = RATE_LIMITS[route];
const now = Date.now();
const key = `${ip}:${route}`;
const cur = buckets.get(key);
if (!cur || cur.resetAt <= now) {
buckets.delete(key); // ensure fresh insertion-order position
buckets.set(key, { count: 1, resetAt: now + cfg.windowMs });
evictOldest(buckets, MAX_BUCKET_ENTRIES);
return { allowed: true, retryAfterSec: 0 };
}
// LRU touch: re-insert so this key isn't a candidate for eviction.
buckets.delete(key);
if (cur.count < cfg.capacity) {
cur.count += 1;
buckets.set(key, cur);
return { allowed: true, retryAfterSec: 0 };
}
buckets.set(key, cur);
return { allowed: false, retryAfterSec: Math.max(1, Math.ceil((cur.resetAt - now) / 1000)) };
}

Expand All @@ -209,15 +236,23 @@ export function createSelfServer(
// reason) within the window increment a counter without logging.
// The sweep below emits a "coalesced N within Xs" summary on
// window expiry if N > 1.
//
// LRU touch + cap match the rate-limit pattern — protects against
// a flood that rotates ownerId/ip/reason fast enough that nothing
// expires the natural way.
function logShouldEmit(ownerIdStr: string, ip: string, reason: string): boolean {
const key = `${ownerIdStr}|${ip}|${reason}`;
const now = Date.now();
const cur = coalesce.get(key);
if (!cur || cur.expiresAt <= now) {
coalesce.delete(key);
coalesce.set(key, { count: 1, firstAt: now, expiresAt: now + LOG_COALESCE_WINDOW_MS });
evictOldest(coalesce, MAX_COALESCE_ENTRIES);
return true;
}
coalesce.delete(key);
cur.count += 1;
coalesce.set(key, cur);
return false;
}

Expand Down
210 changes: 210 additions & 0 deletions test/self-server.integration.test.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
// End-to-end integration test for /self/* against an in-process relay.
//
// Validates the full pipeline that the production smoke tests can't:
// - HMAC signed by client + verified by server over real HTTP
// - DB transaction actually drops evolu_message rows + zeroes
// evolu_usage.storedBytes
// - Storage probe returns the live storedBytes value
// - Rate limiter fires at the configured cap
//
// No production touch — synthesizes a test owner + writeKey + fake
// messages in a tmp SQLite DB, then exercises everything against
// `127.0.0.1:<random-port>`.

import { test, before, after } from "node:test";
import assert from "node:assert/strict";
import { createHmac, randomBytes } from "node:crypto";
import { mkdtempSync, mkdirSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import Database from "better-sqlite3";

import { createSelfServer } from "../dist/lib/self-server.js";

let dataDir, dbPath, server, port, writeKey, ownerIdBytes, ownerIdStr;

function sign(context, ownerIdStr, timestampMs) {
return createHmac("sha256", writeKey)
.update(`${context}:${ownerIdStr}:${timestampMs}`)
.digest("hex");
}

before(async () => {
dataDir = mkdtempSync(join(tmpdir(), "relay-self-itest-"));
mkdirSync(dataDir, { recursive: true });
dbPath = join(dataDir, "evolu-relay.db");
// Build the same schema the relay uses.
const db = new Database(dbPath);
db.exec(`
CREATE TABLE evolu_writeKey (
"ownerId" blob not null, "writeKey" blob not null, primary key ("ownerId")
) strict;
CREATE TABLE evolu_usage (
"ownerId" blob primary key, "storedBytes" integer not null,
"firstTimestamp" blob, "lastTimestamp" blob
) strict;
CREATE TABLE evolu_message (
"ownerId" blob not null, "timestamp" blob not null, "change" blob not null,
primary key ("ownerId", "timestamp")
) strict;
CREATE TABLE evolu_timestamp (
"ownerId" blob not null, "t" blob not null, "h1" integer, "h2" integer,
"c" integer, "l" integer not null, primary key ("ownerId", "t")
) strict;
`);
// Synthesize a fake owner.
ownerIdBytes = randomBytes(16);
ownerIdStr = ownerIdBytes.toString("base64url");
writeKey = randomBytes(16); // matches Evolu's OwnerWriteKey size
db.prepare('INSERT INTO evolu_writeKey ("ownerId", "writeKey") VALUES (?, ?)').run(ownerIdBytes, writeKey);
db.prepare('INSERT INTO evolu_usage ("ownerId", "storedBytes") VALUES (?, ?)').run(ownerIdBytes, 0);
// Insert 5 fake messages, totaling 5000 bytes of "change". Compact
// should drop all of them and zero the usage counter.
const insertMsg = db.prepare('INSERT INTO evolu_message ("ownerId", "timestamp", "change") VALUES (?, ?, ?)');
for (let i = 0; i < 5; i++) {
insertMsg.run(ownerIdBytes, Buffer.from(`ts-${i}`, "utf8"), Buffer.alloc(1000));
}
// Manually set storedBytes to match what a real relay would have.
db.prepare('UPDATE evolu_usage SET "storedBytes" = 5000 WHERE "ownerId" = ?').run(ownerIdBytes);
db.close();

// Boot the relay listener on a random port (let OS pick).
const config = {
relayPort: 4000, adminPort: 4001, selfPort: 0, selfBind: "127.0.0.1",
selfEnabled: true, relayName: "evolu-relay", dataDir,
quotaPerOwnerBytes: 10 * 1024 * 1024, quotaGlobalBytes: 100 * 1024 * 1024,
ownerTtlDays: 90, logLevel: "warn", logFormat: "json",
enableEvoluLogging: false, adminToken: null,
};
const logger = {
emit() {}, console: { log() {}, warn() {}, error() {}, debug() {}, enabled: false },
setOwnerCallback() {}, getCurrentConnections() { return 0; },
};
// We need to override selfPort=0 (random port) — adjust createSelfServer
// to use server.address() after listen. Easiest path: call our own
// listener directly by hooking into the underlying http server. For
// now, just pick a high random port and retry on EADDRINUSE.
for (let attempt = 0; attempt < 10; attempt++) {
config.selfPort = 14000 + Math.floor(Math.random() * 1000);
server = createSelfServer(config, logger);
try {
await server.start();
port = config.selfPort;
break;
} catch (e) {
if (e.code !== "EADDRINUSE") throw e;
}
}
if (!port) throw new Error("could not find free port");
});

after(async () => {
if (server) await server.stop();
});

test("storage probe returns live storedBytes from the DB", async () => {
const ts = Date.now();
const sig = sign("storage", ownerIdStr, ts);
const url = `http://127.0.0.1:${port}/self/owner-storage?ownerId=${ownerIdStr}&timestamp=${ts}&signature=${sig}`;
const r = await fetch(url);
assert.equal(r.status, 200);
const body = await r.json();
assert.equal(body.ownerId, ownerIdStr);
assert.equal(body.storedBytes, 5000, "should report the 5000 bytes we wrote");
assert.equal(typeof body.quotaBytes, "number");
});

test("storage probe rejects a swapped-context signature (replay across endpoints)", async () => {
const ts = Date.now();
// Sign for "compact", try to use on storage. Domain separation must catch.
const sig = sign("compact", ownerIdStr, ts);
const url = `http://127.0.0.1:${port}/self/owner-storage?ownerId=${ownerIdStr}&timestamp=${ts}&signature=${sig}`;
const r = await fetch(url);
assert.equal(r.status, 401);
});

test("compact drops every evolu_message row and zeroes storedBytes", async () => {
// Sanity: precondition.
const db = new Database(dbPath, { readonly: true });
const before = db.prepare('SELECT COUNT(*) as c FROM evolu_message WHERE "ownerId" = ?').get(ownerIdBytes);
const beforeUsage = db.prepare('SELECT "storedBytes" FROM evolu_usage WHERE "ownerId" = ?').get(ownerIdBytes);
db.close();
assert.equal(before.c, 5, "should have 5 message rows before compact");
assert.equal(beforeUsage.storedBytes, 5000, "should have 5000 storedBytes before");

// Sign + send compact.
const ts = Date.now();
const sig = sign("compact", ownerIdStr, ts);
const r = await fetch(`http://127.0.0.1:${port}/self/compact-owner`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ ownerId: ownerIdStr, timestamp: ts, signature: sig }),
});
assert.equal(r.status, 200);
const body = await r.json();
assert.equal(body.ownerId, ownerIdStr);
assert.equal(body.deletedMessages, 5);
assert.equal(body.beforeStoredBytes, 5000);
assert.equal(body.afterStoredBytes, 0);

// Verify the actual DB state matches the response.
const db2 = new Database(dbPath, { readonly: true });
const after = db2.prepare('SELECT COUNT(*) as c FROM evolu_message WHERE "ownerId" = ?').get(ownerIdBytes);
const afterUsage = db2.prepare('SELECT "storedBytes" FROM evolu_usage WHERE "ownerId" = ?').get(ownerIdBytes);
db2.close();
assert.equal(after.c, 0, "should have 0 message rows after compact");
assert.equal(afterUsage.storedBytes, 0, "should have 0 storedBytes after");
});

test("compact is idempotent (second call returns deletedMessages=0)", async () => {
const ts = Date.now();
const sig = sign("compact", ownerIdStr, ts);
const r = await fetch(`http://127.0.0.1:${port}/self/compact-owner`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ ownerId: ownerIdStr, timestamp: ts, signature: sig }),
});
assert.equal(r.status, 200);
const body = await r.json();
assert.equal(body.deletedMessages, 0, "nothing left to delete");
assert.equal(body.afterStoredBytes, 0);
});

test("storage probe still works after compact (returns 0)", async () => {
const ts = Date.now();
const sig = sign("storage", ownerIdStr, ts);
const url = `http://127.0.0.1:${port}/self/owner-storage?ownerId=${ownerIdStr}&timestamp=${ts}&signature=${sig}`;
const r = await fetch(url);
assert.equal(r.status, 200);
const body = await r.json();
assert.equal(body.storedBytes, 0);
});

test("rate limit fires at request 11 on compact (matches per-IP cap)", async () => {
// Burn the bucket — same setup, fresh signatures so we don't test
// dedup. Each request gets a unique ts.
const codes = [];
for (let i = 0; i < 12; i++) {
const ts = Date.now() + i;
const sig = sign("compact", ownerIdStr, ts);
const r = await fetch(`http://127.0.0.1:${port}/self/compact-owner`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ ownerId: ownerIdStr, timestamp: ts, signature: sig }),
});
codes.push(r.status);
}
// First 10 hit the bucket → 200 (no msgs to delete, but auth+route OK).
// Last 2 → 429.
// NOTE: the previous test in this suite already burned 1 token, so
// we expect first 9 of THIS test to be 200, then 429s. Between the
// earlier idempotent test, the storage probes don't count (different
// bucket), so compact has burned 1+1=2 by here. So 8 of THIS test's
// 12 should be 200, and the rest 429.
// To keep this deterministic, just assert: at least one 429 fired,
// and all non-429s are 200.
const success = codes.filter(c => c === 200).length;
const limited = codes.filter(c => c === 429).length;
assert.ok(limited >= 1, `expected at least one 429, got codes ${codes.join(",")}`);
assert.ok(success + limited === 12, "every request returned 200 or 429");
});
Comment on lines +193 to +210
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Rate-limit test has an implicit order dependency

The test's correctness relies on exactly two compact calls having been made in prior tests. The comment acknowledges this, but the assertion limited >= 1 means the test still passes even if the bucket resets mid-suite (e.g. due to test parallelism or a future reorder). A self-contained approach — spinning up a fresh createSelfServer inside this test and burning a known number of tokens — would make the assertion strong enough to actually catch a regression in the eviction/rate-limit path.

Loading
Loading