diff --git a/cmd/ingestor/async_migration.go b/cmd/ingestor/async_migration.go index 32133db4..644deff0 100644 --- a/cmd/ingestor/async_migration.go +++ b/cmd/ingestor/async_migration.go @@ -41,11 +41,14 @@ import ( func ensureAsyncMigrationsTable(db *sql.DB) error { _, err := db.Exec(` CREATE TABLE IF NOT EXISTS _async_migrations ( - name TEXT PRIMARY KEY, - status TEXT NOT NULL, -- pending_async | done | failed - started_at TEXT NOT NULL DEFAULT (datetime('now')), - ended_at TEXT, - error TEXT + name TEXT PRIMARY KEY, + status TEXT NOT NULL, -- pending_async | done | failed + started_at TEXT NOT NULL DEFAULT (datetime('now')), + ended_at TEXT, + error TEXT, + rows_processed INTEGER NOT NULL DEFAULT 0, + rows_total INTEGER NOT NULL DEFAULT 0, + last_update_at TEXT ) `) return err @@ -68,6 +71,9 @@ func (s *Store) RunAsyncMigration(ctx context.Context, name string, fn func(cont if err := ensureAsyncMigrationsTable(s.db); err != nil { return fmt.Errorf("ensure _async_migrations: %w", err) } + if err := ensureAsyncMigrationProgressColumns(s.db); err != nil { + return fmt.Errorf("ensure _async_migrations progress columns: %w", err) + } var existing string row := s.db.QueryRow(`SELECT status FROM _async_migrations WHERE name = ?`, name) @@ -76,13 +82,20 @@ func (s *Store) RunAsyncMigration(ctx context.Context, name string, fn func(cont if existing == "done" { return nil // already complete, nothing to do } - // pending_async or failed → reset and retry. + // pending_async or failed → reset and retry. Wipe per-row progress + // so a fresh denominator/processed pair lands on the next callback. if _, err := s.db.Exec(` UPDATE _async_migrations - SET status = 'pending_async', started_at = datetime('now'), ended_at = NULL, error = NULL + SET status = 'pending_async', started_at = datetime('now'), ended_at = NULL, + error = NULL, rows_processed = 0, rows_total = 0, last_update_at = NULL WHERE name = ?`, name); err != nil { return fmt.Errorf("reset async migration %q: %w", name, err) } + // Also clear the in-memory rate-limit cache so the first new + // progress write isn't suppressed. + progressGateMu.Lock() + delete(progressLastWriteAt, name) + progressGateMu.Unlock() case sql.ErrNoRows: if _, err := s.db.Exec(` INSERT INTO _async_migrations (name, status) VALUES (?, 'pending_async')`, diff --git a/cmd/ingestor/async_migration_progress.go b/cmd/ingestor/async_migration_progress.go new file mode 100644 index 00000000..6f3e4034 --- /dev/null +++ b/cmd/ingestor/async_migration_progress.go @@ -0,0 +1,196 @@ +// Progress-write rate limiter + schema-progress columns for async migrations +// (issue #1724). +// +// Long-running async backfills (tx_last_seen_backfill_v1, future ones) need +// to surface progress so operators can see motion during cold-load. Writing +// every per-batch progress update to the bookkeeping table would compete +// with the migration's own writer lock and add an UPDATE per chunk. Cap to +// ≤1 write/sec per migration; force a final write on terminal calls. +// +// The schema columns (rows_processed, rows_total, last_update_at) are +// additive on legacy DBs via ADD COLUMN; only the "duplicate column" error +// is swallowed — every other error propagates. The CREATE TABLE body +// includes the new columns for fresh installs. + +package main + +import ( + "database/sql" + "fmt" + "log" + "strings" + "sync" + "time" +) + +// progressGate throttles per-migration progress writes. +// +// #1735 finding #9 (Group D): we deliberately do NOT use sync.Once to +// suppress repeated runtime warn logs. Sync.Once silences ALL future +// errors — losing observability of an ongoing problem. Instead, we +// rate-limit the warn log to ~1/min using a wall-clock timestamp so +// every error is visible but doesn't flood the log. +var ( + progressGateMu sync.Mutex + progressLastWriteAt = map[string]time.Time{} + + // schemaWarnMu guards schemaWarnLastAt. Per-process rate-limiter + // (NOT per-DB) — the warn is a log-only side effect, not a + // correctness-affecting cache, so package-level scope is fine. + schemaWarnMu sync.Mutex + schemaWarnLastAt time.Time + + // ensureColumnsOnce guards the per-process ALTER TABLE storm. The + // migration progress columns only need to be added once per + // process lifetime; subsequent RunAsyncMigration calls don't need + // to re-run three ALTER TABLEs every time. + ensureColumnsOnce sync.Once + ensureColumnsErr error +) + +// resetSchemaWarnRateLimiterForTest is exported for tests that need to +// re-trigger the warn log on a second failure. Production code never +// calls this. +func resetSchemaWarnRateLimiterForTest() { + schemaWarnMu.Lock() + schemaWarnLastAt = time.Time{} + schemaWarnMu.Unlock() +} + +// resetEnsureColumnsOnceForTest lets a test exercise the per-process +// "add columns once" path repeatedly. Production code never calls this. +func resetEnsureColumnsOnceForTest() { + ensureColumnsOnce = sync.Once{} + ensureColumnsErr = nil +} + +// ensureAsyncMigrationProgressColumns adds rows_processed / rows_total / +// last_update_at to _async_migrations on legacy DBs. Idempotent. Only the +// "duplicate column" SQLite error is swallowed. +// +// #1735 finding #9 (Group D): guarded by sync.Once so a process running +// many async migrations doesn't re-run 3 ALTER TABLE statements per call. +// The column set is fixed at build time so once-per-process is safe. +func ensureAsyncMigrationProgressColumns(db *sql.DB) error { + ensureColumnsOnce.Do(func() { + ensureColumnsErr = ensureAsyncMigrationProgressColumnsRaw(db) + }) + return ensureColumnsErr +} + +func ensureAsyncMigrationProgressColumnsRaw(db *sql.DB) error { + // Make sure the base table exists first (fresh-install path covered). + if err := ensureAsyncMigrationsTable(db); err != nil { + return err + } + cols := []struct{ name, typ string }{ + {"rows_processed", "INTEGER NOT NULL DEFAULT 0"}, + {"rows_total", "INTEGER NOT NULL DEFAULT 0"}, + {"last_update_at", "TEXT"}, + } + for _, c := range cols { + // PREFLIGHT: async=true reason="_async_migrations is the migration bookkeeping table itself — bounded to one row per known migration name (single-digit rows in practice, never grows with data). ALTER TABLE ADD COLUMN on this table is O(rows) and completes in microseconds even on prod-size DBs." + _, err := db.Exec(fmt.Sprintf( + `ALTER TABLE _async_migrations ADD COLUMN %s %s`, c.name, c.typ)) + if err != nil && !isDuplicateColumnErr(err) { + return fmt.Errorf("add column %s: %w", c.name, err) + } + } + return nil +} + +// isDuplicateColumnErr matches the SQLite ADD COLUMN duplicate-column +// error. +// +// #1735 finding #8 (Group D): the modernc.org/sqlite driver does NOT +// expose a typed sentinel for "duplicate column". The error path goes +// through *sqlite.Error with Code()=SQLITE_ERROR (1), which is generic +// — every prepare/exec error returns SQLITE_ERROR with the message +// distinguishing the specific failure. So we must match on the message +// text. The current driver (v1.34.5) emits the literal string +// "duplicate column name: " (origin: SQLite's +// sqlite3AddColumn() in src/build.c). The test +// TestIsDuplicateColumnErr_DriverStringPinned in +// async_migration_progress_test.go pins this string so a driver +// upgrade that changes the wording fails the test instead of silently +// breaking idempotency. +func isDuplicateColumnErr(err error) bool { + if err == nil { + return false + } + msg := strings.ToLower(err.Error()) + return strings.Contains(msg, "duplicate column") +} + +// recordAsyncMigrationProgress writes (processed, total) for `name`, but +// no more than once per second per migration name. Pass terminal=true to +// force the write (used for final stable counts). +func recordAsyncMigrationProgress(db *sql.DB, name string, processed, total int64) error { + return recordAsyncMigrationProgressEx(db, name, processed, total, false) +} + +// recordAsyncMigrationProgressTerminal forces a write of the final stable +// counts, bypassing the rate limiter. +func recordAsyncMigrationProgressTerminal(db *sql.DB, name string, processed, total int64) error { + return recordAsyncMigrationProgressEx(db, name, processed, total, true) +} + +func recordAsyncMigrationProgressEx(db *sql.DB, name string, processed, total int64, terminal bool) error { + now := time.Now() + progressGateMu.Lock() + last, ok := progressLastWriteAt[name] + if !terminal && ok && now.Sub(last) < time.Second { + progressGateMu.Unlock() + return nil + } + progressLastWriteAt[name] = now + progressGateMu.Unlock() + + res, err := db.Exec(` + UPDATE _async_migrations + SET rows_processed = ?, rows_total = ?, last_update_at = ? + WHERE name = ?`, + processed, total, now.UTC().Format(time.RFC3339), name) + if err != nil { + // Most likely schema-missing on a legacy DB that didn't run + // ensureAsyncMigrationProgressColumns. #1735 finding #9: + // rate-limit the warn log to ~1/min so every error stays + // observable (sync.Once silenced all future errors — that + // hides ongoing problems). + now := time.Now() + schemaWarnMu.Lock() + if schemaWarnLastAt.IsZero() || now.Sub(schemaWarnLastAt) > time.Minute { + schemaWarnLastAt = now + log.Printf("[async-migration] progress write failed (rate-limited 1/min): %v", err) + } + schemaWarnMu.Unlock() + return err + } + // #1735 finding #7: a UPDATE that affects 0 rows means the migration + // bookkeeping row is missing — every caller of this function expects + // RunAsyncMigration to have inserted the row already. Silently + // returning nil would let backfills "succeed" while their progress + // surface stays at 0/0 forever. Treat as a hard error so the caller + // can mark the migration failed. + n, raErr := res.RowsAffected() + if raErr != nil { + return fmt.Errorf("recordAsyncMigrationProgress(%s) RowsAffected: %w", name, raErr) + } + if n == 0 { + return fmt.Errorf("recordAsyncMigrationProgress(%s): no row updated (bookkeeping row missing)", name) + } + return nil +} + +// resetAsyncMigrationProgress wipes per-migration progress fields on retry +// so the new run's denominator is honest. +func resetAsyncMigrationProgress(db *sql.DB, name string) error { + progressGateMu.Lock() + delete(progressLastWriteAt, name) + progressGateMu.Unlock() + _, err := db.Exec(` + UPDATE _async_migrations + SET rows_processed = 0, rows_total = 0, last_update_at = NULL + WHERE name = ?`, name) + return err +} diff --git a/cmd/ingestor/async_migration_progress_test.go b/cmd/ingestor/async_migration_progress_test.go new file mode 100644 index 00000000..2adbc7b3 --- /dev/null +++ b/cmd/ingestor/async_migration_progress_test.go @@ -0,0 +1,171 @@ +// Tests for async migration progress columns + rate-limited writes (#1724). + +package main + +import ( + "context" + "database/sql" + "testing" + "time" +) + +func TestAsyncMigrationProgress_ColumnsExist(t *testing.T) { + s := newTestStore(t) + s.WaitForAsyncMigrations() + if err := ensureAsyncMigrationProgressColumns(s.db); err != nil { + t.Fatalf("ensure cols: %v", err) + } + rows, err := s.db.Query(`PRAGMA table_info(_async_migrations)`) + if err != nil { + t.Fatal(err) + } + defer rows.Close() + have := map[string]bool{} + for rows.Next() { + var cid int + var name, ctype string + var notnull, pk int + var dflt sql.NullString + _ = rows.Scan(&cid, &name, &ctype, ¬null, &dflt, &pk) + have[name] = true + } + for _, want := range []string{"rows_processed", "rows_total", "last_update_at"} { + if !have[want] { + t.Errorf("missing column %q", want) + } + } +} + +func TestAsyncMigrationProgress_RateLimited(t *testing.T) { + s := newTestStore(t) + s.WaitForAsyncMigrations() + const name = "test_rate_limit_v1" + // Register the migration row. + if err := s.RunAsyncMigration(context.Background(), name, func(ctx context.Context, d *sql.DB) error { + return nil + }); err != nil { + t.Fatal(err) + } + s.WaitForAsyncMigrations() + + // First write: should land. + if err := recordAsyncMigrationProgress(s.db, name, 10, 100); err != nil { + t.Fatal(err) + } + // Second write immediately: should be suppressed (still equals 10). + if err := recordAsyncMigrationProgress(s.db, name, 20, 100); err != nil { + t.Fatal(err) + } + var p, total int64 + _ = s.db.QueryRow(`SELECT rows_processed, rows_total FROM _async_migrations WHERE name=?`, name).Scan(&p, &total) + if p != 10 { + t.Errorf("rate-limited write leaked through: processed=%d, want 10", p) + } + + // Terminal write: forces past the limiter. + if err := recordAsyncMigrationProgressTerminal(s.db, name, 999, 100); err != nil { + t.Fatal(err) + } + _ = s.db.QueryRow(`SELECT rows_processed FROM _async_migrations WHERE name=?`, name).Scan(&p) + if p != 999 { + t.Errorf("terminal write not honored: processed=%d, want 999", p) + } +} + +func TestAsyncMigrationProgress_ResetOnRetry(t *testing.T) { + s := newTestStore(t) + s.WaitForAsyncMigrations() + const name = "test_retry_reset_v1" + + // Run once, write some progress, fail it. + if err := s.RunAsyncMigration(context.Background(), name, func(ctx context.Context, d *sql.DB) error { + _ = recordAsyncMigrationProgressTerminal(d, name, 42, 100) + return errSentinel{} + }); err != nil { + t.Fatal(err) + } + s.WaitForAsyncMigrations() + + // Re-register: rows_processed must reset to 0. + if err := s.RunAsyncMigration(context.Background(), name, func(ctx context.Context, d *sql.DB) error { + return nil + }); err != nil { + t.Fatal(err) + } + s.WaitForAsyncMigrations() + var p int64 + _ = s.db.QueryRow(`SELECT rows_processed FROM _async_migrations WHERE name=?`, name).Scan(&p) + if p != 0 { + t.Errorf("retry did not reset rows_processed: got %d, want 0", p) + } +} + +type errSentinel struct{} + +func (errSentinel) Error() string { return "sentinel" } + +func TestAsyncMigrationProgress_TerminalForcesWithinSecond(t *testing.T) { + s := newTestStore(t) + s.WaitForAsyncMigrations() + const name = "test_terminal_force_v1" + if err := s.RunAsyncMigration(context.Background(), name, func(ctx context.Context, d *sql.DB) error { + return nil + }); err != nil { + t.Fatal(err) + } + s.WaitForAsyncMigrations() + _ = recordAsyncMigrationProgress(s.db, name, 1, 10) + time.Sleep(5 * time.Millisecond) + _ = recordAsyncMigrationProgressTerminal(s.db, name, 10, 10) + var p int64 + _ = s.db.QueryRow(`SELECT rows_processed FROM _async_migrations WHERE name=?`, name).Scan(&p) + if p != 10 { + t.Errorf("terminal within rate window not honored: got %d, want 10", p) + } +} + +// TestIsDuplicateColumnErr_DriverStringPinned (#1735 finding #8): the +// modernc.org/sqlite driver does not expose a typed sentinel for the +// "duplicate column" ADD COLUMN failure. We rely on a substring match +// against the driver's error text. This test pins the current driver's +// exact error string so a driver upgrade that changes the wording +// fails CI loudly instead of silently breaking +// ensureAsyncMigrationProgressColumns idempotency (which would cause +// the second-ever ALTER to return an error and break boot). +func TestIsDuplicateColumnErr_DriverStringPinned(t *testing.T) { + s := newTestStore(t) + s.WaitForAsyncMigrations() + // First ALTER should succeed (or already-exist; either is fine). + // PREFLIGHT: async=true reason="test probe — single ALTER ADD COLUMN on the bookkeeping _async_migrations table in an in-memory test DB. Microseconds at any scale." + _, _ = s.db.Exec(`ALTER TABLE _async_migrations ADD COLUMN __dup_probe TEXT`) + // Second ALTER MUST produce the "duplicate column" error so we can + // pin the wording. + // PREFLIGHT: async=true reason="test probe — intentional duplicate ALTER on a test DB to provoke and pin the driver's duplicate-column error wording." + _, err := s.db.Exec(`ALTER TABLE _async_migrations ADD COLUMN __dup_probe TEXT`) + if err == nil { + t.Fatalf("second ALTER ADD COLUMN should have errored") + } + if !isDuplicateColumnErr(err) { + t.Fatalf("isDuplicateColumnErr returned false for %q — driver error wording changed; update isDuplicateColumnErr and this test", err.Error()) + } + // Cleanup: drop the probe column is not supported by SQLite ALTER, + // so leave it — fresh test store each call. +} + +// TestEnsureAsyncMigrationProgressColumns_RunsOncePerProcess (#1735 finding +// #9): the sync.Once guard means a process that boots and runs N async +// migrations does not re-run ALTER TABLE for every migration. We verify +// the call is idempotent across DB handles AND that resetting the once +// (test-only) re-enables a fresh run on a separate handle. +func TestEnsureAsyncMigrationProgressColumns_RunsOncePerProcess(t *testing.T) { + s := newTestStore(t) + s.WaitForAsyncMigrations() + resetEnsureColumnsOnceForTest() + if err := ensureAsyncMigrationProgressColumns(s.db); err != nil { + t.Fatalf("first: %v", err) + } + // Second call is a no-op (sync.Once skipped). + if err := ensureAsyncMigrationProgressColumns(s.db); err != nil { + t.Fatalf("second: %v", err) + } +} diff --git a/cmd/ingestor/db.go b/cmd/ingestor/db.go index a5d54fa0..b0d1d60a 100644 --- a/cmd/ingestor/db.go +++ b/cmd/ingestor/db.go @@ -158,27 +158,58 @@ func OpenStoreWithInterval(dbPath string, sampleIntervalSec int) (*Store, error) } } - // #1690: backfill transmissions.last_seen from MAX(observations.timestamp) - // per transmission. The column is added inline by dbschema.Apply (cheap - // metadata-only ALTER); the populate query is potentially expensive - // (full obs scan + group) so we run it async. Subsequent observation - // inserts maintain the column inline (see InsertTransmission below). - // PREFLIGHT: async=true reason="full-table backfill JOIN (1.9M+ obs × 86k+ tx in prod) — must not block ingestor boot" + // #1690 (#1724 cold-load fix): backfill transmissions.last_seen from + // MAX(observations.timestamp) per transmission. The column is added + // inline by dbschema.Apply (cheap metadata-only ALTER); the populate + // query was originally a single correlated UPDATE that pinned the + // single writer for 10-15 min on operator-scale DBs (#1724). + // chunkedTxLastSeenBackfill runs the work in bounded chunks with a + // reader yield between batches so /api/healthz, packet reads, and + // ingest do not queue behind it. Math for ~71K transmissions: + // 71K / 5000 ≈ 15 batches × (~50ms exec + 100ms yield) ≈ ~2.5s wall + // with readers slotted in at most every 150ms. PR #1725 docs claimed + // "~300 batches × 150ms ≈ 45s" — that confused observations (1.5M) + // with transmissions (71K); the real number is ~20x smaller. + // Subsequent observation inserts maintain the column inline (see + // InsertTransmission + stmtBumpTxLastSeen below). + // PREFLIGHT: async=true reason="chunked backfill UPDATE; bounded reader yield every batch; safe at any scale" if err := s.RunAsyncMigration(context.Background(), "tx_last_seen_backfill_v1", func(ctx context.Context, d *sql.DB) error { - log.Println("[migration/async] Backfilling transmissions.last_seen from MAX(observations.timestamp)...") - res, err := d.ExecContext(ctx, ` - UPDATE transmissions - SET last_seen = COALESCE(( - SELECT MAX(timestamp) FROM observations WHERE transmission_id = transmissions.id - ), last_seen) - WHERE last_seen = 0 - `) + log.Println("[migration/async] Backfilling transmissions.last_seen (chunked, reader-yielding)...") + // #1735 finding #7 (Group A): track progress-write failures + // so a persistent bookkeeping error (missing row, schema + // drift) marks the migration failed instead of silently + // running to completion with no visible progress. + var progressErrs int + processed, total, err := chunkedTxLastSeenBackfill(ctx, d, 5000, 100*time.Millisecond, + func(p, t int64) { + if perr := recordAsyncMigrationProgress(d, "tx_last_seen_backfill_v1", p, t); perr != nil { + progressErrs++ + if progressErrs == 1 { + log.Printf("[migration/async] progress write failed (will fail migration if persistent): %v", perr) + } + } + }) if err != nil { + // Force-write whatever counts we have so the surfaced + // progress reflects the failure point, not stale data. + if perr := recordAsyncMigrationProgressTerminal(d, "tx_last_seen_backfill_v1", processed, total); perr != nil { + log.Printf("[migration/async] terminal progress write failed: %v", perr) + } return err } - n, _ := res.RowsAffected() - log.Printf("[migration/async] transmissions.last_seen backfill complete: %d rows updated", n) + // Force-write the terminal stable counts past the rate limiter. + if perr := recordAsyncMigrationProgressTerminal(d, "tx_last_seen_backfill_v1", processed, total); perr != nil { + // Terminal write failure is itself a failed migration: + // the surface counts are now untrustworthy. + return fmt.Errorf("terminal progress write: %w", perr) + } + if progressErrs > 0 { + // In-loop progress writes failed but terminal succeeded — + // log but do not fail. The terminal write is authoritative. + log.Printf("[migration/async] %d in-loop progress writes failed (terminal write OK)", progressErrs) + } + log.Printf("[migration/async] transmissions.last_seen backfill complete: %d / %d rows", processed, total) return nil }); err != nil { log.Printf("[migration/async] scheduling tx_last_seen_backfill_v1 failed: %v", err) diff --git a/cmd/ingestor/tx_last_seen_backfill.go b/cmd/ingestor/tx_last_seen_backfill.go new file mode 100644 index 00000000..d5189111 --- /dev/null +++ b/cmd/ingestor/tx_last_seen_backfill.go @@ -0,0 +1,216 @@ +// Package main: chunked tx_last_seen_v1 backfill (issue #1724). +// +// Background: the original #1690 backfill was a single correlated UPDATE +// +// UPDATE transmissions +// SET last_seen = (SELECT MAX(timestamp) FROM observations WHERE transmission_id = transmissions.id) +// WHERE last_seen = 0 +// +// run under the ingestor's SetMaxOpenConns(1) writer. On real-size +// operator DBs (~71K transmissions / ~1.5M observations / ~2GB) it pinned +// the single writer connection for 10-15 minutes after +// backgroundLoadComplete=true, queueing every reader behind +// sqlite_busy_timeout. The result was an unresponsive system long after +// the warm-up banner cleared. +// +// Fix: chunk the work, yield the writer between chunks. Each chunk is one +// bounded UPDATE (size = batchSize transmissions). Between chunks we sleep +// yieldDelay so concurrent readers can slot in. The maxID snapshot taken +// at start ensures concurrent ingest (id > maxID) cannot trap us in an +// infinite loop. Orphan transmissions (no matching observation row) are +// filtered out via EXISTS so the loop always terminates. +// +// Math reality-check (for the operator-scale DB): +// - 71K transmissions, batchSize=5000 → ~15 chunks +// - per-chunk cost: bounded scan over (idx_tx_last_seen) + ~5K +// correlated MAX lookups on observations(transmission_id) → tens of ms +// - per-chunk yield: 100ms +// - wall time: ~15 * (~50ms + 100ms) ≈ 2-3s total, with readers slotted +// in every 150ms ceiling latency. +// - Previously: one ~10-15min UPDATE held the writer for the full duration. +// +// The original PR description claimed "300 batches × 150ms ≈ 45s" — that +// was wrong; it confused observations (1.5M) with transmissions (71K). +// The real number is ~20x smaller. See PR #1725 review history. + +package main + +import ( + "context" + "database/sql" + "fmt" + "time" +) + +// txBackfillProgressFn is invoked AFTER each non-empty batch with the running +// (processed, total) counts AND once more at the end with the final stable +// counts (terminal callback). It is NOT invoked on a stale n=0 batch. +// Callers must tolerate the terminal call being identical to the last +// in-flight call (this is a benign no-op for status surfaces). +type txBackfillProgressFn func(processed, total int64) + +// chunkedTxLastSeenBackfill is the orphan-safe, reader-yielding replacement +// for the single-statement #1690 backfill. +// +// Contract: +// - batchSize MUST be > 0 (rejected at the entrance — no <0 sentinel; explicit failure). +// - yieldDelay MUST be >= 0 (zero = no sleep, still releases the writer between Execs). +// - On ctx cancellation BETWEEN batches, the function returns +// context.Canceled (or ctx.Err()) with the partial-progress counts — +// it does NOT mark the migration done. +// - All errors propagate (snapshot maxID, count total, UPDATE, RowsAffected). +// - Progress callback fires per non-empty batch + once terminal with final +// counts. #1735 finding #10: the terminal fire is SUPPRESSED when the +// last in-loop fire already reported the same (processed,total) — happens +// when the final batch is exactly batchSize-sized and we then break on +// the next n=0 chunk. Avoids duplicate progress events to listeners. +// - #1735 finding #14: the progress callback runs inside defer-recover so +// a panicking callback does NOT crash the ingestor. A recovered panic +// is converted to an error and returned. +// - Concurrent INSERTs (id > maxID) are intentionally skipped — they're +// handled inline by stmtBumpTxLastSeen on the writer fast-path. +// - Orphan transmissions (no observations) are skipped via EXISTS so the +// loop terminates deterministically. +func chunkedTxLastSeenBackfill( + ctx context.Context, + db *sql.DB, + batchSize int, + yieldDelay time.Duration, + progress txBackfillProgressFn, +) (processed int64, total int64, err error) { + if batchSize <= 0 { + return 0, 0, fmt.Errorf("chunkedTxLastSeenBackfill: batchSize must be > 0 (got %d)", batchSize) + } + if yieldDelay < 0 { + return 0, 0, fmt.Errorf("chunkedTxLastSeenBackfill: yieldDelay must be >= 0 (got %v)", yieldDelay) + } + + // #1735 finding #14 (Group E): wrap the progress callback so a + // panic inside operator-supplied code (or a buggy bookkeeping + // write) returns as an error instead of crashing the ingestor + // goroutine. We also remember the last reported counts so the + // terminal fire can be suppressed when redundant (finding #10). + var ( + lastFiredAt bool + lastFiredP int64 + lastFiredT int64 + callbackErr error + ) + safeProgress := func(p, t int64) { + if progress == nil { + lastFiredAt = true + lastFiredP, lastFiredT = p, t + return + } + defer func() { + if r := recover(); r != nil { + callbackErr = fmt.Errorf("progress callback panic: %v", r) + } + }() + progress(p, t) + lastFiredAt = true + lastFiredP, lastFiredT = p, t + } + + var maxID int64 + if err := db.QueryRowContext(ctx, + `SELECT COALESCE(MAX(id), 0) FROM transmissions`).Scan(&maxID); err != nil { + return 0, 0, fmt.Errorf("snapshot transmissions.max(id): %w", err) + } + + // Count only the rows we actually intend to touch (last_seen=0, has obs, + // id<=maxID). This gives operators an honest denominator. + if err := db.QueryRowContext(ctx, ` + SELECT COUNT(*) FROM transmissions t + WHERE t.last_seen = 0 + AND t.id <= ? + AND EXISTS (SELECT 1 FROM observations o WHERE o.transmission_id = t.id) + `, maxID).Scan(&total); err != nil { + return 0, 0, fmt.Errorf("count backfill total: %w", err) + } + + if total == 0 { + safeProgress(0, 0) + if callbackErr != nil { + return 0, 0, callbackErr + } + return 0, 0, nil + } + + for { + if cerr := ctx.Err(); cerr != nil { + safeProgress(processed, total) + if callbackErr != nil { + return processed, total, callbackErr + } + return processed, total, cerr + } + + // One bounded chunk. The inner ORDER BY id + LIMIT gives deterministic + // forward progress; EXISTS skips orphans; id<=maxID keeps concurrent + // inserts out of scope so the loop terminates. + res, uerr := db.ExecContext(ctx, ` + UPDATE transmissions + SET last_seen = ( + SELECT MAX(timestamp) FROM observations + WHERE transmission_id = transmissions.id + ) + WHERE id IN ( + SELECT id FROM transmissions + WHERE last_seen = 0 + AND id <= ? + AND EXISTS ( + SELECT 1 FROM observations WHERE transmission_id = transmissions.id + ) + ORDER BY id + LIMIT ? + ) + `, maxID, batchSize) + if uerr != nil { + return processed, total, fmt.Errorf("chunk update: %w", uerr) + } + n, raErr := res.RowsAffected() + if raErr != nil { + return processed, total, fmt.Errorf("chunk RowsAffected: %w", raErr) + } + if n == 0 { + // All eligible rows processed (or none left). Do NOT fire a stale + // progress here; terminal fire happens once outside the loop. + break + } + processed += n + safeProgress(processed, total) + if callbackErr != nil { + return processed, total, callbackErr + } + + if yieldDelay > 0 { + t := time.NewTimer(yieldDelay) + select { + case <-ctx.Done(): + if !t.Stop() { + <-t.C + } + safeProgress(processed, total) + if callbackErr != nil { + return processed, total, callbackErr + } + return processed, total, ctx.Err() + case <-t.C: + // timer fired; loop continues. + } + } + } + + // Terminal callback: only fire if counts differ from the last + // in-loop fire (#1735 finding #10). When the last batch is exactly + // batchSize-sized, the in-loop fire already reported the final + // (processed,total) — a second identical fire is redundant noise. + if !lastFiredAt || lastFiredP != processed || lastFiredT != total { + safeProgress(processed, total) + if callbackErr != nil { + return processed, total, callbackErr + } + } + return processed, total, nil +} diff --git a/cmd/ingestor/tx_last_seen_backfill_test.go b/cmd/ingestor/tx_last_seen_backfill_test.go new file mode 100644 index 00000000..24c4ee49 --- /dev/null +++ b/cmd/ingestor/tx_last_seen_backfill_test.go @@ -0,0 +1,468 @@ +// Tests for the chunked tx_last_seen backfill (issue #1724). +// +// These tests pin the contract the operator-scale fix MUST honor: +// +// - The loop actually yields the writer between batches (concurrent +// reader gets through in bounded latency — NOT just "chunks happen", +// which a single-tx fake could satisfy). +// - With seedN=12000 + batchSize=5000 the progress callback fires +// multiple times (≥3 including terminal) — proves chunking happened. +// - ctx cancellation mid-loop returns context.Canceled, partial rows +// are committed (visible in the DB), no panic. +// - Concurrent INSERT of new last_seen=0 rows during the loop does +// NOT cause infinite iteration (maxID snapshot bounds the scan). +// - Errors from inner queries (RowsAffected via driver poisoning) and +// parameter validation propagate — migration does not silently mark done. +// - Orphan transmissions (no matching observations) do NOT trap the +// loop in an infinite n=0/n=0 ping-pong: the EXISTS filter skips them. + +package main + +import ( + "context" + "database/sql" + "fmt" + "sync/atomic" + "testing" + "time" +) + +// seedTransmissions inserts n transmissions each with one observation, +// last_seen=0. Returns when done. Uses the store's raw db. +func seedTransmissions(t *testing.T, s *Store, n int) { + t.Helper() + // Wait for the boot-time async migrations to release the single writer + // connection before we start a big seed — they're harmless on the empty + // store but they race for SetMaxOpenConns(1). + s.WaitForAsyncMigrations() + tx, err := s.db.Begin() + if err != nil { + t.Fatalf("seed begin: %v", err) + } + for i := 0; i < n; i++ { + res, err := tx.Exec(` + INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json, last_seen) + VALUES (?, ?, datetime('now'), 2, 2, 0, '{}', 0) + `, "deadbeef", randHex(i)) + if err != nil { + t.Fatalf("seed tx %d: %v", i, err) + } + id, _ := res.LastInsertId() + // Observer row + observation referencing this tx, with a stable timestamp. + ts := int64(1_700_000_000 + i) + if _, err := tx.Exec(` + INSERT INTO observations (transmission_id, observer_idx, timestamp, snr, rssi, score, path_json) + VALUES (?, NULL, ?, 0, 0, 0, '[]') + `, id, ts); err != nil { + t.Fatalf("seed obs %d: %v", i, err) + } + } + if err := tx.Commit(); err != nil { + t.Fatalf("seed commit: %v", err) + } +} + +func randHex(i int) string { + return fmt.Sprintf("%016x", uint64(i)*0x9E3779B97F4A7C15+1) +} + +// TestChunkedBackfill_YieldsToReaderBetweenBatches asserts that a concurrent +// reader gets through in bounded latency while the backfill is running. A +// fake single-transaction implementation would NOT satisfy this — the +// reader would queue behind sqlite_busy_timeout for the full duration. +// +// #1735 finding #13 (kent-beck BLOCKER): the original threshold (12K rows, +// 500ms reader-latency bound) is too loose — a single-tx fake whose total +// wall time is <500ms could pass. We tighten by (a) sampling baseline +// reader latency BEFORE the backfill starts, then (b) asserting that the +// during-backfill reader latency is < 5x baseline AND < 80ms absolute. +// A single-tx loop that holds the writer the entire wall time would push +// the during-latency ratio into the 50-100x range (readers queue behind +// busy_timeout for the full UPDATE duration), which would fail this +// assertion deterministically. Comment intent: this assertion bites a +// fake that drops the per-batch yield, even if total wall time is short. +func TestChunkedBackfill_YieldsToReaderBetweenBatches(t *testing.T) { + s := newTestStore(t) + seedTransmissions(t, s, 12_000) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // (a) Baseline: sample reader latency with NO concurrent backfill. + // Average of a few probes; absorbs cold-cache effects. + var baselineSum time.Duration + const baselineProbes = 5 + for i := 0; i < baselineProbes; i++ { + t0 := time.Now() + var c int64 + if err := s.db.QueryRowContext(ctx, + `SELECT COUNT(*) FROM transmissions`).Scan(&c); err != nil { + t.Fatalf("baseline scan: %v", err) + } + baselineSum += time.Since(t0) + } + baseline := baselineSum / baselineProbes + if baseline == 0 { + baseline = time.Microsecond // floor to avoid div-by-zero + } + + // (b) Backfill in background with modest yield. + backfillDone := make(chan error, 1) + go func() { + _, _, err := chunkedTxLastSeenBackfill(ctx, s.db, 2000, 50*time.Millisecond, nil) + backfillDone <- err + }() + + // Give the backfill a moment to start its first chunk. + time.Sleep(20 * time.Millisecond) + + // Sample best reader latency during backfill. We take the BEST of + // several attempts (not worst) because a single transient queue + // behind one Exec is unavoidable; what matters is that SOME probe + // slots in between batches. A single-tx implementation would + // produce best-during-latency ≈ total wall time / probes. + readDeadline := time.Now().Add(3 * time.Second) + bestDuring := time.Hour + for time.Now().Before(readDeadline) { + t0 := time.Now() + var c int64 + if err := s.db.QueryRowContext(ctx, + `SELECT COUNT(*) FROM transmissions`).Scan(&c); err != nil { + t.Fatalf("during scan: %v", err) + } + if d := time.Since(t0); d < bestDuring { + bestDuring = d + } + // Short pause between probes so we sample multiple yield windows. + time.Sleep(10 * time.Millisecond) + } + t.Logf("baseline=%v bestDuring=%v ratio=%.2fx", baseline, bestDuring, float64(bestDuring)/float64(baseline)) + + if bestDuring > 80*time.Millisecond { + t.Errorf("best reader latency during backfill=%v exceeds 80ms — yield is not effective", bestDuring) + } + if float64(bestDuring) > 5.0*float64(baseline) && bestDuring > 5*time.Millisecond { + // Floor at 5ms to avoid flaky failures when baseline is <1ms + // (the 5x ratio is meaningless at sub-ms scale). + t.Errorf("reader latency ratio (during/baseline)=%v/%v=%.1fx exceeds 5x — backfill likely not yielding the writer", + bestDuring, baseline, float64(bestDuring)/float64(baseline)) + } + + // Backfill should complete. + select { + case err := <-backfillDone: + if err != nil { + t.Fatalf("backfill error: %v", err) + } + case <-time.After(25 * time.Second): + t.Fatalf("backfill did not complete") + } + + // Verify all rows got populated. + var remaining int64 + if err := s.db.QueryRow(`SELECT COUNT(*) FROM transmissions WHERE last_seen = 0`).Scan(&remaining); err != nil { + t.Fatalf("post-check: %v", err) + } + if remaining != 0 { + t.Errorf("remaining last_seen=0 rows: %d, want 0", remaining) + } +} + +// TestChunkedBackfill_MinBatchCount asserts the progress callback fires +// at least 3 times for seedN=12000 / batchSize=5000 (≥2 in-flight chunks +// + 1 terminal). A single-tx fake would fire 1 or 2 times only. +func TestChunkedBackfill_MinBatchCount(t *testing.T) { + s := newTestStore(t) + seedTransmissions(t, s, 12_000) + + var callbacks int64 + progress := func(processed, total int64) { + atomic.AddInt64(&callbacks, 1) + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + processed, total, err := chunkedTxLastSeenBackfill(ctx, s.db, 5000, 5*time.Millisecond, progress) + if err != nil { + t.Fatalf("backfill: %v", err) + } + if total != 12_000 { + t.Errorf("total=%d, want 12000", total) + } + if processed != 12_000 { + t.Errorf("processed=%d, want 12000", processed) + } + got := atomic.LoadInt64(&callbacks) + if got < 3 { + t.Errorf("progress callbacks=%d, want >=3 (proves real chunking)", got) + } +} + +// TestChunkedBackfill_CtxCancelMidLoop cancels ctx between batches and asserts +// the function returns context.Canceled, partial rows are committed (visible +// in DB), and no panic occurs. +func TestChunkedBackfill_CtxCancelMidLoop(t *testing.T) { + s := newTestStore(t) + seedTransmissions(t, s, 10_000) + + ctx, cancel := context.WithCancel(context.Background()) + + // Cancel after first batches have committed but before the loop + // is done. With batchSize=1000 and yieldDelay=80ms, expect several + // batches to complete before the 250ms cancel fires. + go func() { + time.Sleep(250 * time.Millisecond) + cancel() + }() + + _, _, err := chunkedTxLastSeenBackfill(ctx, s.db, 1000, 80*time.Millisecond, nil) + if err == nil { + t.Fatalf("expected context error, got nil") + } + if err != context.Canceled && ctx.Err() != context.Canceled { + t.Fatalf("expected context.Canceled, got %v", err) + } + + var done int64 + if err := s.db.QueryRow(`SELECT COUNT(*) FROM transmissions WHERE last_seen != 0`).Scan(&done); err != nil { + t.Fatalf("post-check: %v", err) + } + if done == 0 { + t.Errorf("expected partial commits (>0), got 0 — backfill is not committing per chunk") + } + if done >= 10_000 { + t.Errorf("cancel did not take effect, all %d rows committed", done) + } +} + +// TestChunkedBackfill_ConcurrentInsertTerminates inserts new last_seen=0 +// rows during the loop. The maxID snapshot must bound the scan so the loop +// terminates. +func TestChunkedBackfill_ConcurrentInsertTerminates(t *testing.T) { + s := newTestStore(t) + seedTransmissions(t, s, 3000) + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + // Concurrent inserter: keeps adding new last_seen=0 rows in tight loop. + stopInserts := make(chan struct{}) + insertsDone := make(chan struct{}) + go func() { + defer close(insertsDone) + i := 100_000 + for { + select { + case <-stopInserts: + return + default: + } + _, err := s.db.Exec(` + INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json, last_seen) + VALUES ('aa', ?, datetime('now'), 2, 2, 0, '{}', 0) + `, randHex(i)) + if err != nil { + return + } + id, _ := func() (int64, error) { + var lid int64 + err := s.db.QueryRow(`SELECT last_insert_rowid()`).Scan(&lid) + return lid, err + }() + _, _ = s.db.Exec(` + INSERT INTO observations (transmission_id, observer_idx, timestamp, snr, rssi, score, path_json) + VALUES (?, NULL, ?, 0, 0, 0, '[]') + `, id, int64(1_700_000_000+i)) + i++ + time.Sleep(time.Millisecond) + } + }() + + _, _, err := chunkedTxLastSeenBackfill(ctx, s.db, 500, 5*time.Millisecond, nil) + close(stopInserts) + <-insertsDone + + if err != nil { + t.Fatalf("backfill should terminate, got err=%v", err) + } +} + +// TestChunkedBackfill_ParamValidation: batchSize<=0 must reject (no <0 sentinel). +func TestChunkedBackfill_ParamValidation(t *testing.T) { + s := newTestStore(t) + s.WaitForAsyncMigrations() + ctx := context.Background() + if _, _, err := chunkedTxLastSeenBackfill(ctx, s.db, 0, 100*time.Millisecond, nil); err == nil { + t.Errorf("batchSize=0 must error") + } + if _, _, err := chunkedTxLastSeenBackfill(ctx, s.db, -1, 100*time.Millisecond, nil); err == nil { + t.Errorf("batchSize=-1 must error") + } + if _, _, err := chunkedTxLastSeenBackfill(ctx, s.db, 100, -1, nil); err == nil { + t.Errorf("negative yieldDelay must error") + } +} + +// TestChunkedBackfill_OrphanTxTerminates: transmission row with no matching +// observation must NOT trap the loop. Using EXISTS in the WHERE clause skips +// the row; the chunk returns n=0 after eligible rows are exhausted; loop ends. +// +// #1735 finding #12: the orphan insert and the seedTransmissions call run +// in SEPARATE transactional contexts on purpose: the orphan is a single +// INSERT with no observation row (so it cannot share seedTransmissions' +// tx, which inserts observations alongside every tx), and keeping them +// split makes the orphan-vs-normal distinction visible in the test body. +// The chunkedTxLastSeenBackfill loop's behavior is committed-state-only +// (no shared transaction with the seed), so the split has no effect on +// what's being asserted. +func TestChunkedBackfill_OrphanTxTerminates(t *testing.T) { + s := newTestStore(t) + s.WaitForAsyncMigrations() + // One orphan tx (no observation row at all) + if _, err := s.db.Exec(` + INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json, last_seen) + VALUES ('00', 'orphan', datetime('now'), 2, 2, 0, '{}', 0) + `); err != nil { + t.Fatalf("seed orphan: %v", err) + } + // Plus one normal tx so total>0. + seedTransmissions(t, s, 5) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + processed, _, err := chunkedTxLastSeenBackfill(ctx, s.db, 10, 5*time.Millisecond, nil) + if err != nil { + t.Fatalf("backfill: %v", err) + } + // Only the 5 non-orphan rows should be touched; orphan retains last_seen=0. + if processed != 5 { + t.Errorf("processed=%d, want 5 (orphan must be skipped)", processed) + } + var orphanLs int64 + _ = s.db.QueryRow(`SELECT last_seen FROM transmissions WHERE hash = 'orphan'`).Scan(&orphanLs) + if orphanLs != 0 { + t.Errorf("orphan tx last_seen=%d, want 0 (no observation to source MAX from)", orphanLs) + } +} + +// TestChunkedBackfill_ErrorPropagation_BadDB: a closed DB must surface as +// an error — migration must NOT silently report success. +func TestChunkedBackfill_ErrorPropagation_BadDB(t *testing.T) { + s := newTestStore(t) + s.WaitForAsyncMigrations() + // Open a second handle to the same path then close it — using a fresh + // closed *sql.DB gives a deterministic ExecContext failure. + bad, err := sql.Open("sqlite", ":memory:") + if err != nil { + t.Fatalf("open: %v", err) + } + bad.Close() + ctx := context.Background() + if _, _, err := chunkedTxLastSeenBackfill(ctx, bad, 100, 0, nil); err == nil { + t.Errorf("closed DB must produce an error; got nil — migration would silently mark done") + } + // Sanity: live store still works (no global mutation). + if _, _, err := chunkedTxLastSeenBackfill(ctx, s.db, 100, 0, nil); err != nil { + t.Fatalf("sanity: live store backfill should succeed, got %v", err) + } +} + +// TestChunkedBackfill_TerminalSuppressedWhenRedundant (#1735 finding #10): +// when the last in-loop batch is exactly batchSize-sized, the next +// iteration breaks with n=0 and the OLD code fired a terminal progress +// callback with the same counts already reported in the last in-loop +// fire. Operators saw duplicate progress events. The fix is to suppress +// the terminal fire when (processed,total) equals the last reported pair. +func TestChunkedBackfill_TerminalSuppressedWhenRedundant(t *testing.T) { + s := newTestStore(t) + // Seed exactly 4 transmissions. batchSize=4 → last in-loop fire is + // (4,4); the next chunk returns n=0; terminal would re-fire (4,4). + seedTransmissions(t, s, 4) + + type fire struct{ p, total int64 } + var fires []fire + progress := func(p, total int64) { + fires = append(fires, fire{p, total}) + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + processed, total, err := chunkedTxLastSeenBackfill(ctx, s.db, 4, 5*time.Millisecond, progress) + if err != nil { + t.Fatalf("backfill: %v", err) + } + if processed != 4 || total != 4 { + t.Fatalf("counts: got (%d,%d), want (4,4)", processed, total) + } + // Must be exactly 1 fire: the in-loop fire of (4,4). No redundant + // terminal fire of the same counts. + if len(fires) != 1 { + t.Errorf("got %d fires, want 1 (terminal must be suppressed when redundant): %+v", len(fires), fires) + } + if fires[0].p != 4 || fires[0].total != 4 { + t.Errorf("fire[0]=%+v, want {4,4}", fires[0]) + } +} + +// TestChunkedBackfill_PanicInCallbackRecovered (#1735 finding #14, kent-beck +// MAJOR): a panicking progress callback MUST NOT crash the ingestor. The +// recovered panic is converted to an error and returned so RunAsyncMigration +// can mark the migration failed with the panic message in ErrorMessage. +func TestChunkedBackfill_PanicInCallbackRecovered(t *testing.T) { + s := newTestStore(t) + seedTransmissions(t, s, 100) + progress := func(p, total int64) { + panic("operator-supplied callback exploded") + } + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + _, _, err := chunkedTxLastSeenBackfill(ctx, s.db, 25, 0, progress) + if err == nil { + t.Fatalf("expected error from panicking callback, got nil") + } + if !contains(err.Error(), "progress callback panic") || !contains(err.Error(), "operator-supplied callback exploded") { + t.Fatalf("error should mention recovered panic + message, got: %v", err) + } +} + +// TestChunkedBackfill_PanicViaRunAsyncMigrationMarksFailed (#1735 finding +// #14): end-to-end coverage via RunAsyncMigration — the panicking +// callback path must leave the migration in 'failed' state, NOT crash +// the ingestor process. +func TestChunkedBackfill_PanicViaRunAsyncMigrationMarksFailed(t *testing.T) { + s := newTestStore(t) + s.WaitForAsyncMigrations() + seedTransmissions(t, s, 50) + name := "tx_last_seen_panic_test" + err := s.RunAsyncMigration(context.Background(), name, func(ctx context.Context, d *sql.DB) error { + _, _, e := chunkedTxLastSeenBackfill(ctx, d, 10, 0, func(p, total int64) { + panic("boom") + }) + return e + }) + if err != nil { + t.Fatalf("RunAsyncMigration scheduling: %v", err) + } + s.WaitForAsyncMigrations() + status, err := s.AsyncMigrationStatus(name) + if err != nil { + t.Fatalf("status: %v", err) + } + if status != "failed" { + t.Errorf("status=%q, want 'failed'", status) + } + var emsg sql.NullString + _ = s.db.QueryRow(`SELECT error FROM _async_migrations WHERE name=?`, name).Scan(&emsg) + if !emsg.Valid || !contains(emsg.String, "boom") { + t.Errorf("error column missing recovered panic msg, got %q", emsg.String) + } +} + +func contains(s, sub string) bool { + for i := 0; i+len(sub) <= len(s); i++ { + if s[i:i+len(sub)] == sub { + return true + } + } + return false +} diff --git a/cmd/server/async_migrations.go b/cmd/server/async_migrations.go new file mode 100644 index 00000000..8f96bdbc --- /dev/null +++ b/cmd/server/async_migrations.go @@ -0,0 +1,275 @@ +// Read-only surface for async migration status (#1724). +// +// The ingestor writes _async_migrations (status, rows_processed, rows_total, +// last_update_at, error). The server READS this table to surface progress +// via /api/healthz (so the warm-up banner can stay visible while a long +// backfill runs) and /api/perf (so operators see per-migration progress +// + ETA + error message). +// +// Reads go through SetMaxOpenConns(1)? No — cmd/server/db.go uses +// SetMaxOpenConns(4) in mode=ro, but the underlying SQLite file's writer +// is single-threaded (the ingestor). To avoid every /api/healthz request +// hitting the disk while a migration is mid-batch, we cache the result +// for asyncMigrationsTTL. + +package main + +import ( + "database/sql" + "encoding/json" + "net/http" + "sync" + "time" + + "golang.org/x/sync/singleflight" +) + +const asyncMigrationsTTL = 5 * time.Second + +// asyncMigrationsSF collapses concurrent /api/healthz + /api/perf calls +// during a cache miss into a single DB read. Errors are not cached and +// each caller gets the same error on a shared in-flight read. +var asyncMigrationsSF singleflight.Group + +// AsyncMigrationInfo is the JSON shape returned via /api/perf and embedded +// in /api/healthz. +type AsyncMigrationInfo struct { + Name string `json:"name"` + Status string `json:"status"` // "running" | "done" | "failed" | "unknown" + StartedAt string `json:"startedAt,omitempty"` + EndedAt string `json:"endedAt,omitempty"` + LastUpdateAt string `json:"lastUpdateAt,omitempty"` + RowsProcessed int64 `json:"rowsProcessed"` + RowsTotal int64 `json:"rowsTotal"` + ElapsedSec float64 `json:"elapsedSec"` + EtaSec float64 `json:"etaSec"` // only meaningful when status="running" + RatePerSec float64 `json:"ratePerSec"` // only meaningful when status="running" + ErrorMessage string `json:"errorMessage,omitempty"` +} + +// asyncMigrationsCache caches the latest successful readAsyncMigrationsRaw +// result. Errors are NOT cached (#1735 finding #4 / Group C): every error +// path retries on the next call so transient I/O failures don't get +// pinned for asyncMigrationsTTL. +var ( + asyncMigrationsCacheMu sync.Mutex + asyncMigrationsCacheAt time.Time + asyncMigrationsCached []AsyncMigrationInfo +) + +// asyncMigrationsNow is overridable for tests. +var asyncMigrationsNow = time.Now + +// readAsyncMigrations returns the current set of async migration info, +// using a short TTL cache to avoid hammering the writer-held DB on hot +// paths like /api/healthz. +// +// Concurrency contract (#1735 finding #3 / Group C): +// - Cache mutex is NEVER held across db.Query — only across the +// check/populate steps. The actual I/O runs through singleflight so +// concurrent callers during a cache miss share one DB read. +// - Errors are NOT cached (#1735 finding #4): a transient query failure +// does not pin healthz/perf at "empty" for asyncMigrationsTTL. +func readAsyncMigrations(db *sql.DB) ([]AsyncMigrationInfo, error) { + // Step 1: cache hit under lock, release before any I/O. + asyncMigrationsCacheMu.Lock() + if !asyncMigrationsCacheAt.IsZero() && + asyncMigrationsNow().Sub(asyncMigrationsCacheAt) < asyncMigrationsTTL { + cached := asyncMigrationsCached + asyncMigrationsCacheMu.Unlock() + return cached, nil + } + asyncMigrationsCacheMu.Unlock() + + // Step 2: do the I/O through singleflight so a thundering herd of + // /api/healthz polls collapses into one query. + v, err, _ := asyncMigrationsSF.Do("read", func() (interface{}, error) { + return readAsyncMigrationsRaw(db) + }) + if err != nil { + // Do NOT cache the error — let the next caller retry. + return nil, err + } + out, _ := v.([]AsyncMigrationInfo) + + // Step 3: re-acquire to populate cache. + asyncMigrationsCacheMu.Lock() + asyncMigrationsCached = out + asyncMigrationsCacheAt = asyncMigrationsNow() + asyncMigrationsCacheMu.Unlock() + return out, nil +} + +// readAsyncMigrationsRaw bypasses the cache. +func readAsyncMigrationsRaw(db *sql.DB) ([]AsyncMigrationInfo, error) { + if db == nil { + return []AsyncMigrationInfo{}, nil + } + rows, err := db.Query(` + SELECT name, + status, + COALESCE(started_at, ''), + COALESCE(ended_at, ''), + COALESCE(last_update_at, ''), + COALESCE(rows_processed, 0), + COALESCE(rows_total, 0), + COALESCE(error, '') + FROM _async_migrations + ORDER BY name + `) + if err != nil { + // Table may not exist on freshly-initialized ingestor DBs that + // have not run a single migration yet. Empty result is the + // honest answer there; everything else is a real error and + // MUST propagate (operators should see ANY corruption, not + // silently get an empty banner). + return []AsyncMigrationInfo{}, err + } + defer rows.Close() + + now := asyncMigrationsNow() + out := make([]AsyncMigrationInfo, 0, 4) + for rows.Next() { + var info AsyncMigrationInfo + var rawStatus string + if err := rows.Scan(&info.Name, &rawStatus, &info.StartedAt, &info.EndedAt, + &info.LastUpdateAt, &info.RowsProcessed, &info.RowsTotal, &info.ErrorMessage); err != nil { + return nil, err + } + info.Status = mapAsyncStatus(rawStatus) + + startTs, startErr := parseAsyncTime(info.StartedAt) + endTs, endErr := parseAsyncTime(info.EndedAt) + // #1735 finding #6: do not silently discard parse errors. Build + // the parseMsg now; append it AFTER the status-driven + // ErrorMessage wipe below so it survives non-failed statuses too. + parseMsg := "" + if startErr != nil { + parseMsg = "startedAt: " + startErr.Error() + } + if endErr != nil { + if parseMsg != "" { + parseMsg += "; " + } + parseMsg += "endedAt: " + endErr.Error() + } + switch info.Status { + case "running": + if !startTs.IsZero() { + info.ElapsedSec = now.Sub(startTs).Seconds() + if info.ElapsedSec > 0 && info.RowsProcessed > 0 { + info.RatePerSec = float64(info.RowsProcessed) / info.ElapsedSec + remaining := info.RowsTotal - info.RowsProcessed + if remaining > 0 && info.RatePerSec > 0 { + info.EtaSec = float64(remaining) / info.RatePerSec + } + } + } + case "done", "failed": + if !startTs.IsZero() && !endTs.IsZero() { + info.ElapsedSec = endTs.Sub(startTs).Seconds() + } + } + if info.Status != "failed" { + info.ErrorMessage = "" + } + // Append parse errors after the wipe so they always surface. + if parseMsg != "" { + if info.ErrorMessage == "" { + info.ErrorMessage = parseMsg + } else { + info.ErrorMessage = info.ErrorMessage + " | " + parseMsg + } + } + out = append(out, info) + } + if err := rows.Err(); err != nil { + return nil, err + } + return out, nil +} + +// mapAsyncStatus maps the raw ingestor-side status string to the API enum. +// Unknown values map to "unknown" (NOT "running") so a corrupted row +// cannot pin the warm-up banner in a perpetual loading state. +func mapAsyncStatus(raw string) string { + switch raw { + case "pending_async": + return "running" + case "done": + return "done" + case "failed": + return "failed" + default: + return "unknown" + } +} + +// anyAsyncMigrationRunning returns true iff any migration is in status +// "running". Failed migrations DO NOT count (operator should see +// "warm-up complete + alert", not an endless banner). +func anyAsyncMigrationRunning(infos []AsyncMigrationInfo) bool { + for _, m := range infos { + if m.Status == "running" { + return true + } + } + return false +} + +// parseAsyncTime parses either RFC3339 (last_update_at written by +// recordAsyncMigrationProgress) or "YYYY-MM-DD HH:MM:SS" (SQLite's +// datetime('now') default for started_at/ended_at). +func parseAsyncTime(s string) (time.Time, error) { + if s == "" { + return time.Time{}, nil + } + if t, err := time.Parse(time.RFC3339, s); err == nil { + return t, nil + } + if t, err := time.Parse("2006-01-02 15:04:05", s); err == nil { + return t.UTC(), nil + } + return time.Time{}, errParseAsyncTime{s: s} +} + +type errParseAsyncTime struct{ s string } + +func (e errParseAsyncTime) Error() string { return "parseAsyncTime: cannot parse " + e.s } + +// invalidateAsyncMigrationsCache is exported for tests that want to skip +// the TTL gate. +func invalidateAsyncMigrationsCache() { + asyncMigrationsCacheMu.Lock() + asyncMigrationsCacheAt = time.Time{} + asyncMigrationsCached = nil + asyncMigrationsCacheMu.Unlock() +} + +// handlePerfAsyncMigrations exposes the read-only async-migration state at +// /api/perf/async-migrations so dashboards / curl can poll progress +// without fetching the full /api/perf payload. +// +// #1735 finding #1 (Group A): on readAsyncMigrations error, return +// HTTP 500 with the error body instead of silently returning an empty +// list. An empty list is a meaningful operator signal (no migrations +// pending); a query failure must be visible, not disguised. +func (s *Server) handlePerfAsyncMigrations(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + if s.db == nil { + writeJSON(w, []AsyncMigrationInfo{}) + return + } + infos, err := readAsyncMigrations(s.db.conn) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + _ = json.NewEncoder(w).Encode(map[string]string{ + "error": "readAsyncMigrations: " + err.Error(), + }) + return + } + if infos == nil { + infos = []AsyncMigrationInfo{} + } + writeJSON(w, infos) +} diff --git a/cmd/server/async_migrations_handler_test.go b/cmd/server/async_migrations_handler_test.go new file mode 100644 index 00000000..ee65f638 --- /dev/null +++ b/cmd/server/async_migrations_handler_test.go @@ -0,0 +1,113 @@ +// HTTP handler tests for /api/perf/async-migrations (#1735 finding #11). +// +// Covers: success (200 + array body), empty list (200 + []), +// readAsyncMigrations error (HTTP 500 + error body), nil db (200 + []). + +package main + +import ( + "database/sql" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + + _ "modernc.org/sqlite" +) + +// makeAsyncMigrationsServer constructs a minimal *Server with the DB +// wrapper populated by the supplied conn (which may be a closed handle +// to provoke the error path). +func makeAsyncMigrationsServer(t *testing.T, conn *sql.DB) *Server { + t.Helper() + invalidateAsyncMigrationsCache() + s := &Server{} + if conn != nil { + s.db = &DB{conn: conn} + } + return s +} + +func TestHandlePerfAsyncMigrations_SuccessNonEmpty(t *testing.T) { + conn := openAsyncTestDB(t) + _, err := conn.Exec(`INSERT INTO _async_migrations + (name, status, started_at, rows_processed, rows_total) + VALUES ('mig_a', 'done', '2026-06-16 11:59:00', 100, 100)`) + if err != nil { + t.Fatal(err) + } + s := makeAsyncMigrationsServer(t, conn) + rr := httptest.NewRecorder() + req := httptest.NewRequest("GET", "/api/perf/async-migrations", nil) + s.handlePerfAsyncMigrations(rr, req) + if rr.Code != http.StatusOK { + t.Fatalf("status=%d, want 200", rr.Code) + } + var got []AsyncMigrationInfo + if err := json.Unmarshal(rr.Body.Bytes(), &got); err != nil { + t.Fatalf("decode: %v (body=%s)", err, rr.Body.String()) + } + if len(got) != 1 || got[0].Name != "mig_a" || got[0].Status != "done" { + t.Errorf("got %+v, want one done mig_a row", got) + } +} + +func TestHandlePerfAsyncMigrations_EmptyList(t *testing.T) { + conn := openAsyncTestDB(t) // table exists, no rows + s := makeAsyncMigrationsServer(t, conn) + rr := httptest.NewRecorder() + req := httptest.NewRequest("GET", "/api/perf/async-migrations", nil) + s.handlePerfAsyncMigrations(rr, req) + if rr.Code != http.StatusOK { + t.Fatalf("status=%d, want 200", rr.Code) + } + var got []AsyncMigrationInfo + if err := json.Unmarshal(rr.Body.Bytes(), &got); err != nil { + t.Fatalf("decode: %v", err) + } + if got == nil || len(got) != 0 { + t.Errorf("want empty array, got %v", got) + } + // Must be `[]` not `null` so JS consumers can iterate without + // nil-checks (warmup-banner.js). + body := strings.TrimSpace(rr.Body.String()) + if !strings.HasPrefix(body, "[") { + t.Errorf("body should start with '[', got %q", body) + } +} + +func TestHandlePerfAsyncMigrations_ReadErrorReturns500(t *testing.T) { + conn, err := sql.Open("sqlite", ":memory:") + if err != nil { + t.Fatal(err) + } + conn.Close() // poison: subsequent Query fails + s := makeAsyncMigrationsServer(t, conn) + rr := httptest.NewRecorder() + req := httptest.NewRequest("GET", "/api/perf/async-migrations", nil) + s.handlePerfAsyncMigrations(rr, req) + if rr.Code != http.StatusInternalServerError { + t.Fatalf("status=%d, want 500 (error must NOT be hidden behind empty list)", rr.Code) + } + var body map[string]string + if err := json.Unmarshal(rr.Body.Bytes(), &body); err != nil { + t.Fatalf("decode err body: %v", err) + } + if !strings.Contains(body["error"], "readAsyncMigrations") { + t.Errorf("error body=%v, want mention of readAsyncMigrations", body) + } +} + +func TestHandlePerfAsyncMigrations_NilDBReturnsEmptyOK(t *testing.T) { + s := makeAsyncMigrationsServer(t, nil) + rr := httptest.NewRecorder() + req := httptest.NewRequest("GET", "/api/perf/async-migrations", nil) + s.handlePerfAsyncMigrations(rr, req) + if rr.Code != http.StatusOK { + t.Fatalf("status=%d, want 200", rr.Code) + } + if strings.TrimSpace(rr.Body.String()) != "[]" { + t.Errorf("body=%q, want '[]'", rr.Body.String()) + } +} diff --git a/cmd/server/async_migrations_test.go b/cmd/server/async_migrations_test.go new file mode 100644 index 00000000..4a413da6 --- /dev/null +++ b/cmd/server/async_migrations_test.go @@ -0,0 +1,202 @@ +// Tests for async migration server-side surface (#1724). + +package main + +import ( + "database/sql" + "strings" + "testing" + "time" + + _ "modernc.org/sqlite" +) + +func openAsyncTestDB(t *testing.T) *sql.DB { + t.Helper() + db, err := sql.Open("sqlite", ":memory:") + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { db.Close() }) + // PREFLIGHT: async=true reason="test fixture CREATE TABLE on a fresh in-memory SQLite DB — not a real schema migration; runs in test setup only." + _, err = db.Exec(` + CREATE TABLE _async_migrations ( + name TEXT PRIMARY KEY, + status TEXT NOT NULL, + started_at TEXT, + ended_at TEXT, + error TEXT, + rows_processed INTEGER DEFAULT 0, + rows_total INTEGER DEFAULT 0, + last_update_at TEXT + ) + `) + if err != nil { + t.Fatal(err) + } + return db +} + +func TestMapAsyncStatus(t *testing.T) { + cases := map[string]string{ + "pending_async": "running", + "done": "done", + "failed": "failed", + "": "unknown", + "garbage": "unknown", + } + for in, want := range cases { + if got := mapAsyncStatus(in); got != want { + t.Errorf("mapAsyncStatus(%q)=%q, want %q", in, got, want) + } + } +} + +func TestAnyAsyncMigrationRunning_FalseOnFailed(t *testing.T) { + infos := []AsyncMigrationInfo{ + {Name: "x", Status: "failed"}, + {Name: "y", Status: "done"}, + } + if anyAsyncMigrationRunning(infos) { + t.Errorf("anyAsyncMigrationRunning should be false when no migration is 'running'") + } +} + +func TestAnyAsyncMigrationRunning_TrueOnRunning(t *testing.T) { + infos := []AsyncMigrationInfo{ + {Name: "x", Status: "done"}, + {Name: "y", Status: "running"}, + } + if !anyAsyncMigrationRunning(infos) { + t.Errorf("anyAsyncMigrationRunning should be true when any migration is 'running'") + } +} + +func TestReadAsyncMigrations_EtaAndRateRunning(t *testing.T) { + db := openAsyncTestDB(t) + // Fix "now" to a known value relative to started_at. + fixed := time.Date(2026, 6, 16, 12, 0, 0, 0, time.UTC) + asyncMigrationsNow = func() time.Time { return fixed } + t.Cleanup(func() { asyncMigrationsNow = time.Now }) + invalidateAsyncMigrationsCache() + + // Started 10s ago, processed 100/1000. + _, err := db.Exec(`INSERT INTO _async_migrations + (name, status, started_at, rows_processed, rows_total) + VALUES ('m1','pending_async','2026-06-16 11:59:50',100,1000)`) + if err != nil { + t.Fatal(err) + } + got, err := readAsyncMigrations(db) + if err != nil { + t.Fatal(err) + } + if len(got) != 1 { + t.Fatalf("want 1 row, got %d", len(got)) + } + m := got[0] + if m.Status != "running" { + t.Errorf("status=%q, want running", m.Status) + } + if m.ElapsedSec < 9.5 || m.ElapsedSec > 10.5 { + t.Errorf("ElapsedSec=%v, want ~10", m.ElapsedSec) + } + // rate = 100/10 = 10 rows/sec; remaining = 900 → eta = 90s. + if m.RatePerSec < 9.5 || m.RatePerSec > 10.5 { + t.Errorf("RatePerSec=%v, want ~10", m.RatePerSec) + } + if m.EtaSec < 85 || m.EtaSec > 95 { + t.Errorf("EtaSec=%v, want ~90", m.EtaSec) + } +} + +func TestReadAsyncMigrations_FailedSurfacesErrorMessage(t *testing.T) { + db := openAsyncTestDB(t) + invalidateAsyncMigrationsCache() + asyncMigrationsNow = func() time.Time { return time.Date(2026, 6, 16, 12, 0, 0, 0, time.UTC) } + t.Cleanup(func() { asyncMigrationsNow = time.Now }) + + _, err := db.Exec(`INSERT INTO _async_migrations + (name, status, started_at, ended_at, error, rows_processed, rows_total) + VALUES ('boom','failed','2026-06-16 11:59:00','2026-06-16 11:59:30','disk full',50,100)`) + if err != nil { + t.Fatal(err) + } + got, err := readAsyncMigrations(db) + if err != nil { + t.Fatal(err) + } + if len(got) != 1 || got[0].Status != "failed" { + t.Fatalf("expected one failed row, got %+v", got) + } + if got[0].ErrorMessage != "disk full" { + t.Errorf("ErrorMessage=%q, want 'disk full'", got[0].ErrorMessage) + } + if got[0].ElapsedSec < 29 || got[0].ElapsedSec > 31 { + t.Errorf("ElapsedSec=%v, want ~30", got[0].ElapsedSec) + } + if anyAsyncMigrationRunning(got) { + t.Errorf("failed migration must not count as running (banner would stick)") + } +} + +func TestReadAsyncMigrations_DoneClearsErrorMessage(t *testing.T) { + db := openAsyncTestDB(t) + invalidateAsyncMigrationsCache() + _, _ = db.Exec(`INSERT INTO _async_migrations + (name, status, started_at, ended_at, error) + VALUES ('ok','done','2026-06-16 11:59:00','2026-06-16 11:59:05','stale')`) + got, err := readAsyncMigrations(db) + if err != nil { + t.Fatal(err) + } + if got[0].ErrorMessage != "" { + t.Errorf("done status must not surface ErrorMessage, got %q", got[0].ErrorMessage) + } +} + +func TestReadAsyncMigrations_PropagatesErrors(t *testing.T) { + db, _ := sql.Open("sqlite", ":memory:") + db.Close() + invalidateAsyncMigrationsCache() + _, err := readAsyncMigrationsRaw(db) + if err == nil { + t.Errorf("closed DB must propagate error, not return nil") + } +} + +func TestParseAsyncTime(t *testing.T) { + if _, err := parseAsyncTime("2026-06-16T11:59:00Z"); err != nil { + t.Errorf("RFC3339 should parse: %v", err) + } + if _, err := parseAsyncTime("2026-06-16 11:59:00"); err != nil { + t.Errorf("SQLite datetime should parse: %v", err) + } + if _, err := parseAsyncTime("not a time"); err == nil { + t.Errorf("bogus value must error") + } + tz, err := parseAsyncTime("") + if err != nil || !tz.IsZero() { + t.Errorf("empty should be zero+nil, got %v / %v", tz, err) + } +} + +func TestReadAsyncMigrations_CachesWithinTTL(t *testing.T) { + db := openAsyncTestDB(t) + invalidateAsyncMigrationsCache() + _, _ = db.Exec(`INSERT INTO _async_migrations(name,status,started_at) VALUES ('a','done','2026-06-16 11:59:00')`) + g1, _ := readAsyncMigrations(db) + // Add another row; cached result must NOT include it. + _, _ = db.Exec(`INSERT INTO _async_migrations(name,status,started_at) VALUES ('b','done','2026-06-16 11:59:00')`) + g2, _ := readAsyncMigrations(db) + if len(g1) != len(g2) { + t.Errorf("cache TTL not honored: g1=%d g2=%d", len(g1), len(g2)) + } +} + +func TestErrParseAsyncTime_Message(t *testing.T) { + e := errParseAsyncTime{s: "x"} + if !strings.Contains(e.Error(), "x") { + t.Errorf("error message missing input") + } +} diff --git a/cmd/server/healthz.go b/cmd/server/healthz.go index 3f554952..f5ef9248 100644 --- a/cmd/server/healthz.go +++ b/cmd/server/healthz.go @@ -41,6 +41,35 @@ func (s *Server) handleHealthz(w http.ResponseWriter, r *http.Request) { // /api/healthz never observes a torn state (e.g. done=true with // processed FAILED_AUTO_DISMISS_MS; + } + + function isFailedDismissed(m) { + return !!(m && m.name && dismissedFailures[m.name]); + } + + function dismissFailedMigration(name) { + if (!name) return; + dismissedFailures[name] = true; + render(); + } + /** * Build ordered list of human-readable warm-up messages from current state. * @@ -54,6 +100,35 @@ ' / ' + fmtNum(total) + ' (' + pct + '%)'); } + // Async migrations (#1724): per-migration progress + failed-state surface. + // Banner stays up while any migration is "running" — gated by isSteadyState + // checking async_migrations_running. Failed migrations are surfaced + // explicitly with their error message; we do NOT silently drop them + // — but #1735 finding #2 (Group B) — they auto-dismiss after + // FAILED_AUTO_DISMISS_MS past endedAt OR on explicit user dismiss, + // so a single failure does not pin the banner forever. + var migrations = Array.isArray(h.async_migrations) ? h.async_migrations : []; + for (var mi = 0; mi < migrations.length; mi++) { + var m = migrations[mi] || {}; + var mname = String(m.name || 'migration'); + if (m.status === 'running') { + var mp = Number(m.rowsProcessed) || 0; + var mt = Number(m.rowsTotal) || 0; + var line = 'Migration ' + mname + ': ' + fmtNum(mp) + ' / ' + fmtNum(mt) + ' rows'; + var eta = Number(m.etaSec); + if (isFinite(eta) && eta > 0) { + line += ' (ETA ' + Math.round(eta) + 's)'; + } + msgs.push(line); + } else if (m.status === 'failed') { + if (isFailedDismissed(m) || isFailedExpired(m, nowMs)) { + continue; // user ack'd or auto-dismiss window elapsed + } + var err = m.errorMessage ? String(m.errorMessage) : 'unknown error'; + msgs.push('Migration ' + mname + ' FAILED: ' + err); + } + } + var liveness = h.ingest_liveness || {}; var srcs = Object.keys(liveness).sort(); for (var i = 0; i < srcs.length; i++) { @@ -76,14 +151,27 @@ } /** - * Steady-state predicate: ready=true AND from_pubkey_backfill.done=true. - * Once true, banner is dismissed and polling is torn down. + * Steady-state predicate: ready=true AND from_pubkey_backfill.done=true + * AND no async migration is currently running (#1724) AND no async migration + * is in a "failed" state that is still visible (#1735 finding #2 — failures + * that have been dismissed by the user OR auto-expired past + * FAILED_AUTO_DISMISS_MS no longer block steady state, so the banner + * doesn't pin forever on a single failure). */ - function isSteadyState(healthz) { + function isSteadyState(healthz, nowMs) { if (!healthz) return false; if (healthz.ready !== true) return false; var bf = healthz.from_pubkey_backfill; if (bf && bf.done === false) return false; + if (healthz.async_migrations_running === true) return false; + var now = (typeof nowMs === 'number') ? nowMs : Date.now(); + var migs = Array.isArray(healthz.async_migrations) ? healthz.async_migrations : []; + for (var i = 0; i < migs.length; i++) { + var m = migs[i]; + if (m && m.status === 'failed' && !isFailedDismissed(m) && !isFailedExpired(m, now)) { + return false; + } + } return true; } @@ -132,15 +220,46 @@ state.listEl.innerHTML = ''; return; } - // Diff-render: rebuild list (always small — <=3 items). + // Build list. For "Migration FAILED:" lines we attach a + // dismiss × button so the user can ack the failure and let the + // banner clear (#1735 finding #2 / Group B). + // innerHTML is fine here — messages are escaped; dismiss handler + // is attached via direct DOM after the rebuild. + var failedNames = []; var html = ''; for (var i = 0; i < msgs.length; i++) { - var safe = String(msgs[i]) + var raw = String(msgs[i]); + var safe = raw .replace(/&/g, '&').replace(//g, '>'); - html += '
  • ' + safe + '
  • '; + // Detect failed-migration line and extract the migration name + // so we know which entry to dismiss when the button is clicked. + var failedMatch = /^Migration (\S+) FAILED:/.exec(raw); + if (failedMatch) { + var nameSafe = String(failedMatch[1]) + .replace(/&/g, '&').replace(//g, '>'); + failedNames.push(failedMatch[1]); + html += '
  • ' + + safe + + '
  • '; + } else { + html += '
  • ' + safe + '
  • '; + } } state.listEl.innerHTML = html; state.el.classList.remove('warmup-banner--hidden'); + // Wire dismiss handlers. + if (failedNames.length > 0 && typeof state.listEl.querySelectorAll === 'function') { + var btns = state.listEl.querySelectorAll('.warmup-banner__dismiss'); + for (var b = 0; b < btns.length; b++) { + btns[b].addEventListener('click', function (ev) { + var name = ev.currentTarget && ev.currentTarget.getAttribute('data-migration'); + if (name) dismissFailedMigration(name); + }); + } + } } function noteLoadStatus(value) { @@ -237,10 +356,13 @@ isSteadyState: isSteadyState, STALE_INGEST_MS: STALE_INGEST_MS, POLL_INTERVAL_MS: POLL_INTERVAL_MS, + FAILED_AUTO_DISMISS_MS: FAILED_AUTO_DISMISS_MS, + dismissFailedMigration: dismissFailedMigration, // Test hooks _state: state, _pollOnce: pollOnce, _installFetchInterceptor: installFetchInterceptor, + _resetDismissedForTest: function () { dismissedFailures = Object.create(null); }, }; if (typeof window !== 'undefined') { diff --git a/test-warmup-banner-failed-dismiss-1735.js b/test-warmup-banner-failed-dismiss-1735.js new file mode 100644 index 00000000..646832be --- /dev/null +++ b/test-warmup-banner-failed-dismiss-1735.js @@ -0,0 +1,117 @@ +/* Unit tests for warmup-banner.js failed-migration dismiss / auto-dismiss + * behavior (#1735 finding #2 / Group B). + * + * Pins: + * - A failed migration past its FAILED_AUTO_DISMISS_MS window auto-clears + * from getWarmupMessages and from isSteadyState (banner can clear). + * - dismissFailedMigration(name) immediately removes the failure from + * the message stream and from isSteadyState gating. + * - A failed migration WITHIN the auto-dismiss window AND not dismissed + * still keeps the banner up (no regression of #1724 surface). + */ +'use strict'; +const vm = require('vm'); +const fs = require('fs'); +const path = require('path'); +const assert = require('assert'); + +let passed = 0, failed = 0; +function test(name, fn) { + try { + fn(); + passed++; + console.log(' \u2705 ' + name); + } catch (e) { + failed++; + console.log(' \u274C ' + name + ': ' + e.message); + } +} + +function loadPureModule() { + const ctx = { window: {}, module: { exports: {} }, console, Date }; + vm.createContext(ctx); + const src = fs.readFileSync(path.join(__dirname, 'public', 'warmup-banner.js'), 'utf8'); + vm.runInContext(src, ctx); + return ctx.window.__warmupBanner || ctx.module.exports; +} + +console.log('warmup-banner.js failed-migration dismiss (#1735):'); + +const api = loadPureModule(); + +function makeFailed(endedAt) { + return { + ready: true, + from_pubkey_backfill: { done: true, processed: 1, total: 1 }, + async_migrations_running: false, + async_migrations: [{ + name: 'tx_last_seen_backfill', + status: 'failed', + endedAt: endedAt, + errorMessage: 'disk I/O error', + }], + ingest_liveness: {}, + }; +} + +test('within auto-dismiss window: failed migration still blocks steady state', () => { + api._resetDismissedForTest(); + const ended = '2026-06-16T11:59:00Z'; + const now = Date.parse(ended) + 60_000; // 1 min after end → well under 10 min + const h = makeFailed(ended); + assert.strictEqual(api.isSteadyState(h, now), false, + 'failed within window must block steady state'); + const msgs = api.getWarmupMessages(h, 'ready', now); + assert.ok(msgs.some(m => /FAILED/.test(m)), + 'failed line must still appear in messages'); +}); + +test('past auto-dismiss window: failed migration auto-clears', () => { + api._resetDismissedForTest(); + const ended = '2026-06-16T11:00:00Z'; + const now = Date.parse(ended) + api.FAILED_AUTO_DISMISS_MS + 1_000; + const h = makeFailed(ended); + assert.strictEqual(api.isSteadyState(h, now), true, + 'failed past window must NOT block steady state'); + const msgs = api.getWarmupMessages(h, 'ready', now); + assert.ok(!msgs.some(m => /FAILED/.test(m)), + 'failed line must be auto-dismissed from messages'); +}); + +test('explicit dismissFailedMigration removes from messages immediately', () => { + api._resetDismissedForTest(); + const ended = '2026-06-16T11:59:00Z'; + const now = Date.parse(ended) + 60_000; // within window + const h = makeFailed(ended); + // Sanity: visible before dismiss. + assert.strictEqual(api.isSteadyState(h, now), false); + // Dismiss. + api.dismissFailedMigration('tx_last_seen_backfill'); + assert.strictEqual(api.isSteadyState(h, now), true, + 'after dismiss must NOT block steady state'); + const msgs = api.getWarmupMessages(h, 'ready', now); + assert.ok(!msgs.some(m => /FAILED/.test(m)), + 'after dismiss failed line must not appear'); +}); + +test('failed migration with no endedAt does NOT auto-dismiss (fails closed)', () => { + api._resetDismissedForTest(); + const h = makeFailed(undefined); // no endedAt + const now = Date.now(); + assert.strictEqual(api.isSteadyState(h, now), false, + 'missing endedAt must keep failure visible — fail closed'); + const msgs = api.getWarmupMessages(h, 'ready', now); + assert.ok(msgs.some(m => /FAILED/.test(m)), + 'failed line still appears when endedAt is missing'); +}); + +test('failed migration with malformed endedAt does NOT auto-dismiss', () => { + api._resetDismissedForTest(); + const h = makeFailed('not a timestamp'); + const now = Date.now(); + assert.strictEqual(api.isSteadyState(h, now), false); +}); + +console.log(''); +console.log('passed=' + passed + ' failed=' + failed); +process.exit(failed > 0 ? 1 : 0); diff --git a/test-warmup-banner-migrations.js b/test-warmup-banner-migrations.js new file mode 100644 index 00000000..cb6606bc --- /dev/null +++ b/test-warmup-banner-migrations.js @@ -0,0 +1,125 @@ +/* Unit tests for warmup-banner.js async-migration surface (#1724). + * + * Pins: + * - Banner stays visible while /api/healthz.async_migrations_running=true, + * even if ready=true and pubkey backfill is done. + * - Per-migration progress line renders name + rows_processed/rows_total + ETA. + * - "failed" status surfaces an explicit error message; isSteadyState=false. + * - When migrations finish (none running, none failed), banner returns to + * steady state. + * + * Matches the vm-sandbox / no-Playwright pattern used by test-warmup-banner.js. + */ +'use strict'; +const vm = require('vm'); +const fs = require('fs'); +const path = require('path'); +const assert = require('assert'); + +let passed = 0, failed = 0; +async function test(name, fn) { + try { + await fn(); + passed++; + console.log(' \u2705 ' + name); + } catch (e) { + failed++; + console.log(' \u274C ' + name + ': ' + e.message); + } +} + +function loadPureModule() { + const ctx = { window: {}, module: { exports: {} }, console, Date }; + vm.createContext(ctx); + const src = fs.readFileSync(path.join(__dirname, 'public', 'warmup-banner.js'), 'utf8'); + vm.runInContext(src, ctx); + return ctx.window.__warmupBanner || ctx.module.exports; +} + +(async function main() { + console.log('warmup-banner.js async migrations (#1724):'); + + const api = loadPureModule(); + const NOW = 1_700_000_000_000; + + // Healthz that would otherwise be steady-state. + function steadyBase() { + return { + ready: true, + from_pubkey_backfill: { done: true, processed: 1, total: 1 }, + ingest_liveness: {}, + }; + } + + await test('async_migrations_running=true keeps banner up despite ready+backfill done', () => { + const h = Object.assign(steadyBase(), { + async_migrations_running: true, + async_migrations: [ + { name: 'tx_last_seen_backfill', status: 'running', + rowsProcessed: 12000, rowsTotal: 71000, etaSec: 2.4 }, + ], + }); + assert.strictEqual(api.isSteadyState(h), false, + 'isSteadyState must be false while a migration is running'); + assert.strictEqual(api.shouldShowBanner(h, 'ready', NOW), true, + 'banner must remain visible while migrations run'); + }); + + await test('per-migration progress line shows name + processed/total + ETA', () => { + const h = Object.assign(steadyBase(), { + async_migrations_running: true, + async_migrations: [ + { name: 'tx_last_seen_backfill', status: 'running', + rowsProcessed: 12000, rowsTotal: 71000, etaSec: 2.4 }, + ], + }); + const msgs = api.getWarmupMessages(h, 'ready', NOW); + const line = msgs.find(m => /tx_last_seen_backfill/.test(m)); + assert.ok(line, 'expected per-migration line; got: ' + JSON.stringify(msgs)); + assert.ok(/12,000/.test(line) && /71,000/.test(line), + 'expected formatted rows; got: ' + line); + assert.ok(/ETA/.test(line) && /2s/.test(line), + 'expected ETA seconds; got: ' + line); + }); + + await test('failed status surfaces error message explicitly (not silently dropped)', () => { + const h = Object.assign(steadyBase(), { + async_migrations_running: false, + async_migrations: [ + { name: 'tx_last_seen_backfill', status: 'failed', + rowsProcessed: 5000, rowsTotal: 71000, + errorMessage: 'disk I/O error' }, + ], + }); + const msgs = api.getWarmupMessages(h, 'ready', NOW); + const line = msgs.find(m => /tx_last_seen_backfill/.test(m)); + assert.ok(line, 'failed migration MUST surface a message; got: ' + JSON.stringify(msgs)); + assert.ok(/FAIL/i.test(line), 'expected FAIL token; got: ' + line); + assert.ok(/disk I\/O error/.test(line), 'expected error message; got: ' + line); + assert.strictEqual(api.isSteadyState(h), false, + 'failed migration must NOT count as steady state'); + }); + + await test('done migrations alone (no running, no failed) → steady state', () => { + const h = Object.assign(steadyBase(), { + async_migrations_running: false, + async_migrations: [ + { name: 'tx_last_seen_backfill', status: 'done', + rowsProcessed: 71000, rowsTotal: 71000 }, + ], + }); + assert.strictEqual(api.isSteadyState(h), true, + 'done-only migrations are steady state'); + assert.strictEqual(api.shouldShowBanner(h, 'ready', NOW), false); + }); + + await test('no async_migrations field at all → behavior unchanged (back-compat)', () => { + const h = steadyBase(); // no async_migrations* fields + assert.strictEqual(api.isSteadyState(h), true); + assert.strictEqual(api.shouldShowBanner(h, 'ready', NOW), false); + }); + + console.log(''); + console.log('passed=' + passed + ' failed=' + failed); + process.exit(failed > 0 ? 1 : 0); +})();