Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,21 @@ type PropagationConfig struct {
RetryBackoffMs int `mapstructure:"retry_backoff_ms"`
ReaperIntervalMs int `mapstructure:"reaper_interval_ms"`
ReaperBatchSize int `mapstructure:"reaper_batch_size"`
// ReaperRebroadcastIntervalMs is the minimum time between successive
// rebroadcasts of the same stuck tx. The reaper only re-sends a SEEN_*
// row whose last_rebroadcast_at is NULL or older than this window, so each
// tick's batch spreads across the whole backlog instead of re-sending the
// same rows every interval — this is what prevents head-of-line starvation
// when the backlog exceeds reaper_batch_size. Defaults to 1h.
ReaperRebroadcastIntervalMs int `mapstructure:"reaper_rebroadcast_interval_ms"`
// ReaperRequeueBackoffMs is the (shorter) delay before retrying a stuck tx
// whose rebroadcast hit a TRANSIENT failure — a Teranode requeue verdict or
// a failed merkle /watch registration. These rows stay SEEN_* and would
// otherwise wait the full ReaperRebroadcastIntervalMs; that is too slow for
// a transient blip, so they get this shorter backoff instead. Capped at
// ReaperRebroadcastIntervalMs (a requeue backoff longer than the full
// interval is meaningless). Defaults to 60000 (1 min).
ReaperRequeueBackoffMs int `mapstructure:"reaper_requeue_backoff_ms"`
// LeaseTTLMs bounds how long the reaper lease remains valid without a
// renewal. Set to at least 2–3× reaper_interval_ms so a missed tick
// doesn't trigger a false-positive failover. Defaults to 3× interval.
Expand Down Expand Up @@ -726,6 +741,8 @@ func setDefaults() {
viper.SetDefault("propagation.retry_backoff_ms", 500)
viper.SetDefault("propagation.reaper_interval_ms", 30000)
viper.SetDefault("propagation.reaper_batch_size", 500)
viper.SetDefault("propagation.reaper_rebroadcast_interval_ms", 3600000)
viper.SetDefault("propagation.reaper_requeue_backoff_ms", 60000)
// 0 keeps New()'s 3×reaper_interval default, so changing reaper_interval
// automatically moves the lease TTL unless the operator opts into a fixed value.
viper.SetDefault("propagation.lease_ttl_ms", 0)
Expand Down
7 changes: 7 additions & 0 deletions models/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ type TransactionStatus struct {
// added). The startup replay loop reads this to skip rows registered
// within MerkleReplaySkipRecentMinutes — see issue #145.
MerkleRegisteredAt time.Time `json:"merkleRegisteredAt,omitempty"`
// LastRebroadcastAt is the wall-clock time of the most recent reaper
// rebroadcast attempt for this txid. Zero value means "never rebroadcast"
// (or rebroadcast before this field was added). The reaper reads it to
// throttle re-sends to at most once per reaper_rebroadcast_interval_ms, so
// a saturated backlog can't starve older stuck rows by re-sending the same
// newest rows every tick.
LastRebroadcastAt time.Time `json:"lastRebroadcastAt,omitempty"`
}

// Status represents the various states a transaction can be in
Expand Down
8 changes: 8 additions & 0 deletions services/api_server/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,14 @@ func (m *mockStore) MarkMerkleRegisteredByTxIDs(context.Context, []string, time.
return nil
}

func (m *mockStore) GetReapCandidates(context.Context, time.Time, time.Time, time.Time, int) ([]*models.TransactionStatus, error) {
return nil, nil
}

func (m *mockStore) MarkRebroadcastByTxIDs(context.Context, []string, time.Time) error {
return nil
}

func (m *mockStore) InsertSubmission(_ context.Context, sub *models.Submission) error {
m.mu.Lock()
defer m.mu.Unlock()
Expand Down
81 changes: 52 additions & 29 deletions services/propagation/propagator.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,18 @@ type Propagator struct {
merkleConcurrency int
reaperInterval time.Duration
reaperBatchSize int
teranodeBatchCap int
broadcastWorkers int
maxParallelChunks int
holderID string
leaseTTL time.Duration
// reaperRebroadcastInterval is the minimum gap between successive
// rebroadcasts of the same stuck tx. See ReaperRebroadcastIntervalMs.
reaperRebroadcastInterval time.Duration
// reaperRequeueBackoff is the shorter retry delay for stuck txs whose
// rebroadcast hit a transient failure (Teranode requeue / failed merkle
// registration). See ReaperRequeueBackoffMs.
reaperRequeueBackoff time.Duration
teranodeBatchCap int
broadcastWorkers int
maxParallelChunks int
holderID string
leaseTTL time.Duration

// broadcastJobs feeds the persistent worker pool that runs every
// per-endpoint POST /txs call. Replaces the previous per-broadcast
Expand Down Expand Up @@ -220,6 +227,20 @@ func New(cfg *config.Config, logger *zap.Logger, producer *kafka.Producer, publi
if reaperBatch <= 0 {
reaperBatch = 500
}
reaperRebroadcastInterval := time.Duration(cfg.Propagation.ReaperRebroadcastIntervalMs) * time.Millisecond
if reaperRebroadcastInterval <= 0 {
reaperRebroadcastInterval = time.Hour
}
reaperRequeueBackoff := time.Duration(cfg.Propagation.ReaperRequeueBackoffMs) * time.Millisecond
if reaperRequeueBackoff <= 0 {
reaperRequeueBackoff = time.Minute
}
// A requeue backoff longer than the full rebroadcast interval is
// meaningless — clamp so the transient-failure path never waits longer
// than the steady-state refresh.
if reaperRequeueBackoff > reaperRebroadcastInterval {
reaperRequeueBackoff = reaperRebroadcastInterval
}
leaseTTL := time.Duration(cfg.Propagation.LeaseTTLMs) * time.Millisecond
if leaseTTL <= 0 {
leaseTTL = 3 * reaperInterval
Expand All @@ -245,30 +266,32 @@ func New(cfg *config.Config, logger *zap.Logger, producer *kafka.Producer, publi
maxParallelChunks = defaultMaxParallelChunks
}
p := &Propagator{
cfg: cfg,
logger: logger.Named("propagation"),
producer: producer,
publisher: publisher,
store: st,
leaser: leaser,
teranodeClient: tc,
merkleClient: mc,
maxPending: maxPending,
merkleConcurrency: merkleConcurrency,
reaperInterval: reaperInterval,
reaperBatchSize: reaperBatch,
teranodeBatchCap: teranodeBatchCap,
broadcastWorkers: broadcastWorkers,
maxParallelChunks: maxParallelChunks,
holderID: newHolderID(),
leaseTTL: leaseTTL,
broadcastJobs: make(chan broadcastJob, broadcastJobBuffer),
processBatchSem: make(chan struct{}, maxConcurrentBatches),
admitCh: make(chan admitRequest, dispatcherChannelBuffer),
requeueCh: make(chan requeueRequest, dispatcherChannelBuffer),
terminalCh: make(chan terminalEvent, dispatcherChannelBuffer),
drainCh: make(chan drainRequest),
initDone: make(chan struct{}),
cfg: cfg,
logger: logger.Named("propagation"),
producer: producer,
publisher: publisher,
store: st,
leaser: leaser,
teranodeClient: tc,
merkleClient: mc,
maxPending: maxPending,
merkleConcurrency: merkleConcurrency,
reaperInterval: reaperInterval,
reaperBatchSize: reaperBatch,
reaperRebroadcastInterval: reaperRebroadcastInterval,
reaperRequeueBackoff: reaperRequeueBackoff,
teranodeBatchCap: teranodeBatchCap,
broadcastWorkers: broadcastWorkers,
maxParallelChunks: maxParallelChunks,
holderID: newHolderID(),
leaseTTL: leaseTTL,
broadcastJobs: make(chan broadcastJob, broadcastJobBuffer),
processBatchSem: make(chan struct{}, maxConcurrentBatches),
admitCh: make(chan admitRequest, dispatcherChannelBuffer),
requeueCh: make(chan requeueRequest, dispatcherChannelBuffer),
terminalCh: make(chan terminalEvent, dispatcherChannelBuffer),
drainCh: make(chan drainRequest),
initDone: make(chan struct{}),
}
// Start a dispatcher goroutine with a nil claim so tests that
// construct via New and drive via admitCh / drainCh have a running
Expand Down
78 changes: 78 additions & 0 deletions services/propagation/propagator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"sort"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -128,6 +129,15 @@ type mockStore struct {
// markErr forces MarkMerkleRegisteredByTxIDs to return this error.
// Used by tests that verify a mark failure doesn't block broadcast.
markErr error
// rebroadcastMarks records every MarkRebroadcastByTxIDs call as one slice
// per call. Lets reaper tests assert which txids were stamped on attempt.
rebroadcastMarks [][]string
// rebroadcastMarkTs records the ts arg of each MarkRebroadcastByTxIDs call,
// index-aligned with rebroadcastMarks. Lets tests distinguish the
// full-interval stamp (≈now) from the backdated short-backoff stamp.
rebroadcastMarkTs []time.Time
// reapErr forces GetReapCandidates to return this error.
reapErr error
// returningPrev, when non-nil, overrides the synthetic previous-status
// row BatchUpdateStatusReturning hands back for each input. The default
// (RECEIVED prev) always reads as a real transition; this hook lets
Expand Down Expand Up @@ -300,6 +310,74 @@ func (m *mockStore) IterateStatusesSince(_ context.Context, since time.Time, fn
return nil
}

// GetReapCandidates mirrors the Postgres query over replayRows: SEEN_* rows
// with a timestamp in [since, seenDeadline), a non-empty RawTx, and a
// last_rebroadcast_at that is zero or older than rebroadcastBefore, ordered
// oldest-rebroadcast-first (zero/never sorts first) and capped at limit.
func (m *mockStore) GetReapCandidates(_ context.Context, since, seenDeadline, rebroadcastBefore time.Time, limit int) ([]*models.TransactionStatus, error) {
m.mu.Lock()
defer m.mu.Unlock()
if m.reapErr != nil {
return nil, m.reapErr
}
if limit <= 0 {
return nil, nil
}
var out []*models.TransactionStatus
for _, r := range m.replayRows {
if r.Status != models.StatusSeenOnNetwork && r.Status != models.StatusSeenMultipleNodes {
continue
}
if r.Timestamp.Before(since) || !r.Timestamp.Before(seenDeadline) {
continue
}
if len(r.RawTx) == 0 {
continue
}
if !r.LastRebroadcastAt.IsZero() && !r.LastRebroadcastAt.Before(rebroadcastBefore) {
continue
}
out = append(out, &models.TransactionStatus{TxID: r.TxID, RawTx: r.RawTx, LastRebroadcastAt: r.LastRebroadcastAt})
}
sort.SliceStable(out, func(i, j int) bool {
// zero (never rebroadcast) sorts first
zi, zj := out[i].LastRebroadcastAt.IsZero(), out[j].LastRebroadcastAt.IsZero()
if zi != zj {
return zi
}
return out[i].LastRebroadcastAt.Before(out[j].LastRebroadcastAt)
})
if len(out) > limit {
out = out[:limit]
}
// Strip the marker from the returned rows — the reaper only consumes
// TxID + RawTx, matching the real backends.
for _, r := range out {
r.LastRebroadcastAt = time.Time{}
}
return out, nil
}

// MarkRebroadcastByTxIDs records the call and stamps LastRebroadcastAt on the
// matching replayRows so a subsequent GetReapCandidates within the interval
// excludes them — mirroring the real backends.
func (m *mockStore) MarkRebroadcastByTxIDs(_ context.Context, txids []string, ts time.Time) error {
m.mu.Lock()
defer m.mu.Unlock()
m.rebroadcastMarks = append(m.rebroadcastMarks, append([]string(nil), txids...))
m.rebroadcastMarkTs = append(m.rebroadcastMarkTs, ts)
marked := make(map[string]struct{}, len(txids))
for _, t := range txids {
marked[t] = struct{}{}
}
for _, r := range m.replayRows {
if _, ok := marked[r.TxID]; ok {
r.LastRebroadcastAt = ts
}
}
return nil
}

func (m *mockStore) updateCount() int {
m.mu.Lock()
defer m.mu.Unlock()
Expand Down
Loading
Loading