From 5958de72c9b92fa4a71b702e806c80b7278d5646 Mon Sep 17 00:00:00 2001 From: rjseibane Date: Tue, 26 May 2026 23:59:51 +0200 Subject: [PATCH 1/3] fix(propagation): end SEEN_* rebroadcast starvation The propagation reaper rebroadcasts txs stuck in SEEN_ON_NETWORK / SEEN_MULTIPLE_NODES, but two defects let valid-but-stuck txs sit forever: - The per-tick cap was the hardcoded constant 200; the configurable ReaperBatchSize was read into a field reapOnce never used, so the knob was dead and the effective cap was always 200. - Candidates came from IterateStatusesSince, whose Postgres impl orders newest-first. Each saturated tick re-sent the newest 200 stale rows and never reached older ones, so a tx stuck a few hours was permanently starved (e.g. one broadcast with an unconfirmed ancestor that later confirmed and became minable, but was never rebroadcast). Fix: add a per-tx rebroadcast throttle. A new last_rebroadcast_at marker plus reaper_rebroadcast_interval_ms (default 1h) gate re-sends, and a new GetReapCandidates store query returns due SEEN_* rows oldest-rebroadcast- first. reapOnce now honors reaperBatchSize and stamps every attempted txid (on attempt, not just success) so capacity spreads across the whole backlog over one interval instead of re-sending the same rows every tick. No row is starved; recoverable txs are never auto-rejected. Adds the column + idempotent migration + partial index for Postgres and round-trips the field through Aerospike and Pebble. Covered by new reaper tests including a fairness-across-ticks assertion. --- config/config.go | 8 + models/transaction.go | 7 + services/api_server/handlers_test.go | 8 + services/propagation/propagator.go | 66 ++++---- services/propagation/propagator_test.go | 73 +++++++++ services/propagation/reaper.go | 85 +++++----- services/propagation/reaper_test.go | 207 ++++++++++++++++++++++++ services/webhook/service_test.go | 6 + store/aerospike/aerospike.go | 126 +++++++++++++++ store/pebble/pebble.go | 126 +++++++++++++++ store/postgres/postgres.go | 51 ++++++ store/postgres/schema.sql | 14 +- store/store.go | 18 +++ 13 files changed, 722 insertions(+), 73 deletions(-) create mode 100644 services/propagation/reaper_test.go diff --git a/config/config.go b/config/config.go index 2f68826..3e25759 100644 --- a/config/config.go +++ b/config/config.go @@ -298,6 +298,13 @@ type PropagationConfig struct { RetryBackoffMs int `mapstructure:"retry_backoff_ms"` ReaperIntervalMs int `mapstructure:"reaper_interval_ms"` ReaperBatchSize int `mapstructure:"reaper_batch_size"` + // ReaperRebroadcastIntervalMs is the minimum time between successive + // rebroadcasts of the same stuck tx. The reaper only re-sends a SEEN_* + // row whose last_rebroadcast_at is NULL or older than this window, so each + // tick's batch spreads across the whole backlog instead of re-sending the + // same rows every interval — this is what prevents head-of-line starvation + // when the backlog exceeds reaper_batch_size. Defaults to 1h. + ReaperRebroadcastIntervalMs int `mapstructure:"reaper_rebroadcast_interval_ms"` // LeaseTTLMs bounds how long the reaper lease remains valid without a // renewal. Set to at least 2–3× reaper_interval_ms so a missed tick // doesn't trigger a false-positive failover. Defaults to 3× interval. @@ -683,6 +690,7 @@ func setDefaults() { viper.SetDefault("propagation.retry_backoff_ms", 500) viper.SetDefault("propagation.reaper_interval_ms", 30000) viper.SetDefault("propagation.reaper_batch_size", 500) + viper.SetDefault("propagation.reaper_rebroadcast_interval_ms", 3600000) // 0 keeps New()'s 3×reaper_interval default, so changing reaper_interval // automatically moves the lease TTL unless the operator opts into a fixed value. viper.SetDefault("propagation.lease_ttl_ms", 0) diff --git a/models/transaction.go b/models/transaction.go index edb2c6b..f29d86e 100644 --- a/models/transaction.go +++ b/models/transaction.go @@ -68,6 +68,13 @@ type TransactionStatus struct { // added). The startup replay loop reads this to skip rows registered // within MerkleReplaySkipRecentMinutes — see issue #145. MerkleRegisteredAt time.Time `json:"merkleRegisteredAt,omitempty"` + // LastRebroadcastAt is the wall-clock time of the most recent reaper + // rebroadcast attempt for this txid. Zero value means "never rebroadcast" + // (or rebroadcast before this field was added). The reaper reads it to + // throttle re-sends to at most once per reaper_rebroadcast_interval_ms, so + // a saturated backlog can't starve older stuck rows by re-sending the same + // newest rows every tick. + LastRebroadcastAt time.Time `json:"lastRebroadcastAt,omitempty"` } // Status represents the various states a transaction can be in diff --git a/services/api_server/handlers_test.go b/services/api_server/handlers_test.go index ded427d..412a4db 100644 --- a/services/api_server/handlers_test.go +++ b/services/api_server/handlers_test.go @@ -156,6 +156,14 @@ func (m *mockStore) MarkMerkleRegisteredByTxIDs(context.Context, []string, time. return nil } +func (m *mockStore) GetReapCandidates(context.Context, time.Time, time.Time, time.Time, int) ([]*models.TransactionStatus, error) { + return nil, nil +} + +func (m *mockStore) MarkRebroadcastByTxIDs(context.Context, []string, time.Time) error { + return nil +} + func (m *mockStore) InsertSubmission(_ context.Context, sub *models.Submission) error { m.mu.Lock() defer m.mu.Unlock() diff --git a/services/propagation/propagator.go b/services/propagation/propagator.go index 81168b7..9fe4e35 100644 --- a/services/propagation/propagator.go +++ b/services/propagation/propagator.go @@ -68,11 +68,14 @@ type Propagator struct { merkleConcurrency int reaperInterval time.Duration reaperBatchSize int - teranodeBatchCap int - broadcastWorkers int - maxParallelChunks int - holderID string - leaseTTL time.Duration + // reaperRebroadcastInterval is the minimum gap between successive + // rebroadcasts of the same stuck tx. See ReaperRebroadcastIntervalMs. + reaperRebroadcastInterval time.Duration + teranodeBatchCap int + broadcastWorkers int + maxParallelChunks int + holderID string + leaseTTL time.Duration // broadcastJobs feeds the persistent worker pool that runs every // per-endpoint POST /txs call. Replaces the previous per-broadcast @@ -220,6 +223,10 @@ func New(cfg *config.Config, logger *zap.Logger, producer *kafka.Producer, publi if reaperBatch <= 0 { reaperBatch = 500 } + reaperRebroadcastInterval := time.Duration(cfg.Propagation.ReaperRebroadcastIntervalMs) * time.Millisecond + if reaperRebroadcastInterval <= 0 { + reaperRebroadcastInterval = time.Hour + } leaseTTL := time.Duration(cfg.Propagation.LeaseTTLMs) * time.Millisecond if leaseTTL <= 0 { leaseTTL = 3 * reaperInterval @@ -245,30 +252,31 @@ func New(cfg *config.Config, logger *zap.Logger, producer *kafka.Producer, publi maxParallelChunks = defaultMaxParallelChunks } p := &Propagator{ - cfg: cfg, - logger: logger.Named("propagation"), - producer: producer, - publisher: publisher, - store: st, - leaser: leaser, - teranodeClient: tc, - merkleClient: mc, - maxPending: maxPending, - merkleConcurrency: merkleConcurrency, - reaperInterval: reaperInterval, - reaperBatchSize: reaperBatch, - teranodeBatchCap: teranodeBatchCap, - broadcastWorkers: broadcastWorkers, - maxParallelChunks: maxParallelChunks, - holderID: newHolderID(), - leaseTTL: leaseTTL, - broadcastJobs: make(chan broadcastJob, broadcastJobBuffer), - processBatchSem: make(chan struct{}, maxConcurrentBatches), - admitCh: make(chan admitRequest, dispatcherChannelBuffer), - requeueCh: make(chan requeueRequest, dispatcherChannelBuffer), - terminalCh: make(chan terminalEvent, dispatcherChannelBuffer), - drainCh: make(chan drainRequest), - initDone: make(chan struct{}), + cfg: cfg, + logger: logger.Named("propagation"), + producer: producer, + publisher: publisher, + store: st, + leaser: leaser, + teranodeClient: tc, + merkleClient: mc, + maxPending: maxPending, + merkleConcurrency: merkleConcurrency, + reaperInterval: reaperInterval, + reaperBatchSize: reaperBatch, + reaperRebroadcastInterval: reaperRebroadcastInterval, + teranodeBatchCap: teranodeBatchCap, + broadcastWorkers: broadcastWorkers, + maxParallelChunks: maxParallelChunks, + holderID: newHolderID(), + leaseTTL: leaseTTL, + broadcastJobs: make(chan broadcastJob, broadcastJobBuffer), + processBatchSem: make(chan struct{}, maxConcurrentBatches), + admitCh: make(chan admitRequest, dispatcherChannelBuffer), + requeueCh: make(chan requeueRequest, dispatcherChannelBuffer), + terminalCh: make(chan terminalEvent, dispatcherChannelBuffer), + drainCh: make(chan drainRequest), + initDone: make(chan struct{}), } // Start a dispatcher goroutine with a nil claim so tests that // construct via New and drive via admitCh / drainCh have a running diff --git a/services/propagation/propagator_test.go b/services/propagation/propagator_test.go index 479dd7f..a8b16fc 100644 --- a/services/propagation/propagator_test.go +++ b/services/propagation/propagator_test.go @@ -7,6 +7,7 @@ import ( "fmt" "net/http" "net/http/httptest" + "sort" "strings" "sync" "sync/atomic" @@ -128,6 +129,11 @@ type mockStore struct { // markErr forces MarkMerkleRegisteredByTxIDs to return this error. // Used by tests that verify a mark failure doesn't block broadcast. markErr error + // rebroadcastMarks records every MarkRebroadcastByTxIDs call as one slice + // per call. Lets reaper tests assert which txids were stamped on attempt. + rebroadcastMarks [][]string + // reapErr forces GetReapCandidates to return this error. + reapErr error // returningPrev, when non-nil, overrides the synthetic previous-status // row BatchUpdateStatusReturning hands back for each input. The default // (RECEIVED prev) always reads as a real transition; this hook lets @@ -300,6 +306,73 @@ func (m *mockStore) IterateStatusesSince(_ context.Context, since time.Time, fn return nil } +// GetReapCandidates mirrors the Postgres query over replayRows: SEEN_* rows +// with a timestamp in [since, seenDeadline), a non-empty RawTx, and a +// last_rebroadcast_at that is zero or older than rebroadcastBefore, ordered +// oldest-rebroadcast-first (zero/never sorts first) and capped at limit. +func (m *mockStore) GetReapCandidates(_ context.Context, since, seenDeadline, rebroadcastBefore time.Time, limit int) ([]*models.TransactionStatus, error) { + m.mu.Lock() + defer m.mu.Unlock() + if m.reapErr != nil { + return nil, m.reapErr + } + if limit <= 0 { + return nil, nil + } + var out []*models.TransactionStatus + for _, r := range m.replayRows { + if r.Status != models.StatusSeenOnNetwork && r.Status != models.StatusSeenMultipleNodes { + continue + } + if r.Timestamp.Before(since) || !r.Timestamp.Before(seenDeadline) { + continue + } + if len(r.RawTx) == 0 { + continue + } + if !r.LastRebroadcastAt.IsZero() && !r.LastRebroadcastAt.Before(rebroadcastBefore) { + continue + } + out = append(out, &models.TransactionStatus{TxID: r.TxID, RawTx: r.RawTx, LastRebroadcastAt: r.LastRebroadcastAt}) + } + sort.SliceStable(out, func(i, j int) bool { + // zero (never rebroadcast) sorts first + zi, zj := out[i].LastRebroadcastAt.IsZero(), out[j].LastRebroadcastAt.IsZero() + if zi != zj { + return zi + } + return out[i].LastRebroadcastAt.Before(out[j].LastRebroadcastAt) + }) + if len(out) > limit { + out = out[:limit] + } + // Strip the marker from the returned rows — the reaper only consumes + // TxID + RawTx, matching the real backends. + for _, r := range out { + r.LastRebroadcastAt = time.Time{} + } + return out, nil +} + +// MarkRebroadcastByTxIDs records the call and stamps LastRebroadcastAt on the +// matching replayRows so a subsequent GetReapCandidates within the interval +// excludes them — mirroring the real backends. +func (m *mockStore) MarkRebroadcastByTxIDs(_ context.Context, txids []string, ts time.Time) error { + m.mu.Lock() + defer m.mu.Unlock() + m.rebroadcastMarks = append(m.rebroadcastMarks, append([]string(nil), txids...)) + marked := make(map[string]struct{}, len(txids)) + for _, t := range txids { + marked[t] = struct{}{} + } + for _, r := range m.replayRows { + if _, ok := marked[r.TxID]; ok { + r.LastRebroadcastAt = ts + } + } + return nil +} + func (m *mockStore) updateCount() int { m.mu.Lock() defer m.mu.Unlock() diff --git a/services/propagation/reaper.go b/services/propagation/reaper.go index ce92209..68a2ee9 100644 --- a/services/propagation/reaper.go +++ b/services/propagation/reaper.go @@ -19,14 +19,16 @@ import ( // may be needed, or a callback may have been dropped. Long enough that we // don't rebroadcast every tx that takes a few minutes to mine. // -// staleScanLookback bounds how far back IterateStatusesSince walks. Rows +// staleScanLookback bounds how far back the candidate scan walks. Rows // older than this are assumed permanently stuck and outside the reaper's // responsibility — the operator surfaces them with `arcade tools surface // stuck` if a deeper sweep is needed. +// +// The per-tick batch size is configurable (reaper_batch_size); the per-tx +// rebroadcast cadence is reaper_rebroadcast_interval_ms. const ( - staleSeenOnNetworkAge = time.Hour - staleScanLookback = 24 * time.Hour - reaperRebroadcastBatch = 200 + staleSeenOnNetworkAge = time.Hour + staleScanLookback = 24 * time.Hour ) // reaperLeaseName is the well-known key every replica uses to coordinate @@ -82,11 +84,18 @@ func (p *Propagator) tryReap(ctx context.Context) { p.reapOnce(ctx) } -// reapOnce rebroadcasts rows stuck at SEEN_ON_NETWORK past -// staleSeenOnNetworkAge (peer mempool eviction, dropped BLOCK_PROCESSED -// callback, fee bump needed). RECEIVED rows are intentionally not -// rebroadcast — the submitter got an error from intake on Kafka publish -// failure and owns the decision to retry. +// reapOnce rebroadcasts rows stuck at SEEN_ON_NETWORK / SEEN_MULTIPLE_NODES +// past staleSeenOnNetworkAge (peer mempool eviction, dropped BLOCK_PROCESSED +// callback, unconfirmed ancestor that has since confirmed). RECEIVED rows are +// intentionally not rebroadcast — the submitter got an error from intake on +// Kafka publish failure and owns the decision to retry. +// +// Candidates come from GetReapCandidates ordered oldest-rebroadcast-first, and +// each attempted txid is stamped via MarkRebroadcastByTxIDs so it isn't re-sent +// again for reaperRebroadcastInterval. That per-tx throttle is what keeps a +// backlog larger than reaperBatchSize from starving older rows: capacity +// spreads across the whole backlog over one interval instead of re-sending the +// same rows every tick. // // Rebroadcasts go through the same registerBatch + broadcastInChunks + // applyTerminalStatuses pipeline as processBatch but bypass the @@ -94,43 +103,27 @@ func (p *Propagator) tryReap(ctx context.Context) { // resulting terminal status notifies the dispatcher via applyTerminalStatuses // only as a no-op for offset bookkeeping. // -// Bounded by reaperRebroadcastBatch per tick so a backlog can't pin the -// reaper into a single multi-minute call. +// Bounded by reaperBatchSize per tick so a backlog can't pin the reaper into a +// single multi-minute call. func (p *Propagator) reapOnce(ctx context.Context) { now := time.Now() since := now.Add(-staleScanLookback) seenDeadline := now.Add(-staleSeenOnNetworkAge) + rebroadcastBefore := now.Add(-p.reaperRebroadcastInterval) - stuck := make([]propagationMsg, 0, reaperRebroadcastBatch) - err := p.store.IterateStatusesSince(ctx, since, func(st *models.TransactionStatus) error { - if len(stuck) >= reaperRebroadcastBatch { - return errReaperBatchFull - } - if len(st.RawTx) == 0 { - // No body to rebroadcast. Pre-reaper-population rows - // won't have it; just skip. - return nil + candidates, err := p.store.GetReapCandidates(ctx, since, seenDeadline, rebroadcastBefore, p.reaperBatchSize) + if err != nil { + if !errors.Is(err, context.Canceled) { + p.logger.Error("reaper: scan failed", zap.Error(err)) } - switch st.Status { - case models.StatusSeenOnNetwork, models.StatusSeenMultipleNodes: - if !st.Timestamp.Before(seenDeadline) { - return nil - } - default: - // RECEIVED rows are intentionally NOT rebroadcast — the - // submitter got an error from intake on Kafka publish - // failure and is responsible for deciding whether to retry. - // Terminal statuses, MINED, IMMUTABLE — not the reaper's job. - return nil - } - stuck = append(stuck, propagationMsg{TXID: st.TxID, RawTx: st.RawTx}) - return nil - }) - if err != nil && !errors.Is(err, errReaperBatchFull) && !errors.Is(err, context.Canceled) { - p.logger.Error("reaper: scan failed", zap.Error(err)) return } + stuck := make([]propagationMsg, 0, len(candidates)) + for _, st := range candidates { + stuck = append(stuck, propagationMsg{TXID: st.TxID, RawTx: st.RawTx}) + } + // Publish the post-scan depth on every tick BEFORE the early-return // so the gauge reflects "what the last reaper observed" — including // the queue-is-clear case. Setting it only on the non-empty branch @@ -155,11 +148,23 @@ func (p *Propagator) reapOnce(ctx context.Context) { return } rawTxs := make([][]byte, len(registered)) + attemptedTxids := make([]string, len(registered)) for i, m := range registered { rawTxs[i] = m.RawTx + attemptedTxids[i] = m.TXID } results := p.broadcastInChunks(ctx, registered, rawTxs) + // Stamp last_rebroadcast_at on every tx we attempted to broadcast (not + // just the ones that succeeded) so a perpetually-requeueing tx cedes its + // slot for one interval and can't re-monopolize the batch. Rows whose + // merkle /watch registration failed are intentionally NOT marked — they + // weren't broadcast, and a global merkle outage should not push every + // stuck tx past its rebroadcast window. + if err := p.store.MarkRebroadcastByTxIDs(ctx, attemptedTxids, now); err != nil { + p.logger.Warn("reaper: mark rebroadcast failed", zap.Error(err)) + } + var accepted, rejected int terminalStatuses := make([]*models.TransactionStatus, 0, len(results)) for _, res := range results { @@ -184,9 +189,3 @@ func (p *Propagator) reapOnce(ctx context.Context) { } p.applyTerminalStatuses(ctx, terminalStatuses, accepted, rejected) } - -// errReaperBatchFull halts the IterateStatusesSince walk once we've -// accumulated reaperRebroadcastBatch stuck rows. Sentinel error so -// IterateStatusesSince surfaces it cleanly without being mistaken for -// a backend error. -var errReaperBatchFull = errors.New("reaper batch full") diff --git a/services/propagation/reaper_test.go b/services/propagation/reaper_test.go new file mode 100644 index 0000000..93df8f2 --- /dev/null +++ b/services/propagation/reaper_test.go @@ -0,0 +1,207 @@ +package propagation + +import ( + "context" + "testing" + "time" + + "github.com/bsv-blockchain/arcade/models" +) + +// seenRow builds a SEEN_MULTIPLE_NODES row eligible for the reaper: a non-empty +// raw_tx and a timestamp older than staleSeenOnNetworkAge but within +// staleScanLookback. lastRebroadcast is the row's last_rebroadcast_at (zero == +// never rebroadcast). +func seenRow(txid string, age time.Duration, lastRebroadcast time.Time) *models.TransactionStatus { + return &models.TransactionStatus{ + TxID: txid, + Status: models.StatusSeenMultipleNodes, + RawTx: []byte{0xde, 0xad, 0xbe, 0xef}, + Timestamp: time.Now().Add(-age), + LastRebroadcastAt: lastRebroadcast, + } +} + +func (m *mockStore) allRebroadcastMarks() []string { + m.mu.Lock() + defer m.mu.Unlock() + var out []string + for _, call := range m.rebroadcastMarks { + out = append(out, call...) + } + return out +} + +// TestReapOnce_UsesConfiguredBatchSize proves the reaper honors +// p.reaperBatchSize. Before the fix it always used a hardcoded 200, so with 10 +// due rows and a batch of 4 we must see exactly 4 rebroadcast, not 10. +func TestReapOnce_UsesConfiguredBatchSize(t *testing.T) { + log := &eventLog{} + merkleSrv := newMerkleServer(log, 200) + defer merkleSrv.Close() + teranodeSrv := newTeranodeServer(log, 200) + defer teranodeSrv.Close() + + ms := newMockStore() + for i := 0; i < 10; i++ { + ms.replayRows = append(ms.replayRows, seenRow(txidN(i), 2*time.Hour, time.Time{})) + } + + p := newPropagator(merkleSrv.URL, teranodeSrv.URL, ms) + p.reaperBatchSize = 4 + p.reaperRebroadcastInterval = time.Hour + + p.reapOnce(context.Background()) + + if got := log.count("register:"); got != 4 { + t.Fatalf("expected exactly 4 rows rebroadcast (batch cap), got %d", got) + } + if got := len(ms.allRebroadcastMarks()); got != 4 { + t.Fatalf("expected 4 txids marked, got %d", got) + } +} + +// TestReapOnce_SkipsRecentlyRebroadcast confirms the per-tx interval throttle: +// a row rebroadcast within reaperRebroadcastInterval is excluded; never-sent +// and long-ago-sent rows are included. +func TestReapOnce_SkipsRecentlyRebroadcast(t *testing.T) { + log := &eventLog{} + merkleSrv := newMerkleServer(log, 200) + defer merkleSrv.Close() + teranodeSrv := newTeranodeServer(log, 200) + defer teranodeSrv.Close() + + ms := newMockStore() + ms.replayRows = []*models.TransactionStatus{ + seenRow("never", 2*time.Hour, time.Time{}), // due + seenRow("recent", 2*time.Hour, time.Now().Add(-30*time.Minute)), // throttled + seenRow("longago", 2*time.Hour, time.Now().Add(-2*time.Hour)), // due + } + + p := newPropagator(merkleSrv.URL, teranodeSrv.URL, ms) + p.reaperBatchSize = 10 + p.reaperRebroadcastInterval = time.Hour + + p.reapOnce(context.Background()) + + if got := log.count("register:"); got != 2 { + t.Fatalf("expected 2 due rows rebroadcast, got %d", got) + } + for _, txid := range ms.allRebroadcastMarks() { + if txid == "recent" { + t.Fatalf("recently-rebroadcast row should have been skipped, but it was marked") + } + } +} + +// TestReapOnce_MarksRebroadcastOnAttempt confirms attempted txids are stamped, +// so an immediate second tick (within the interval) rebroadcasts nothing. +func TestReapOnce_MarksRebroadcastOnAttempt(t *testing.T) { + log := &eventLog{} + merkleSrv := newMerkleServer(log, 200) + defer merkleSrv.Close() + teranodeSrv := newTeranodeServer(log, 200) + defer teranodeSrv.Close() + + ms := newMockStore() + ms.replayRows = []*models.TransactionStatus{ + seenRow("a", 2*time.Hour, time.Time{}), + seenRow("b", 2*time.Hour, time.Time{}), + } + + p := newPropagator(merkleSrv.URL, teranodeSrv.URL, ms) + p.reaperBatchSize = 10 + p.reaperRebroadcastInterval = time.Hour + + p.reapOnce(context.Background()) + if got := log.count("register:"); got != 2 { + t.Fatalf("first tick: expected 2 rebroadcast, got %d", got) + } + + p.reapOnce(context.Background()) + if got := log.count("register:"); got != 2 { + t.Fatalf("second tick: expected no new rebroadcasts (still 2 total), got %d", got) + } +} + +// TestReapOnce_FairnessAcrossTicks is the core anti-starvation assertion: with +// a backlog larger than the batch size, successive ticks rebroadcast every row +// exactly once over the interval — no row is starved, none is re-sent twice. +func TestReapOnce_FairnessAcrossTicks(t *testing.T) { + log := &eventLog{} + merkleSrv := newMerkleServer(log, 200) + defer merkleSrv.Close() + teranodeSrv := newTeranodeServer(log, 200) + defer teranodeSrv.Close() + + const backlog = 5 + ms := newMockStore() + for i := 0; i < backlog; i++ { + ms.replayRows = append(ms.replayRows, seenRow(txidN(i), 2*time.Hour, time.Time{})) + } + + p := newPropagator(merkleSrv.URL, teranodeSrv.URL, ms) + p.reaperBatchSize = 2 + p.reaperRebroadcastInterval = time.Hour + + // 3 ticks of batch 2 covers a backlog of 5 (2+2+1). + for i := 0; i < 3; i++ { + p.reapOnce(context.Background()) + } + + marks := ms.allRebroadcastMarks() + seen := make(map[string]int, backlog) + for _, txid := range marks { + seen[txid]++ + } + if len(seen) != backlog { + t.Fatalf("expected all %d rows rebroadcast within the interval, got %d distinct: %v", backlog, len(seen), seen) + } + for txid, n := range seen { + if n != 1 { + t.Fatalf("row %s rebroadcast %d times within one interval, want exactly 1", txid, n) + } + } +} + +// TestReapOnce_RecoverableNotRejected confirms a transient broadcast failure +// (requeue) leaves the SEEN_* row non-terminal — the reaper must never +// auto-reject a recoverable tx. The row is still marked on attempt so it cedes +// its slot for one interval. +func TestReapOnce_RecoverableNotRejected(t *testing.T) { + log := &eventLog{} + merkleSrv := newMerkleServer(log, 200) + defer merkleSrv.Close() + // 500 with no Teranode failure-list body → whole-batch requeue. + teranodeSrv := newTeranodeServer(log, 500) + defer teranodeSrv.Close() + + ms := newMockStore() + ms.replayRows = []*models.TransactionStatus{ + seenRow("recoverable", 2*time.Hour, time.Time{}), + } + + p := newPropagator(merkleSrv.URL, teranodeSrv.URL, ms) + p.reaperBatchSize = 10 + p.reaperRebroadcastInterval = time.Hour + + p.reapOnce(context.Background()) + + ms.mu.Lock() + for _, u := range ms.updates { + if u.TxID == "recoverable" && u.Status == models.StatusRejected { + ms.mu.Unlock() + t.Fatalf("recoverable tx was wrongly written REJECTED on a requeue") + } + } + ms.mu.Unlock() + + if got := len(ms.allRebroadcastMarks()); got != 1 { + t.Fatalf("expected the attempted tx to be marked even on requeue, got %d marks", got) + } +} + +func txidN(i int) string { + const hexd = "0123456789abcdef" + return "tx" + string(hexd[i%16]) + string(hexd[(i/16)%16]) +} diff --git a/services/webhook/service_test.go b/services/webhook/service_test.go index 31f1b5f..325f00c 100644 --- a/services/webhook/service_test.go +++ b/services/webhook/service_test.go @@ -108,6 +108,12 @@ func (s *fakeStore) SetMinedByTxIDs(context.Context, string, uint64, []string) ( func (s *fakeStore) MarkMerkleRegisteredByTxIDs(context.Context, []string, time.Time) error { return nil } +func (s *fakeStore) GetReapCandidates(context.Context, time.Time, time.Time, time.Time, int) ([]*models.TransactionStatus, error) { + return nil, nil +} +func (s *fakeStore) MarkRebroadcastByTxIDs(context.Context, []string, time.Time) error { + return nil +} func (s *fakeStore) InsertSubmission(context.Context, *models.Submission) error { return nil } func (s *fakeStore) GetSubmissionsByToken(context.Context, string) ([]*models.Submission, error) { return nil, nil diff --git a/store/aerospike/aerospike.go b/store/aerospike/aerospike.go index 4d1d8cc..6d5d47d 100644 --- a/store/aerospike/aerospike.go +++ b/store/aerospike/aerospike.go @@ -5,7 +5,9 @@ import ( "encoding/json" "errors" "fmt" + "math" "net" + "sort" "strconv" "strings" "time" @@ -391,6 +393,9 @@ func (s *Store) UpdateStatus(ctx context.Context, status *models.TransactionStat if !status.MerkleRegisteredAt.IsZero() { bins["merkle_reg_at"] = status.MerkleRegisteredAt.UnixMilli() } + if !status.LastRebroadcastAt.IsZero() { + bins["rebroadcast_at"] = status.LastRebroadcastAt.UnixMilli() + } // Enforce the status lattice: refuse to overwrite a terminal status with a // later, lower-priority update (e.g. a stray SEEN_ON_NETWORK callback after @@ -849,6 +854,122 @@ func (s *Store) MarkMerkleRegisteredByTxIDs(ctx context.Context, txids []string, return nil } +// GetReapCandidates scans the transactions set and returns up to limit SEEN_* +// rows the reaper should rebroadcast, oldest-unserved first. Aerospike has no +// secondary index for this predicate and the query API does not order results, +// so the filter and the oldest-first sort are done in-process over a full set +// scan. This is acceptable because production runs on Postgres; the Aerospike +// path exists for parity. See store.Store for the contract. +func (s *Store) GetReapCandidates(ctx context.Context, since, seenDeadline, rebroadcastBefore time.Time, limit int) ([]*models.TransactionStatus, error) { + if limit <= 0 { + return nil, nil + } + if err := ctx.Err(); err != nil { + return nil, err + } + stmt := aero.NewStatement(s.namespace, setTransactions) + rs, err := s.client.Query(s.queryPolicy(ctx), stmt) + if rs != nil { + defer func() { _ = rs.Close() }() + } + if err != nil { + return nil, fmt.Errorf("query reap candidates: %w", err) + } + + type candidate struct { + st *models.TransactionStatus + lastRebcastMs int64 // math.MinInt64 sentinel == never rebroadcast (sorts first) + } + var candidates []candidate + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case rec, ok := <-rs.Results(): + if !ok { + goto done + } + if rec.Err != nil { + return nil, rec.Err + } + txid := getString(rec.Record, "txid") + st := recordToStatus(rec.Record, txid) + if st.Status != models.StatusSeenOnNetwork && st.Status != models.StatusSeenMultipleNodes { + continue + } + if st.Timestamp.Before(since) || !st.Timestamp.Before(seenDeadline) { + continue + } + rawTx, _ := rec.Record.Bins["raw_tx"].([]byte) + if len(rawTx) == 0 { + continue + } + if !st.LastRebroadcastAt.IsZero() && !st.LastRebroadcastAt.Before(rebroadcastBefore) { + continue + } + lastMs := int64(math.MinInt64) + if !st.LastRebroadcastAt.IsZero() { + lastMs = st.LastRebroadcastAt.UnixMilli() + } + candidates = append(candidates, candidate{ + st: &models.TransactionStatus{TxID: txid, RawTx: rawTx}, + lastRebcastMs: lastMs, + }) + } + } +done: + sort.Slice(candidates, func(i, j int) bool { + return candidates[i].lastRebcastMs < candidates[j].lastRebcastMs + }) + if len(candidates) > limit { + candidates = candidates[:limit] + } + results := make([]*models.TransactionStatus, len(candidates)) + for i, c := range candidates { + results[i] = c.st + } + return results, nil +} + +// MarkRebroadcastByTxIDs writes rebroadcast_at = ts.UnixMilli() on every +// existing transaction record in the txid list. Unknown txids are silently +// skipped via UPDATE_ONLY, mirroring MarkMerkleRegisteredByTxIDs. +func (s *Store) MarkRebroadcastByTxIDs(ctx context.Context, txids []string, ts time.Time) error { + if len(txids) == 0 { + return nil + } + bwp := aero.NewBatchWritePolicy() + bwp.RecordExistsAction = aero.UPDATE_ONLY + + for i := 0; i < len(txids); i += s.batchSize { + if err := ctx.Err(); err != nil { + return err + } + end := i + s.batchSize + if end > len(txids) { + end = len(txids) + } + batch := txids[i:end] + + records := make([]aero.BatchRecordIfc, len(batch)) + for j, txid := range batch { + key, err := s.key(setTransactions, txid) + if err != nil { + continue + } + records[j] = aero.NewBatchWrite( + bwp, key, + aero.PutOp(aero.NewBin("rebroadcast_at", ts.UnixMilli())), + ) + } + + if err := s.client.BatchOperate(s.batchPolicy(ctx), records); err != nil { + return fmt.Errorf("batch mark rebroadcast: %w", err) + } + } + return nil +} + // --- BUMP Operations --- // // BUMPs are stored as a manifest record at primary key plus N @@ -2068,6 +2189,11 @@ func recordToStatus(rec *aero.Record, txid string) *models.TransactionStatus { status.MerkleRegisteredAt = time.UnixMilli(int64(ms)) } } + if v, ok := rec.Bins["rebroadcast_at"]; ok { + if ms, ok := v.(int); ok { + status.LastRebroadcastAt = time.UnixMilli(int64(ms)) + } + } return status } diff --git a/store/pebble/pebble.go b/store/pebble/pebble.go index 35f64a5..4f87cc9 100644 --- a/store/pebble/pebble.go +++ b/store/pebble/pebble.go @@ -72,6 +72,7 @@ type storedStatus struct { CreatedUnixNs int64 `json:"created_at,omitempty"` NextRetryUnixNs int64 `json:"next_retry_at,omitempty"` MerkleRegisteredUnixNs int64 `json:"merkle_registered_at,omitempty"` + LastRebroadcastUnixNs int64 `json:"last_rebroadcast_at,omitempty"` } func (s storedStatus) toModel() *models.TransactionStatus { @@ -99,6 +100,9 @@ func (s storedStatus) toModel() *models.TransactionStatus { if s.MerkleRegisteredUnixNs != 0 { out.MerkleRegisteredAt = time.Unix(0, s.MerkleRegisteredUnixNs) } + if s.LastRebroadcastUnixNs != 0 { + out.LastRebroadcastAt = time.Unix(0, s.LastRebroadcastUnixNs) + } return out } @@ -127,6 +131,9 @@ func fromModel(m *models.TransactionStatus) storedStatus { if !m.MerkleRegisteredAt.IsZero() { out.MerkleRegisteredUnixNs = m.MerkleRegisteredAt.UnixNano() } + if !m.LastRebroadcastAt.IsZero() { + out.LastRebroadcastUnixNs = m.LastRebroadcastAt.UnixNano() + } return out } @@ -492,6 +499,9 @@ func mergeStatus(existing *storedStatus, update *models.TransactionStatus) store if !update.MerkleRegisteredAt.IsZero() { out.MerkleRegisteredUnixNs = update.MerkleRegisteredAt.UnixNano() } + if !update.LastRebroadcastAt.IsZero() { + out.LastRebroadcastUnixNs = update.LastRebroadcastAt.UnixNano() + } return out } @@ -963,6 +973,122 @@ func (s *Store) MarkMerkleRegisteredByTxIDs(ctx context.Context, txids []string, return nil } +// GetReapCandidates walks the updated-at index (oldest-first) and returns up to +// limit SEEN_* rows the reaper should rebroadcast, ordered oldest-unserved +// first. The updated-at index is timestamp-ordered, not last_rebroadcast_at- +// ordered, so the due-filter and the final oldest-rebroadcast-first sort are +// applied in-process over the filtered set. See store.Store for the contract. +func (s *Store) GetReapCandidates(ctx context.Context, since, seenDeadline, rebroadcastBefore time.Time, limit int) ([]*models.TransactionStatus, error) { + if limit <= 0 { + return nil, nil + } + if err := ctx.Err(); err != nil { + return nil, err + } + prefix := idxTxUpdatedPrefix() + iter, err := s.db.NewIter(&pebbledb.IterOptions{ + LowerBound: prefix, + UpperBound: endOfPrefix(prefix), + }) + if err != nil { + return nil, err + } + defer func() { _ = iter.Close() }() + + sinceNs := since.UnixNano() + deadlineNs := seenDeadline.UnixNano() + type candidate struct { + st *models.TransactionStatus + lastRebcastNs int64 // 0 == never rebroadcast (sorts first) + } + var candidates []candidate + for iter.First(); iter.Valid(); iter.Next() { + if err := ctx.Err(); err != nil { + return nil, err + } + txid := lastSegment(iter.Key()) + st, err := s.readStatus(txid) + if err != nil || st == nil { + continue + } + if st.Status != models.StatusSeenOnNetwork && st.Status != models.StatusSeenMultipleNodes { + continue + } + tsNs := st.Timestamp.UnixNano() + if tsNs < sinceNs || tsNs >= deadlineNs { + continue + } + if len(st.RawTx) == 0 { + continue + } + if !st.LastRebroadcastAt.IsZero() && !st.LastRebroadcastAt.Before(rebroadcastBefore) { + continue + } + var lastNs int64 + if !st.LastRebroadcastAt.IsZero() { + lastNs = st.LastRebroadcastAt.UnixNano() + } + candidates = append(candidates, candidate{ + st: &models.TransactionStatus{TxID: st.TxID, RawTx: st.RawTx}, + lastRebcastNs: lastNs, + }) + } + sort.Slice(candidates, func(i, j int) bool { + return candidates[i].lastRebcastNs < candidates[j].lastRebcastNs + }) + if len(candidates) > limit { + candidates = candidates[:limit] + } + results := make([]*models.TransactionStatus, len(candidates)) + for i, c := range candidates { + results[i] = c.st + } + return results, nil +} + +// MarkRebroadcastByTxIDs stamps last_rebroadcast_at on every existing row in +// the txid list. Unknown txids are silently no-ops, mirroring +// MarkMerkleRegisteredByTxIDs (last_rebroadcast_at is not indexed, so no index +// churn). +func (s *Store) MarkRebroadcastByTxIDs(ctx context.Context, txids []string, ts time.Time) error { + if err := ctx.Err(); err != nil { + return err + } + tsNs := ts.UnixNano() + for _, txid := range txids { + if err := ctx.Err(); err != nil { + return err + } + mu := s.shardFor(txid) + mu.Lock() + + existing, err := s.readStoredStatus(txid) + if err != nil { + mu.Unlock() + return err + } + if existing == nil { + mu.Unlock() + continue + } + + updated := *existing + updated.LastRebroadcastUnixNs = tsNs + + payload, err := json.Marshal(updated) + if err != nil { + mu.Unlock() + return err + } + err = s.db.Set(txKey(txid), payload, s.writeOpts) + mu.Unlock() + if err != nil { + return err + } + } + return nil +} + // --- BUMP / STUMP --- func (s *Store) InsertBUMP(ctx context.Context, blockHash string, blockHeight uint64, bumpData []byte) error { diff --git a/store/postgres/postgres.go b/store/postgres/postgres.go index 2b3095b..b7ac746 100644 --- a/store/postgres/postgres.go +++ b/store/postgres/postgres.go @@ -822,6 +822,57 @@ func (s *Store) MarkMerkleRegisteredByTxIDs(ctx context.Context, txids []string, return nil } +// GetReapCandidates returns the SEEN_* rows the reaper should rebroadcast, +// oldest-unserved first. See store.Store for the full contract. Only txid and +// raw_tx are selected — the reaper needs nothing else, and ordering by +// last_rebroadcast_at NULLS FIRST is what spreads each tick's bounded batch +// across the whole backlog instead of re-sending the same rows every tick. +func (s *Store) GetReapCandidates(ctx context.Context, since, seenDeadline, rebroadcastBefore time.Time, limit int) ([]*models.TransactionStatus, error) { + if limit <= 0 { + return nil, nil + } + const q = ` +SELECT txid, raw_tx FROM transactions +WHERE status IN ('SEEN_ON_NETWORK', 'SEEN_MULTIPLE_NODES') + AND timestamp_at >= $1 AND timestamp_at < $2 + AND raw_tx IS NOT NULL + AND (last_rebroadcast_at IS NULL OR last_rebroadcast_at < $3) +ORDER BY last_rebroadcast_at ASC NULLS FIRST +LIMIT $4` + rows, err := s.pool.Query(ctx, q, since, seenDeadline, rebroadcastBefore, limit) + if err != nil { + return nil, err + } + defer rows.Close() + + var results []*models.TransactionStatus + for rows.Next() { + var ( + txid string + rawTx []byte + ) + if err := rows.Scan(&txid, &rawTx); err != nil { + return results, err + } + results = append(results, &models.TransactionStatus{TxID: txid, RawTx: rawTx}) + } + return results, rows.Err() +} + +// MarkRebroadcastByTxIDs stamps last_rebroadcast_at = $1 on every existing row +// whose txid is in $2. Unknown txids are silently no-ops, mirroring +// MarkMerkleRegisteredByTxIDs. +func (s *Store) MarkRebroadcastByTxIDs(ctx context.Context, txids []string, ts time.Time) error { + if len(txids) == 0 { + return nil + } + const q = `UPDATE transactions SET last_rebroadcast_at = $1 WHERE txid = ANY($2)` + if _, err := s.pool.Exec(ctx, q, ts, txids); err != nil { + return fmt.Errorf("mark rebroadcast: %w", err) + } + return nil +} + // --- BUMP / STUMP --- func (s *Store) InsertBUMP(ctx context.Context, blockHash string, blockHeight uint64, bumpData []byte) error { diff --git a/store/postgres/schema.sql b/store/postgres/schema.sql index 451c17c..530a4ac 100644 --- a/store/postgres/schema.sql +++ b/store/postgres/schema.sql @@ -15,7 +15,8 @@ CREATE TABLE IF NOT EXISTS transactions ( next_retry_at TIMESTAMPTZ, timestamp_at TIMESTAMPTZ NOT NULL, created_at TIMESTAMPTZ NOT NULL, - merkle_registered_at TIMESTAMPTZ + merkle_registered_at TIMESTAMPTZ, + last_rebroadcast_at TIMESTAMPTZ ); -- Idempotent column add for stores created before merkle_registered_at was @@ -23,6 +24,11 @@ CREATE TABLE IF NOT EXISTS transactions ( -- repopulates the marker — see issue #145. ALTER TABLE transactions ADD COLUMN IF NOT EXISTS merkle_registered_at TIMESTAMPTZ; +-- Idempotent column add for stores created before last_rebroadcast_at was +-- introduced. NULL means "never rebroadcast", so existing stuck rows become +-- immediately eligible for the reaper's first rebroadcast. +ALTER TABLE transactions ADD COLUMN IF NOT EXISTS last_rebroadcast_at TIMESTAMPTZ; + CREATE INDEX IF NOT EXISTS idx_tx_status ON transactions(status); CREATE INDEX IF NOT EXISTS idx_tx_block_hash ON transactions(block_hash); CREATE INDEX IF NOT EXISTS idx_tx_updated ON transactions(timestamp_at); @@ -32,6 +38,12 @@ CREATE INDEX IF NOT EXISTS idx_tx_updated ON transactions(timestamp_at); CREATE INDEX IF NOT EXISTS idx_tx_retry_ready ON transactions(next_retry_at) WHERE status = 'PENDING_RETRY'; +-- Partial index for the reaper's rebroadcast-candidate scan: only rows in a +-- non-terminal SEEN_* state are eligible, and ordering by last_rebroadcast_at +-- (NULLs first) drives the oldest-unserved-first fairness query. +CREATE INDEX IF NOT EXISTS idx_tx_reap_due + ON transactions(last_rebroadcast_at NULLS FIRST) + WHERE status IN ('SEEN_ON_NETWORK', 'SEEN_MULTIPLE_NODES'); CREATE TABLE IF NOT EXISTS bumps ( block_hash TEXT PRIMARY KEY, diff --git a/store/store.go b/store/store.go index 667723c..8ab1697 100644 --- a/store/store.go +++ b/store/store.go @@ -249,6 +249,24 @@ type Store interface { // issue #145. MarkMerkleRegisteredByTxIDs(ctx context.Context, txids []string, ts time.Time) error + // GetReapCandidates returns up to limit transactions the reaper should + // rebroadcast: rows in a non-terminal SEEN_* state (SEEN_ON_NETWORK or + // SEEN_MULTIPLE_NODES) whose timestamp is in [since, seenDeadline), that + // carry a non-empty raw_tx, and whose last_rebroadcast_at is either unset + // or older than rebroadcastBefore. Rows are ordered by last_rebroadcast_at + // ascending (NULLs first) so the longest-unserved txs are returned first — + // this is the fairness guarantee that prevents head-of-line starvation + // under a backlog larger than limit. Each returned row carries at least + // TxID and RawTx so the reaper can rebroadcast without a second read. + GetReapCandidates(ctx context.Context, since, seenDeadline, rebroadcastBefore time.Time, limit int) ([]*models.TransactionStatus, error) + + // MarkRebroadcastByTxIDs stamps last_rebroadcast_at = ts for the given + // txids. Unknown txids are silently skipped (matching + // MarkMerkleRegisteredByTxIDs semantics). The reaper calls this for every + // txid it attempted to rebroadcast — on attempt, not just success — so a + // perpetually-requeueing tx cedes its slot for one rebroadcast interval. + MarkRebroadcastByTxIDs(ctx context.Context, txids []string, ts time.Time) error + // EnsureIndexes creates any required secondary indexes for query operations. EnsureIndexes() error From a36cd5b725742356be9ede5a91e5446ba282b0c7 Mon Sep 17 00:00:00 2001 From: rjseibane Date: Wed, 27 May 2026 00:38:18 +0200 Subject: [PATCH 2/3] fix(propagation): address reaper PR review feedback Follow-up to the reaper fairness fix, addressing review of #175: - Registration-failure starvation (#1): the reaper now captures the `failed` set from registerBatch (previously discarded) and stamps it on partial failures, so a tx that keeps failing merkle /watch can't stay last_rebroadcast_at=NULL, sort first, and re-fill the front of every batch. A full outage (nothing registered) still stamps nothing so it isn't penalized. - Outcome-class backoff (#2): accepted-but-unmined rows stay SEEN_* (the lattice forbids the SEEN->ACCEPTED downgrade) and only need the full-interval refresh; transient failures (Teranode requeue/unknown, plus registration failures) retry after a shorter reaper_requeue_backoff_ms (default 1m, clamped <= rebroadcast interval), applied by backdating the stamp. Restores fast recovery from transient blips instead of waiting the full 1h. - raw_tx contract (#3): Postgres GetReapCandidates now filters length(raw_tx) > 0, matching the len()==0 check in the other backends. - Index note (#4): document that idx_tx_reap_due's range/raw_tx predicates are inline rechecks; flag for future tuning. No code change. - Sentinel alignment (#5): Pebble's in-Go never-rebroadcast sort sentinel now matches Aerospike (math.MinInt64). Adds TestReapOnce_PartialRegistrationFailureMarksFailedSubset and TestReapOnce_RequeueUsesShortBackoff. --- config/config.go | 9 ++ services/propagation/propagator.go | 25 ++++- services/propagation/propagator_test.go | 5 + services/propagation/reaper.go | 71 +++++++++---- services/propagation/reaper_test.go | 129 ++++++++++++++++++++++++ store/pebble/pebble.go | 5 +- store/postgres/postgres.go | 2 +- store/postgres/schema.sql | 4 + 8 files changed, 225 insertions(+), 25 deletions(-) diff --git a/config/config.go b/config/config.go index 3e25759..5decad2 100644 --- a/config/config.go +++ b/config/config.go @@ -305,6 +305,14 @@ type PropagationConfig struct { // same rows every interval — this is what prevents head-of-line starvation // when the backlog exceeds reaper_batch_size. Defaults to 1h. ReaperRebroadcastIntervalMs int `mapstructure:"reaper_rebroadcast_interval_ms"` + // ReaperRequeueBackoffMs is the (shorter) delay before retrying a stuck tx + // whose rebroadcast hit a TRANSIENT failure — a Teranode requeue verdict or + // a failed merkle /watch registration. These rows stay SEEN_* and would + // otherwise wait the full ReaperRebroadcastIntervalMs; that is too slow for + // a transient blip, so they get this shorter backoff instead. Capped at + // ReaperRebroadcastIntervalMs (a requeue backoff longer than the full + // interval is meaningless). Defaults to 60000 (1 min). + ReaperRequeueBackoffMs int `mapstructure:"reaper_requeue_backoff_ms"` // LeaseTTLMs bounds how long the reaper lease remains valid without a // renewal. Set to at least 2–3× reaper_interval_ms so a missed tick // doesn't trigger a false-positive failover. Defaults to 3× interval. @@ -691,6 +699,7 @@ func setDefaults() { viper.SetDefault("propagation.reaper_interval_ms", 30000) viper.SetDefault("propagation.reaper_batch_size", 500) viper.SetDefault("propagation.reaper_rebroadcast_interval_ms", 3600000) + viper.SetDefault("propagation.reaper_requeue_backoff_ms", 60000) // 0 keeps New()'s 3×reaper_interval default, so changing reaper_interval // automatically moves the lease TTL unless the operator opts into a fixed value. viper.SetDefault("propagation.lease_ttl_ms", 0) diff --git a/services/propagation/propagator.go b/services/propagation/propagator.go index 9fe4e35..f89db81 100644 --- a/services/propagation/propagator.go +++ b/services/propagation/propagator.go @@ -71,11 +71,15 @@ type Propagator struct { // reaperRebroadcastInterval is the minimum gap between successive // rebroadcasts of the same stuck tx. See ReaperRebroadcastIntervalMs. reaperRebroadcastInterval time.Duration - teranodeBatchCap int - broadcastWorkers int - maxParallelChunks int - holderID string - leaseTTL time.Duration + // reaperRequeueBackoff is the shorter retry delay for stuck txs whose + // rebroadcast hit a transient failure (Teranode requeue / failed merkle + // registration). See ReaperRequeueBackoffMs. + reaperRequeueBackoff time.Duration + teranodeBatchCap int + broadcastWorkers int + maxParallelChunks int + holderID string + leaseTTL time.Duration // broadcastJobs feeds the persistent worker pool that runs every // per-endpoint POST /txs call. Replaces the previous per-broadcast @@ -227,6 +231,16 @@ func New(cfg *config.Config, logger *zap.Logger, producer *kafka.Producer, publi if reaperRebroadcastInterval <= 0 { reaperRebroadcastInterval = time.Hour } + reaperRequeueBackoff := time.Duration(cfg.Propagation.ReaperRequeueBackoffMs) * time.Millisecond + if reaperRequeueBackoff <= 0 { + reaperRequeueBackoff = time.Minute + } + // A requeue backoff longer than the full rebroadcast interval is + // meaningless — clamp so the transient-failure path never waits longer + // than the steady-state refresh. + if reaperRequeueBackoff > reaperRebroadcastInterval { + reaperRequeueBackoff = reaperRebroadcastInterval + } leaseTTL := time.Duration(cfg.Propagation.LeaseTTLMs) * time.Millisecond if leaseTTL <= 0 { leaseTTL = 3 * reaperInterval @@ -265,6 +279,7 @@ func New(cfg *config.Config, logger *zap.Logger, producer *kafka.Producer, publi reaperInterval: reaperInterval, reaperBatchSize: reaperBatch, reaperRebroadcastInterval: reaperRebroadcastInterval, + reaperRequeueBackoff: reaperRequeueBackoff, teranodeBatchCap: teranodeBatchCap, broadcastWorkers: broadcastWorkers, maxParallelChunks: maxParallelChunks, diff --git a/services/propagation/propagator_test.go b/services/propagation/propagator_test.go index a8b16fc..059356f 100644 --- a/services/propagation/propagator_test.go +++ b/services/propagation/propagator_test.go @@ -132,6 +132,10 @@ type mockStore struct { // rebroadcastMarks records every MarkRebroadcastByTxIDs call as one slice // per call. Lets reaper tests assert which txids were stamped on attempt. rebroadcastMarks [][]string + // rebroadcastMarkTs records the ts arg of each MarkRebroadcastByTxIDs call, + // index-aligned with rebroadcastMarks. Lets tests distinguish the + // full-interval stamp (≈now) from the backdated short-backoff stamp. + rebroadcastMarkTs []time.Time // reapErr forces GetReapCandidates to return this error. reapErr error // returningPrev, when non-nil, overrides the synthetic previous-status @@ -361,6 +365,7 @@ func (m *mockStore) MarkRebroadcastByTxIDs(_ context.Context, txids []string, ts m.mu.Lock() defer m.mu.Unlock() m.rebroadcastMarks = append(m.rebroadcastMarks, append([]string(nil), txids...)) + m.rebroadcastMarkTs = append(m.rebroadcastMarkTs, ts) marked := make(map[string]struct{}, len(txids)) for _, t := range txids { marked[t] = struct{}{} diff --git a/services/propagation/reaper.go b/services/propagation/reaper.go index 68a2ee9..a8f2970 100644 --- a/services/propagation/reaper.go +++ b/services/propagation/reaper.go @@ -143,34 +143,51 @@ func (p *Propagator) reapOnce(ctx context.Context) { // dispatcher — txids the dispatcher doesn't know about (because the // original Kafka message terminated long ago) get a no-op notify, // which is fine. - registered, _ := p.registerBatch(ctx, stuck) + registered, failed := p.registerBatch(ctx, stuck) if len(registered) == 0 { + // Nothing registered: either a global merkle /watch outage or this + // whole batch is persistently failing registration. Don't broadcast + // and don't stamp — a global outage must not push every stuck tx past + // its rebroadcast window, and a fully-failing batch has no healthy + // rows behind it to starve. The next tick retries the same set. return } rawTxs := make([][]byte, len(registered)) - attemptedTxids := make([]string, len(registered)) for i, m := range registered { rawTxs[i] = m.RawTx - attemptedTxids[i] = m.TXID } results := p.broadcastInChunks(ctx, registered, rawTxs) - // Stamp last_rebroadcast_at on every tx we attempted to broadcast (not - // just the ones that succeeded) so a perpetually-requeueing tx cedes its - // slot for one interval and can't re-monopolize the batch. Rows whose - // merkle /watch registration failed are intentionally NOT marked — they - // weren't broadcast, and a global merkle outage should not push every - // stuck tx past its rebroadcast window. - if err := p.store.MarkRebroadcastByTxIDs(ctx, attemptedTxids, now); err != nil { - p.logger.Warn("reaper: mark rebroadcast failed", zap.Error(err)) + // Stamp last_rebroadcast_at so each attempted tx cedes its slot, at one of + // two cadences by outcome class: + // - accepted rows stay SEEN_* (the lattice forbids the SEEN→ACCEPTED + // downgrade, so applyTerminalStatuses is a no-op for them) and only + // need a periodic refresh → full reaperRebroadcastInterval. + // - transient failures — a Teranode requeue/unknown verdict, plus rows + // whose merkle /watch registration failed (the `failed` set) — should + // retry soon → shorter reaperRequeueBackoff, applied by backdating the + // stamp so the row becomes due again after that backoff instead of the + // full interval. Stamping the failed subset is what stops a + // persistently registration-failing tx (last_rebroadcast_at == NULL) + // from sorting first and re-filling the front of every batch. + // rejected rows become terminal and leave the candidate set — no stamp. + fullIntervalTxids := make([]string, 0, len(registered)) + shortBackoffTxids := make([]string, 0, len(registered)+len(failed)) + for _, m := range failed { + shortBackoffTxids = append(shortBackoffTxids, m.TXID) } var accepted, rejected int terminalStatuses := make([]*models.TransactionStatus, 0, len(results)) - for _, res := range results { + for i, res := range results { + // results is index-aligned to registered (broadcastInChunks writes + // per-index), so registered[i] is this result's tx — needed because a + // requeue result carries no txid of its own. + txid := registered[i].TXID switch res.class { case txResultClassAccepted: accepted++ + fullIntervalTxids = append(fullIntervalTxids, txid) if res.status != nil { terminalStatuses = append(terminalStatuses, res.status) } @@ -180,12 +197,32 @@ func (p *Propagator) reapOnce(ctx context.Context) { terminalStatuses = append(terminalStatuses, res.status) } case txResultClassUnknown, txResultClassRequeue: - // Requeue / Unknown from the reaper's rebroadcast path: - // leave the row alone so the next reaper tick picks it up. - // The reaper bypasses the dispatcher, so there's no inFlight - // entry to requeue against — natural retry is just the next - // tick. + // Transient: leave the row SEEN_* and retry after the short + // backoff. The reaper bypasses the dispatcher, so there's no + // inFlight entry to requeue against — the next due tick is the + // retry. + shortBackoffTxids = append(shortBackoffTxids, txid) } } + + p.markRebroadcast(ctx, fullIntervalTxids, now) + if len(shortBackoffTxids) > 0 { + // Backdate so eligibility (last_rebroadcast_at < now-interval) flips + // true after reaperRequeueBackoff rather than the full interval. + p.markRebroadcast(ctx, shortBackoffTxids, now.Add(-(p.reaperRebroadcastInterval - p.reaperRequeueBackoff))) + } + p.applyTerminalStatuses(ctx, terminalStatuses, accepted, rejected) } + +// markRebroadcast stamps last_rebroadcast_at = ts on the given txids, logging +// (but not failing) on a store error — a missed stamp only costs a redundant +// rebroadcast next tick, not correctness. +func (p *Propagator) markRebroadcast(ctx context.Context, txids []string, ts time.Time) { + if len(txids) == 0 { + return + } + if err := p.store.MarkRebroadcastByTxIDs(ctx, txids, ts); err != nil { + p.logger.Warn("reaper: mark rebroadcast failed", zap.Error(err)) + } +} diff --git a/services/propagation/reaper_test.go b/services/propagation/reaper_test.go index 93df8f2..2049ec5 100644 --- a/services/propagation/reaper_test.go +++ b/services/propagation/reaper_test.go @@ -2,12 +2,47 @@ package propagation import ( "context" + "encoding/json" + "net/http" + "net/http/httptest" "testing" "time" "github.com/bsv-blockchain/arcade/models" ) +// newSelectiveMerkleServer returns a /watch stub that fails (400) registration +// for txids in `fail` and accepts (200) all others — used to drive partial +// merkle-registration failures. +func newSelectiveMerkleServer(fail map[string]bool) *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var req struct { + TxID string `json:"txid"` + } + _ = json.NewDecoder(r.Body).Decode(&req) + if fail[req.TxID] { + w.WriteHeader(http.StatusBadRequest) + return + } + w.WriteHeader(http.StatusOK) + })) +} + +// markTsFor returns the ts of the most recent MarkRebroadcastByTxIDs call that +// included txid. +func (m *mockStore) markTsFor(txid string) (time.Time, bool) { + m.mu.Lock() + defer m.mu.Unlock() + for i := len(m.rebroadcastMarks) - 1; i >= 0; i-- { + for _, t := range m.rebroadcastMarks[i] { + if t == txid { + return m.rebroadcastMarkTs[i], true + } + } + } + return time.Time{}, false +} + // seenRow builds a SEEN_MULTIPLE_NODES row eligible for the reaper: a non-empty // raw_tx and a timestamp older than staleSeenOnNetworkAge but within // staleScanLookback. lastRebroadcast is the row's last_rebroadcast_at (zero == @@ -201,6 +236,100 @@ func TestReapOnce_RecoverableNotRejected(t *testing.T) { } } +// TestReapOnce_PartialRegistrationFailureMarksFailedSubset covers review #1: +// a tx that fails merkle /watch must still be stamped so it can't keep sorting +// first (NULL last_rebroadcast_at) and re-fill the front of every batch — but a +// FULL outage (nothing registered) marks nothing so it isn't penalized. +func TestReapOnce_PartialRegistrationFailureMarksFailedSubset(t *testing.T) { + t.Run("partial failure marks the failed subset", func(t *testing.T) { + merkleSrv := newSelectiveMerkleServer(map[string]bool{"poison": true}) + defer merkleSrv.Close() + log := &eventLog{} + teranodeSrv := newTeranodeServer(log, 200) + defer teranodeSrv.Close() + + ms := newMockStore() + ms.replayRows = []*models.TransactionStatus{ + seenRow("ok1", 2*time.Hour, time.Time{}), + seenRow("poison", 2*time.Hour, time.Time{}), + seenRow("ok2", 2*time.Hour, time.Time{}), + } + + p := newPropagator(merkleSrv.URL, teranodeSrv.URL, ms) + p.reaperBatchSize = 10 + p.reaperRebroadcastInterval = time.Hour + p.reaperRequeueBackoff = time.Minute + + p.reapOnce(context.Background()) + + if _, ok := ms.markTsFor("poison"); !ok { + t.Fatalf("registration-failed tx 'poison' was not stamped; it would starve the front of the batch") + } + if _, ok := ms.markTsFor("ok1"); !ok { + t.Fatalf("successfully broadcast tx 'ok1' was not stamped") + } + }) + + t.Run("full outage marks nothing", func(t *testing.T) { + merkleSrv := newSelectiveMerkleServer(map[string]bool{"a": true, "b": true}) + defer merkleSrv.Close() + log := &eventLog{} + teranodeSrv := newTeranodeServer(log, 200) + defer teranodeSrv.Close() + + ms := newMockStore() + ms.replayRows = []*models.TransactionStatus{ + seenRow("a", 2*time.Hour, time.Time{}), + seenRow("b", 2*time.Hour, time.Time{}), + } + + p := newPropagator(merkleSrv.URL, teranodeSrv.URL, ms) + p.reaperBatchSize = 10 + p.reaperRebroadcastInterval = time.Hour + p.reaperRequeueBackoff = time.Minute + + p.reapOnce(context.Background()) + + if got := len(ms.allRebroadcastMarks()); got != 0 { + t.Fatalf("a full registration outage must not stamp any row, got %d marks", got) + } + }) +} + +// TestReapOnce_RequeueUsesShortBackoff covers review #2: a transient requeue is +// stamped with a backdated ts so it becomes due again after reaperRequeueBackoff +// (~1 min), not the full reaperRebroadcastInterval (1h). +func TestReapOnce_RequeueUsesShortBackoff(t *testing.T) { + log := &eventLog{} + merkleSrv := newMerkleServer(log, 200) + defer merkleSrv.Close() + teranodeSrv := newTeranodeServer(log, 500) // 500, no failure body → requeue + defer teranodeSrv.Close() + + ms := newMockStore() + ms.replayRows = []*models.TransactionStatus{ + seenRow("requeueme", 2*time.Hour, time.Time{}), + } + + p := newPropagator(merkleSrv.URL, teranodeSrv.URL, ms) + p.reaperBatchSize = 10 + p.reaperRebroadcastInterval = time.Hour + p.reaperRequeueBackoff = time.Minute + + before := time.Now() + p.reapOnce(context.Background()) + + ts, ok := ms.markTsFor("requeueme") + if !ok { + t.Fatalf("requeued tx was not stamped") + } + // Backdated to ≈ now - (interval - backoff) = now - 59m, so it must be far + // behind `before`, not ≈now (which is what an accepted row would get). + if !ts.Before(before.Add(-30 * time.Minute)) { + t.Fatalf("requeued tx stamped at %v (vs now≈%v); expected a backdated (~59m-old) stamp for the short backoff", ts, before) + } +} + func txidN(i int) string { const hexd = "0123456789abcdef" return "tx" + string(hexd[i%16]) + string(hexd[(i/16)%16]) diff --git a/store/pebble/pebble.go b/store/pebble/pebble.go index 4f87cc9..cdbf2fb 100644 --- a/store/pebble/pebble.go +++ b/store/pebble/pebble.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "hash/fnv" + "math" "sort" "sync" "time" @@ -999,7 +1000,7 @@ func (s *Store) GetReapCandidates(ctx context.Context, since, seenDeadline, rebr deadlineNs := seenDeadline.UnixNano() type candidate struct { st *models.TransactionStatus - lastRebcastNs int64 // 0 == never rebroadcast (sorts first) + lastRebcastNs int64 // math.MinInt64 sentinel == never rebroadcast (sorts first) } var candidates []candidate for iter.First(); iter.Valid(); iter.Next() { @@ -1024,7 +1025,7 @@ func (s *Store) GetReapCandidates(ctx context.Context, since, seenDeadline, rebr if !st.LastRebroadcastAt.IsZero() && !st.LastRebroadcastAt.Before(rebroadcastBefore) { continue } - var lastNs int64 + lastNs := int64(math.MinInt64) if !st.LastRebroadcastAt.IsZero() { lastNs = st.LastRebroadcastAt.UnixNano() } diff --git a/store/postgres/postgres.go b/store/postgres/postgres.go index b7ac746..8db4687 100644 --- a/store/postgres/postgres.go +++ b/store/postgres/postgres.go @@ -835,7 +835,7 @@ func (s *Store) GetReapCandidates(ctx context.Context, since, seenDeadline, rebr SELECT txid, raw_tx FROM transactions WHERE status IN ('SEEN_ON_NETWORK', 'SEEN_MULTIPLE_NODES') AND timestamp_at >= $1 AND timestamp_at < $2 - AND raw_tx IS NOT NULL + AND length(raw_tx) > 0 AND (last_rebroadcast_at IS NULL OR last_rebroadcast_at < $3) ORDER BY last_rebroadcast_at ASC NULLS FIRST LIMIT $4` diff --git a/store/postgres/schema.sql b/store/postgres/schema.sql index 530a4ac..84f9f97 100644 --- a/store/postgres/schema.sql +++ b/store/postgres/schema.sql @@ -41,6 +41,10 @@ CREATE INDEX IF NOT EXISTS idx_tx_retry_ready -- Partial index for the reaper's rebroadcast-candidate scan: only rows in a -- non-terminal SEEN_* state are eligible, and ordering by last_rebroadcast_at -- (NULLs first) drives the oldest-unserved-first fairness query. +-- Note: the timestamp_at range and length(raw_tx) > 0 predicates are inline +-- rechecks, not part of this index. At a very large SEEN_* population the scan +-- may touch many index/heap rows before LIMIT is satisfied; if that shows up in +-- query plans, consider a composite/covering index — fine at current scale. CREATE INDEX IF NOT EXISTS idx_tx_reap_due ON transactions(last_rebroadcast_at NULLS FIRST) WHERE status IN ('SEEN_ON_NETWORK', 'SEEN_MULTIPLE_NODES'); From effeb37ea5f8882932c450605c5369579ce7bcbf Mon Sep 17 00:00:00 2001 From: rjseibane Date: Wed, 27 May 2026 00:51:17 +0200 Subject: [PATCH 3/3] fix(store): address Copilot review on reaper-candidate reads Follow-up to #175 review (Copilot): - Postgres reads now surface last_rebroadcast_at: scanStatus / scanStatusWithInserted scan it and the GetStatus / GetStatusesSince / IterateStatusesSince SELECTs and the batch INSERT...RETURNING include it, so GET /tx and tooling see the same value the Aerospike/Pebble backends already round-trip (was always zero on Postgres). - Pebble GetReapCandidates no longer scans the full updated-at history every tick: it seeks to the first key at/after `since` and stops once a row reaches `seenDeadline` (the index is timestamp-ordered). - Pebble GetReapCandidates keeps only the `limit` oldest-rebroadcast candidates via a bounded max-heap, so memory is O(limit) instead of O(backlog). The empty-vs-NULL raw_tx concern was already handled in the prior commit (length(raw_tx) > 0). Adds a Pebble GetReapCandidates test covering the window filter, eligibility, and bounded oldest-first ordering. --- store/pebble/pebble.go | 87 ++++++++++++++++++++++++------------- store/pebble/pebble_test.go | 63 +++++++++++++++++++++++++++ store/postgres/postgres.go | 20 ++++++--- 3 files changed, 135 insertions(+), 35 deletions(-) diff --git a/store/pebble/pebble.go b/store/pebble/pebble.go index cdbf2fb..9df1a01 100644 --- a/store/pebble/pebble.go +++ b/store/pebble/pebble.go @@ -14,6 +14,7 @@ package pebble import ( + "container/heap" "context" "encoding/json" "errors" @@ -974,11 +975,36 @@ func (s *Store) MarkMerkleRegisteredByTxIDs(ctx context.Context, txids []string, return nil } -// GetReapCandidates walks the updated-at index (oldest-first) and returns up to -// limit SEEN_* rows the reaper should rebroadcast, ordered oldest-unserved -// first. The updated-at index is timestamp-ordered, not last_rebroadcast_at- -// ordered, so the due-filter and the final oldest-rebroadcast-first sort are -// applied in-process over the filtered set. See store.Store for the contract. +// reapCandidate is one row under consideration by GetReapCandidates. +type reapCandidate struct { + txid string + rawTx []byte + lastRebcastNs int64 // math.MinInt64 == never rebroadcast (oldest) +} + +// reapHeap is a bounded MAX-heap on lastRebcastNs (largest on top). Pushing and +// popping-when-over-limit keeps the `limit` SMALLEST (oldest-rebroadcast) +// candidates, so GetReapCandidates stays O(limit) memory over a large backlog. +type reapHeap []reapCandidate + +func (h *reapHeap) Len() int { return len(*h) } +func (h *reapHeap) Less(i, j int) bool { return (*h)[i].lastRebcastNs > (*h)[j].lastRebcastNs } +func (h *reapHeap) Swap(i, j int) { (*h)[i], (*h)[j] = (*h)[j], (*h)[i] } +func (h *reapHeap) Push(x any) { *h = append(*h, x.(reapCandidate)) } +func (h *reapHeap) Pop() any { + old := *h + n := len(old) + item := old[n-1] + *h = old[:n-1] + return item +} + +// GetReapCandidates walks the updated-at index within [since, seenDeadline) and +// returns up to limit SEEN_* rows the reaper should rebroadcast, ordered +// oldest-rebroadcast-first. The updated-at index is timestamp-ordered (not +// last_rebroadcast_at-ordered), so the scan is bounded by the timestamp window +// and a bounded max-heap selects the oldest-rebroadcast subset. See store.Store +// for the contract. func (s *Store) GetReapCandidates(ctx context.Context, since, seenDeadline, rebroadcastBefore time.Time, limit int) ([]*models.TransactionStatus, error) { if limit <= 0 { return nil, nil @@ -987,8 +1013,14 @@ func (s *Store) GetReapCandidates(ctx context.Context, since, seenDeadline, rebr return nil, err } prefix := idxTxUpdatedPrefix() + sinceNs := since.UnixNano() + deadlineNs := seenDeadline.UnixNano() + // Bound the scan to [since, seenDeadline): the idx:tx:updated keyspace is + // ordered by timestamp (hex-encoded unix nanos), so start at the first key + // at/after `since` and stop as soon as a row's timestamp reaches the + // deadline — no need to walk the full history every tick. iter, err := s.db.NewIter(&pebbledb.IterOptions{ - LowerBound: prefix, + LowerBound: idxTxUpdatedKey(sinceNs, ""), UpperBound: endOfPrefix(prefix), }) if err != nil { @@ -996,13 +1028,10 @@ func (s *Store) GetReapCandidates(ctx context.Context, since, seenDeadline, rebr } defer func() { _ = iter.Close() }() - sinceNs := since.UnixNano() - deadlineNs := seenDeadline.UnixNano() - type candidate struct { - st *models.TransactionStatus - lastRebcastNs int64 // math.MinInt64 sentinel == never rebroadcast (sorts first) - } - var candidates []candidate + // Keep only the `limit` oldest-rebroadcast candidates via a bounded + // max-heap (largest lastRebcastNs on top, evicted when full) so memory + // stays O(limit) regardless of backlog. math.MinInt64 == never rebroadcast. + h := &reapHeap{} for iter.First(); iter.Valid(); iter.Next() { if err := ctx.Err(); err != nil { return nil, err @@ -1012,11 +1041,15 @@ func (s *Store) GetReapCandidates(ctx context.Context, since, seenDeadline, rebr if err != nil || st == nil { continue } - if st.Status != models.StatusSeenOnNetwork && st.Status != models.StatusSeenMultipleNodes { + tsNs := st.Timestamp.UnixNano() + if tsNs >= deadlineNs { + // Index is timestamp-ascending, so nothing past here is in-window. + break + } + if tsNs < sinceNs { continue } - tsNs := st.Timestamp.UnixNano() - if tsNs < sinceNs || tsNs >= deadlineNs { + if st.Status != models.StatusSeenOnNetwork && st.Status != models.StatusSeenMultipleNodes { continue } if len(st.RawTx) == 0 { @@ -1029,20 +1062,16 @@ func (s *Store) GetReapCandidates(ctx context.Context, since, seenDeadline, rebr if !st.LastRebroadcastAt.IsZero() { lastNs = st.LastRebroadcastAt.UnixNano() } - candidates = append(candidates, candidate{ - st: &models.TransactionStatus{TxID: st.TxID, RawTx: st.RawTx}, - lastRebcastNs: lastNs, - }) - } - sort.Slice(candidates, func(i, j int) bool { - return candidates[i].lastRebcastNs < candidates[j].lastRebcastNs - }) - if len(candidates) > limit { - candidates = candidates[:limit] + heap.Push(h, reapCandidate{txid: st.TxID, rawTx: st.RawTx, lastRebcastNs: lastNs}) + if h.Len() > limit { + heap.Pop(h) // evict the most-recently-rebroadcast (largest) + } } - results := make([]*models.TransactionStatus, len(candidates)) - for i, c := range candidates { - results[i] = c.st + // Heap holds the `limit` oldest-rebroadcast rows; emit ascending. + results := make([]*models.TransactionStatus, h.Len()) + for i := len(results) - 1; i >= 0; i-- { + c := heap.Pop(h).(reapCandidate) + results[i] = &models.TransactionStatus{TxID: c.txid, RawTx: c.rawTx} } return results, nil } diff --git a/store/pebble/pebble_test.go b/store/pebble/pebble_test.go index 4e1e903..afaabbe 100644 --- a/store/pebble/pebble_test.go +++ b/store/pebble/pebble_test.go @@ -1006,3 +1006,66 @@ func hashesOf(rows []*models.BlockProcessingStatus) []string { } return out } + +// TestGetReapCandidates_WindowEligibilityAndBoundedOrder exercises the Pebble +// reaper-candidate query: the [since, seenDeadline) window filter, the +// raw_tx/status/recently-rebroadcast eligibility checks, and the bounded +// oldest-rebroadcast-first selection (limit < eligible count). +func TestGetReapCandidates_WindowEligibilityAndBoundedOrder(t *testing.T) { + s := newTestStore(t) + ctx := context.Background() + now := time.Now() + + insertSeen := func(txid string, ts time.Time, raw []byte, status models.Status) { + _, _, err := s.GetOrInsertStatus(ctx, &models.TransactionStatus{ + TxID: txid, + Status: status, + RawTx: raw, + Timestamp: ts, + }) + if err != nil { + t.Fatalf("insert %s: %v", txid, err) + } + } + raw := []byte{0x01, 0x02} + + // In-window (now-2h) SEEN rows with distinct last_rebroadcast times. + insertSeen("never", now.Add(-2*time.Hour), raw, models.StatusSeenMultipleNodes) // oldest (never) + insertSeen("rb5h", now.Add(-2*time.Hour), raw, models.StatusSeenMultipleNodes) + insertSeen("rb3h", now.Add(-2*time.Hour), raw, models.StatusSeenOnNetwork) + insertSeen("rb2h", now.Add(-2*time.Hour), raw, models.StatusSeenMultipleNodes) // most-recently rebroadcast (evicted at limit=3) + // Excluded rows: + insertSeen("recent", now.Add(-2*time.Hour), raw, models.StatusSeenMultipleNodes) // rebroadcast within interval + insertSeen("received", now.Add(-2*time.Hour), raw, models.StatusReceived) // wrong status + insertSeen("noraw", now.Add(-2*time.Hour), nil, models.StatusSeenMultipleNodes) // empty raw_tx + insertSeen("tooOld", now.Add(-30*time.Hour), raw, models.StatusSeenMultipleNodes) // before since + insertSeen("tooNew", now.Add(-30*time.Minute), raw, models.StatusSeenMultipleNodes) // after deadline + + mark := func(txid string, ts time.Time) { + if err := s.MarkRebroadcastByTxIDs(ctx, []string{txid}, ts); err != nil { + t.Fatalf("mark %s: %v", txid, err) + } + } + mark("rb5h", now.Add(-5*time.Hour)) + mark("rb3h", now.Add(-3*time.Hour)) + mark("rb2h", now.Add(-2*time.Hour)) + mark("recent", now.Add(-30*time.Minute)) // within the 1h interval → not eligible + + got, err := s.GetReapCandidates(ctx, now.Add(-24*time.Hour), now.Add(-time.Hour), now.Add(-time.Hour), 3) + if err != nil { + t.Fatalf("GetReapCandidates: %v", err) + } + gotTxids := make([]string, len(got)) + for i, r := range got { + gotTxids[i] = r.TxID + if len(r.RawTx) == 0 { + t.Fatalf("candidate %s returned with empty RawTx", r.TxID) + } + } + // limit=3 of the eligible {never, rb5h, rb3h, rb2h}: the 3 oldest-rebroadcast, + // ascending. rb2h (most recent) is evicted. + want := []string{"never", "rb5h", "rb3h"} + if fmt.Sprintf("%v", gotTxids) != fmt.Sprintf("%v", want) { + t.Fatalf("got %v, want %v", gotTxids, want) + } +} diff --git a/store/postgres/postgres.go b/store/postgres/postgres.go index 8db4687..b9c038c 100644 --- a/store/postgres/postgres.go +++ b/store/postgres/postgres.go @@ -269,7 +269,7 @@ VALUES ` + values.String() + ` ON CONFLICT (txid) DO UPDATE SET txid = transactions.txid RETURNING txid, status, status_code, block_hash, block_height, merkle_path, extra_info, competing_txs, raw_tx, retry_count, next_retry_at, - timestamp_at, created_at, merkle_registered_at, (xmax = 0) AS inserted` + timestamp_at, created_at, merkle_registered_at, last_rebroadcast_at, (xmax = 0) AS inserted` rows, err := s.pool.Query(ctx, q, args...) if err != nil { @@ -551,7 +551,7 @@ func (s *Store) GetStatus(ctx context.Context, txid string) (*models.Transaction const q = ` SELECT txid, status, status_code, block_hash, block_height, merkle_path, extra_info, competing_txs, raw_tx, retry_count, next_retry_at, - timestamp_at, created_at, merkle_registered_at + timestamp_at, created_at, merkle_registered_at, last_rebroadcast_at FROM transactions WHERE txid = $1` row := s.pool.QueryRow(ctx, q, txid) st, err := scanStatus(row) @@ -569,7 +569,7 @@ func (s *Store) GetStatusesSince(ctx context.Context, since time.Time) ([]*model const q = ` SELECT txid, status, status_code, block_hash, block_height, merkle_path, extra_info, competing_txs, raw_tx, retry_count, next_retry_at, - timestamp_at, created_at, merkle_registered_at + timestamp_at, created_at, merkle_registered_at, last_rebroadcast_at FROM transactions WHERE timestamp_at >= $1 ORDER BY timestamp_at DESC` rows, err := s.pool.Query(ctx, q, since) @@ -596,7 +596,7 @@ func (s *Store) IterateStatusesSince(ctx context.Context, since time.Time, fn fu const q = ` SELECT txid, status, status_code, block_hash, block_height, merkle_path, extra_info, competing_txs, raw_tx, retry_count, next_retry_at, - timestamp_at, created_at, merkle_registered_at + timestamp_at, created_at, merkle_registered_at, last_rebroadcast_at FROM transactions WHERE timestamp_at >= $1 ORDER BY timestamp_at DESC` rows, err := s.pool.Query(ctx, q, since) @@ -1277,6 +1277,7 @@ func scanStatusWithInserted(row rowScanner) (*models.TransactionStatus, bool, er rawTx []byte nextRetry *time.Time merkleRegisteredAt *time.Time + lastRebroadcastAt *time.Time inserted bool ) if err := row.Scan( @@ -1284,7 +1285,7 @@ func scanStatusWithInserted(row rowScanner) (*models.TransactionStatus, bool, er &blockHash, &blockHeight, &merklePath, &extraInfo, &competingTxs, &rawTx, &st.RetryCount, &nextRetry, - &st.Timestamp, &st.CreatedAt, &merkleRegisteredAt, &inserted, + &st.Timestamp, &st.CreatedAt, &merkleRegisteredAt, &lastRebroadcastAt, &inserted, ); err != nil { return nil, false, err } @@ -1315,6 +1316,9 @@ func scanStatusWithInserted(row rowScanner) (*models.TransactionStatus, bool, er if merkleRegisteredAt != nil { st.MerkleRegisteredAt = *merkleRegisteredAt } + if lastRebroadcastAt != nil { + st.LastRebroadcastAt = *lastRebroadcastAt + } return &st, inserted, nil } @@ -1330,13 +1334,14 @@ func scanStatus(row rowScanner) (*models.TransactionStatus, error) { rawTx []byte nextRetry *time.Time merkleRegisteredAt *time.Time + lastRebroadcastAt *time.Time ) if err := row.Scan( &st.TxID, &st.Status, &statusCode, &blockHash, &blockHeight, &merklePath, &extraInfo, &competingTxs, &rawTx, &st.RetryCount, &nextRetry, - &st.Timestamp, &st.CreatedAt, &merkleRegisteredAt, + &st.Timestamp, &st.CreatedAt, &merkleRegisteredAt, &lastRebroadcastAt, ); err != nil { return nil, err } @@ -1367,6 +1372,9 @@ func scanStatus(row rowScanner) (*models.TransactionStatus, error) { if merkleRegisteredAt != nil { st.MerkleRegisteredAt = *merkleRegisteredAt } + if lastRebroadcastAt != nil { + st.LastRebroadcastAt = *lastRebroadcastAt + } return &st, nil }