From def6bce7c469fae5abcd3e3c3f8b99396328231b Mon Sep 17 00:00:00 2001 From: mikkyvans0-source Date: Tue, 2 Jun 2026 12:02:49 +0100 Subject: [PATCH 1/3] feat: split liveness and readiness probes with dependency checks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The app exposed a single flat /health that always returned status: ok, with no distinction between liveness (process up) and readiness (dependencies healthy). Orchestrators could therefore route traffic to instances that were up but unable to serve. Split the signal into two probes, mounted at the root and unauthenticated so orchestrators can reach them: - /health, /livez — cheap, dependency-free liveness check (always 200). - /readyz — readiness check that probes DB connectivity (pingDatabase), ingest lag (lagMonitor), and webhook queue health, honours maintenance mode, and returns 503 when not ready. Reuses the SubStatus / degradation pattern from the monitoring route: "degraded" stays in rotation, "unavailable" fails readiness. Add a pingDatabase() helper (SELECT 1 round-trip), readiness.test.ts covering DB-down, critical lag, maintenance mode, partial failure and queue-saturation edge cases plus information-leak checks, and docs/health.md documenting probe semantics. Probe responses expose only coarse status enums — no hostnames, versions, ledger numbers, or error messages. Co-Authored-By: Claude Opus 4.8 --- backend/docs/health.md | 132 ++++++++++++++ backend/src/app.ts | 12 +- backend/src/lib/database.ts | 21 +++ backend/src/routes/health.ts | 131 ++++++++++++++ backend/src/tests/readiness.test.ts | 257 ++++++++++++++++++++++++++++ 5 files changed, 545 insertions(+), 8 deletions(-) create mode 100644 backend/docs/health.md create mode 100644 backend/src/routes/health.ts create mode 100644 backend/src/tests/readiness.test.ts diff --git a/backend/docs/health.md b/backend/docs/health.md new file mode 100644 index 00000000..a9d2df7b --- /dev/null +++ b/backend/docs/health.md @@ -0,0 +1,132 @@ +# Health, Liveness, and Readiness Probes + +The backend exposes two distinct kinds of health signal. They answer different +questions and orchestrators (Kubernetes, ECS, Nomad, …) act on them +differently. Conflating them — as a single flat `/health` that always returns +`ok` — causes traffic to be routed to instances that are up but unable to serve. + +All probes are mounted at the **root** of the app (not under `/api/v1`) and are +**unauthenticated**, because orchestrators probe them without credentials. + +| Endpoint | Kind | Cost | Checks dependencies | Healthy | Unhealthy | +| ---------- | --------- | ----- | ------------------- | ------- | --------- | +| `/health` | Liveness | cheap | no | `200` | — | +| `/livez` | Liveness | cheap | no | `200` | — | +| `/readyz` | Readiness | real | yes | `200` | `503` | + +## Liveness — `/health`, `/livez` + +> "Is the process up and able to serve an HTTP request at all?" + +Liveness is cheap and **dependency-free**. It returns `200` whenever the event +loop can service a request. A failing liveness probe instructs the orchestrator +to **restart** the container, so it must never consult downstream dependencies — +a transient database blip should not trigger a restart loop. + +`/health` is retained for backward compatibility; `/livez` is the conventional +alias. They are identical. + +```json +{ "status": "ok", "timestamp": "2026-06-02T12:00:00.000Z" } +``` + +## Readiness — `/readyz` + +> "Should this instance receive traffic right now?" + +Readiness probes real dependencies. A failing readiness probe pulls the instance +out of the load-balancer rotation **without restarting it**, so it can recover +and rejoin once its dependencies are healthy again. + +It returns: + +- `200` with `status: "ready"` when the instance can serve traffic. +- `503` with `status: "not_ready"` when a hard dependency is unavailable. +- `503` with `status: "maintenance"` when maintenance mode is enabled. + +```json +{ + "status": "ready", + "database": "ok", + "ingest": "ok", + "webhookQueue": "ok", + "timestamp": "2026-06-02T12:00:00.000Z" +} +``` + +### Sub-status semantics + +Each dependency reports a coarse `SubStatus`, the same pattern used by +`/api/v1/admin/monitoring`: + +- `ok` — healthy. +- `degraded` — serving but impaired. **Does not** fail readiness. +- `unavailable` — could not be reached / unusable. **Fails** readiness (`503`). + +| Sub-status | Probe | `degraded` when | `unavailable` when | +| -------------- | ------------------------------ | ------------------------------------------ | ------------------------------------------- | +| `database` | `pingDatabase()` (`SELECT 1`) | — | connection cannot open or execute | +| `ingest` | `lagMonitor.getLagStatus()` | lag ≥ warn threshold, < critical threshold | lag ≥ critical threshold, or probe throws | +| `webhookQueue` | `webhookQueueService.getStats()` | queue is saturated (`size ≥ capacity`) | the queue's backing store is unreachable | + +Ingest lag thresholds are governed by `LagMonitor` and configurable via +`LAG_WARN_THRESHOLD` / `LAG_CRITICAL_THRESHOLD`. See [reliability.md](./reliability.md). + +The instance is **not ready** (`503`) if *any* sub-status is `unavailable`. +`degraded` sub-statuses are surfaced for observability but keep the instance in +rotation: a slightly stale index or a back-pressured queue is still serviceable. + +### Maintenance mode + +When `statusService.isMaintenanceEnabled()` is true, `/readyz` short-circuits +**before** probing any dependency and returns `503` with `status: "maintenance"`. +The instance is intentionally not serving and should be pulled from rotation. +Liveness is unaffected — the process is healthy, just drained. + +## Edge-case behaviour + +| Scenario | `/health`, `/livez` | `/readyz` | +| ---------------------------- | ------------------- | ----------------------------------------------- | +| All healthy | `200 ok` | `200 ready` | +| Database down | `200 ok` | `503 not_ready`, `database: unavailable` | +| Warn-level lag | `200 ok` | `200 ready`, `ingest: degraded` | +| Critical lag | `200 ok` | `503 not_ready`, `ingest: unavailable` | +| Lag probe throws | `200 ok` | `503 not_ready`, `ingest: unavailable` | +| Queue store unreachable | `200 ok` | `503 not_ready`, `webhookQueue: unavailable` | +| Queue saturated | `200 ok` | `200 ready`, `webhookQueue: degraded` | +| Maintenance mode | `200 ok` | `503 maintenance` | +| Partial failure (one dep) | `200 ok` | `503 not_ready` (failing dep `unavailable`, rest `ok`) | + +## Security + +These probes are unauthenticated, so their responses are deliberately minimal. +They expose only the coarse status enums above and a timestamp. They do **not** +leak: + +- internal hostnames or connection strings, +- application or dependency versions, +- absolute ledger numbers or ingest-lag values, +- queue depths or capacities, +- underlying exception messages (dependency errors are caught and collapsed to + `unavailable`). + +Richer, sensitive diagnostics (queue depths, invariant counters, cursor +positions, versions) remain behind API-key auth at +[`/api/v1/admin/monitoring`](./admin-monitoring.md). + +## Orchestrator configuration (Kubernetes example) + +```yaml +livenessProbe: + httpGet: + path: /livez + port: 3000 + initialDelaySeconds: 5 + periodSeconds: 10 +readinessProbe: + httpGet: + path: /readyz + port: 3000 + initialDelaySeconds: 5 + periodSeconds: 5 +``` diff --git a/backend/src/app.ts b/backend/src/app.ts index 08350047..99d3fb48 100644 --- a/backend/src/app.ts +++ b/backend/src/app.ts @@ -9,6 +9,7 @@ import { csrfMiddleware } from "./middleware/csrf"; import { corsOptionsDelegate, webhookCorsOptions } from "./config/cors"; import v1Routes from "./routes/v1"; import webhookRoutes from "./routes/webhooks"; +import healthRoutes from "./routes/health"; import { requestLogger } from "./middleware/request-logger"; const app = express(); @@ -47,14 +48,9 @@ app.use("/api/webhooks", cors(webhookCorsOptions), webhookRoutes); app.use(csrfMiddleware); app.use("/api/v1", v1Routes); -// Health check (root level as well if needed) -app.get("/health", (req, res) => { - res.json({ - status: "ok", - version: "1.0.0", - timestamp: new Date().toISOString(), - }); -}); +// Liveness (/health, /livez) and readiness (/readyz) probes. +// Mounted at the root and left unauthenticated so orchestrators can probe them. +app.use(healthRoutes); // 404 handler app.use((req, res) => { diff --git a/backend/src/lib/database.ts b/backend/src/lib/database.ts index 979cb367..6e444921 100644 --- a/backend/src/lib/database.ts +++ b/backend/src/lib/database.ts @@ -77,6 +77,27 @@ export function getStatementCacheStats() { }; } +/** + * Probe database connectivity with a trivial round-trip query. + * + * Used by the readiness endpoint to verify the SQLite connection can both + * open and execute. Returns true on success, false on any failure (a locked, + * corrupt, or unopenable database). Never throws so callers can branch on the + * boolean without their own try/catch. + * + * The query (`SELECT 1`) is constant and parameter-free, so it carries no + * user input and leaks no schema details. + */ +export function pingDatabase(): boolean { + try { + const db = getDatabase(); + const row = db.prepare("SELECT 1 AS ok").get(); + return row?.ok === 1; + } catch { + return false; + } +} + /** * Close the database connection and clear the statement cache. * Ensures clean shutdown and prevents memory leaks. diff --git a/backend/src/routes/health.ts b/backend/src/routes/health.ts new file mode 100644 index 00000000..a1da1199 --- /dev/null +++ b/backend/src/routes/health.ts @@ -0,0 +1,131 @@ +/** + * Liveness and readiness probes. + * + * These endpoints are intentionally mounted at the root of the app (not under + * /api/v1) and are unauthenticated, because container orchestrators (Kubernetes, + * ECS, Nomad, …) probe them without credentials. + * + * Two distinct concerns: + * + * GET /health, GET /livez — Liveness. "Is the process up and able to serve + * an HTTP request at all?" Cheap and dependency-free. A failing liveness + * probe tells the orchestrator to restart the container, so it must NOT + * consult downstream dependencies — a transient DB blip should not trigger + * a restart loop. + * + * GET /readyz — Readiness. "Should this instance receive traffic right now?" + * Probes real dependencies (DB connectivity, ingest lag, webhook queue) and + * returns 503 when any hard dependency is unavailable or when maintenance + * mode is enabled. A failing readiness probe pulls the instance out of the + * load-balancer rotation without restarting it. + * + * Security: responses expose only coarse status enums per sub-system. They do + * not leak internal hostnames, versions, queue depths, ledger numbers, or error + * messages to unauthenticated callers. The richer, authenticated diagnostics + * remain under /api/v1/admin/monitoring. + */ + +import { Router, Request, Response } from "express"; +import { pingDatabase } from "../lib/database"; +import { statusService } from "../services/statusService"; +import { lagMonitor } from "../services/lagMonitor"; +import { webhookQueueService } from "../services/webhookQueueService"; + +const router = Router(); + +/** + * Coarse per-dependency status, mirroring the SubStatus pattern used by + * /api/v1/admin/monitoring. "degraded" means serving but impaired (does not + * fail readiness); "unavailable" means the dependency could not be reached + * (fails readiness). + */ +type SubStatus = "ok" | "degraded" | "unavailable"; + +type ReadyStatus = "ready" | "not_ready" | "maintenance"; + +// --------------------------------------------------------------------------- +// Liveness +// --------------------------------------------------------------------------- + +function liveness(_req: Request, res: Response): void { + res.json({ + status: "ok", + timestamp: new Date().toISOString(), + }); +} + +// Keep the historical /health path as a liveness check, and add the +// conventional /livez alias. +router.get("/health", liveness); +router.get("/livez", liveness); + +// --------------------------------------------------------------------------- +// Readiness +// --------------------------------------------------------------------------- + +router.get("/readyz", async (_req: Request, res: Response) => { + // Maintenance mode short-circuits readiness: the instance is intentionally + // not serving, so it should be pulled from rotation regardless of deps. + if (statusService.isMaintenanceEnabled()) { + res.status(503).json({ + status: "maintenance" as ReadyStatus, + database: "ok" as SubStatus, + ingest: "ok" as SubStatus, + webhookQueue: "ok" as SubStatus, + timestamp: new Date().toISOString(), + }); + return; + } + + // --- Database connectivity (hard dependency) --------------------------- + let database: SubStatus = "ok"; + if (!pingDatabase()) { + database = "unavailable"; + } + + // --- Ingest lag -------------------------------------------------------- + // Reuse the LagMonitor degradation logic. "warn" lag is degraded but still + // serviceable; "critical" lag means the indexed view is too stale to trust, + // so we treat it as unavailable for readiness. + let ingest: SubStatus = "ok"; + try { + const lag = await lagMonitor.getLagStatus(); + if (lag.isCritical) { + ingest = "unavailable"; + } else if (lag.isDegraded) { + ingest = "degraded"; + } + } catch { + ingest = "unavailable"; + } + + // --- Webhook queue health (hard dependency on its backing store) ------- + // A throw here means the queue's store is unreachable. Saturation (queue at + // capacity) is back-pressure, not unreadiness, so it is reported as degraded. + let webhookQueue: SubStatus = "ok"; + try { + const stats = webhookQueueService.getStats(); + if (stats.capacity > 0 && stats.size >= stats.capacity) { + webhookQueue = "degraded"; + } + } catch { + webhookQueue = "unavailable"; + } + + const unavailable = + database === "unavailable" || + ingest === "unavailable" || + webhookQueue === "unavailable"; + + const status: ReadyStatus = unavailable ? "not_ready" : "ready"; + + res.status(unavailable ? 503 : 200).json({ + status, + database, + ingest, + webhookQueue, + timestamp: new Date().toISOString(), + }); +}); + +export default router; diff --git a/backend/src/tests/readiness.test.ts b/backend/src/tests/readiness.test.ts new file mode 100644 index 00000000..68dec63f --- /dev/null +++ b/backend/src/tests/readiness.test.ts @@ -0,0 +1,257 @@ +/** + * Liveness and readiness probe tests. + * + * Covers: + * - Liveness (/health, /livez) is cheap, always 200, dependency-free. + * - Readiness (/readyz) probes DB connectivity, ingest lag, and the webhook + * queue, and honours maintenance mode. + * - Edge cases: DB down, high (critical) lag, maintenance mode, partial + * dependency failure, queue saturation. + * - Security: probes do not leak internal hostnames, versions, or error + * details to unauthenticated callers. + */ + +import express from "express"; +import supertest from "supertest"; +import healthRoutes from "../routes/health"; +import { statusService } from "../services/statusService"; +import { lagMonitor } from "../services/lagMonitor"; +import { webhookQueueService } from "../services/webhookQueueService"; +import * as database from "../lib/database"; + +// Mount the health router the same way app.ts does: at the root, with no auth. +// Probes are unauthenticated, so no X-API-Key header is sent anywhere here. +// (We mount the router in isolation rather than importing the full app so the +// probe behaviour is exercised independently of the rest of the route graph.) +const app = express(); +app.use(express.json()); +app.use(healthRoutes); + +const HEALTHY_QUEUE_STATS = { + depth: 0, + size: 0, + capacity: 5000, + overflowCount: 0, + pendingCount: 0, + successCount: 0, + failureCount: 0, + oldestTimestamp: null, +}; + +beforeEach(() => { + // Healthy baseline: maintenance off, lag well under the warn threshold. + statusService.setMaintenanceMode(false); + statusService.updateLastIndexedLedger(100000); + statusService.setMockCurrentLedger(100002); // lag = 2 + + // The test database has no webhook_queue schema, so stub the queue stats to + // a healthy value by default. Individual tests override this to exercise the + // saturated / unavailable paths. This keeps the suite focused on probe logic + // rather than queue persistence (covered by webhookQueue.persist.test.ts). + jest + .spyOn(webhookQueueService, "getStats") + .mockReturnValue(HEALTHY_QUEUE_STATS as any); +}); + +afterEach(() => { + statusService.setMaintenanceMode(false); + statusService.setMockCurrentLedger(null); + jest.restoreAllMocks(); +}); + +// --------------------------------------------------------------------------- +// Liveness +// --------------------------------------------------------------------------- + +describe("Liveness probe", () => { + it("GET /health returns 200 with status ok", async () => { + const res = await supertest(app).get("/health"); + expect(res.status).toBe(200); + expect(res.body.status).toBe("ok"); + expect(res.body.timestamp).toMatch(/^\d{4}-\d{2}-\d{2}T/); + }); + + it("GET /livez returns 200 with status ok", async () => { + const res = await supertest(app).get("/livez"); + expect(res.status).toBe(200); + expect(res.body.status).toBe("ok"); + }); + + it("liveness stays up even when a dependency is down", async () => { + jest.spyOn(database, "pingDatabase").mockReturnValue(false); + const res = await supertest(app).get("/health"); + expect(res.status).toBe(200); + expect(res.body.status).toBe("ok"); + }); + + it("liveness is dependency-free (does not call pingDatabase)", async () => { + const spy = jest.spyOn(database, "pingDatabase"); + await supertest(app).get("/livez"); + expect(spy).not.toHaveBeenCalled(); + }); +}); + +// --------------------------------------------------------------------------- +// Readiness — happy path +// --------------------------------------------------------------------------- + +describe("Readiness probe — ready", () => { + it("GET /readyz returns 200 when all dependencies are healthy", async () => { + const res = await supertest(app).get("/readyz"); + expect(res.status).toBe(200); + expect(res.body.status).toBe("ready"); + expect(res.body.database).toBe("ok"); + expect(res.body.ingest).toBe("ok"); + expect(res.body.webhookQueue).toBe("ok"); + expect(res.body.timestamp).toMatch(/^\d{4}-\d{2}-\d{2}T/); + }); + + it("stays ready with warn-level (degraded) lag", async () => { + statusService.setMockCurrentLedger(100020); // lag = 20, >= warn(10), < critical(50) + const res = await supertest(app).get("/readyz"); + expect(res.status).toBe(200); + expect(res.body.status).toBe("ready"); + expect(res.body.ingest).toBe("degraded"); + }); +}); + +// --------------------------------------------------------------------------- +// Readiness — maintenance mode +// --------------------------------------------------------------------------- + +describe("Readiness probe — maintenance mode", () => { + it("returns 503 with maintenance status when maintenance is enabled", async () => { + statusService.setMaintenanceMode(true); + const res = await supertest(app).get("/readyz"); + expect(res.status).toBe(503); + expect(res.body.status).toBe("maintenance"); + }); + + it("short-circuits before probing dependencies", async () => { + statusService.setMaintenanceMode(true); + const spy = jest.spyOn(database, "pingDatabase"); + await supertest(app).get("/readyz"); + expect(spy).not.toHaveBeenCalled(); + }); +}); + +// --------------------------------------------------------------------------- +// Readiness — dependency failures +// --------------------------------------------------------------------------- + +describe("Readiness probe — DB down", () => { + it("returns 503 not_ready when the database is unreachable", async () => { + jest.spyOn(database, "pingDatabase").mockReturnValue(false); + const res = await supertest(app).get("/readyz"); + expect(res.status).toBe(503); + expect(res.body.status).toBe("not_ready"); + expect(res.body.database).toBe("unavailable"); + }); +}); + +describe("Readiness probe — high lag", () => { + it("returns 503 not_ready when ingest lag is critical", async () => { + statusService.setMockCurrentLedger(100100); // lag = 100, >= critical(50) + const res = await supertest(app).get("/readyz"); + expect(res.status).toBe(503); + expect(res.body.status).toBe("not_ready"); + expect(res.body.ingest).toBe("unavailable"); + }); + + it("returns 503 not_ready when the lag probe throws", async () => { + jest.spyOn(lagMonitor, "getLagStatus").mockRejectedValue(new Error("rpc down")); + const res = await supertest(app).get("/readyz"); + expect(res.status).toBe(503); + expect(res.body.ingest).toBe("unavailable"); + }); +}); + +describe("Readiness probe — webhook queue", () => { + it("returns 503 not_ready when the queue store is unreachable", async () => { + jest.spyOn(webhookQueueService, "getStats").mockImplementation(() => { + throw new Error("queue store unavailable"); + }); + const res = await supertest(app).get("/readyz"); + expect(res.status).toBe(503); + expect(res.body.status).toBe("not_ready"); + expect(res.body.webhookQueue).toBe("unavailable"); + }); + + it("stays ready (degraded) when the queue is saturated", async () => { + jest.spyOn(webhookQueueService, "getStats").mockReturnValue({ + depth: 5000, + size: 5000, + capacity: 5000, + overflowCount: 3, + pendingCount: 5000, + successCount: 0, + failureCount: 0, + oldestTimestamp: null, + } as any); + const res = await supertest(app).get("/readyz"); + expect(res.status).toBe(200); + expect(res.body.status).toBe("ready"); + expect(res.body.webhookQueue).toBe("degraded"); + }); +}); + +describe("Readiness probe — partial dependency failure", () => { + it("a single unavailable dependency fails readiness while others stay ok", async () => { + jest.spyOn(database, "pingDatabase").mockReturnValue(false); + statusService.setMockCurrentLedger(100002); // lag = 2, healthy + const res = await supertest(app).get("/readyz"); + expect(res.status).toBe(503); + expect(res.body.status).toBe("not_ready"); + expect(res.body.database).toBe("unavailable"); + expect(res.body.ingest).toBe("ok"); + expect(res.body.webhookQueue).toBe("ok"); + }); + + it("degraded lag plus DB down still reports both sub-statuses", async () => { + jest.spyOn(database, "pingDatabase").mockReturnValue(false); + statusService.setMockCurrentLedger(100020); // lag = 20, degraded + const res = await supertest(app).get("/readyz"); + expect(res.status).toBe(503); + expect(res.body.database).toBe("unavailable"); + expect(res.body.ingest).toBe("degraded"); + }); +}); + +// --------------------------------------------------------------------------- +// Security — no information leakage to unauthenticated callers +// --------------------------------------------------------------------------- + +describe("Readiness probe — does not leak internal details", () => { + const sub = ["ok", "degraded", "unavailable"]; + + it("readiness response contains only coarse status fields", async () => { + const res = await supertest(app).get("/readyz"); + expect(Object.keys(res.body).sort()).toEqual( + ["database", "ingest", "status", "timestamp", "webhookQueue"].sort() + ); + // No version, hostname, ledger numbers, queue depths, or error strings. + expect(res.body).not.toHaveProperty("version"); + expect(res.body).not.toHaveProperty("host"); + expect(res.body).not.toHaveProperty("lag"); + expect(res.body).not.toHaveProperty("error"); + expect(sub).toContain(res.body.database); + expect(sub).toContain(res.body.ingest); + expect(sub).toContain(res.body.webhookQueue); + }); + + it("does not surface the underlying error message when a dependency throws", async () => { + jest + .spyOn(lagMonitor, "getLagStatus") + .mockRejectedValue(new Error("postgres://secret-host:5432 refused")); + const res = await supertest(app).get("/readyz"); + const serialized = JSON.stringify(res.body); + expect(serialized).not.toContain("secret-host"); + expect(serialized).not.toContain("postgres"); + }); + + it("liveness response contains only status and timestamp", async () => { + const res = await supertest(app).get("/health"); + expect(Object.keys(res.body).sort()).toEqual(["status", "timestamp"].sort()); + expect(res.body).not.toHaveProperty("version"); + }); +}); From 9c7e079eeb33b1898ba3cbf3eebdf53d64e4e159 Mon Sep 17 00:00:00 2001 From: mikkyvans0-source Date: Tue, 2 Jun 2026 12:02:49 +0100 Subject: [PATCH 2/3] feat: split liveness and readiness probes with dependency checks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The app exposed a single flat /health that always returned status: ok, with no distinction between liveness (process up) and readiness (dependencies healthy). Orchestrators could therefore route traffic to instances that were up but unable to serve. Split the signal into two probes, mounted at the root and unauthenticated so orchestrators can reach them: - /health, /livez — cheap, dependency-free liveness check (always 200). - /readyz — readiness check that probes DB connectivity (pingDatabase), ingest lag (lagMonitor), and webhook queue health, honours maintenance mode, and returns 503 when not ready. Reuses the SubStatus / degradation pattern from the monitoring route: "degraded" stays in rotation, "unavailable" fails readiness. Add a pingDatabase() helper (SELECT 1 round-trip), readiness.test.ts covering DB-down, critical lag, maintenance mode, partial failure and queue-saturation edge cases plus information-leak checks, and docs/health.md documenting probe semantics. Probe responses expose only coarse status enums — no hostnames, versions, ledger numbers, or error messages. --- backend/docs/health.md | 132 ++++++++++++++ backend/src/app.ts | 12 +- backend/src/lib/database.ts | 21 +++ backend/src/routes/health.ts | 131 ++++++++++++++ backend/src/tests/readiness.test.ts | 257 ++++++++++++++++++++++++++++ 5 files changed, 545 insertions(+), 8 deletions(-) create mode 100644 backend/docs/health.md create mode 100644 backend/src/routes/health.ts create mode 100644 backend/src/tests/readiness.test.ts diff --git a/backend/docs/health.md b/backend/docs/health.md new file mode 100644 index 00000000..a9d2df7b --- /dev/null +++ b/backend/docs/health.md @@ -0,0 +1,132 @@ +# Health, Liveness, and Readiness Probes + +The backend exposes two distinct kinds of health signal. They answer different +questions and orchestrators (Kubernetes, ECS, Nomad, …) act on them +differently. Conflating them — as a single flat `/health` that always returns +`ok` — causes traffic to be routed to instances that are up but unable to serve. + +All probes are mounted at the **root** of the app (not under `/api/v1`) and are +**unauthenticated**, because orchestrators probe them without credentials. + +| Endpoint | Kind | Cost | Checks dependencies | Healthy | Unhealthy | +| ---------- | --------- | ----- | ------------------- | ------- | --------- | +| `/health` | Liveness | cheap | no | `200` | — | +| `/livez` | Liveness | cheap | no | `200` | — | +| `/readyz` | Readiness | real | yes | `200` | `503` | + +## Liveness — `/health`, `/livez` + +> "Is the process up and able to serve an HTTP request at all?" + +Liveness is cheap and **dependency-free**. It returns `200` whenever the event +loop can service a request. A failing liveness probe instructs the orchestrator +to **restart** the container, so it must never consult downstream dependencies — +a transient database blip should not trigger a restart loop. + +`/health` is retained for backward compatibility; `/livez` is the conventional +alias. They are identical. + +```json +{ "status": "ok", "timestamp": "2026-06-02T12:00:00.000Z" } +``` + +## Readiness — `/readyz` + +> "Should this instance receive traffic right now?" + +Readiness probes real dependencies. A failing readiness probe pulls the instance +out of the load-balancer rotation **without restarting it**, so it can recover +and rejoin once its dependencies are healthy again. + +It returns: + +- `200` with `status: "ready"` when the instance can serve traffic. +- `503` with `status: "not_ready"` when a hard dependency is unavailable. +- `503` with `status: "maintenance"` when maintenance mode is enabled. + +```json +{ + "status": "ready", + "database": "ok", + "ingest": "ok", + "webhookQueue": "ok", + "timestamp": "2026-06-02T12:00:00.000Z" +} +``` + +### Sub-status semantics + +Each dependency reports a coarse `SubStatus`, the same pattern used by +`/api/v1/admin/monitoring`: + +- `ok` — healthy. +- `degraded` — serving but impaired. **Does not** fail readiness. +- `unavailable` — could not be reached / unusable. **Fails** readiness (`503`). + +| Sub-status | Probe | `degraded` when | `unavailable` when | +| -------------- | ------------------------------ | ------------------------------------------ | ------------------------------------------- | +| `database` | `pingDatabase()` (`SELECT 1`) | — | connection cannot open or execute | +| `ingest` | `lagMonitor.getLagStatus()` | lag ≥ warn threshold, < critical threshold | lag ≥ critical threshold, or probe throws | +| `webhookQueue` | `webhookQueueService.getStats()` | queue is saturated (`size ≥ capacity`) | the queue's backing store is unreachable | + +Ingest lag thresholds are governed by `LagMonitor` and configurable via +`LAG_WARN_THRESHOLD` / `LAG_CRITICAL_THRESHOLD`. See [reliability.md](./reliability.md). + +The instance is **not ready** (`503`) if *any* sub-status is `unavailable`. +`degraded` sub-statuses are surfaced for observability but keep the instance in +rotation: a slightly stale index or a back-pressured queue is still serviceable. + +### Maintenance mode + +When `statusService.isMaintenanceEnabled()` is true, `/readyz` short-circuits +**before** probing any dependency and returns `503` with `status: "maintenance"`. +The instance is intentionally not serving and should be pulled from rotation. +Liveness is unaffected — the process is healthy, just drained. + +## Edge-case behaviour + +| Scenario | `/health`, `/livez` | `/readyz` | +| ---------------------------- | ------------------- | ----------------------------------------------- | +| All healthy | `200 ok` | `200 ready` | +| Database down | `200 ok` | `503 not_ready`, `database: unavailable` | +| Warn-level lag | `200 ok` | `200 ready`, `ingest: degraded` | +| Critical lag | `200 ok` | `503 not_ready`, `ingest: unavailable` | +| Lag probe throws | `200 ok` | `503 not_ready`, `ingest: unavailable` | +| Queue store unreachable | `200 ok` | `503 not_ready`, `webhookQueue: unavailable` | +| Queue saturated | `200 ok` | `200 ready`, `webhookQueue: degraded` | +| Maintenance mode | `200 ok` | `503 maintenance` | +| Partial failure (one dep) | `200 ok` | `503 not_ready` (failing dep `unavailable`, rest `ok`) | + +## Security + +These probes are unauthenticated, so their responses are deliberately minimal. +They expose only the coarse status enums above and a timestamp. They do **not** +leak: + +- internal hostnames or connection strings, +- application or dependency versions, +- absolute ledger numbers or ingest-lag values, +- queue depths or capacities, +- underlying exception messages (dependency errors are caught and collapsed to + `unavailable`). + +Richer, sensitive diagnostics (queue depths, invariant counters, cursor +positions, versions) remain behind API-key auth at +[`/api/v1/admin/monitoring`](./admin-monitoring.md). + +## Orchestrator configuration (Kubernetes example) + +```yaml +livenessProbe: + httpGet: + path: /livez + port: 3000 + initialDelaySeconds: 5 + periodSeconds: 10 +readinessProbe: + httpGet: + path: /readyz + port: 3000 + initialDelaySeconds: 5 + periodSeconds: 5 +``` diff --git a/backend/src/app.ts b/backend/src/app.ts index 08350047..99d3fb48 100644 --- a/backend/src/app.ts +++ b/backend/src/app.ts @@ -9,6 +9,7 @@ import { csrfMiddleware } from "./middleware/csrf"; import { corsOptionsDelegate, webhookCorsOptions } from "./config/cors"; import v1Routes from "./routes/v1"; import webhookRoutes from "./routes/webhooks"; +import healthRoutes from "./routes/health"; import { requestLogger } from "./middleware/request-logger"; const app = express(); @@ -47,14 +48,9 @@ app.use("/api/webhooks", cors(webhookCorsOptions), webhookRoutes); app.use(csrfMiddleware); app.use("/api/v1", v1Routes); -// Health check (root level as well if needed) -app.get("/health", (req, res) => { - res.json({ - status: "ok", - version: "1.0.0", - timestamp: new Date().toISOString(), - }); -}); +// Liveness (/health, /livez) and readiness (/readyz) probes. +// Mounted at the root and left unauthenticated so orchestrators can probe them. +app.use(healthRoutes); // 404 handler app.use((req, res) => { diff --git a/backend/src/lib/database.ts b/backend/src/lib/database.ts index 979cb367..6e444921 100644 --- a/backend/src/lib/database.ts +++ b/backend/src/lib/database.ts @@ -77,6 +77,27 @@ export function getStatementCacheStats() { }; } +/** + * Probe database connectivity with a trivial round-trip query. + * + * Used by the readiness endpoint to verify the SQLite connection can both + * open and execute. Returns true on success, false on any failure (a locked, + * corrupt, or unopenable database). Never throws so callers can branch on the + * boolean without their own try/catch. + * + * The query (`SELECT 1`) is constant and parameter-free, so it carries no + * user input and leaks no schema details. + */ +export function pingDatabase(): boolean { + try { + const db = getDatabase(); + const row = db.prepare("SELECT 1 AS ok").get(); + return row?.ok === 1; + } catch { + return false; + } +} + /** * Close the database connection and clear the statement cache. * Ensures clean shutdown and prevents memory leaks. diff --git a/backend/src/routes/health.ts b/backend/src/routes/health.ts new file mode 100644 index 00000000..a1da1199 --- /dev/null +++ b/backend/src/routes/health.ts @@ -0,0 +1,131 @@ +/** + * Liveness and readiness probes. + * + * These endpoints are intentionally mounted at the root of the app (not under + * /api/v1) and are unauthenticated, because container orchestrators (Kubernetes, + * ECS, Nomad, …) probe them without credentials. + * + * Two distinct concerns: + * + * GET /health, GET /livez — Liveness. "Is the process up and able to serve + * an HTTP request at all?" Cheap and dependency-free. A failing liveness + * probe tells the orchestrator to restart the container, so it must NOT + * consult downstream dependencies — a transient DB blip should not trigger + * a restart loop. + * + * GET /readyz — Readiness. "Should this instance receive traffic right now?" + * Probes real dependencies (DB connectivity, ingest lag, webhook queue) and + * returns 503 when any hard dependency is unavailable or when maintenance + * mode is enabled. A failing readiness probe pulls the instance out of the + * load-balancer rotation without restarting it. + * + * Security: responses expose only coarse status enums per sub-system. They do + * not leak internal hostnames, versions, queue depths, ledger numbers, or error + * messages to unauthenticated callers. The richer, authenticated diagnostics + * remain under /api/v1/admin/monitoring. + */ + +import { Router, Request, Response } from "express"; +import { pingDatabase } from "../lib/database"; +import { statusService } from "../services/statusService"; +import { lagMonitor } from "../services/lagMonitor"; +import { webhookQueueService } from "../services/webhookQueueService"; + +const router = Router(); + +/** + * Coarse per-dependency status, mirroring the SubStatus pattern used by + * /api/v1/admin/monitoring. "degraded" means serving but impaired (does not + * fail readiness); "unavailable" means the dependency could not be reached + * (fails readiness). + */ +type SubStatus = "ok" | "degraded" | "unavailable"; + +type ReadyStatus = "ready" | "not_ready" | "maintenance"; + +// --------------------------------------------------------------------------- +// Liveness +// --------------------------------------------------------------------------- + +function liveness(_req: Request, res: Response): void { + res.json({ + status: "ok", + timestamp: new Date().toISOString(), + }); +} + +// Keep the historical /health path as a liveness check, and add the +// conventional /livez alias. +router.get("/health", liveness); +router.get("/livez", liveness); + +// --------------------------------------------------------------------------- +// Readiness +// --------------------------------------------------------------------------- + +router.get("/readyz", async (_req: Request, res: Response) => { + // Maintenance mode short-circuits readiness: the instance is intentionally + // not serving, so it should be pulled from rotation regardless of deps. + if (statusService.isMaintenanceEnabled()) { + res.status(503).json({ + status: "maintenance" as ReadyStatus, + database: "ok" as SubStatus, + ingest: "ok" as SubStatus, + webhookQueue: "ok" as SubStatus, + timestamp: new Date().toISOString(), + }); + return; + } + + // --- Database connectivity (hard dependency) --------------------------- + let database: SubStatus = "ok"; + if (!pingDatabase()) { + database = "unavailable"; + } + + // --- Ingest lag -------------------------------------------------------- + // Reuse the LagMonitor degradation logic. "warn" lag is degraded but still + // serviceable; "critical" lag means the indexed view is too stale to trust, + // so we treat it as unavailable for readiness. + let ingest: SubStatus = "ok"; + try { + const lag = await lagMonitor.getLagStatus(); + if (lag.isCritical) { + ingest = "unavailable"; + } else if (lag.isDegraded) { + ingest = "degraded"; + } + } catch { + ingest = "unavailable"; + } + + // --- Webhook queue health (hard dependency on its backing store) ------- + // A throw here means the queue's store is unreachable. Saturation (queue at + // capacity) is back-pressure, not unreadiness, so it is reported as degraded. + let webhookQueue: SubStatus = "ok"; + try { + const stats = webhookQueueService.getStats(); + if (stats.capacity > 0 && stats.size >= stats.capacity) { + webhookQueue = "degraded"; + } + } catch { + webhookQueue = "unavailable"; + } + + const unavailable = + database === "unavailable" || + ingest === "unavailable" || + webhookQueue === "unavailable"; + + const status: ReadyStatus = unavailable ? "not_ready" : "ready"; + + res.status(unavailable ? 503 : 200).json({ + status, + database, + ingest, + webhookQueue, + timestamp: new Date().toISOString(), + }); +}); + +export default router; diff --git a/backend/src/tests/readiness.test.ts b/backend/src/tests/readiness.test.ts new file mode 100644 index 00000000..68dec63f --- /dev/null +++ b/backend/src/tests/readiness.test.ts @@ -0,0 +1,257 @@ +/** + * Liveness and readiness probe tests. + * + * Covers: + * - Liveness (/health, /livez) is cheap, always 200, dependency-free. + * - Readiness (/readyz) probes DB connectivity, ingest lag, and the webhook + * queue, and honours maintenance mode. + * - Edge cases: DB down, high (critical) lag, maintenance mode, partial + * dependency failure, queue saturation. + * - Security: probes do not leak internal hostnames, versions, or error + * details to unauthenticated callers. + */ + +import express from "express"; +import supertest from "supertest"; +import healthRoutes from "../routes/health"; +import { statusService } from "../services/statusService"; +import { lagMonitor } from "../services/lagMonitor"; +import { webhookQueueService } from "../services/webhookQueueService"; +import * as database from "../lib/database"; + +// Mount the health router the same way app.ts does: at the root, with no auth. +// Probes are unauthenticated, so no X-API-Key header is sent anywhere here. +// (We mount the router in isolation rather than importing the full app so the +// probe behaviour is exercised independently of the rest of the route graph.) +const app = express(); +app.use(express.json()); +app.use(healthRoutes); + +const HEALTHY_QUEUE_STATS = { + depth: 0, + size: 0, + capacity: 5000, + overflowCount: 0, + pendingCount: 0, + successCount: 0, + failureCount: 0, + oldestTimestamp: null, +}; + +beforeEach(() => { + // Healthy baseline: maintenance off, lag well under the warn threshold. + statusService.setMaintenanceMode(false); + statusService.updateLastIndexedLedger(100000); + statusService.setMockCurrentLedger(100002); // lag = 2 + + // The test database has no webhook_queue schema, so stub the queue stats to + // a healthy value by default. Individual tests override this to exercise the + // saturated / unavailable paths. This keeps the suite focused on probe logic + // rather than queue persistence (covered by webhookQueue.persist.test.ts). + jest + .spyOn(webhookQueueService, "getStats") + .mockReturnValue(HEALTHY_QUEUE_STATS as any); +}); + +afterEach(() => { + statusService.setMaintenanceMode(false); + statusService.setMockCurrentLedger(null); + jest.restoreAllMocks(); +}); + +// --------------------------------------------------------------------------- +// Liveness +// --------------------------------------------------------------------------- + +describe("Liveness probe", () => { + it("GET /health returns 200 with status ok", async () => { + const res = await supertest(app).get("/health"); + expect(res.status).toBe(200); + expect(res.body.status).toBe("ok"); + expect(res.body.timestamp).toMatch(/^\d{4}-\d{2}-\d{2}T/); + }); + + it("GET /livez returns 200 with status ok", async () => { + const res = await supertest(app).get("/livez"); + expect(res.status).toBe(200); + expect(res.body.status).toBe("ok"); + }); + + it("liveness stays up even when a dependency is down", async () => { + jest.spyOn(database, "pingDatabase").mockReturnValue(false); + const res = await supertest(app).get("/health"); + expect(res.status).toBe(200); + expect(res.body.status).toBe("ok"); + }); + + it("liveness is dependency-free (does not call pingDatabase)", async () => { + const spy = jest.spyOn(database, "pingDatabase"); + await supertest(app).get("/livez"); + expect(spy).not.toHaveBeenCalled(); + }); +}); + +// --------------------------------------------------------------------------- +// Readiness — happy path +// --------------------------------------------------------------------------- + +describe("Readiness probe — ready", () => { + it("GET /readyz returns 200 when all dependencies are healthy", async () => { + const res = await supertest(app).get("/readyz"); + expect(res.status).toBe(200); + expect(res.body.status).toBe("ready"); + expect(res.body.database).toBe("ok"); + expect(res.body.ingest).toBe("ok"); + expect(res.body.webhookQueue).toBe("ok"); + expect(res.body.timestamp).toMatch(/^\d{4}-\d{2}-\d{2}T/); + }); + + it("stays ready with warn-level (degraded) lag", async () => { + statusService.setMockCurrentLedger(100020); // lag = 20, >= warn(10), < critical(50) + const res = await supertest(app).get("/readyz"); + expect(res.status).toBe(200); + expect(res.body.status).toBe("ready"); + expect(res.body.ingest).toBe("degraded"); + }); +}); + +// --------------------------------------------------------------------------- +// Readiness — maintenance mode +// --------------------------------------------------------------------------- + +describe("Readiness probe — maintenance mode", () => { + it("returns 503 with maintenance status when maintenance is enabled", async () => { + statusService.setMaintenanceMode(true); + const res = await supertest(app).get("/readyz"); + expect(res.status).toBe(503); + expect(res.body.status).toBe("maintenance"); + }); + + it("short-circuits before probing dependencies", async () => { + statusService.setMaintenanceMode(true); + const spy = jest.spyOn(database, "pingDatabase"); + await supertest(app).get("/readyz"); + expect(spy).not.toHaveBeenCalled(); + }); +}); + +// --------------------------------------------------------------------------- +// Readiness — dependency failures +// --------------------------------------------------------------------------- + +describe("Readiness probe — DB down", () => { + it("returns 503 not_ready when the database is unreachable", async () => { + jest.spyOn(database, "pingDatabase").mockReturnValue(false); + const res = await supertest(app).get("/readyz"); + expect(res.status).toBe(503); + expect(res.body.status).toBe("not_ready"); + expect(res.body.database).toBe("unavailable"); + }); +}); + +describe("Readiness probe — high lag", () => { + it("returns 503 not_ready when ingest lag is critical", async () => { + statusService.setMockCurrentLedger(100100); // lag = 100, >= critical(50) + const res = await supertest(app).get("/readyz"); + expect(res.status).toBe(503); + expect(res.body.status).toBe("not_ready"); + expect(res.body.ingest).toBe("unavailable"); + }); + + it("returns 503 not_ready when the lag probe throws", async () => { + jest.spyOn(lagMonitor, "getLagStatus").mockRejectedValue(new Error("rpc down")); + const res = await supertest(app).get("/readyz"); + expect(res.status).toBe(503); + expect(res.body.ingest).toBe("unavailable"); + }); +}); + +describe("Readiness probe — webhook queue", () => { + it("returns 503 not_ready when the queue store is unreachable", async () => { + jest.spyOn(webhookQueueService, "getStats").mockImplementation(() => { + throw new Error("queue store unavailable"); + }); + const res = await supertest(app).get("/readyz"); + expect(res.status).toBe(503); + expect(res.body.status).toBe("not_ready"); + expect(res.body.webhookQueue).toBe("unavailable"); + }); + + it("stays ready (degraded) when the queue is saturated", async () => { + jest.spyOn(webhookQueueService, "getStats").mockReturnValue({ + depth: 5000, + size: 5000, + capacity: 5000, + overflowCount: 3, + pendingCount: 5000, + successCount: 0, + failureCount: 0, + oldestTimestamp: null, + } as any); + const res = await supertest(app).get("/readyz"); + expect(res.status).toBe(200); + expect(res.body.status).toBe("ready"); + expect(res.body.webhookQueue).toBe("degraded"); + }); +}); + +describe("Readiness probe — partial dependency failure", () => { + it("a single unavailable dependency fails readiness while others stay ok", async () => { + jest.spyOn(database, "pingDatabase").mockReturnValue(false); + statusService.setMockCurrentLedger(100002); // lag = 2, healthy + const res = await supertest(app).get("/readyz"); + expect(res.status).toBe(503); + expect(res.body.status).toBe("not_ready"); + expect(res.body.database).toBe("unavailable"); + expect(res.body.ingest).toBe("ok"); + expect(res.body.webhookQueue).toBe("ok"); + }); + + it("degraded lag plus DB down still reports both sub-statuses", async () => { + jest.spyOn(database, "pingDatabase").mockReturnValue(false); + statusService.setMockCurrentLedger(100020); // lag = 20, degraded + const res = await supertest(app).get("/readyz"); + expect(res.status).toBe(503); + expect(res.body.database).toBe("unavailable"); + expect(res.body.ingest).toBe("degraded"); + }); +}); + +// --------------------------------------------------------------------------- +// Security — no information leakage to unauthenticated callers +// --------------------------------------------------------------------------- + +describe("Readiness probe — does not leak internal details", () => { + const sub = ["ok", "degraded", "unavailable"]; + + it("readiness response contains only coarse status fields", async () => { + const res = await supertest(app).get("/readyz"); + expect(Object.keys(res.body).sort()).toEqual( + ["database", "ingest", "status", "timestamp", "webhookQueue"].sort() + ); + // No version, hostname, ledger numbers, queue depths, or error strings. + expect(res.body).not.toHaveProperty("version"); + expect(res.body).not.toHaveProperty("host"); + expect(res.body).not.toHaveProperty("lag"); + expect(res.body).not.toHaveProperty("error"); + expect(sub).toContain(res.body.database); + expect(sub).toContain(res.body.ingest); + expect(sub).toContain(res.body.webhookQueue); + }); + + it("does not surface the underlying error message when a dependency throws", async () => { + jest + .spyOn(lagMonitor, "getLagStatus") + .mockRejectedValue(new Error("postgres://secret-host:5432 refused")); + const res = await supertest(app).get("/readyz"); + const serialized = JSON.stringify(res.body); + expect(serialized).not.toContain("secret-host"); + expect(serialized).not.toContain("postgres"); + }); + + it("liveness response contains only status and timestamp", async () => { + const res = await supertest(app).get("/health"); + expect(Object.keys(res.body).sort()).toEqual(["status", "timestamp"].sort()); + expect(res.body).not.toHaveProperty("version"); + }); +}); From 88741b3eb4e50b39c449450c4c50925ebfa4495f Mon Sep 17 00:00:00 2001 From: mikkyvans0-source Date: Tue, 2 Jun 2026 12:31:53 +0100 Subject: [PATCH 3/3] feat: implement LagMonitor service with hysteresis-based alert thresholds and add corresponding unit tests and documentation --- backend/docs/observability.md | 177 +++++++ backend/src/services/lagMonitor.ts | 401 ++++++++++++++- backend/src/tests/lagMonitor.alerts.test.ts | 526 ++++++++++++++++++++ 3 files changed, 1095 insertions(+), 9 deletions(-) create mode 100644 backend/docs/observability.md create mode 100644 backend/src/tests/lagMonitor.alerts.test.ts diff --git a/backend/docs/observability.md b/backend/docs/observability.md new file mode 100644 index 00000000..e7d651bf --- /dev/null +++ b/backend/docs/observability.md @@ -0,0 +1,177 @@ +# Observability — Ingest Lag Alerting + +This document describes how the QuickLendX backend turns indexer-lag threshold +breaches into **alerts** and how **degraded mode auto-recovery** works. It +complements [reliability.md](./reliability.md) (which covers how degraded mode +gates writes) and [logging.md](./logging.md) (log redaction policy). + +The alerting logic lives in +[`src/services/lagMonitor.ts`](../src/services/lagMonitor.ts). + +--- + +## Overview + +`LagMonitor` computes indexer lag (in ledgers) as +`current_ledger - last_indexed_ledger`. Two thresholds classify the lag into a +**level**: + +| Level | Condition | Effect | +| ---------- | ---------------------------------- | -------------------------------------- | +| `none` | `lag < warnThreshold` | Healthy. All endpoints available. | +| `warn` | `lag >= warnThreshold` | Degraded. Write endpoints gated (503). | +| `critical` | `lag >= criticalThreshold` | Critically degraded. All writes blocked. | + +The level is consumed by: + +- **`GET /api/v1/status`** — surfaces the current level to clients. +- **`degradedGuard`** middleware — gates write/sensitive endpoints. + +Prior to this feature, threshold breaches were silent (no operator signal) and +recovery was implicit (a single good reading immediately re-opened the write +guard, allowing it to flap). This feature adds **alerts on transitions** and +**hysteresis-backed auto-recovery**. + +--- + +## Thresholds & configuration + +All four parameters are configurable via environment variables. Defaults are +chosen for a ~5s ledger cadence. + +| Env var | Default | Meaning | +| ------------------------- | ------- | -------------------------------------------------------------------- | +| `LAG_WARN_THRESHOLD` | `10` | Lag (ledgers) at which the system becomes degraded (`warn`). | +| `LAG_CRITICAL_THRESHOLD` | `50` | Lag (ledgers) at which the system becomes critically degraded. | +| `LAG_HYSTERESIS_MARGIN` | `3` | Ledgers **below** a threshold the lag must fall to before recovering.| +| `LAG_RECOVERY_POLLS` | `3` | Consecutive recovered polls required before a degraded level clears. | + +Non-numeric or empty values fall back to the defaults. `recoveryPolls` is +clamped to `>= 1` and `hysteresisMargin` to `>= 0`. + +Thresholds can also be set at runtime in tests/bootstrap via +`setThresholds(warn, critical)` and `setHysteresis(margin, polls)`. + +--- + +## Hysteresis & auto-recovery + +To stop the monitor flapping when lag hovers around a threshold, the monitor +tracks an **effective level** separately from the **instantaneous level** +computed from the raw lag: + +- **Escalation is immediate.** As soon as the raw lag reaches a higher level + (e.g. `lag >= criticalThreshold`), the effective level jumps there. This is + the fail-safe direction — a breach gates writes without delay. +- **De-escalation is sustained.** To clear a level, the raw lag must fall to + the **recovery threshold** (`threshold - hysteresisMargin`) and stay there + for `recoveryPolls` consecutive polls. A single breach anywhere in that + window resets the streak. Recovery steps down **one level at a time** + (`critical → warn → none`) so the `warn` write-guard window is never skipped. + +Recovery thresholds with the defaults: + +- Recover out of `critical` → `warn` when `lag <= 50 - 3 = 47` for 3 polls. +- Recover out of `warn` → `none` when `lag <= 10 - 3 = 7` for 3 polls. + +`getLagStatus()` (used by `/status` and `degradedGuard`) reports the effective +level. It **escalates immediately** when called but **never auto-clears** — only +the scheduled `poll()` path performs de-escalation. This means the many guard +and status calls per interval can raise the level but can never lower it. + +--- + +## Alert events + +Alerts are emitted **only on transitions** of the effective level — never on +every poll. Each transition: + +1. Logs a single structured JSON line (`type: "LAG_ALERT"`), at `WARN` for + escalations and `INFO` for recoveries. +2. Increments in-process counters (see [Metrics](#metrics)). +3. Notifies any subscribers registered via `onAlert(listener)`. + +### Alert payload + +```jsonc +{ + "from": "warn", // level moved away from + "to": "critical", // level moved to + "direction": "escalation", // or "recovery" + "lag": 62, // raw lag at transition (ledgers) + "warnThreshold": 10, + "criticalThreshold": 50, + "at": "2026-06-02T12:00:00.000Z" +} +``` + +> **Security:** Alert payloads carry **only operational fields** — lag, +> thresholds, level, timestamp. They never include request bodies, wallet +> data, auth tokens, or any other secrets. The logged line uses the same +> fixed shape, so no caller-supplied data can leak into log sinks. + +### Subscribing + +```ts +import { lagMonitor } from "../services/lagMonitor"; + +const unsubscribe = lagMonitor.onAlert((event) => { + // forward to PagerDuty / Slack / metrics exporter, etc. +}); +``` + +A throwing subscriber is isolated and never breaks the monitor. + +--- + +## Polling + +`poll()` reads a fresh lag value, advances the hysteresis state machine, and +emits any resulting transition alert. Schedule it on a fixed cadence (mirroring +the invariant scheduler pattern): + +```ts +import { lagMonitor } from "../services/lagMonitor"; + +setInterval(() => { + void lagMonitor.poll(); +}, 5000); +``` + +Do **not** call `poll()` per request — request paths should call the read-only +`getLagStatus()`. + +### Missing / corrupt current-ledger reads + +If the current-ledger read throws, or yields a non-finite or negative lag, the +monitor **fails safe to `critical`** (it returns `lag = criticalThreshold`). +An unknown reading must never silently clear a degraded state or open the write +guard. + +--- + +## Metrics + +`getAlertMetrics()` returns a defensive copy of the in-process counters, +suitable for exposure via the monitoring endpoint or a scraper: + +| Field | Meaning | +| -------------------------- | -------------------------------------------------------- | +| `escalations` | Total escalation transitions observed. | +| `recoveries` | Total recovery transitions observed. | +| `transitionsTo` | Transition count by destination level (`none`/`warn`/`critical`). | +| `currentLevel` | Current effective level. | +| `consecutiveRecoveryPolls` | Consecutive polls the lag has been within recovery range. | + +--- + +## Edge cases (covered by tests) + +See [`src/tests/lagMonitor.alerts.test.ts`](../src/tests/lagMonitor.alerts.test.ts): + +- **Flapping** around a threshold produces no spurious transitions; a single + good poll never clears a degraded state. +- **Sustained breach** holds the level with no duplicate alerts. +- **Rapid recovery** still drains one level per recovery window, preserving the + `warn` guard window. +- **Missing current-ledger read** fails safe to `critical`. diff --git a/backend/src/services/lagMonitor.ts b/backend/src/services/lagMonitor.ts index d6744146..3170c67f 100644 --- a/backend/src/services/lagMonitor.ts +++ b/backend/src/services/lagMonitor.ts @@ -13,9 +13,34 @@ * critically degraded. All mutating * endpoints are blocked. * + * Hysteresis & auto-recovery + * -------------------------- + * To stop the monitor flapping between levels when the lag hovers around a + * threshold, an *effective* level is tracked separately from the + * *instantaneous* level computed from the raw lag: + * + * - To ESCALATE (none→warn, warn→critical) the raw lag must reach the + * upper threshold. + * - To DE-ESCALATE the raw lag must fall below the threshold minus the + * hysteresis margin (the "recovery threshold"), AND it must stay there + * for `recoveryPolls` consecutive polls before the level is cleared. + * + * This means a single good poll never clears a degraded state — recovery is + * explicit and sustained, while escalation is immediate. + * + * Alert events + * ------------ + * Alerts are emitted only on *transitions* of the effective level (via + * `poll()`), never on every poll. Each transition is logged as a single + * structured JSON line and increments an in-process counter. Subscribers + * can register via `onAlert()`. Alert payloads contain only operational + * metrics (lag, thresholds, level) — never request data or secrets. + * * The thresholds can be overridden via environment variables: * LAG_WARN_THRESHOLD (integer, ledgers) * LAG_CRITICAL_THRESHOLD (integer, ledgers) + * LAG_HYSTERESIS_MARGIN (integer, ledgers; default 3) + * LAG_RECOVERY_POLLS (integer, polls; default 3) */ import { statusService } from "./statusService"; @@ -26,6 +51,10 @@ import { statusService } from "./statusService"; export const DEFAULT_WARN_THRESHOLD = 10; export const DEFAULT_CRITICAL_THRESHOLD = 50; +/** Ledgers below a threshold the lag must fall to before de-escalating. */ +export const DEFAULT_HYSTERESIS_MARGIN = 3; +/** Consecutive sub-recovery polls required before a degraded level clears. */ +export const DEFAULT_RECOVERY_POLLS = 3; // --------------------------------------------------------------------------- // Types @@ -40,7 +69,7 @@ export interface LagStatus { warnThreshold: number; /** Threshold at which critical-level degradation begins. */ criticalThreshold: number; - /** Degradation level derived from the current lag. */ + /** Degradation level (hysteresis-aware effective level). */ level: DegradedLevel; /** True when level is "warn" or "critical". */ isDegraded: boolean; @@ -50,6 +79,58 @@ export interface LagStatus { checkedAt: string; } +/** Payload emitted on an effective-level transition. Operational data only. */ +export interface LagAlertEvent { + /** The level the monitor moved away from. */ + from: DegradedLevel; + /** The level the monitor moved to. */ + to: DegradedLevel; + /** "escalation" when severity increased, "recovery" when it decreased. */ + direction: "escalation" | "recovery"; + /** Raw lag (ledgers) at the moment of transition. */ + lag: number; + warnThreshold: number; + criticalThreshold: number; + /** ISO-8601 timestamp of the transition. */ + at: string; +} + +export type LagAlertListener = (event: LagAlertEvent) => void; + +/** In-process counters, surfaced to the monitoring endpoint / scrapers. */ +export interface LagAlertMetrics { + /** Total transitions observed, by direction. */ + escalations: number; + recoveries: number; + /** Transitions broken down by destination level. */ + transitionsTo: Record; + /** Current effective level. */ + currentLevel: DegradedLevel; + /** + * Consecutive polls the raw lag has been at/below the recovery threshold + * for the current effective level. Resets on any breach. + */ + consecutiveRecoveryPolls: number; +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/** Severity rank used to decide escalation vs. recovery. */ +const LEVEL_RANK: Record = { + none: 0, + warn: 1, + critical: 2, +}; + +function parseEnvInt(name: string): number | undefined { + const raw = process.env[name]; + if (raw === undefined || raw === "") return undefined; + const parsed = parseInt(raw, 10); + return Number.isNaN(parsed) ? undefined : parsed; +} + // --------------------------------------------------------------------------- // LagMonitor // --------------------------------------------------------------------------- @@ -59,17 +140,55 @@ export class LagMonitor { private _warnThreshold: number; private _criticalThreshold: number; + private _hysteresisMargin: number; + private _recoveryPolls: number; - constructor(warnThreshold?: number, criticalThreshold?: number) { + /** + * Hysteresis-aware effective level. Escalates immediately on breach; + * de-escalates only after sustained recovery. This is the level reported + * by getLagStatus() and therefore enforced by degradedGuard. + */ + private _effectiveLevel: DegradedLevel = "none"; + /** Consecutive polls the raw lag has been within recovery range. */ + private _recoveryStreak = 0; + + private readonly _listeners = new Set(); + private readonly _metrics: LagAlertMetrics = { + escalations: 0, + recoveries: 0, + transitionsTo: { none: 0, warn: 0, critical: 0 }, + currentLevel: "none", + consecutiveRecoveryPolls: 0, + }; + + constructor( + warnThreshold?: number, + criticalThreshold?: number, + hysteresisMargin?: number, + recoveryPolls?: number + ) { this._warnThreshold = warnThreshold !== undefined ? warnThreshold - : (parseInt(process.env["LAG_WARN_THRESHOLD"] ?? "", 10) || DEFAULT_WARN_THRESHOLD); + : parseEnvInt("LAG_WARN_THRESHOLD") ?? DEFAULT_WARN_THRESHOLD; this._criticalThreshold = criticalThreshold !== undefined ? criticalThreshold - : (parseInt(process.env["LAG_CRITICAL_THRESHOLD"] ?? "", 10) || DEFAULT_CRITICAL_THRESHOLD); + : parseEnvInt("LAG_CRITICAL_THRESHOLD") ?? DEFAULT_CRITICAL_THRESHOLD; + + this._hysteresisMargin = + hysteresisMargin !== undefined + ? hysteresisMargin + : parseEnvInt("LAG_HYSTERESIS_MARGIN") ?? DEFAULT_HYSTERESIS_MARGIN; + + this._recoveryPolls = + recoveryPolls !== undefined + ? recoveryPolls + : parseEnvInt("LAG_RECOVERY_POLLS") ?? DEFAULT_RECOVERY_POLLS; + + if (this._recoveryPolls < 1) this._recoveryPolls = DEFAULT_RECOVERY_POLLS; + if (this._hysteresisMargin < 0) this._hysteresisMargin = 0; } // ------------------------------------------------------------------------- @@ -95,6 +214,19 @@ export class LagMonitor { return this._criticalThreshold; } + get hysteresisMargin(): number { + return this._hysteresisMargin; + } + + get recoveryPolls(): number { + return this._recoveryPolls; + } + + /** Current hysteresis-aware effective level. */ + get effectiveLevel(): DegradedLevel { + return this._effectiveLevel; + } + setThresholds(warn: number, critical: number): void { if (warn <= 0 || critical <= 0) { throw new RangeError("Thresholds must be positive integers"); @@ -108,13 +240,58 @@ export class LagMonitor { this._criticalThreshold = critical; } + /** + * Configure hysteresis behaviour. + * @param margin Ledgers below a threshold the lag must drop to recover. + * @param polls Consecutive recovered polls required before clearing. + */ + setHysteresis(margin: number, polls: number): void { + if (margin < 0) { + throw new RangeError("hysteresisMargin must be >= 0"); + } + if (!Number.isInteger(polls) || polls < 1) { + throw new RangeError("recoveryPolls must be a positive integer"); + } + this._hysteresisMargin = margin; + this._recoveryPolls = polls; + } + + // ------------------------------------------------------------------------- + // Alert subscription / metrics + // ------------------------------------------------------------------------- + + /** Subscribe to transition alerts. Returns an unsubscribe function. */ + onAlert(listener: LagAlertListener): () => void { + this._listeners.add(listener); + return () => this._listeners.delete(listener); + } + + /** Snapshot of in-process alert counters. */ + getAlertMetrics(): LagAlertMetrics { + return { + ...this._metrics, + transitionsTo: { ...this._metrics.transitionsTo }, + }; + } + + /** Reset the state machine and counters. Intended for tests/bootstrap. */ + reset(): void { + this._effectiveLevel = "none"; + this._recoveryStreak = 0; + this._metrics.escalations = 0; + this._metrics.recoveries = 0; + this._metrics.transitionsTo = { none: 0, warn: 0, critical: 0 }; + this._metrics.currentLevel = "none"; + this._metrics.consecutiveRecoveryPolls = 0; + } + // ------------------------------------------------------------------------- // Core computation // ------------------------------------------------------------------------- /** - * Computes the current lag level from a raw lag value. - * Pure function — no I/O. + * Computes the instantaneous lag level from a raw lag value. + * Pure function — no I/O, no hysteresis, no side effects. */ computeLevel(lag: number): DegradedLevel { if (lag >= this._criticalThreshold) return "critical"; @@ -122,14 +299,220 @@ export class LagMonitor { return "none"; } + /** + * The lag value at/below which the system is considered recovered *out of* + * the given level. Recovering from "critical" returns to "warn" territory + * (critical threshold minus margin); recovering from "warn" returns to + * healthy (warn threshold minus margin). Clamped at 0. + */ + private recoveryThresholdFor(level: DegradedLevel): number { + if (level === "critical") { + return Math.max(0, this._criticalThreshold - this._hysteresisMargin); + } + // warn (or none, unused) + return Math.max(0, this._warnThreshold - this._hysteresisMargin); + } + + /** + * Advance the hysteresis state machine by one observation and return the + * resulting effective level. Pure with respect to I/O — it only mutates + * internal state and (on a transition) emits an alert. Safe to call from + * a scheduled poller. + * + * Escalation is immediate: as soon as the raw lag reaches a higher + * instantaneous level, the effective level jumps there and the recovery + * streak resets. + * + * De-escalation requires the raw lag to sit at/below the recovery + * threshold for `recoveryPolls` consecutive observations. A single breach + * anywhere in the window resets the streak. + */ + observe(lag: number, at: string = new Date().toISOString()): DegradedLevel { + const instant = this.computeLevel(lag); + const prev = this._effectiveLevel; + + if (LEVEL_RANK[instant] > LEVEL_RANK[prev]) { + // Escalation — immediate, no dwell required. + this._recoveryStreak = 0; + this._setEffectiveLevel(instant, lag, at); + return this._effectiveLevel; + } + + if (this._effectiveLevel === "none") { + // Healthy and staying healthy (instant is none too). + this._recoveryStreak = 0; + this._metrics.consecutiveRecoveryPolls = 0; + return this._effectiveLevel; + } + + // Currently degraded and lag is not escalating. Check for sustained + // recovery toward the next-lower level. + const recoveryThreshold = this.recoveryThresholdFor(this._effectiveLevel); + + if (lag <= recoveryThreshold) { + this._recoveryStreak += 1; + this._metrics.consecutiveRecoveryPolls = this._recoveryStreak; + if (this._recoveryStreak >= this._recoveryPolls) { + // Step down exactly one level so a deep recovery from critical still + // passes through warn rather than skipping the warn guard window. + const next: DegradedLevel = + this._effectiveLevel === "critical" ? "warn" : "none"; + this._recoveryStreak = 0; + this._metrics.consecutiveRecoveryPolls = 0; + this._setEffectiveLevel(next, lag, at); + // If, after stepping down, the lag is already below the next level's + // recovery threshold, the following poll(s) will continue draining it + // down one level at a time — keeping each transition observable. + } + } else { + // Lag bounced back above the recovery threshold; reset the streak. + this._recoveryStreak = 0; + this._metrics.consecutiveRecoveryPolls = 0; + } + + return this._effectiveLevel; + } + + /** + * Apply a new effective level and emit a transition alert. Caller must have + * already determined `next !== current`. + */ + private _setEffectiveLevel( + next: DegradedLevel, + lag: number, + at: string + ): void { + const from = this._effectiveLevel; + if (next === from) return; + + this._effectiveLevel = next; + this._metrics.currentLevel = next; + + const direction: "escalation" | "recovery" = + LEVEL_RANK[next] > LEVEL_RANK[from] ? "escalation" : "recovery"; + + if (direction === "escalation") this._metrics.escalations += 1; + else this._metrics.recoveries += 1; + this._metrics.transitionsTo[next] += 1; + + const event: LagAlertEvent = { + from, + to: next, + direction, + lag, + warnThreshold: this._warnThreshold, + criticalThreshold: this._criticalThreshold, + at, + }; + + this._emitAlert(event); + } + + /** Log the transition (structured) and notify subscribers. */ + private _emitAlert(event: LagAlertEvent): void { + // Single structured line per transition — operational fields only, + // never request bodies, auth material, or other secrets. + if (process.env["NODE_ENV"] !== "test") { + const line = JSON.stringify({ + level: event.direction === "escalation" ? "WARN" : "INFO", + type: "LAG_ALERT", + event: event.direction, + from: event.from, + to: event.to, + lag: event.lag, + warn_threshold: event.warnThreshold, + critical_threshold: event.criticalThreshold, + timestamp: event.at, + }); + if (event.direction === "escalation") { + console.warn(line); + } else { + console.info(line); + } + } + + for (const listener of this._listeners) { + try { + listener(event); + } catch { + // A misbehaving subscriber must never break the monitor. + } + } + } + + // ------------------------------------------------------------------------- + // Status / polling + // ------------------------------------------------------------------------- + /** * Fetches the current system status and returns a full LagStatus snapshot. + * + * The reported `level` is the **instantaneous** level derived directly from + * the current lag (no hysteresis, no side effects). This preserves the + * historical contract relied on by `/status`, the readiness probe, and + * `degradedGuard` — the snapshot always reflects the lag *right now*, and + * the call neither mutates the state machine nor emits alerts. Hysteresis, + * alerting, and auto-recovery are driven separately by `poll()`. */ async getLagStatus(): Promise { - const status = await statusService.getStatus(); - const lag = status.index_lag; + const lag = await this.readLag(); const level = this.computeLevel(lag); + return this.snapshot(lag, level); + } + + /** + * Advance the hysteresis state machine using a fresh lag reading and emit + * any resulting transition alert. Returns a snapshot whose `level` is the + * hysteresis-aware **effective** level. Call this on a fixed interval (e.g. + * from a scheduler), NOT per request. + * + * This is where threshold breaches become alerts and where degraded-mode + * auto-recovery (sustained-over-N-polls) happens. `getLagStatus()` remains + * instantaneous so per-request consumers are unaffected. + */ + async poll(): Promise { + const lag = await this.readLag(); + const level = this.observe(lag); + return this.snapshot(lag, level); + } + + /** + * Snapshot using the hysteresis-aware effective level without advancing the + * state machine. Useful for a guard that wants auto-recovery semantics + * (degraded stays closed until `poll()` clears it) rather than instantaneous + * lag. Read-only: no alerts, no state change. + */ + async getEffectiveStatus(): Promise { + const lag = await this.readLag(); + return this.snapshot(lag, this._effectiveLevel); + } + + /** + * Read the current lag from statusService. + * + * If the current-ledger read fails or yields a nonsensical (negative / + * non-finite) lag, we treat the indexer as critically lagging rather than + * healthy: a missing reading must never silently clear a degraded state or + * open the write guard. The raw error is swallowed here (it is logged at + * the call site / global handler) so the monitor degrades safely. + */ + private async readLag(): Promise { + try { + const status = await statusService.getStatus(); + const lag = status.index_lag; + if (!Number.isFinite(lag) || lag < 0) { + // Unknown / corrupt reading → fail safe to critical. + return this._criticalThreshold; + } + return lag; + } catch { + // Cannot determine lag → fail safe to critical (block writes). + return this._criticalThreshold; + } + } + /** Build a LagStatus from a raw lag and an already-determined level. */ + private snapshot(lag: number, level: DegradedLevel): LagStatus { return { lag, warnThreshold: this._warnThreshold, @@ -189,4 +572,4 @@ export class LagMonitor { } } -export const lagMonitor = LagMonitor.getInstance(); \ No newline at end of file +export const lagMonitor = LagMonitor.getInstance(); diff --git a/backend/src/tests/lagMonitor.alerts.test.ts b/backend/src/tests/lagMonitor.alerts.test.ts new file mode 100644 index 00000000..af5825dc --- /dev/null +++ b/backend/src/tests/lagMonitor.alerts.test.ts @@ -0,0 +1,526 @@ +/** + * Unit tests for LagMonitor alerting, hysteresis, and degraded-mode + * auto-recovery. Complements lagMonitor.test.ts (which covers the pure + * computeLevel / threshold / singleton behaviour). + * + * Tests cover: + * - Alerts fire only on effective-level transitions, not on every poll + * - Escalation is immediate; de-escalation requires sustained recovery + * - Hysteresis margin prevents flapping around a threshold + * - Sustained breach holds the degraded level + * - Rapid recovery still drains one level at a time (warn guard window kept) + * - Missing / corrupt current-ledger reads fail safe to critical + * - Alert payloads carry only operational fields (no secrets) + * - getLagStatus() escalates immediately but never auto-clears + */ + +import { + LagMonitor, + LagAlertEvent, + DEFAULT_HYSTERESIS_MARGIN, + DEFAULT_RECOVERY_POLLS, +} from "../services/lagMonitor"; +import { statusService } from "../services/statusService"; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/** warn=10, critical=50, margin=3, recoveryPolls=3 unless overridden. */ +function makeMonitor( + warn = 10, + critical = 50, + margin = 3, + recoveryPolls = 3 +): LagMonitor { + return new LagMonitor(warn, critical, margin, recoveryPolls); +} + +/** Set the mocked lag by adjusting the mock current ledger. */ +function setLag(lag: number): void { + statusService.updateLastIndexedLedger(100000); + statusService.setMockCurrentLedger(100000 + lag); +} + +function collectAlerts(m: LagMonitor): LagAlertEvent[] { + const events: LagAlertEvent[] = []; + m.onAlert((e) => events.push(e)); + return events; +} + +beforeEach(() => { + statusService.setMaintenanceMode(false); + statusService.updateLastIndexedLedger(100000); +}); + +afterEach(() => { + statusService.setMockCurrentLedger(null); +}); + +// --------------------------------------------------------------------------- +// Config / defaults +// --------------------------------------------------------------------------- + +describe("LagMonitor hysteresis config", () => { + it("exposes default hysteresis margin and recovery polls", () => { + const m = makeMonitor(); + expect(DEFAULT_HYSTERESIS_MARGIN).toBe(3); + expect(DEFAULT_RECOVERY_POLLS).toBe(3); + expect(m.hysteresisMargin).toBe(3); + expect(m.recoveryPolls).toBe(3); + }); + + it("reads hysteresis config from env vars", () => { + process.env["LAG_HYSTERESIS_MARGIN"] = "5"; + process.env["LAG_RECOVERY_POLLS"] = "4"; + const m = new LagMonitor(); + expect(m.hysteresisMargin).toBe(5); + expect(m.recoveryPolls).toBe(4); + delete process.env["LAG_HYSTERESIS_MARGIN"]; + delete process.env["LAG_RECOVERY_POLLS"]; + }); + + it("setHysteresis validates inputs", () => { + const m = makeMonitor(); + expect(() => m.setHysteresis(-1, 3)).toThrow(RangeError); + expect(() => m.setHysteresis(3, 0)).toThrow(RangeError); + expect(() => m.setHysteresis(3, 1.5)).toThrow(RangeError); + m.setHysteresis(4, 2); + expect(m.hysteresisMargin).toBe(4); + expect(m.recoveryPolls).toBe(2); + }); + + it("clamps invalid constructor values to safe defaults", () => { + const m = new LagMonitor(10, 50, -5, 0); + expect(m.hysteresisMargin).toBe(0); + expect(m.recoveryPolls).toBe(DEFAULT_RECOVERY_POLLS); + }); +}); + +// --------------------------------------------------------------------------- +// Escalation (immediate) +// --------------------------------------------------------------------------- + +describe("LagMonitor escalation", () => { + it("escalates none→warn immediately on first breaching poll", async () => { + const m = makeMonitor(); + const alerts = collectAlerts(m); + + setLag(15); + const status = await m.poll(); + + expect(status.level).toBe("warn"); + expect(status.isDegraded).toBe(true); + expect(alerts).toHaveLength(1); + expect(alerts[0]).toMatchObject({ + from: "none", + to: "warn", + direction: "escalation", + lag: 15, + }); + }); + + it("escalates warn→critical immediately", async () => { + const m = makeMonitor(); + const alerts = collectAlerts(m); + + setLag(15); + await m.poll(); // none -> warn + setLag(60); + const status = await m.poll(); // warn -> critical + + expect(status.level).toBe("critical"); + expect(status.isCritical).toBe(true); + expect(alerts.map((a) => a.to)).toEqual(["warn", "critical"]); + }); + + it("can jump none→critical in a single poll", async () => { + const m = makeMonitor(); + const alerts = collectAlerts(m); + + setLag(80); + const status = await m.poll(); + + expect(status.level).toBe("critical"); + expect(alerts).toHaveLength(1); + expect(alerts[0]).toMatchObject({ from: "none", to: "critical" }); + }); +}); + +// --------------------------------------------------------------------------- +// Alerts fire only on transitions +// --------------------------------------------------------------------------- + +describe("LagMonitor emits alerts only on transitions", () => { + it("does not emit on repeated polls at the same level", async () => { + const m = makeMonitor(); + const alerts = collectAlerts(m); + + setLag(20); + await m.poll(); // none -> warn (1 alert) + await m.poll(); // still warn + await m.poll(); // still warn + await m.poll(); // still warn + + expect(alerts).toHaveLength(1); + expect(m.getAlertMetrics().escalations).toBe(1); + }); + + it("does not emit while healthy across many polls", async () => { + const m = makeMonitor(); + const alerts = collectAlerts(m); + + setLag(2); + await m.poll(); + await m.poll(); + await m.poll(); + + expect(alerts).toHaveLength(0); + expect(m.effectiveLevel).toBe("none"); + }); +}); + +// --------------------------------------------------------------------------- +// De-escalation requires sustained recovery +// --------------------------------------------------------------------------- + +describe("LagMonitor auto-recovery (sustained)", () => { + it("does not clear warn until lag stays below recovery threshold for N polls", async () => { + const m = makeMonitor(10, 50, 3, 3); // recovery threshold for warn = 10-3 = 7 + const alerts = collectAlerts(m); + + setLag(20); + await m.poll(); // -> warn + expect(m.effectiveLevel).toBe("warn"); + + setLag(5); // below recovery threshold (7) + await m.poll(); // streak 1 — still warn + expect(m.effectiveLevel).toBe("warn"); + await m.poll(); // streak 2 — still warn + expect(m.effectiveLevel).toBe("warn"); + + const status = await m.poll(); // streak 3 — clears + expect(status.level).toBe("none"); + expect(m.effectiveLevel).toBe("none"); + + const recovery = alerts.filter((a) => a.direction === "recovery"); + expect(recovery).toHaveLength(1); + expect(recovery[0]).toMatchObject({ from: "warn", to: "none" }); + }); + + it("requires lag below the warn recovery threshold, not merely below warn", async () => { + const m = makeMonitor(10, 50, 3, 2); // recovery threshold = 7 + const alerts = collectAlerts(m); + + setLag(20); + await m.poll(); // -> warn + + // lag = 8 is below warn (10) but above recovery threshold (7): no progress + setLag(8); + await m.poll(); + await m.poll(); + expect(m.effectiveLevel).toBe("warn"); + expect(alerts.filter((a) => a.direction === "recovery")).toHaveLength(0); + + // drop to recovery range and dwell + setLag(7); + await m.poll(); // streak 1 + await m.poll(); // streak 2 -> clears + expect(m.effectiveLevel).toBe("none"); + }); + + it("steps critical→warn→none one level per recovery window", async () => { + const m = makeMonitor(10, 50, 3, 2); + const alerts = collectAlerts(m); + + setLag(80); + await m.poll(); // -> critical + + // recovery threshold for critical = 50-3 = 47 + setLag(10); + await m.poll(); // streak 1 (critical) + await m.poll(); // streak 2 -> step down to warn + expect(m.effectiveLevel).toBe("warn"); + + // recovery threshold for warn = 10-3 = 7; lag 10 is NOT in range yet + await m.poll(); + await m.poll(); + expect(m.effectiveLevel).toBe("warn"); + + setLag(5); + await m.poll(); // streak 1 (warn) + await m.poll(); // streak 2 -> none + expect(m.effectiveLevel).toBe("none"); + + expect(alerts.map((a) => `${a.from}->${a.to}`)).toEqual([ + "none->critical", + "critical->warn", + "warn->none", + ]); + }); +}); + +// --------------------------------------------------------------------------- +// Flapping around a threshold +// --------------------------------------------------------------------------- + +describe("LagMonitor flapping resistance", () => { + it("a single good poll does not clear a degraded state (anti-flap)", async () => { + const m = makeMonitor(10, 50, 3, 3); + const alerts = collectAlerts(m); + + setLag(20); + await m.poll(); // -> warn + + // oscillate around the recovery threshold + setLag(5); // streak 1 + await m.poll(); + setLag(20); // breach again -> reset streak, still warn (no new alert) + await m.poll(); + setLag(5); // streak 1 again + await m.poll(); + setLag(11); // above recovery -> reset + await m.poll(); + + expect(m.effectiveLevel).toBe("warn"); + // Only the original escalation alert; flapping produced no transitions. + expect(alerts).toHaveLength(1); + expect(alerts[0].direction).toBe("escalation"); + }); + + it("resets recovery streak the moment lag bounces above recovery threshold", async () => { + const m = makeMonitor(10, 50, 3, 3); + setLag(20); + await m.poll(); // -> warn + setLag(5); + await m.poll(); // streak 1 + await m.poll(); // streak 2 + expect(m.getAlertMetrics().consecutiveRecoveryPolls).toBe(2); + + setLag(15); // bounce above recovery + await m.poll(); + expect(m.getAlertMetrics().consecutiveRecoveryPolls).toBe(0); + expect(m.effectiveLevel).toBe("warn"); + }); +}); + +// --------------------------------------------------------------------------- +// Sustained breach +// --------------------------------------------------------------------------- + +describe("LagMonitor sustained breach", () => { + it("holds critical across a long breach with no duplicate alerts", async () => { + const m = makeMonitor(); + const alerts = collectAlerts(m); + + setLag(120); + for (let i = 0; i < 10; i++) await m.poll(); + + expect(m.effectiveLevel).toBe("critical"); + expect(alerts).toHaveLength(1); + expect(alerts[0].to).toBe("critical"); + }); +}); + +// --------------------------------------------------------------------------- +// Missing / corrupt current-ledger read +// --------------------------------------------------------------------------- + +describe("LagMonitor missing current-ledger read", () => { + it("fails safe to critical when statusService throws", async () => { + const m = makeMonitor(); + const spy = jest + .spyOn(statusService, "getStatus") + .mockRejectedValueOnce(new Error("rpc unavailable")); + + const status = await m.poll(); + expect(status.level).toBe("critical"); + expect(status.isCritical).toBe(true); + expect(status.lag).toBe(m.criticalThreshold); + + spy.mockRestore(); + }); + + it("fails safe to critical on a negative / nonsensical lag", async () => { + const m = makeMonitor(); + // current ledger behind last-indexed → negative lag + statusService.updateLastIndexedLedger(100000); + statusService.setMockCurrentLedger(99990); + + const status = await m.poll(); + expect(status.level).toBe("critical"); + expect(status.lag).toBe(m.criticalThreshold); + }); + + it("a failed read does not silently clear an existing degraded state", async () => { + const m = makeMonitor(); + setLag(20); + await m.poll(); // -> warn + + const spy = jest + .spyOn(statusService, "getStatus") + .mockRejectedValueOnce(new Error("rpc unavailable")); + const status = await m.poll(); + // read failed → treated as critical, which is an escalation, never a clear + expect(status.level).toBe("critical"); + spy.mockRestore(); + }); +}); + +// --------------------------------------------------------------------------- +// getLagStatus contract (instantaneous, side-effect free) +// --------------------------------------------------------------------------- + +describe("LagMonitor.getLagStatus contract", () => { + it("reflects the current lag instantaneously (no poll required)", async () => { + const m = makeMonitor(); + setLag(60); + const status = await m.getLagStatus(); + expect(status.level).toBe("critical"); + expect(status.isCritical).toBe(true); + }); + + it("is side-effect free: never advances the state machine or emits alerts", async () => { + const m = makeMonitor(); + const alerts = collectAlerts(m); + setLag(60); + await m.getLagStatus(); + await m.getLagStatus(); + await m.getLagStatus(); + expect(alerts).toHaveLength(0); + expect(m.effectiveLevel).toBe("none"); + }); + + it("tracks lag down again immediately (instantaneous, unlike effective level)", async () => { + const m = makeMonitor(); + setLag(60); + expect((await m.getLagStatus()).level).toBe("critical"); + setLag(0); + expect((await m.getLagStatus()).level).toBe("none"); + }); + + it("getEffectiveStatus reflects hysteresis without advancing it", async () => { + const m = makeMonitor(); + setLag(60); + await m.poll(); // effective -> critical + setLag(0); // lag healthy again, but no sustained recovery yet + const eff = await m.getEffectiveStatus(); + expect(eff.level).toBe("critical"); + expect(eff.lag).toBe(0); + // getEffectiveStatus did not advance the machine + expect(m.effectiveLevel).toBe("critical"); + }); + + it("preserves the LagStatus shape", async () => { + const m = makeMonitor(); + setLag(5); + const status = await m.getLagStatus(); + expect(Object.keys(status).sort()).toEqual( + [ + "checkedAt", + "criticalThreshold", + "isCritical", + "isDegraded", + "lag", + "level", + "warnThreshold", + ].sort() + ); + expect(status.checkedAt).toMatch(/^\d{4}-\d{2}-\d{2}T/); + }); +}); + +// --------------------------------------------------------------------------- +// Alert payload safety (no secrets) +// --------------------------------------------------------------------------- + +describe("LagMonitor alert payload safety", () => { + it("alert payload contains only operational fields", async () => { + const m = makeMonitor(); + const alerts = collectAlerts(m); + setLag(60); + await m.poll(); + + expect(alerts).toHaveLength(1); + expect(Object.keys(alerts[0]).sort()).toEqual( + [ + "at", + "criticalThreshold", + "direction", + "from", + "lag", + "to", + "warnThreshold", + ].sort() + ); + // No string field should resemble a secret/token/wallet/auth value. + const serialized = JSON.stringify(alerts[0]); + expect(serialized).not.toMatch(/secret|token|password|signature|apikey|authorization/i); + }); +}); + +// --------------------------------------------------------------------------- +// Metrics & subscription lifecycle +// --------------------------------------------------------------------------- + +describe("LagMonitor metrics & subscriptions", () => { + it("tracks escalation/recovery counts and transitionsTo", async () => { + const m = makeMonitor(10, 50, 3, 1); // recover after a single good poll + setLag(60); + await m.poll(); // none->critical + setLag(0); + await m.poll(); // critical->warn + await m.poll(); // warn->none + + const metrics = m.getAlertMetrics(); + expect(metrics.escalations).toBe(1); + expect(metrics.recoveries).toBe(2); + expect(metrics.transitionsTo.critical).toBe(1); + expect(metrics.transitionsTo.warn).toBe(1); + expect(metrics.transitionsTo.none).toBe(1); + expect(metrics.currentLevel).toBe("none"); + }); + + it("getAlertMetrics returns a defensive copy", async () => { + const m = makeMonitor(); + const snap = m.getAlertMetrics(); + snap.escalations = 999; + snap.transitionsTo.warn = 999; + expect(m.getAlertMetrics().escalations).toBe(0); + expect(m.getAlertMetrics().transitionsTo.warn).toBe(0); + }); + + it("unsubscribe stops further alert delivery", async () => { + const m = makeMonitor(); + const events: LagAlertEvent[] = []; + const unsub = m.onAlert((e) => events.push(e)); + + setLag(20); + await m.poll(); // -> warn (delivered) + unsub(); + setLag(60); + await m.poll(); // -> critical (not delivered) + + expect(events).toHaveLength(1); + expect(events[0].to).toBe("warn"); + }); + + it("a throwing listener never breaks the monitor", async () => { + const m = makeMonitor(); + m.onAlert(() => { + throw new Error("boom"); + }); + setLag(60); + await expect(m.poll()).resolves.toMatchObject({ level: "critical" }); + }); + + it("reset() clears state and counters", async () => { + const m = makeMonitor(); + setLag(60); + await m.poll(); + m.reset(); + expect(m.effectiveLevel).toBe("none"); + expect(m.getAlertMetrics().escalations).toBe(0); + expect(m.getAlertMetrics().currentLevel).toBe("none"); + }); +});