diff --git a/config/config.go b/config/config.go index 48dc4cb..483c105 100644 --- a/config/config.go +++ b/config/config.go @@ -305,6 +305,21 @@ type PropagationConfig struct { RetryBackoffMs int `mapstructure:"retry_backoff_ms"` ReaperIntervalMs int `mapstructure:"reaper_interval_ms"` ReaperBatchSize int `mapstructure:"reaper_batch_size"` + // ReaperRebroadcastIntervalMs is the minimum time between successive + // rebroadcasts of the same stuck tx. The reaper only re-sends a SEEN_* + // row whose last_rebroadcast_at is NULL or older than this window, so each + // tick's batch spreads across the whole backlog instead of re-sending the + // same rows every interval — this is what prevents head-of-line starvation + // when the backlog exceeds reaper_batch_size. Defaults to 1h. + ReaperRebroadcastIntervalMs int `mapstructure:"reaper_rebroadcast_interval_ms"` + // ReaperRequeueBackoffMs is the (shorter) delay before retrying a stuck tx + // whose rebroadcast hit a TRANSIENT failure — a Teranode requeue verdict or + // a failed merkle /watch registration. These rows stay SEEN_* and would + // otherwise wait the full ReaperRebroadcastIntervalMs; that is too slow for + // a transient blip, so they get this shorter backoff instead. Capped at + // ReaperRebroadcastIntervalMs (a requeue backoff longer than the full + // interval is meaningless). Defaults to 60000 (1 min). + ReaperRequeueBackoffMs int `mapstructure:"reaper_requeue_backoff_ms"` // LeaseTTLMs bounds how long the reaper lease remains valid without a // renewal. Set to at least 2–3× reaper_interval_ms so a missed tick // doesn't trigger a false-positive failover. Defaults to 3× interval. @@ -726,6 +741,8 @@ func setDefaults() { viper.SetDefault("propagation.retry_backoff_ms", 500) viper.SetDefault("propagation.reaper_interval_ms", 30000) viper.SetDefault("propagation.reaper_batch_size", 500) + viper.SetDefault("propagation.reaper_rebroadcast_interval_ms", 3600000) + viper.SetDefault("propagation.reaper_requeue_backoff_ms", 60000) // 0 keeps New()'s 3×reaper_interval default, so changing reaper_interval // automatically moves the lease TTL unless the operator opts into a fixed value. viper.SetDefault("propagation.lease_ttl_ms", 0) diff --git a/models/transaction.go b/models/transaction.go index 621c64a..38b1b07 100644 --- a/models/transaction.go +++ b/models/transaction.go @@ -68,6 +68,13 @@ type TransactionStatus struct { // added). The startup replay loop reads this to skip rows registered // within MerkleReplaySkipRecentMinutes — see issue #145. MerkleRegisteredAt time.Time `json:"merkleRegisteredAt,omitempty"` + // LastRebroadcastAt is the wall-clock time of the most recent reaper + // rebroadcast attempt for this txid. Zero value means "never rebroadcast" + // (or rebroadcast before this field was added). The reaper reads it to + // throttle re-sends to at most once per reaper_rebroadcast_interval_ms, so + // a saturated backlog can't starve older stuck rows by re-sending the same + // newest rows every tick. + LastRebroadcastAt time.Time `json:"lastRebroadcastAt,omitempty"` } // Status represents the various states a transaction can be in diff --git a/services/api_server/handlers_test.go b/services/api_server/handlers_test.go index 2872ea0..e0e3569 100644 --- a/services/api_server/handlers_test.go +++ b/services/api_server/handlers_test.go @@ -156,6 +156,14 @@ func (m *mockStore) MarkMerkleRegisteredByTxIDs(context.Context, []string, time. return nil } +func (m *mockStore) GetReapCandidates(context.Context, time.Time, time.Time, time.Time, int) ([]*models.TransactionStatus, error) { + return nil, nil +} + +func (m *mockStore) MarkRebroadcastByTxIDs(context.Context, []string, time.Time) error { + return nil +} + func (m *mockStore) InsertSubmission(_ context.Context, sub *models.Submission) error { m.mu.Lock() defer m.mu.Unlock() diff --git a/services/propagation/propagator.go b/services/propagation/propagator.go index d0664e2..16cba66 100644 --- a/services/propagation/propagator.go +++ b/services/propagation/propagator.go @@ -68,11 +68,18 @@ type Propagator struct { merkleConcurrency int reaperInterval time.Duration reaperBatchSize int - teranodeBatchCap int - broadcastWorkers int - maxParallelChunks int - holderID string - leaseTTL time.Duration + // reaperRebroadcastInterval is the minimum gap between successive + // rebroadcasts of the same stuck tx. See ReaperRebroadcastIntervalMs. + reaperRebroadcastInterval time.Duration + // reaperRequeueBackoff is the shorter retry delay for stuck txs whose + // rebroadcast hit a transient failure (Teranode requeue / failed merkle + // registration). See ReaperRequeueBackoffMs. + reaperRequeueBackoff time.Duration + teranodeBatchCap int + broadcastWorkers int + maxParallelChunks int + holderID string + leaseTTL time.Duration // broadcastJobs feeds the persistent worker pool that runs every // per-endpoint POST /txs call. Replaces the previous per-broadcast @@ -220,6 +227,20 @@ func New(cfg *config.Config, logger *zap.Logger, producer *kafka.Producer, publi if reaperBatch <= 0 { reaperBatch = 500 } + reaperRebroadcastInterval := time.Duration(cfg.Propagation.ReaperRebroadcastIntervalMs) * time.Millisecond + if reaperRebroadcastInterval <= 0 { + reaperRebroadcastInterval = time.Hour + } + reaperRequeueBackoff := time.Duration(cfg.Propagation.ReaperRequeueBackoffMs) * time.Millisecond + if reaperRequeueBackoff <= 0 { + reaperRequeueBackoff = time.Minute + } + // A requeue backoff longer than the full rebroadcast interval is + // meaningless — clamp so the transient-failure path never waits longer + // than the steady-state refresh. + if reaperRequeueBackoff > reaperRebroadcastInterval { + reaperRequeueBackoff = reaperRebroadcastInterval + } leaseTTL := time.Duration(cfg.Propagation.LeaseTTLMs) * time.Millisecond if leaseTTL <= 0 { leaseTTL = 3 * reaperInterval @@ -245,30 +266,32 @@ func New(cfg *config.Config, logger *zap.Logger, producer *kafka.Producer, publi maxParallelChunks = defaultMaxParallelChunks } p := &Propagator{ - cfg: cfg, - logger: logger.Named("propagation"), - producer: producer, - publisher: publisher, - store: st, - leaser: leaser, - teranodeClient: tc, - merkleClient: mc, - maxPending: maxPending, - merkleConcurrency: merkleConcurrency, - reaperInterval: reaperInterval, - reaperBatchSize: reaperBatch, - teranodeBatchCap: teranodeBatchCap, - broadcastWorkers: broadcastWorkers, - maxParallelChunks: maxParallelChunks, - holderID: newHolderID(), - leaseTTL: leaseTTL, - broadcastJobs: make(chan broadcastJob, broadcastJobBuffer), - processBatchSem: make(chan struct{}, maxConcurrentBatches), - admitCh: make(chan admitRequest, dispatcherChannelBuffer), - requeueCh: make(chan requeueRequest, dispatcherChannelBuffer), - terminalCh: make(chan terminalEvent, dispatcherChannelBuffer), - drainCh: make(chan drainRequest), - initDone: make(chan struct{}), + cfg: cfg, + logger: logger.Named("propagation"), + producer: producer, + publisher: publisher, + store: st, + leaser: leaser, + teranodeClient: tc, + merkleClient: mc, + maxPending: maxPending, + merkleConcurrency: merkleConcurrency, + reaperInterval: reaperInterval, + reaperBatchSize: reaperBatch, + reaperRebroadcastInterval: reaperRebroadcastInterval, + reaperRequeueBackoff: reaperRequeueBackoff, + teranodeBatchCap: teranodeBatchCap, + broadcastWorkers: broadcastWorkers, + maxParallelChunks: maxParallelChunks, + holderID: newHolderID(), + leaseTTL: leaseTTL, + broadcastJobs: make(chan broadcastJob, broadcastJobBuffer), + processBatchSem: make(chan struct{}, maxConcurrentBatches), + admitCh: make(chan admitRequest, dispatcherChannelBuffer), + requeueCh: make(chan requeueRequest, dispatcherChannelBuffer), + terminalCh: make(chan terminalEvent, dispatcherChannelBuffer), + drainCh: make(chan drainRequest), + initDone: make(chan struct{}), } // Start a dispatcher goroutine with a nil claim so tests that // construct via New and drive via admitCh / drainCh have a running diff --git a/services/propagation/propagator_test.go b/services/propagation/propagator_test.go index 479dd7f..059356f 100644 --- a/services/propagation/propagator_test.go +++ b/services/propagation/propagator_test.go @@ -7,6 +7,7 @@ import ( "fmt" "net/http" "net/http/httptest" + "sort" "strings" "sync" "sync/atomic" @@ -128,6 +129,15 @@ type mockStore struct { // markErr forces MarkMerkleRegisteredByTxIDs to return this error. // Used by tests that verify a mark failure doesn't block broadcast. markErr error + // rebroadcastMarks records every MarkRebroadcastByTxIDs call as one slice + // per call. Lets reaper tests assert which txids were stamped on attempt. + rebroadcastMarks [][]string + // rebroadcastMarkTs records the ts arg of each MarkRebroadcastByTxIDs call, + // index-aligned with rebroadcastMarks. Lets tests distinguish the + // full-interval stamp (≈now) from the backdated short-backoff stamp. + rebroadcastMarkTs []time.Time + // reapErr forces GetReapCandidates to return this error. + reapErr error // returningPrev, when non-nil, overrides the synthetic previous-status // row BatchUpdateStatusReturning hands back for each input. The default // (RECEIVED prev) always reads as a real transition; this hook lets @@ -300,6 +310,74 @@ func (m *mockStore) IterateStatusesSince(_ context.Context, since time.Time, fn return nil } +// GetReapCandidates mirrors the Postgres query over replayRows: SEEN_* rows +// with a timestamp in [since, seenDeadline), a non-empty RawTx, and a +// last_rebroadcast_at that is zero or older than rebroadcastBefore, ordered +// oldest-rebroadcast-first (zero/never sorts first) and capped at limit. +func (m *mockStore) GetReapCandidates(_ context.Context, since, seenDeadline, rebroadcastBefore time.Time, limit int) ([]*models.TransactionStatus, error) { + m.mu.Lock() + defer m.mu.Unlock() + if m.reapErr != nil { + return nil, m.reapErr + } + if limit <= 0 { + return nil, nil + } + var out []*models.TransactionStatus + for _, r := range m.replayRows { + if r.Status != models.StatusSeenOnNetwork && r.Status != models.StatusSeenMultipleNodes { + continue + } + if r.Timestamp.Before(since) || !r.Timestamp.Before(seenDeadline) { + continue + } + if len(r.RawTx) == 0 { + continue + } + if !r.LastRebroadcastAt.IsZero() && !r.LastRebroadcastAt.Before(rebroadcastBefore) { + continue + } + out = append(out, &models.TransactionStatus{TxID: r.TxID, RawTx: r.RawTx, LastRebroadcastAt: r.LastRebroadcastAt}) + } + sort.SliceStable(out, func(i, j int) bool { + // zero (never rebroadcast) sorts first + zi, zj := out[i].LastRebroadcastAt.IsZero(), out[j].LastRebroadcastAt.IsZero() + if zi != zj { + return zi + } + return out[i].LastRebroadcastAt.Before(out[j].LastRebroadcastAt) + }) + if len(out) > limit { + out = out[:limit] + } + // Strip the marker from the returned rows — the reaper only consumes + // TxID + RawTx, matching the real backends. + for _, r := range out { + r.LastRebroadcastAt = time.Time{} + } + return out, nil +} + +// MarkRebroadcastByTxIDs records the call and stamps LastRebroadcastAt on the +// matching replayRows so a subsequent GetReapCandidates within the interval +// excludes them — mirroring the real backends. +func (m *mockStore) MarkRebroadcastByTxIDs(_ context.Context, txids []string, ts time.Time) error { + m.mu.Lock() + defer m.mu.Unlock() + m.rebroadcastMarks = append(m.rebroadcastMarks, append([]string(nil), txids...)) + m.rebroadcastMarkTs = append(m.rebroadcastMarkTs, ts) + marked := make(map[string]struct{}, len(txids)) + for _, t := range txids { + marked[t] = struct{}{} + } + for _, r := range m.replayRows { + if _, ok := marked[r.TxID]; ok { + r.LastRebroadcastAt = ts + } + } + return nil +} + func (m *mockStore) updateCount() int { m.mu.Lock() defer m.mu.Unlock() diff --git a/services/propagation/reaper.go b/services/propagation/reaper.go index ce92209..a8f2970 100644 --- a/services/propagation/reaper.go +++ b/services/propagation/reaper.go @@ -19,14 +19,16 @@ import ( // may be needed, or a callback may have been dropped. Long enough that we // don't rebroadcast every tx that takes a few minutes to mine. // -// staleScanLookback bounds how far back IterateStatusesSince walks. Rows +// staleScanLookback bounds how far back the candidate scan walks. Rows // older than this are assumed permanently stuck and outside the reaper's // responsibility — the operator surfaces them with `arcade tools surface // stuck` if a deeper sweep is needed. +// +// The per-tick batch size is configurable (reaper_batch_size); the per-tx +// rebroadcast cadence is reaper_rebroadcast_interval_ms. const ( - staleSeenOnNetworkAge = time.Hour - staleScanLookback = 24 * time.Hour - reaperRebroadcastBatch = 200 + staleSeenOnNetworkAge = time.Hour + staleScanLookback = 24 * time.Hour ) // reaperLeaseName is the well-known key every replica uses to coordinate @@ -82,11 +84,18 @@ func (p *Propagator) tryReap(ctx context.Context) { p.reapOnce(ctx) } -// reapOnce rebroadcasts rows stuck at SEEN_ON_NETWORK past -// staleSeenOnNetworkAge (peer mempool eviction, dropped BLOCK_PROCESSED -// callback, fee bump needed). RECEIVED rows are intentionally not -// rebroadcast — the submitter got an error from intake on Kafka publish -// failure and owns the decision to retry. +// reapOnce rebroadcasts rows stuck at SEEN_ON_NETWORK / SEEN_MULTIPLE_NODES +// past staleSeenOnNetworkAge (peer mempool eviction, dropped BLOCK_PROCESSED +// callback, unconfirmed ancestor that has since confirmed). RECEIVED rows are +// intentionally not rebroadcast — the submitter got an error from intake on +// Kafka publish failure and owns the decision to retry. +// +// Candidates come from GetReapCandidates ordered oldest-rebroadcast-first, and +// each attempted txid is stamped via MarkRebroadcastByTxIDs so it isn't re-sent +// again for reaperRebroadcastInterval. That per-tx throttle is what keeps a +// backlog larger than reaperBatchSize from starving older rows: capacity +// spreads across the whole backlog over one interval instead of re-sending the +// same rows every tick. // // Rebroadcasts go through the same registerBatch + broadcastInChunks + // applyTerminalStatuses pipeline as processBatch but bypass the @@ -94,43 +103,27 @@ func (p *Propagator) tryReap(ctx context.Context) { // resulting terminal status notifies the dispatcher via applyTerminalStatuses // only as a no-op for offset bookkeeping. // -// Bounded by reaperRebroadcastBatch per tick so a backlog can't pin the -// reaper into a single multi-minute call. +// Bounded by reaperBatchSize per tick so a backlog can't pin the reaper into a +// single multi-minute call. func (p *Propagator) reapOnce(ctx context.Context) { now := time.Now() since := now.Add(-staleScanLookback) seenDeadline := now.Add(-staleSeenOnNetworkAge) + rebroadcastBefore := now.Add(-p.reaperRebroadcastInterval) - stuck := make([]propagationMsg, 0, reaperRebroadcastBatch) - err := p.store.IterateStatusesSince(ctx, since, func(st *models.TransactionStatus) error { - if len(stuck) >= reaperRebroadcastBatch { - return errReaperBatchFull - } - if len(st.RawTx) == 0 { - // No body to rebroadcast. Pre-reaper-population rows - // won't have it; just skip. - return nil - } - switch st.Status { - case models.StatusSeenOnNetwork, models.StatusSeenMultipleNodes: - if !st.Timestamp.Before(seenDeadline) { - return nil - } - default: - // RECEIVED rows are intentionally NOT rebroadcast — the - // submitter got an error from intake on Kafka publish - // failure and is responsible for deciding whether to retry. - // Terminal statuses, MINED, IMMUTABLE — not the reaper's job. - return nil + candidates, err := p.store.GetReapCandidates(ctx, since, seenDeadline, rebroadcastBefore, p.reaperBatchSize) + if err != nil { + if !errors.Is(err, context.Canceled) { + p.logger.Error("reaper: scan failed", zap.Error(err)) } - stuck = append(stuck, propagationMsg{TXID: st.TxID, RawTx: st.RawTx}) - return nil - }) - if err != nil && !errors.Is(err, errReaperBatchFull) && !errors.Is(err, context.Canceled) { - p.logger.Error("reaper: scan failed", zap.Error(err)) return } + stuck := make([]propagationMsg, 0, len(candidates)) + for _, st := range candidates { + stuck = append(stuck, propagationMsg{TXID: st.TxID, RawTx: st.RawTx}) + } + // Publish the post-scan depth on every tick BEFORE the early-return // so the gauge reflects "what the last reaper observed" — including // the queue-is-clear case. Setting it only on the non-empty branch @@ -150,8 +143,13 @@ func (p *Propagator) reapOnce(ctx context.Context) { // dispatcher — txids the dispatcher doesn't know about (because the // original Kafka message terminated long ago) get a no-op notify, // which is fine. - registered, _ := p.registerBatch(ctx, stuck) + registered, failed := p.registerBatch(ctx, stuck) if len(registered) == 0 { + // Nothing registered: either a global merkle /watch outage or this + // whole batch is persistently failing registration. Don't broadcast + // and don't stamp — a global outage must not push every stuck tx past + // its rebroadcast window, and a fully-failing batch has no healthy + // rows behind it to starve. The next tick retries the same set. return } rawTxs := make([][]byte, len(registered)) @@ -160,12 +158,36 @@ func (p *Propagator) reapOnce(ctx context.Context) { } results := p.broadcastInChunks(ctx, registered, rawTxs) + // Stamp last_rebroadcast_at so each attempted tx cedes its slot, at one of + // two cadences by outcome class: + // - accepted rows stay SEEN_* (the lattice forbids the SEEN→ACCEPTED + // downgrade, so applyTerminalStatuses is a no-op for them) and only + // need a periodic refresh → full reaperRebroadcastInterval. + // - transient failures — a Teranode requeue/unknown verdict, plus rows + // whose merkle /watch registration failed (the `failed` set) — should + // retry soon → shorter reaperRequeueBackoff, applied by backdating the + // stamp so the row becomes due again after that backoff instead of the + // full interval. Stamping the failed subset is what stops a + // persistently registration-failing tx (last_rebroadcast_at == NULL) + // from sorting first and re-filling the front of every batch. + // rejected rows become terminal and leave the candidate set — no stamp. + fullIntervalTxids := make([]string, 0, len(registered)) + shortBackoffTxids := make([]string, 0, len(registered)+len(failed)) + for _, m := range failed { + shortBackoffTxids = append(shortBackoffTxids, m.TXID) + } + var accepted, rejected int terminalStatuses := make([]*models.TransactionStatus, 0, len(results)) - for _, res := range results { + for i, res := range results { + // results is index-aligned to registered (broadcastInChunks writes + // per-index), so registered[i] is this result's tx — needed because a + // requeue result carries no txid of its own. + txid := registered[i].TXID switch res.class { case txResultClassAccepted: accepted++ + fullIntervalTxids = append(fullIntervalTxids, txid) if res.status != nil { terminalStatuses = append(terminalStatuses, res.status) } @@ -175,18 +197,32 @@ func (p *Propagator) reapOnce(ctx context.Context) { terminalStatuses = append(terminalStatuses, res.status) } case txResultClassUnknown, txResultClassRequeue: - // Requeue / Unknown from the reaper's rebroadcast path: - // leave the row alone so the next reaper tick picks it up. - // The reaper bypasses the dispatcher, so there's no inFlight - // entry to requeue against — natural retry is just the next - // tick. + // Transient: leave the row SEEN_* and retry after the short + // backoff. The reaper bypasses the dispatcher, so there's no + // inFlight entry to requeue against — the next due tick is the + // retry. + shortBackoffTxids = append(shortBackoffTxids, txid) } } + + p.markRebroadcast(ctx, fullIntervalTxids, now) + if len(shortBackoffTxids) > 0 { + // Backdate so eligibility (last_rebroadcast_at < now-interval) flips + // true after reaperRequeueBackoff rather than the full interval. + p.markRebroadcast(ctx, shortBackoffTxids, now.Add(-(p.reaperRebroadcastInterval - p.reaperRequeueBackoff))) + } + p.applyTerminalStatuses(ctx, terminalStatuses, accepted, rejected) } -// errReaperBatchFull halts the IterateStatusesSince walk once we've -// accumulated reaperRebroadcastBatch stuck rows. Sentinel error so -// IterateStatusesSince surfaces it cleanly without being mistaken for -// a backend error. -var errReaperBatchFull = errors.New("reaper batch full") +// markRebroadcast stamps last_rebroadcast_at = ts on the given txids, logging +// (but not failing) on a store error — a missed stamp only costs a redundant +// rebroadcast next tick, not correctness. +func (p *Propagator) markRebroadcast(ctx context.Context, txids []string, ts time.Time) { + if len(txids) == 0 { + return + } + if err := p.store.MarkRebroadcastByTxIDs(ctx, txids, ts); err != nil { + p.logger.Warn("reaper: mark rebroadcast failed", zap.Error(err)) + } +} diff --git a/services/propagation/reaper_test.go b/services/propagation/reaper_test.go new file mode 100644 index 0000000..2049ec5 --- /dev/null +++ b/services/propagation/reaper_test.go @@ -0,0 +1,336 @@ +package propagation + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/bsv-blockchain/arcade/models" +) + +// newSelectiveMerkleServer returns a /watch stub that fails (400) registration +// for txids in `fail` and accepts (200) all others — used to drive partial +// merkle-registration failures. +func newSelectiveMerkleServer(fail map[string]bool) *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var req struct { + TxID string `json:"txid"` + } + _ = json.NewDecoder(r.Body).Decode(&req) + if fail[req.TxID] { + w.WriteHeader(http.StatusBadRequest) + return + } + w.WriteHeader(http.StatusOK) + })) +} + +// markTsFor returns the ts of the most recent MarkRebroadcastByTxIDs call that +// included txid. +func (m *mockStore) markTsFor(txid string) (time.Time, bool) { + m.mu.Lock() + defer m.mu.Unlock() + for i := len(m.rebroadcastMarks) - 1; i >= 0; i-- { + for _, t := range m.rebroadcastMarks[i] { + if t == txid { + return m.rebroadcastMarkTs[i], true + } + } + } + return time.Time{}, false +} + +// seenRow builds a SEEN_MULTIPLE_NODES row eligible for the reaper: a non-empty +// raw_tx and a timestamp older than staleSeenOnNetworkAge but within +// staleScanLookback. lastRebroadcast is the row's last_rebroadcast_at (zero == +// never rebroadcast). +func seenRow(txid string, age time.Duration, lastRebroadcast time.Time) *models.TransactionStatus { + return &models.TransactionStatus{ + TxID: txid, + Status: models.StatusSeenMultipleNodes, + RawTx: []byte{0xde, 0xad, 0xbe, 0xef}, + Timestamp: time.Now().Add(-age), + LastRebroadcastAt: lastRebroadcast, + } +} + +func (m *mockStore) allRebroadcastMarks() []string { + m.mu.Lock() + defer m.mu.Unlock() + var out []string + for _, call := range m.rebroadcastMarks { + out = append(out, call...) + } + return out +} + +// TestReapOnce_UsesConfiguredBatchSize proves the reaper honors +// p.reaperBatchSize. Before the fix it always used a hardcoded 200, so with 10 +// due rows and a batch of 4 we must see exactly 4 rebroadcast, not 10. +func TestReapOnce_UsesConfiguredBatchSize(t *testing.T) { + log := &eventLog{} + merkleSrv := newMerkleServer(log, 200) + defer merkleSrv.Close() + teranodeSrv := newTeranodeServer(log, 200) + defer teranodeSrv.Close() + + ms := newMockStore() + for i := 0; i < 10; i++ { + ms.replayRows = append(ms.replayRows, seenRow(txidN(i), 2*time.Hour, time.Time{})) + } + + p := newPropagator(merkleSrv.URL, teranodeSrv.URL, ms) + p.reaperBatchSize = 4 + p.reaperRebroadcastInterval = time.Hour + + p.reapOnce(context.Background()) + + if got := log.count("register:"); got != 4 { + t.Fatalf("expected exactly 4 rows rebroadcast (batch cap), got %d", got) + } + if got := len(ms.allRebroadcastMarks()); got != 4 { + t.Fatalf("expected 4 txids marked, got %d", got) + } +} + +// TestReapOnce_SkipsRecentlyRebroadcast confirms the per-tx interval throttle: +// a row rebroadcast within reaperRebroadcastInterval is excluded; never-sent +// and long-ago-sent rows are included. +func TestReapOnce_SkipsRecentlyRebroadcast(t *testing.T) { + log := &eventLog{} + merkleSrv := newMerkleServer(log, 200) + defer merkleSrv.Close() + teranodeSrv := newTeranodeServer(log, 200) + defer teranodeSrv.Close() + + ms := newMockStore() + ms.replayRows = []*models.TransactionStatus{ + seenRow("never", 2*time.Hour, time.Time{}), // due + seenRow("recent", 2*time.Hour, time.Now().Add(-30*time.Minute)), // throttled + seenRow("longago", 2*time.Hour, time.Now().Add(-2*time.Hour)), // due + } + + p := newPropagator(merkleSrv.URL, teranodeSrv.URL, ms) + p.reaperBatchSize = 10 + p.reaperRebroadcastInterval = time.Hour + + p.reapOnce(context.Background()) + + if got := log.count("register:"); got != 2 { + t.Fatalf("expected 2 due rows rebroadcast, got %d", got) + } + for _, txid := range ms.allRebroadcastMarks() { + if txid == "recent" { + t.Fatalf("recently-rebroadcast row should have been skipped, but it was marked") + } + } +} + +// TestReapOnce_MarksRebroadcastOnAttempt confirms attempted txids are stamped, +// so an immediate second tick (within the interval) rebroadcasts nothing. +func TestReapOnce_MarksRebroadcastOnAttempt(t *testing.T) { + log := &eventLog{} + merkleSrv := newMerkleServer(log, 200) + defer merkleSrv.Close() + teranodeSrv := newTeranodeServer(log, 200) + defer teranodeSrv.Close() + + ms := newMockStore() + ms.replayRows = []*models.TransactionStatus{ + seenRow("a", 2*time.Hour, time.Time{}), + seenRow("b", 2*time.Hour, time.Time{}), + } + + p := newPropagator(merkleSrv.URL, teranodeSrv.URL, ms) + p.reaperBatchSize = 10 + p.reaperRebroadcastInterval = time.Hour + + p.reapOnce(context.Background()) + if got := log.count("register:"); got != 2 { + t.Fatalf("first tick: expected 2 rebroadcast, got %d", got) + } + + p.reapOnce(context.Background()) + if got := log.count("register:"); got != 2 { + t.Fatalf("second tick: expected no new rebroadcasts (still 2 total), got %d", got) + } +} + +// TestReapOnce_FairnessAcrossTicks is the core anti-starvation assertion: with +// a backlog larger than the batch size, successive ticks rebroadcast every row +// exactly once over the interval — no row is starved, none is re-sent twice. +func TestReapOnce_FairnessAcrossTicks(t *testing.T) { + log := &eventLog{} + merkleSrv := newMerkleServer(log, 200) + defer merkleSrv.Close() + teranodeSrv := newTeranodeServer(log, 200) + defer teranodeSrv.Close() + + const backlog = 5 + ms := newMockStore() + for i := 0; i < backlog; i++ { + ms.replayRows = append(ms.replayRows, seenRow(txidN(i), 2*time.Hour, time.Time{})) + } + + p := newPropagator(merkleSrv.URL, teranodeSrv.URL, ms) + p.reaperBatchSize = 2 + p.reaperRebroadcastInterval = time.Hour + + // 3 ticks of batch 2 covers a backlog of 5 (2+2+1). + for i := 0; i < 3; i++ { + p.reapOnce(context.Background()) + } + + marks := ms.allRebroadcastMarks() + seen := make(map[string]int, backlog) + for _, txid := range marks { + seen[txid]++ + } + if len(seen) != backlog { + t.Fatalf("expected all %d rows rebroadcast within the interval, got %d distinct: %v", backlog, len(seen), seen) + } + for txid, n := range seen { + if n != 1 { + t.Fatalf("row %s rebroadcast %d times within one interval, want exactly 1", txid, n) + } + } +} + +// TestReapOnce_RecoverableNotRejected confirms a transient broadcast failure +// (requeue) leaves the SEEN_* row non-terminal — the reaper must never +// auto-reject a recoverable tx. The row is still marked on attempt so it cedes +// its slot for one interval. +func TestReapOnce_RecoverableNotRejected(t *testing.T) { + log := &eventLog{} + merkleSrv := newMerkleServer(log, 200) + defer merkleSrv.Close() + // 500 with no Teranode failure-list body → whole-batch requeue. + teranodeSrv := newTeranodeServer(log, 500) + defer teranodeSrv.Close() + + ms := newMockStore() + ms.replayRows = []*models.TransactionStatus{ + seenRow("recoverable", 2*time.Hour, time.Time{}), + } + + p := newPropagator(merkleSrv.URL, teranodeSrv.URL, ms) + p.reaperBatchSize = 10 + p.reaperRebroadcastInterval = time.Hour + + p.reapOnce(context.Background()) + + ms.mu.Lock() + for _, u := range ms.updates { + if u.TxID == "recoverable" && u.Status == models.StatusRejected { + ms.mu.Unlock() + t.Fatalf("recoverable tx was wrongly written REJECTED on a requeue") + } + } + ms.mu.Unlock() + + if got := len(ms.allRebroadcastMarks()); got != 1 { + t.Fatalf("expected the attempted tx to be marked even on requeue, got %d marks", got) + } +} + +// TestReapOnce_PartialRegistrationFailureMarksFailedSubset covers review #1: +// a tx that fails merkle /watch must still be stamped so it can't keep sorting +// first (NULL last_rebroadcast_at) and re-fill the front of every batch — but a +// FULL outage (nothing registered) marks nothing so it isn't penalized. +func TestReapOnce_PartialRegistrationFailureMarksFailedSubset(t *testing.T) { + t.Run("partial failure marks the failed subset", func(t *testing.T) { + merkleSrv := newSelectiveMerkleServer(map[string]bool{"poison": true}) + defer merkleSrv.Close() + log := &eventLog{} + teranodeSrv := newTeranodeServer(log, 200) + defer teranodeSrv.Close() + + ms := newMockStore() + ms.replayRows = []*models.TransactionStatus{ + seenRow("ok1", 2*time.Hour, time.Time{}), + seenRow("poison", 2*time.Hour, time.Time{}), + seenRow("ok2", 2*time.Hour, time.Time{}), + } + + p := newPropagator(merkleSrv.URL, teranodeSrv.URL, ms) + p.reaperBatchSize = 10 + p.reaperRebroadcastInterval = time.Hour + p.reaperRequeueBackoff = time.Minute + + p.reapOnce(context.Background()) + + if _, ok := ms.markTsFor("poison"); !ok { + t.Fatalf("registration-failed tx 'poison' was not stamped; it would starve the front of the batch") + } + if _, ok := ms.markTsFor("ok1"); !ok { + t.Fatalf("successfully broadcast tx 'ok1' was not stamped") + } + }) + + t.Run("full outage marks nothing", func(t *testing.T) { + merkleSrv := newSelectiveMerkleServer(map[string]bool{"a": true, "b": true}) + defer merkleSrv.Close() + log := &eventLog{} + teranodeSrv := newTeranodeServer(log, 200) + defer teranodeSrv.Close() + + ms := newMockStore() + ms.replayRows = []*models.TransactionStatus{ + seenRow("a", 2*time.Hour, time.Time{}), + seenRow("b", 2*time.Hour, time.Time{}), + } + + p := newPropagator(merkleSrv.URL, teranodeSrv.URL, ms) + p.reaperBatchSize = 10 + p.reaperRebroadcastInterval = time.Hour + p.reaperRequeueBackoff = time.Minute + + p.reapOnce(context.Background()) + + if got := len(ms.allRebroadcastMarks()); got != 0 { + t.Fatalf("a full registration outage must not stamp any row, got %d marks", got) + } + }) +} + +// TestReapOnce_RequeueUsesShortBackoff covers review #2: a transient requeue is +// stamped with a backdated ts so it becomes due again after reaperRequeueBackoff +// (~1 min), not the full reaperRebroadcastInterval (1h). +func TestReapOnce_RequeueUsesShortBackoff(t *testing.T) { + log := &eventLog{} + merkleSrv := newMerkleServer(log, 200) + defer merkleSrv.Close() + teranodeSrv := newTeranodeServer(log, 500) // 500, no failure body → requeue + defer teranodeSrv.Close() + + ms := newMockStore() + ms.replayRows = []*models.TransactionStatus{ + seenRow("requeueme", 2*time.Hour, time.Time{}), + } + + p := newPropagator(merkleSrv.URL, teranodeSrv.URL, ms) + p.reaperBatchSize = 10 + p.reaperRebroadcastInterval = time.Hour + p.reaperRequeueBackoff = time.Minute + + before := time.Now() + p.reapOnce(context.Background()) + + ts, ok := ms.markTsFor("requeueme") + if !ok { + t.Fatalf("requeued tx was not stamped") + } + // Backdated to ≈ now - (interval - backoff) = now - 59m, so it must be far + // behind `before`, not ≈now (which is what an accepted row would get). + if !ts.Before(before.Add(-30 * time.Minute)) { + t.Fatalf("requeued tx stamped at %v (vs now≈%v); expected a backdated (~59m-old) stamp for the short backoff", ts, before) + } +} + +func txidN(i int) string { + const hexd = "0123456789abcdef" + return "tx" + string(hexd[i%16]) + string(hexd[(i/16)%16]) +} diff --git a/services/webhook/service_test.go b/services/webhook/service_test.go index 26a39a2..b831185 100644 --- a/services/webhook/service_test.go +++ b/services/webhook/service_test.go @@ -146,6 +146,12 @@ func (s *fakeStore) SetMinedByTxIDs(context.Context, string, uint64, []string) ( func (s *fakeStore) MarkMerkleRegisteredByTxIDs(context.Context, []string, time.Time) error { return nil } +func (s *fakeStore) GetReapCandidates(context.Context, time.Time, time.Time, time.Time, int) ([]*models.TransactionStatus, error) { + return nil, nil +} +func (s *fakeStore) MarkRebroadcastByTxIDs(context.Context, []string, time.Time) error { + return nil +} func (s *fakeStore) InsertSubmission(context.Context, *models.Submission) error { return nil } func (s *fakeStore) GetSubmissionsByToken(context.Context, string) ([]*models.Submission, error) { return nil, nil diff --git a/store/aerospike/aerospike.go b/store/aerospike/aerospike.go index 23de442..b259a98 100644 --- a/store/aerospike/aerospike.go +++ b/store/aerospike/aerospike.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "math" "net" "sort" "strconv" @@ -393,6 +394,9 @@ func (s *Store) UpdateStatus(ctx context.Context, status *models.TransactionStat if !status.MerkleRegisteredAt.IsZero() { bins["merkle_reg_at"] = status.MerkleRegisteredAt.UnixMilli() } + if !status.LastRebroadcastAt.IsZero() { + bins["rebroadcast_at"] = status.LastRebroadcastAt.UnixMilli() + } // Enforce the status lattice: refuse to overwrite a terminal status with a // later, lower-priority update (e.g. a stray SEEN_ON_NETWORK callback after @@ -851,6 +855,122 @@ func (s *Store) MarkMerkleRegisteredByTxIDs(ctx context.Context, txids []string, return nil } +// GetReapCandidates scans the transactions set and returns up to limit SEEN_* +// rows the reaper should rebroadcast, oldest-unserved first. Aerospike has no +// secondary index for this predicate and the query API does not order results, +// so the filter and the oldest-first sort are done in-process over a full set +// scan. This is acceptable because production runs on Postgres; the Aerospike +// path exists for parity. See store.Store for the contract. +func (s *Store) GetReapCandidates(ctx context.Context, since, seenDeadline, rebroadcastBefore time.Time, limit int) ([]*models.TransactionStatus, error) { + if limit <= 0 { + return nil, nil + } + if err := ctx.Err(); err != nil { + return nil, err + } + stmt := aero.NewStatement(s.namespace, setTransactions) + rs, err := s.client.Query(s.queryPolicy(ctx), stmt) + if rs != nil { + defer func() { _ = rs.Close() }() + } + if err != nil { + return nil, fmt.Errorf("query reap candidates: %w", err) + } + + type candidate struct { + st *models.TransactionStatus + lastRebcastMs int64 // math.MinInt64 sentinel == never rebroadcast (sorts first) + } + var candidates []candidate + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case rec, ok := <-rs.Results(): + if !ok { + goto done + } + if rec.Err != nil { + return nil, rec.Err + } + txid := getString(rec.Record, "txid") + st := recordToStatus(rec.Record, txid) + if st.Status != models.StatusSeenOnNetwork && st.Status != models.StatusSeenMultipleNodes { + continue + } + if st.Timestamp.Before(since) || !st.Timestamp.Before(seenDeadline) { + continue + } + rawTx, _ := rec.Record.Bins["raw_tx"].([]byte) + if len(rawTx) == 0 { + continue + } + if !st.LastRebroadcastAt.IsZero() && !st.LastRebroadcastAt.Before(rebroadcastBefore) { + continue + } + lastMs := int64(math.MinInt64) + if !st.LastRebroadcastAt.IsZero() { + lastMs = st.LastRebroadcastAt.UnixMilli() + } + candidates = append(candidates, candidate{ + st: &models.TransactionStatus{TxID: txid, RawTx: rawTx}, + lastRebcastMs: lastMs, + }) + } + } +done: + sort.Slice(candidates, func(i, j int) bool { + return candidates[i].lastRebcastMs < candidates[j].lastRebcastMs + }) + if len(candidates) > limit { + candidates = candidates[:limit] + } + results := make([]*models.TransactionStatus, len(candidates)) + for i, c := range candidates { + results[i] = c.st + } + return results, nil +} + +// MarkRebroadcastByTxIDs writes rebroadcast_at = ts.UnixMilli() on every +// existing transaction record in the txid list. Unknown txids are silently +// skipped via UPDATE_ONLY, mirroring MarkMerkleRegisteredByTxIDs. +func (s *Store) MarkRebroadcastByTxIDs(ctx context.Context, txids []string, ts time.Time) error { + if len(txids) == 0 { + return nil + } + bwp := aero.NewBatchWritePolicy() + bwp.RecordExistsAction = aero.UPDATE_ONLY + + for i := 0; i < len(txids); i += s.batchSize { + if err := ctx.Err(); err != nil { + return err + } + end := i + s.batchSize + if end > len(txids) { + end = len(txids) + } + batch := txids[i:end] + + records := make([]aero.BatchRecordIfc, len(batch)) + for j, txid := range batch { + key, err := s.key(setTransactions, txid) + if err != nil { + continue + } + records[j] = aero.NewBatchWrite( + bwp, key, + aero.PutOp(aero.NewBin("rebroadcast_at", ts.UnixMilli())), + ) + } + + if err := s.client.BatchOperate(s.batchPolicy(ctx), records); err != nil { + return fmt.Errorf("batch mark rebroadcast: %w", err) + } + } + return nil +} + // --- BUMP Operations --- // // BUMPs are stored as a manifest record at primary key plus N @@ -2192,6 +2312,11 @@ func recordToStatus(rec *aero.Record, txid string) *models.TransactionStatus { status.MerkleRegisteredAt = time.UnixMilli(int64(ms)) } } + if v, ok := rec.Bins["rebroadcast_at"]; ok { + if ms, ok := v.(int); ok { + status.LastRebroadcastAt = time.UnixMilli(int64(ms)) + } + } return status } diff --git a/store/pebble/pebble.go b/store/pebble/pebble.go index 98bb0ad..cf5a0cd 100644 --- a/store/pebble/pebble.go +++ b/store/pebble/pebble.go @@ -14,11 +14,13 @@ package pebble import ( + "container/heap" "context" "encoding/json" "errors" "fmt" "hash/fnv" + "math" "sort" "sync" "time" @@ -77,6 +79,7 @@ type storedStatus struct { CreatedUnixNs int64 `json:"created_at,omitempty"` NextRetryUnixNs int64 `json:"next_retry_at,omitempty"` MerkleRegisteredUnixNs int64 `json:"merkle_registered_at,omitempty"` + LastRebroadcastUnixNs int64 `json:"last_rebroadcast_at,omitempty"` } func (s storedStatus) toModel() *models.TransactionStatus { @@ -104,6 +107,9 @@ func (s storedStatus) toModel() *models.TransactionStatus { if s.MerkleRegisteredUnixNs != 0 { out.MerkleRegisteredAt = time.Unix(0, s.MerkleRegisteredUnixNs) } + if s.LastRebroadcastUnixNs != 0 { + out.LastRebroadcastAt = time.Unix(0, s.LastRebroadcastUnixNs) + } return out } @@ -132,6 +138,9 @@ func fromModel(m *models.TransactionStatus) storedStatus { if !m.MerkleRegisteredAt.IsZero() { out.MerkleRegisteredUnixNs = m.MerkleRegisteredAt.UnixNano() } + if !m.LastRebroadcastAt.IsZero() { + out.LastRebroadcastUnixNs = m.LastRebroadcastAt.UnixNano() + } return out } @@ -497,6 +506,9 @@ func mergeStatus(existing *storedStatus, update *models.TransactionStatus) store if !update.MerkleRegisteredAt.IsZero() { out.MerkleRegisteredUnixNs = update.MerkleRegisteredAt.UnixNano() } + if !update.LastRebroadcastAt.IsZero() { + out.LastRebroadcastUnixNs = update.LastRebroadcastAt.UnixNano() + } return out } @@ -968,6 +980,150 @@ func (s *Store) MarkMerkleRegisteredByTxIDs(ctx context.Context, txids []string, return nil } +// reapCandidate is one row under consideration by GetReapCandidates. +type reapCandidate struct { + txid string + rawTx []byte + lastRebcastNs int64 // math.MinInt64 == never rebroadcast (oldest) +} + +// reapHeap is a bounded MAX-heap on lastRebcastNs (largest on top). Pushing and +// popping-when-over-limit keeps the `limit` SMALLEST (oldest-rebroadcast) +// candidates, so GetReapCandidates stays O(limit) memory over a large backlog. +type reapHeap []reapCandidate + +func (h *reapHeap) Len() int { return len(*h) } +func (h *reapHeap) Less(i, j int) bool { return (*h)[i].lastRebcastNs > (*h)[j].lastRebcastNs } +func (h *reapHeap) Swap(i, j int) { (*h)[i], (*h)[j] = (*h)[j], (*h)[i] } +func (h *reapHeap) Push(x any) { *h = append(*h, x.(reapCandidate)) } +func (h *reapHeap) Pop() any { + old := *h + n := len(old) + item := old[n-1] + *h = old[:n-1] + return item +} + +// GetReapCandidates walks the updated-at index within [since, seenDeadline) and +// returns up to limit SEEN_* rows the reaper should rebroadcast, ordered +// oldest-rebroadcast-first. The updated-at index is timestamp-ordered (not +// last_rebroadcast_at-ordered), so the scan is bounded by the timestamp window +// and a bounded max-heap selects the oldest-rebroadcast subset. See store.Store +// for the contract. +func (s *Store) GetReapCandidates(ctx context.Context, since, seenDeadline, rebroadcastBefore time.Time, limit int) ([]*models.TransactionStatus, error) { + if limit <= 0 { + return nil, nil + } + if err := ctx.Err(); err != nil { + return nil, err + } + prefix := idxTxUpdatedPrefix() + sinceNs := since.UnixNano() + deadlineNs := seenDeadline.UnixNano() + // Bound the scan to [since, seenDeadline): the idx:tx:updated keyspace is + // ordered by timestamp (hex-encoded unix nanos), so start at the first key + // at/after `since` and stop as soon as a row's timestamp reaches the + // deadline — no need to walk the full history every tick. + iter, err := s.db.NewIter(&pebbledb.IterOptions{ + LowerBound: idxTxUpdatedKey(sinceNs, ""), + UpperBound: endOfPrefix(prefix), + }) + if err != nil { + return nil, err + } + defer func() { _ = iter.Close() }() + + // Keep only the `limit` oldest-rebroadcast candidates via a bounded + // max-heap (largest lastRebcastNs on top, evicted when full) so memory + // stays O(limit) regardless of backlog. math.MinInt64 == never rebroadcast. + h := &reapHeap{} + for iter.First(); iter.Valid(); iter.Next() { + if err := ctx.Err(); err != nil { + return nil, err + } + txid := lastSegment(iter.Key()) + st, err := s.readStatus(txid) + if err != nil || st == nil { + continue + } + tsNs := st.Timestamp.UnixNano() + if tsNs >= deadlineNs { + // Index is timestamp-ascending, so nothing past here is in-window. + break + } + if tsNs < sinceNs { + continue + } + if st.Status != models.StatusSeenOnNetwork && st.Status != models.StatusSeenMultipleNodes { + continue + } + if len(st.RawTx) == 0 { + continue + } + if !st.LastRebroadcastAt.IsZero() && !st.LastRebroadcastAt.Before(rebroadcastBefore) { + continue + } + lastNs := int64(math.MinInt64) + if !st.LastRebroadcastAt.IsZero() { + lastNs = st.LastRebroadcastAt.UnixNano() + } + heap.Push(h, reapCandidate{txid: st.TxID, rawTx: st.RawTx, lastRebcastNs: lastNs}) + if h.Len() > limit { + heap.Pop(h) // evict the most-recently-rebroadcast (largest) + } + } + // Heap holds the `limit` oldest-rebroadcast rows; emit ascending. + results := make([]*models.TransactionStatus, h.Len()) + for i := len(results) - 1; i >= 0; i-- { + c := heap.Pop(h).(reapCandidate) + results[i] = &models.TransactionStatus{TxID: c.txid, RawTx: c.rawTx} + } + return results, nil +} + +// MarkRebroadcastByTxIDs stamps last_rebroadcast_at on every existing row in +// the txid list. Unknown txids are silently no-ops, mirroring +// MarkMerkleRegisteredByTxIDs (last_rebroadcast_at is not indexed, so no index +// churn). +func (s *Store) MarkRebroadcastByTxIDs(ctx context.Context, txids []string, ts time.Time) error { + if err := ctx.Err(); err != nil { + return err + } + tsNs := ts.UnixNano() + for _, txid := range txids { + if err := ctx.Err(); err != nil { + return err + } + mu := s.shardFor(txid) + mu.Lock() + + existing, err := s.readStoredStatus(txid) + if err != nil { + mu.Unlock() + return err + } + if existing == nil { + mu.Unlock() + continue + } + + updated := *existing + updated.LastRebroadcastUnixNs = tsNs + + payload, err := json.Marshal(updated) + if err != nil { + mu.Unlock() + return err + } + err = s.db.Set(txKey(txid), payload, s.writeOpts) + mu.Unlock() + if err != nil { + return err + } + } + return nil +} + // --- BUMP / STUMP --- func (s *Store) InsertBUMP(ctx context.Context, blockHash string, blockHeight uint64, bumpData []byte) error { diff --git a/store/pebble/pebble_test.go b/store/pebble/pebble_test.go index 5ea0cd4..c1533d5 100644 --- a/store/pebble/pebble_test.go +++ b/store/pebble/pebble_test.go @@ -1086,3 +1086,66 @@ func hashesOf(rows []*models.BlockProcessingStatus) []string { } return out } + +// TestGetReapCandidates_WindowEligibilityAndBoundedOrder exercises the Pebble +// reaper-candidate query: the [since, seenDeadline) window filter, the +// raw_tx/status/recently-rebroadcast eligibility checks, and the bounded +// oldest-rebroadcast-first selection (limit < eligible count). +func TestGetReapCandidates_WindowEligibilityAndBoundedOrder(t *testing.T) { + s := newTestStore(t) + ctx := context.Background() + now := time.Now() + + insertSeen := func(txid string, ts time.Time, raw []byte, status models.Status) { + _, _, err := s.GetOrInsertStatus(ctx, &models.TransactionStatus{ + TxID: txid, + Status: status, + RawTx: raw, + Timestamp: ts, + }) + if err != nil { + t.Fatalf("insert %s: %v", txid, err) + } + } + raw := []byte{0x01, 0x02} + + // In-window (now-2h) SEEN rows with distinct last_rebroadcast times. + insertSeen("never", now.Add(-2*time.Hour), raw, models.StatusSeenMultipleNodes) // oldest (never) + insertSeen("rb5h", now.Add(-2*time.Hour), raw, models.StatusSeenMultipleNodes) + insertSeen("rb3h", now.Add(-2*time.Hour), raw, models.StatusSeenOnNetwork) + insertSeen("rb2h", now.Add(-2*time.Hour), raw, models.StatusSeenMultipleNodes) // most-recently rebroadcast (evicted at limit=3) + // Excluded rows: + insertSeen("recent", now.Add(-2*time.Hour), raw, models.StatusSeenMultipleNodes) // rebroadcast within interval + insertSeen("received", now.Add(-2*time.Hour), raw, models.StatusReceived) // wrong status + insertSeen("noraw", now.Add(-2*time.Hour), nil, models.StatusSeenMultipleNodes) // empty raw_tx + insertSeen("tooOld", now.Add(-30*time.Hour), raw, models.StatusSeenMultipleNodes) // before since + insertSeen("tooNew", now.Add(-30*time.Minute), raw, models.StatusSeenMultipleNodes) // after deadline + + mark := func(txid string, ts time.Time) { + if err := s.MarkRebroadcastByTxIDs(ctx, []string{txid}, ts); err != nil { + t.Fatalf("mark %s: %v", txid, err) + } + } + mark("rb5h", now.Add(-5*time.Hour)) + mark("rb3h", now.Add(-3*time.Hour)) + mark("rb2h", now.Add(-2*time.Hour)) + mark("recent", now.Add(-30*time.Minute)) // within the 1h interval → not eligible + + got, err := s.GetReapCandidates(ctx, now.Add(-24*time.Hour), now.Add(-time.Hour), now.Add(-time.Hour), 3) + if err != nil { + t.Fatalf("GetReapCandidates: %v", err) + } + gotTxids := make([]string, len(got)) + for i, r := range got { + gotTxids[i] = r.TxID + if len(r.RawTx) == 0 { + t.Fatalf("candidate %s returned with empty RawTx", r.TxID) + } + } + // limit=3 of the eligible {never, rb5h, rb3h, rb2h}: the 3 oldest-rebroadcast, + // ascending. rb2h (most recent) is evicted. + want := []string{"never", "rb5h", "rb3h"} + if fmt.Sprintf("%v", gotTxids) != fmt.Sprintf("%v", want) { + t.Fatalf("got %v, want %v", gotTxids, want) + } +} diff --git a/store/postgres/postgres.go b/store/postgres/postgres.go index 85824de..ffa9785 100644 --- a/store/postgres/postgres.go +++ b/store/postgres/postgres.go @@ -289,7 +289,7 @@ VALUES ` + values.String() + ` ON CONFLICT (txid) DO UPDATE SET txid = transactions.txid RETURNING txid, status, status_code, block_hash, block_height, merkle_path, extra_info, competing_txs, raw_tx, retry_count, next_retry_at, - timestamp_at, created_at, merkle_registered_at, (xmax = 0) AS inserted` + timestamp_at, created_at, merkle_registered_at, last_rebroadcast_at, (xmax = 0) AS inserted` rows, err := s.pool.Query(ctx, q, args...) if err != nil { @@ -571,7 +571,7 @@ func (s *Store) GetStatus(ctx context.Context, txid string) (*models.Transaction const q = ` SELECT txid, status, status_code, block_hash, block_height, merkle_path, extra_info, competing_txs, raw_tx, retry_count, next_retry_at, - timestamp_at, created_at, merkle_registered_at + timestamp_at, created_at, merkle_registered_at, last_rebroadcast_at FROM transactions WHERE txid = $1` row := s.pool.QueryRow(ctx, q, txid) st, err := scanStatus(row) @@ -589,7 +589,7 @@ func (s *Store) GetStatusesSince(ctx context.Context, since time.Time) ([]*model const q = ` SELECT txid, status, status_code, block_hash, block_height, merkle_path, extra_info, competing_txs, raw_tx, retry_count, next_retry_at, - timestamp_at, created_at, merkle_registered_at + timestamp_at, created_at, merkle_registered_at, last_rebroadcast_at FROM transactions WHERE timestamp_at >= $1 ORDER BY timestamp_at DESC` rows, err := s.pool.Query(ctx, q, since) @@ -616,7 +616,7 @@ func (s *Store) IterateStatusesSince(ctx context.Context, since time.Time, fn fu const q = ` SELECT txid, status, status_code, block_hash, block_height, merkle_path, extra_info, competing_txs, raw_tx, retry_count, next_retry_at, - timestamp_at, created_at, merkle_registered_at + timestamp_at, created_at, merkle_registered_at, last_rebroadcast_at FROM transactions WHERE timestamp_at >= $1 ORDER BY timestamp_at DESC` rows, err := s.pool.Query(ctx, q, since) @@ -842,6 +842,57 @@ func (s *Store) MarkMerkleRegisteredByTxIDs(ctx context.Context, txids []string, return nil } +// GetReapCandidates returns the SEEN_* rows the reaper should rebroadcast, +// oldest-unserved first. See store.Store for the full contract. Only txid and +// raw_tx are selected — the reaper needs nothing else, and ordering by +// last_rebroadcast_at NULLS FIRST is what spreads each tick's bounded batch +// across the whole backlog instead of re-sending the same rows every tick. +func (s *Store) GetReapCandidates(ctx context.Context, since, seenDeadline, rebroadcastBefore time.Time, limit int) ([]*models.TransactionStatus, error) { + if limit <= 0 { + return nil, nil + } + const q = ` +SELECT txid, raw_tx FROM transactions +WHERE status IN ('SEEN_ON_NETWORK', 'SEEN_MULTIPLE_NODES') + AND timestamp_at >= $1 AND timestamp_at < $2 + AND length(raw_tx) > 0 + AND (last_rebroadcast_at IS NULL OR last_rebroadcast_at < $3) +ORDER BY last_rebroadcast_at ASC NULLS FIRST +LIMIT $4` + rows, err := s.pool.Query(ctx, q, since, seenDeadline, rebroadcastBefore, limit) + if err != nil { + return nil, err + } + defer rows.Close() + + var results []*models.TransactionStatus + for rows.Next() { + var ( + txid string + rawTx []byte + ) + if err := rows.Scan(&txid, &rawTx); err != nil { + return results, err + } + results = append(results, &models.TransactionStatus{TxID: txid, RawTx: rawTx}) + } + return results, rows.Err() +} + +// MarkRebroadcastByTxIDs stamps last_rebroadcast_at = $1 on every existing row +// whose txid is in $2. Unknown txids are silently no-ops, mirroring +// MarkMerkleRegisteredByTxIDs. +func (s *Store) MarkRebroadcastByTxIDs(ctx context.Context, txids []string, ts time.Time) error { + if len(txids) == 0 { + return nil + } + const q = `UPDATE transactions SET last_rebroadcast_at = $1 WHERE txid = ANY($2)` + if _, err := s.pool.Exec(ctx, q, ts, txids); err != nil { + return fmt.Errorf("mark rebroadcast: %w", err) + } + return nil +} + // --- BUMP / STUMP --- func (s *Store) InsertBUMP(ctx context.Context, blockHash string, blockHeight uint64, bumpData []byte) error { @@ -1319,6 +1370,7 @@ func scanStatusWithInserted(row rowScanner) (*models.TransactionStatus, bool, er rawTx []byte nextRetry *time.Time merkleRegisteredAt *time.Time + lastRebroadcastAt *time.Time inserted bool ) if err := row.Scan( @@ -1326,7 +1378,7 @@ func scanStatusWithInserted(row rowScanner) (*models.TransactionStatus, bool, er &blockHash, &blockHeight, &merklePath, &extraInfo, &competingTxs, &rawTx, &st.RetryCount, &nextRetry, - &st.Timestamp, &st.CreatedAt, &merkleRegisteredAt, &inserted, + &st.Timestamp, &st.CreatedAt, &merkleRegisteredAt, &lastRebroadcastAt, &inserted, ); err != nil { return nil, false, err } @@ -1357,6 +1409,9 @@ func scanStatusWithInserted(row rowScanner) (*models.TransactionStatus, bool, er if merkleRegisteredAt != nil { st.MerkleRegisteredAt = *merkleRegisteredAt } + if lastRebroadcastAt != nil { + st.LastRebroadcastAt = *lastRebroadcastAt + } return &st, inserted, nil } @@ -1372,13 +1427,14 @@ func scanStatus(row rowScanner) (*models.TransactionStatus, error) { rawTx []byte nextRetry *time.Time merkleRegisteredAt *time.Time + lastRebroadcastAt *time.Time ) if err := row.Scan( &st.TxID, &st.Status, &statusCode, &blockHash, &blockHeight, &merklePath, &extraInfo, &competingTxs, &rawTx, &st.RetryCount, &nextRetry, - &st.Timestamp, &st.CreatedAt, &merkleRegisteredAt, + &st.Timestamp, &st.CreatedAt, &merkleRegisteredAt, &lastRebroadcastAt, ); err != nil { return nil, err } @@ -1409,6 +1465,9 @@ func scanStatus(row rowScanner) (*models.TransactionStatus, error) { if merkleRegisteredAt != nil { st.MerkleRegisteredAt = *merkleRegisteredAt } + if lastRebroadcastAt != nil { + st.LastRebroadcastAt = *lastRebroadcastAt + } return &st, nil } diff --git a/store/postgres/schema.sql b/store/postgres/schema.sql index fa8ccf0..b491ed6 100644 --- a/store/postgres/schema.sql +++ b/store/postgres/schema.sql @@ -15,7 +15,8 @@ CREATE TABLE IF NOT EXISTS transactions ( next_retry_at TIMESTAMPTZ, timestamp_at TIMESTAMPTZ NOT NULL, created_at TIMESTAMPTZ NOT NULL, - merkle_registered_at TIMESTAMPTZ + merkle_registered_at TIMESTAMPTZ, + last_rebroadcast_at TIMESTAMPTZ ); -- Idempotent column add for stores created before merkle_registered_at was @@ -23,6 +24,11 @@ CREATE TABLE IF NOT EXISTS transactions ( -- repopulates the marker — see issue #145. ALTER TABLE transactions ADD COLUMN IF NOT EXISTS merkle_registered_at TIMESTAMPTZ; +-- Idempotent column add for stores created before last_rebroadcast_at was +-- introduced. NULL means "never rebroadcast", so existing stuck rows become +-- immediately eligible for the reaper's first rebroadcast. +ALTER TABLE transactions ADD COLUMN IF NOT EXISTS last_rebroadcast_at TIMESTAMPTZ; + CREATE INDEX IF NOT EXISTS idx_tx_status ON transactions(status); CREATE INDEX IF NOT EXISTS idx_tx_block_hash ON transactions(block_hash); CREATE INDEX IF NOT EXISTS idx_tx_updated ON transactions(timestamp_at); @@ -32,6 +38,16 @@ CREATE INDEX IF NOT EXISTS idx_tx_updated ON transactions(timestamp_at); CREATE INDEX IF NOT EXISTS idx_tx_retry_ready ON transactions(next_retry_at) WHERE status = 'PENDING_RETRY'; +-- Partial index for the reaper's rebroadcast-candidate scan: only rows in a +-- non-terminal SEEN_* state are eligible, and ordering by last_rebroadcast_at +-- (NULLs first) drives the oldest-unserved-first fairness query. +-- Note: the timestamp_at range and length(raw_tx) > 0 predicates are inline +-- rechecks, not part of this index. At a very large SEEN_* population the scan +-- may touch many index/heap rows before LIMIT is satisfied; if that shows up in +-- query plans, consider a composite/covering index — fine at current scale. +CREATE INDEX IF NOT EXISTS idx_tx_reap_due + ON transactions(last_rebroadcast_at NULLS FIRST) + WHERE status IN ('SEEN_ON_NETWORK', 'SEEN_MULTIPLE_NODES'); CREATE TABLE IF NOT EXISTS bumps ( block_hash TEXT PRIMARY KEY, diff --git a/store/store.go b/store/store.go index c728cfb..8b4e216 100644 --- a/store/store.go +++ b/store/store.go @@ -271,6 +271,24 @@ type Store interface { // issue #145. MarkMerkleRegisteredByTxIDs(ctx context.Context, txids []string, ts time.Time) error + // GetReapCandidates returns up to limit transactions the reaper should + // rebroadcast: rows in a non-terminal SEEN_* state (SEEN_ON_NETWORK or + // SEEN_MULTIPLE_NODES) whose timestamp is in [since, seenDeadline), that + // carry a non-empty raw_tx, and whose last_rebroadcast_at is either unset + // or older than rebroadcastBefore. Rows are ordered by last_rebroadcast_at + // ascending (NULLs first) so the longest-unserved txs are returned first — + // this is the fairness guarantee that prevents head-of-line starvation + // under a backlog larger than limit. Each returned row carries at least + // TxID and RawTx so the reaper can rebroadcast without a second read. + GetReapCandidates(ctx context.Context, since, seenDeadline, rebroadcastBefore time.Time, limit int) ([]*models.TransactionStatus, error) + + // MarkRebroadcastByTxIDs stamps last_rebroadcast_at = ts for the given + // txids. Unknown txids are silently skipped (matching + // MarkMerkleRegisteredByTxIDs semantics). The reaper calls this for every + // txid it attempted to rebroadcast — on attempt, not just success — so a + // perpetually-requeueing tx cedes its slot for one rebroadcast interval. + MarkRebroadcastByTxIDs(ctx context.Context, txids []string, ts time.Time) error + // EnsureIndexes creates any required secondary indexes for query operations. EnsureIndexes() error