Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 32 additions & 3 deletions cmd/ingestor/async_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,33 @@ import (

// ensureAsyncMigrationsTable creates the bookkeeping table used by
// RunAsyncMigration / AsyncMigrationStatus. Idempotent.
//
// #1724: rows_processed/rows_total/last_update_at columns let long-running
// migrations stream progress to the server's /api/perf endpoint without
// holding shared in-process state across the ingestor/server boundary.
func ensureAsyncMigrationsTable(db *sql.DB) error {
_, err := db.Exec(`
if _, err := db.Exec(`
CREATE TABLE IF NOT EXISTS _async_migrations (
name TEXT PRIMARY KEY,
status TEXT NOT NULL, -- pending_async | done | failed
started_at TEXT NOT NULL DEFAULT (datetime('now')),
ended_at TEXT,
error TEXT
)
`)
return err
`); err != nil {
return err
}
// Best-effort additive columns for progress reporting (#1724).
// IF NOT EXISTS isn't supported for ADD COLUMN until SQLite 3.35; the
// errors are ignored when the column already exists.
for _, sql := range []string{
`ALTER TABLE _async_migrations ADD COLUMN rows_processed INTEGER NOT NULL DEFAULT 0`,
`ALTER TABLE _async_migrations ADD COLUMN rows_total INTEGER NOT NULL DEFAULT 0`,
`ALTER TABLE _async_migrations ADD COLUMN last_update_at TEXT`,
} {
_, _ = db.Exec(sql)
}
return nil
}

// RunAsyncMigration registers `name` as a pending async migration and
Expand Down Expand Up @@ -140,6 +156,19 @@ func (s *Store) AsyncMigrationStatus(name string) (string, error) {
return status, err
}

// recordAsyncMigrationProgress writes the latest progress snapshot for the
// named migration to the _async_migrations bookkeeping row so the server's
// /api/perf can surface mid-flight state to operators (#1724). Best-effort:
// failures are logged but never propagated to the migration body.
func (s *Store) recordAsyncMigrationProgress(name string, p TxLastSeenBackfillProgress) {
if _, err := s.db.Exec(`
UPDATE _async_migrations
SET rows_processed = ?, rows_total = ?, last_update_at = datetime('now')
WHERE name = ?`, p.RowsProcessed, p.RowsTotal, name); err != nil {
log.Printf("[async-migration] failed to record progress for %q: %v", name, err)
}
}

// WaitForAsyncMigrations blocks until all currently-scheduled async migrations
// finish. Intended for tests + graceful shutdown; production boot path does NOT
// call this (that's the whole point).
Expand Down
20 changes: 11 additions & 9 deletions cmd/ingestor/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,21 +163,23 @@ func OpenStoreWithInterval(dbPath string, sampleIntervalSec int) (*Store, error)
// metadata-only ALTER); the populate query is potentially expensive
// (full obs scan + group) so we run it async. Subsequent observation
// inserts maintain the column inline (see InsertTransmission below).
//
// #1724: the populate MUST chunk (LIMIT N + sleep between batches) —
// the original full-table correlated UPDATE pinned the SQLite writer
// 10-15 min on prod-sized DBs, starving every reader. See
// tx_last_seen_backfill.go for the chunking rationale + defaults.
// PREFLIGHT: async=true reason="full-table backfill JOIN (1.9M+ obs × 86k+ tx in prod) — must not block ingestor boot"
if err := s.RunAsyncMigration(context.Background(), "tx_last_seen_backfill_v1",
func(ctx context.Context, d *sql.DB) error {
log.Println("[migration/async] Backfilling transmissions.last_seen from MAX(observations.timestamp)...")
res, err := d.ExecContext(ctx, `
UPDATE transmissions
SET last_seen = COALESCE((
SELECT MAX(timestamp) FROM observations WHERE transmission_id = transmissions.id
), last_seen)
WHERE last_seen = 0
`)
log.Println("[migration/async] Backfilling transmissions.last_seen from MAX(observations.timestamp) (chunked, #1724)...")
n, err := runTxLastSeenBackfillChunked(ctx, d, TxLastSeenBackfillOpts{
Progress: func(p TxLastSeenBackfillProgress) {
s.recordAsyncMigrationProgress("tx_last_seen_backfill_v1", p)
},
})
if err != nil {
return err
}
n, _ := res.RowsAffected()
log.Printf("[migration/async] transmissions.last_seen backfill complete: %d rows updated", n)
return nil
}); err != nil {
Expand Down
152 changes: 152 additions & 0 deletions cmd/ingestor/tx_last_seen_backfill.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// tx_last_seen_backfill — chunked backfill of transmissions.last_seen.
//
// Issue #1724: PR #1691 ran the populate as a single correlated UPDATE; on a
// prod-shaped DB (71K tx / 1.5M obs) that pinned the SQLite writer for 10-15
// min, starving every reader (p95 catastrophic across /api/stats,
// /api/healthz, /api/packets, /api/analytics/hash-sizes, ...). The writer
// path is a SINGLE connection (db.SetMaxOpenConns(1) in OpenStoreWithInterval)
// — every reader queues behind whatever statement currently holds it.
//
// The fix here chunks the UPDATE into batches of `batchSize` rows and sleeps
// `yieldDelay` between batches. Each batch releases + re-acquires the writer
// connection, so reader queries that arrived during the previous batch get
// served in the gap. Progress is reported via the optional callback so the
// migration runner can surface live state on /api/perf and the warm-up
// banner can stay up until the backfill finishes.
//
// Defaults (5000 rows / 100ms sleep) are tuned for prod ARM64 hardware:
// at ~5000 UPDATEs per batch, wall time per batch on the prod DB is
// ~30-80 ms; the 100 ms sleep keeps the writer idle ~55-75% of the time.
// On 1.5M rows that's ~300 batches × ~150 ms = ~45 s of wall time end-to-end
// (vs ~5-7 min of writer-locked dead-air pre-fix). Smaller batches raise
// the overhead-to-work ratio; larger batches risk extending lock windows
// past reader-visible (~200 ms) thresholds.

package main

import (
"context"
"database/sql"
"time"
)

// TxLastSeenBackfillProgress is the snapshot reported to the optional
// progress callback after each batch.
type TxLastSeenBackfillProgress struct {
RowsProcessed int64
RowsTotal int64
BatchNum int
ElapsedMs int64
}

// TxLastSeenBackfillOpts tunes the chunked backfill. Zero values fall back
// to production defaults.
type TxLastSeenBackfillOpts struct {
BatchSize int // rows per UPDATE chunk (default 5000)
YieldDelay time.Duration // sleep between batches (default 100ms); negative means no sleep
Progress func(TxLastSeenBackfillProgress)
}

const (
defaultTxBackfillBatchSize = 5000
defaultTxBackfillYieldDelay = 100 * time.Millisecond
)

// runTxLastSeenBackfillChunked backfills transmissions.last_seen in bounded
// batches. Returns the total number of rows updated. The function is the
// body of the tx_last_seen_backfill_v1 async migration registered in
// OpenStoreWithInterval (#1690 backfill, #1724 chunking).
//
// Contract (pinned by tx_last_seen_backfill_test.go):
// - MUST NOT execute a single full-table UPDATE; readers in another
// goroutine must be able to make forward progress while the backfill
// runs.
// - MUST invoke opts.Progress at least once per batch (when non-nil) so
// the migration runner can surface mid-flight state.
// - MUST honor ctx cancellation between batches; an in-flight batch
// completes, then the loop returns ctx.Err().
// - Idempotent: once `last_seen=0` rows are exhausted the loop exits.
func runTxLastSeenBackfillChunked(ctx context.Context, db *sql.DB, opts TxLastSeenBackfillOpts) (int64, error) {
batch := opts.BatchSize
if batch <= 0 {
batch = defaultTxBackfillBatchSize
}
yield := opts.YieldDelay
if yield == 0 {
yield = defaultTxBackfillYieldDelay
}
if yield < 0 {
yield = 0
}

// One-shot count of pending rows so the progress callback can
// report ETA. This SELECT is a normal reader on the writer
// connection — runs once before the loop so it doesn't extend
// per-batch lock windows.
//
// Snapshot the max transmission id at start (#1724): the chunked
// loop must only process rows that existed when the migration
// began. Without this bound, new INSERTs that land between
// batches (every observation insert that creates a fresh hash
// goes through last_seen=0 → bump) would keep the loop alive
// indefinitely, deadlocking shutdown paths that wait on
// backfillWg. New rows are already maintained inline by
// InsertTransmission's last_seen bumper (#1690 writer path), so
// the backfill explicitly does NOT need to catch them.
//
// Errors here must propagate: silently swallowing them sets maxID=0,
// which makes the WHERE `id <= ?` match nothing and the loop returns
// "0 rows processed, success" without ever running — a real
// backfill failure would look identical to a clean DB and the
// migration would be marked done.
var maxID int64
if err := db.QueryRowContext(ctx, `SELECT COALESCE(MAX(id), 0) FROM transmissions`).Scan(&maxID); err != nil {
return 0, err
}
var total int64
if err := db.QueryRowContext(ctx, `SELECT COUNT(*) FROM transmissions WHERE last_seen = 0 AND id <= ?`, maxID).Scan(&total); err != nil {
return 0, err
}

start := time.Now()
var processed int64
var batchNum int
for {
if err := ctx.Err(); err != nil {
return processed, err
}
res, err := db.ExecContext(ctx, `
UPDATE transmissions
SET last_seen = COALESCE((
SELECT MAX(timestamp) FROM observations WHERE transmission_id = transmissions.id
), last_seen)
WHERE id IN (
SELECT id FROM transmissions WHERE last_seen = 0 AND id <= ? LIMIT ?
)`, maxID, batch)
if err != nil {
return processed, err
}
n, _ := res.RowsAffected()
batchNum++
processed += n
if opts.Progress != nil {
opts.Progress(TxLastSeenBackfillProgress{
RowsProcessed: processed,
RowsTotal: total,
BatchNum: batchNum,
ElapsedMs: time.Since(start).Milliseconds(),
})
}
if n == 0 {
return processed, nil
}
if yield > 0 {
// Use a timer so ctx cancellation interrupts the sleep.
select {
case <-ctx.Done():
return processed, ctx.Err()
case <-time.After(yield):
}
}
}
}
105 changes: 105 additions & 0 deletions cmd/ingestor/tx_last_seen_backfill_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Test for issue #1724 — the tx_last_seen backfill MUST chunk its
// UPDATE so SQLite readers can make forward progress while the
// backfill runs. The original PR #1691 implementation ran a single
// correlated UPDATE that pinned the writer 10-15 min on a prod-sized
// DB; this test asserts the chunked behavior (≤ batchSize rows per
// batch + multiple progress callbacks).

package main

import (
"context"
"testing"
"time"
)

// TestIssue1724_TxLastSeenBackfillIsChunked seeds 12k transmissions
// with last_seen=0 and runs runTxLastSeenBackfillChunked with a
// batchSize of 1000. It asserts:
//
// 1. The progress callback fires more than once (proving the loop
// batches, not single-shots).
// 2. Every per-batch RowsProcessed delta is ≤ batchSize+epsilon
// (proving each UPDATE is bounded, not full-table).
//
// Pre-fix (single full-table UPDATE) the callback fires exactly once
// with RowsProcessed=12000, failing both assertions on an assertion
// (not a build/import error).
func TestIssue1724_TxLastSeenBackfillIsChunked(t *testing.T) {
s := newTestStore(t)
ctx := context.Background()

// The OpenStore-scheduled tx_last_seen_backfill_v1 fires against the
// empty DB; wait for it to complete before seeding so the goroutine
// doesn't race our INSERTs and consume rows from under the manual
// backfill call below.
s.WaitForAsyncMigrations()

const seedN = 12000
const batchSize = 1000

// Seed transmissions with last_seen=0 and one matching observation
// each so the correlated MAX(timestamp) subquery returns a non-zero
// value (forces RowsAffected to be non-zero).
tx, err := s.db.Begin()
if err != nil {
t.Fatalf("begin: %v", err)
}
insTx, err := tx.Prepare(`INSERT INTO transmissions(raw_hex, hash, first_seen, last_seen) VALUES('00','h'||?, '2024-01-01T00:00:00Z', 0)`)
if err != nil {
t.Fatalf("prep tx: %v", err)
}
insObs, err := tx.Prepare(`INSERT INTO observations(transmission_id, observer_idx, timestamp) VALUES(?, 1, ?)`)
if err != nil {
t.Fatalf("prep obs: %v", err)
}
for i := 0; i < seedN; i++ {
res, err := insTx.Exec(i)
if err != nil {
t.Fatalf("seed tx %d: %v", i, err)
}
id, _ := res.LastInsertId()
if _, err := insObs.Exec(id, time.Now().Unix()+int64(i)); err != nil {
t.Fatalf("seed obs %d: %v", i, err)
}
}
insTx.Close()
insObs.Close()
if err := tx.Commit(); err != nil {
t.Fatalf("commit: %v", err)
}

var snapshots []TxLastSeenBackfillProgress
progress := func(p TxLastSeenBackfillProgress) {
snapshots = append(snapshots, p)
}

total, err := runTxLastSeenBackfillChunked(ctx, s.db, TxLastSeenBackfillOpts{
BatchSize: batchSize,
YieldDelay: time.Millisecond,
Progress: progress,
})
if err != nil {
t.Fatalf("backfill: %v", err)
}
if total != seedN {
t.Fatalf("total rows updated = %d, want %d", total, seedN)
}

// Invariant 1: the loop must batch.
if len(snapshots) < 2 {
t.Fatalf("progress callback fired %d times; want ≥ 2 (chunked loop should emit one per batch; pre-fix #1724 emits exactly 1 for the full-table UPDATE)",
len(snapshots))
}

// Invariant 2: per-batch delta must be bounded by batchSize.
var prev int64
for i, snap := range snapshots {
delta := snap.RowsProcessed - prev
if delta > int64(batchSize) {
t.Fatalf("snapshot[%d] delta=%d exceeds batchSize=%d; backfill is not chunking (pre-fix #1724 ran one full-table UPDATE)",
i, delta, batchSize)
}
prev = snap.RowsProcessed
}
}
Loading
Loading