From 4313b5f2b00343aab21ba39936ca8bd622c208cb Mon Sep 17 00:00:00 2001 From: owen-eth Date: Sat, 18 Apr 2026 15:36:37 -0400 Subject: [PATCH 1/3] fix: additional protection against double fuul submission --- tools/fastswap-miles/README.md | 21 +++++ tools/fastswap-miles/main.go | 33 +++++++- tools/fastswap-miles/main_test.go | 80 ++++++++++++++++++ tools/fastswap-miles/miles.go | 66 ++++++++++----- tools/fastswap-miles/sweep.go | 34 +++++++- tools/fastswap-miles/sweep_test.go | 127 +++++++++++++++++++++++++++++ 6 files changed, 340 insertions(+), 21 deletions(-) create mode 100644 tools/fastswap-miles/sweep_test.go diff --git a/tools/fastswap-miles/README.md b/tools/fastswap-miles/README.md index e383f3df5..116091765 100644 --- a/tools/fastswap-miles/README.md +++ b/tools/fastswap-miles/README.md @@ -174,3 +174,24 @@ go run ./tools/fastswap-miles/ \ - **Caught-up guard**: Miles are only processed after the indexer has caught up to the chain tip, avoiding excessive Barter API calls during historical backfill. - **Graceful shutdown**: Catches SIGINT/SIGTERM, finishes current batch, then exits. - **Idempotent**: Re-running from the same start block is safe — inserts use `INSERT INTO` with primary key dedup. +- **Fuel-submission idempotency** (three-layer defense built in response to the 2026-04-16 double-credit incident, in which an operator re-used the contract deployment block as `-start-block` on a pod restart, causing the indexer to re-walk all history): + 1. `insertEvent` checks tx_hash existence before INSERT, skipping re-inserts. The `fastswap_miles` table uses StarRocks `PRIMARY KEY(tx_hash)` — an unconditional INSERT upserts the row and wipes columns (including `processed`, `miles`, `fuel_submitted_at`) not specified in the INSERT. Without this check, any block rescan — whether from an explicit `-start-block` flag, a manual reset, or any other trigger — destroys already-processed rows and causes mass re-submission to Fuel. This is the primary fix: re-walking history is now idempotent. + 2. Each row carries a `fuel_submitted_at` timestamp set only when `submitToFuel` succeeds. The service skips re-submission for any row where it is non-null, even if `processed` is flipped back to false by any means. Backstop against future reset paths. + 3. `saveLastBlock` issues a single atomic INSERT (fastswap_miles_meta has PRIMARY KEY(k) so INSERT upserts). The prior DELETE-then-INSERT pattern could vanish the `last_block` row if the pod died between the two statements. Hardening rather than incident root cause, but worth fixing. + +### Schema requirement + +Before deploying, ensure the DB has the `fuel_submitted_at` column. The service's SELECT queries require it: + +```sql +ALTER TABLE mevcommit_57173.fastswap_miles + ADD COLUMN fuel_submitted_at DATETIME NULL; + +-- Backfill: mark every row that has already been submitted to Fuel so the +-- service's idempotency check will skip re-submission. +UPDATE mevcommit_57173.fastswap_miles +SET fuel_submitted_at = CURRENT_TIMESTAMP +WHERE miles > 0 AND fuel_submitted_at IS NULL; +``` + +The UPDATE is idempotent — safe to run multiple times (e.g. right before and after code deploy to catch rows processed in the gap window). diff --git a/tools/fastswap-miles/main.go b/tools/fastswap-miles/main.go index ed1e511a4..28b91d966 100644 --- a/tools/fastswap-miles/main.go +++ b/tools/fastswap-miles/main.go @@ -362,14 +362,29 @@ func loadLastBlock(db *sql.DB) uint64 { return v } +// saveLastBlock persists the indexer's progress marker. The prior +// DELETE-then-INSERT implementation was not atomic: a pod crash, SIGTERM +// during rolling deploy, or transient DB error between the two statements +// could vanish the `last_block` row. On the next startup loadLastBlock would +// return 0, startBlock would fall back to the contract deployment block, +// and the indexer would re-walk all history — re-inserting every event and +// (before the insertEvent existence guard landed) wiping processed=true on +// every row, causing mass re-submission to Fuel. This was the underlying +// trigger for the 2026-04-16 double-credit incident. +// +// fastswap_miles_meta has PRIMARY KEY(k), so a plain INSERT is an atomic +// upsert under StarRocks PK semantics. The DELETE is unnecessary and unsafe. func saveLastBlock(db *sql.DB, block uint64) { - _, _ = db.Exec(`DELETE FROM mevcommit_57173.fastswap_miles_meta WHERE k = 'last_block'`) _, err := db.Exec(`INSERT INTO mevcommit_57173.fastswap_miles_meta (k, v) VALUES ('last_block', ?)`, fmt.Sprintf("%d", block)) if err != nil { log.Printf("saveLastBlock: %v", err) } } +// markProcessed sets processed=true and populates the derived columns. It +// intentionally does NOT touch fuel_submitted_at so that re-runs of the +// pipeline (e.g. a row flipped back to processed=false by a reset SQL) can +// rebuild the derived state without appearing to re-credit Fuel. func markProcessed(db *sql.DB, txHash string, surplusEth, netProfitEth float64, miles int64, bidCost string) { _, err := db.Exec(` UPDATE mevcommit_57173.fastswap_miles @@ -381,6 +396,22 @@ WHERE tx_hash = ?`, } } +// markProcessedWithFuelSubmission is called only when submitToFuel has just +// succeeded for this row. It sets fuel_submitted_at so future pipeline runs +// (even after a reset of `processed`) skip the submitToFuel call and don't +// double-credit the user. +func markProcessedWithFuelSubmission(db *sql.DB, txHash string, surplusEth, netProfitEth float64, miles int64, bidCost string) { + _, err := db.Exec(` +UPDATE mevcommit_57173.fastswap_miles +SET surplus_eth = ?, net_profit_eth = ?, miles = ?, bid_cost = ?, processed = true, + fuel_submitted_at = CURRENT_TIMESTAMP +WHERE tx_hash = ?`, + surplusEth, netProfitEth, miles, bidCost, txHash) + if err != nil { + log.Printf("markProcessedWithFuelSubmission %s: %v", txHash, err) + } +} + // -------------------- Barter API -------------------- type BarterResponse struct { diff --git a/tools/fastswap-miles/main_test.go b/tools/fastswap-miles/main_test.go index 3457a6e79..eab74f1b9 100644 --- a/tools/fastswap-miles/main_test.go +++ b/tools/fastswap-miles/main_test.go @@ -6,12 +6,92 @@ import ( "math/big" "net/http" "net/http/httptest" + "regexp" "testing" "time" + "github.com/DATA-DOG/go-sqlmock" "github.com/ethereum/go-ethereum/common" ) +// TestMarkProcessed_DoesNotTouchFuelSubmittedAt is the core idempotency +// contract: the plain markProcessed function (used by orphan, no-profit, +// sub-threshold, and already-submitted-retry paths) must never write to +// fuel_submitted_at. That column is what tells a future run "Fuel already +// has this row — don't re-submit." +func TestMarkProcessed_DoesNotTouchFuelSubmittedAt(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer func() { _ = db.Close() }() + + // Expect the UPDATE to omit fuel_submitted_at from its SET clause. + mock.ExpectExec(regexp.QuoteMeta( + "UPDATE mevcommit_57173.fastswap_miles\n"+ + "SET surplus_eth = ?, net_profit_eth = ?, miles = ?, bid_cost = ?, processed = true\n"+ + "WHERE tx_hash = ?", + )).WithArgs(0.01, 0.005, int64(0), "0", "0xdead"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + markProcessed(db, "0xdead", 0.01, 0.005, 0, "0") + + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("sqlmock expectations: %v", err) + } +} + +// TestSaveLastBlock_IsAtomicInsert verifies that saveLastBlock issues a +// single INSERT (which the fastswap_miles_meta PRIMARY KEY table upserts +// atomically) and NOT the old non-atomic DELETE-then-INSERT pattern. The +// old pattern could leave last_block vanished if the pod was killed between +// the two statements — on next startup the indexer would fall back to the +// deployment block and re-scan all history, which combined with the +// insertEvent upsert bug caused the 2026-04-16 double-credit incident. +func TestSaveLastBlock_IsAtomicInsert(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer func() { _ = db.Close() }() + + // Exactly one INSERT expected, no DELETE. + mock.ExpectExec(regexp.QuoteMeta( + "INSERT INTO mevcommit_57173.fastswap_miles_meta (k, v) VALUES ('last_block', ?)", + )).WithArgs("12345").WillReturnResult(sqlmock.NewResult(0, 1)) + + saveLastBlock(db, 12345) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("sqlmock expectations: %v", err) + } +} + +// TestMarkProcessedWithFuelSubmission_SetsFuelSubmittedAt verifies that the +// success-path mark function sets fuel_submitted_at = CURRENT_TIMESTAMP so +// future runs skip re-submission to Fuel. +func TestMarkProcessedWithFuelSubmission_SetsFuelSubmittedAt(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer func() { _ = db.Close() }() + + mock.ExpectExec(regexp.QuoteMeta( + "UPDATE mevcommit_57173.fastswap_miles\n"+ + "SET surplus_eth = ?, net_profit_eth = ?, miles = ?, bid_cost = ?, processed = true,\n"+ + " fuel_submitted_at = CURRENT_TIMESTAMP\n"+ + "WHERE tx_hash = ?", + )).WithArgs(0.01, 0.005, int64(7), "1000", "0xdead"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + markProcessedWithFuelSubmission(db, "0xdead", 0.01, 0.005, 7, "1000") + + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("sqlmock expectations: %v", err) + } +} + func TestDecideBidCheckOutcome(t *testing.T) { tests := []struct { name string diff --git a/tools/fastswap-miles/miles.go b/tools/fastswap-miles/miles.go index 19651d98e..1080c0629 100644 --- a/tools/fastswap-miles/miles.go +++ b/tools/fastswap-miles/miles.go @@ -91,22 +91,24 @@ type serviceConfig struct { } type ethRow struct { - txHash string - user string - surplus string - gasCost sql.NullString - inputToken string - blockTS sql.NullTime + txHash string + user string + surplus string + gasCost sql.NullString + inputToken string + blockTS sql.NullTime + fuelSubmittedAt sql.NullTime } type erc20Row struct { - txHash string - user string - token string - surplus string - gasCost sql.NullString - inputToken string - blockTS sql.NullTime + txHash string + user string + token string + surplus string + gasCost sql.NullString + inputToken string + blockTS sql.NullTime + fuelSubmittedAt sql.NullTime } type tokenBatch struct { @@ -119,7 +121,7 @@ type tokenBatch struct { func processMiles(ctx context.Context, cfg *serviceConfig) (int, error) { rows, err := cfg.DB.QueryContext(ctx, ` -SELECT tx_hash, user_address, surplus, gas_cost, input_token, block_timestamp +SELECT tx_hash, user_address, surplus, gas_cost, input_token, block_timestamp, fuel_submitted_at FROM mevcommit_57173.fastswap_miles WHERE processed = false AND swap_type = 'eth_weth' @@ -133,7 +135,7 @@ WHERE processed = false var pending []ethRow for rows.Next() { var r ethRow - if err := rows.Scan(&r.txHash, &r.user, &r.surplus, &r.gasCost, &r.inputToken, &r.blockTS); err != nil { + if err := rows.Scan(&r.txHash, &r.user, &r.surplus, &r.gasCost, &r.inputToken, &r.blockTS, &r.fuelSubmittedAt); err != nil { return 0, err } pending = append(pending, r) @@ -243,6 +245,20 @@ WHERE processed = false continue } + // Idempotency guard: if this row was already submitted to Fuel on a + // prior run (even if `processed` got flipped back to false by a reset + // SQL), do NOT re-submit — just rebuild the derived columns. This is + // the durable guarantee against double-crediting users. + if r.fuelSubmittedAt.Valid { + cfg.Logger.Info("tx already submitted to Fuel previously, skipping re-submission", + slog.String("tx", r.txHash), slog.String("user", r.user), + slog.Time("fuel_submitted_at", r.fuelSubmittedAt.Time), + slog.Int64("miles", miles.Int64())) + markProcessed(cfg.DB, r.txHash, surplusEth, netProfitEth, miles.Int64(), bidCostWei.String()) + processed++ + continue + } + err := submitToFuel(ctx, cfg.HTTPClient, cfg.FuelURL, cfg.FuelKey, common.HexToAddress(r.user), common.HexToHash(r.txHash), @@ -253,7 +269,7 @@ WHERE processed = false continue } - markProcessed(cfg.DB, r.txHash, surplusEth, netProfitEth, miles.Int64(), bidCostWei.String()) + markProcessedWithFuelSubmission(cfg.DB, r.txHash, surplusEth, netProfitEth, miles.Int64(), bidCostWei.String()) processed++ cfg.Logger.Info("awarded miles", slog.Int64("miles", miles.Int64()), slog.String("user", r.user), slog.String("tx", r.txHash)) @@ -268,7 +284,7 @@ func processERC20Miles(ctx context.Context, cfg *serviceConfig) (int, error) { processed := 0 rows, err := cfg.DB.QueryContext(ctx, ` -SELECT tx_hash, user_address, output_token, surplus, gas_cost, input_token, block_timestamp +SELECT tx_hash, user_address, output_token, surplus, gas_cost, input_token, block_timestamp, fuel_submitted_at FROM mevcommit_57173.fastswap_miles WHERE processed = false AND swap_type = 'erc20' @@ -282,7 +298,7 @@ WHERE processed = false var pending []erc20Row for rows.Next() { var r erc20Row - if err := rows.Scan(&r.txHash, &r.user, &r.token, &r.surplus, &r.gasCost, &r.inputToken, &r.blockTS); err != nil { + if err := rows.Scan(&r.txHash, &r.user, &r.token, &r.surplus, &r.gasCost, &r.inputToken, &r.blockTS, &r.fuelSubmittedAt); err != nil { return processed, err } pending = append(pending, r) @@ -513,6 +529,18 @@ WHERE processed = false continue } + // Idempotency guard: if this row was already submitted to Fuel on + // a prior run, do NOT re-submit — just rebuild the derived columns. + if r.fuelSubmittedAt.Valid { + cfg.Logger.Info("erc20 tx already submitted to Fuel previously, skipping re-submission", + slog.String("tx", r.txHash), slog.String("user", r.user), + slog.Time("fuel_submitted_at", r.fuelSubmittedAt.Time), + slog.Int64("miles", miles.Int64())) + markProcessed(cfg.DB, r.txHash, surplusEth, netProfitEth, miles.Int64(), readyBidCosts[i].String()) + processed++ + continue + } + err := submitToFuel(ctx, cfg.HTTPClient, cfg.FuelURL, cfg.FuelKey, common.HexToAddress(r.user), common.HexToHash(r.txHash), @@ -523,7 +551,7 @@ WHERE processed = false slog.String("tx", r.txHash), slog.Any("error", err)) continue // don't mark processed — retry next cycle } - markProcessed(cfg.DB, r.txHash, surplusEth, netProfitEth, miles.Int64(), readyBidCosts[i].String()) + markProcessedWithFuelSubmission(cfg.DB, r.txHash, surplusEth, netProfitEth, miles.Int64(), readyBidCosts[i].String()) processed++ } } diff --git a/tools/fastswap-miles/sweep.go b/tools/fastswap-miles/sweep.go index be14a48a6..0a01f6135 100644 --- a/tools/fastswap-miles/sweep.go +++ b/tools/fastswap-miles/sweep.go @@ -131,6 +131,38 @@ func insertEvent( gasCost *big.Int, swapType string, ) error { + // CRITICAL: the fastswap_miles table uses StarRocks `PRIMARY KEY(tx_hash)` + // model, which means an unconditional INSERT UPSERTS the entire row and + // resets every column we don't specify (processed → false, miles → NULL, + // surplus_eth → NULL, bid_cost → NULL, fuel_submitted_at → NULL, …). + // + // If the indexer rescans a block (pod restart with last_block reset, an + // explicit -start-block flag going backward, meta row lost during a + // deploy, etc.) an unconditional INSERT would clobber already-processed + // rows back to the pending state — and the miles pipeline would then + // re-submit each one to Fuel, double-crediting users. + // + // The 2026-04-16 double-credit incident was caused by exactly this: the + // Docker-support deploy restarted the pod, the indexer re-walked historical + // blocks, and every re-inserted event wiped `processed` on the existing + // row. 78 events ended up re-submitted to Fuel for a single test user + // alone; the protocol-wide overcount was much larger. + // + // Fix: check for existence before inserting. The IntentExecuted events + // are immutable once L1-finalized, so a row already in the table is + // authoritative — we must never replace it. + var exists bool + err := db.QueryRow( + `SELECT EXISTS(SELECT 1 FROM mevcommit_57173.fastswap_miles WHERE tx_hash = ?)`, + txHash, + ).Scan(&exists) + if err != nil { + return fmt.Errorf("check existing row: %w", err) + } + if exists { + return nil + } + var tsVal interface{} = nil if blockTS != nil { tsVal = *blockTS @@ -140,7 +172,7 @@ func insertEvent( gcStr = gasCost.String() } - _, err := db.Exec(` + _, err = db.Exec(` INSERT INTO mevcommit_57173.fastswap_miles ( tx_hash, block_number, block_timestamp, user_address, input_token, output_token, input_amount, user_amt_out, diff --git a/tools/fastswap-miles/sweep_test.go b/tools/fastswap-miles/sweep_test.go new file mode 100644 index 000000000..8b5826d0e --- /dev/null +++ b/tools/fastswap-miles/sweep_test.go @@ -0,0 +1,127 @@ +package main + +import ( + "math/big" + "regexp" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + fastsettlement "github.com/primev/mev-commit/contracts-abi/clients/FastSettlementV3" +) + +func newTestEvent() *fastsettlement.Fastsettlementv3IntentExecuted { + return &fastsettlement.Fastsettlementv3IntentExecuted{ + User: common.HexToAddress("0xabc"), + InputToken: common.HexToAddress("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"), // USDC + OutputToken: common.HexToAddress("0x0000000000000000000000000000000000000000"), // ETH + InputAmt: big.NewInt(100_000_000), + UserAmtOut: big.NewInt(50_000_000_000_000_000), + Surplus: big.NewInt(500_000_000_000_000), + Raw: types.Log{ + TxHash: common.HexToHash("0xdead"), + BlockNumber: 12345, + }, + } +} + +// TestInsertEvent_SkipsWhenRowExists verifies the critical idempotency +// guarantee: when a row with this tx_hash already exists in fastswap_miles, +// insertEvent must NOT execute an INSERT statement. This prevents the +// StarRocks PRIMARY KEY upsert from wiping the existing row's `processed` +// (and other derived) columns — the exact mechanism that caused the +// 2026-04-16 double-credit incident. +func TestInsertEvent_SkipsWhenRowExists(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer func() { _ = db.Close() }() + + txHash := "0xdead" + blockTS := time.Date(2026, 4, 17, 12, 0, 0, 0, time.UTC) + + // Expect the SELECT EXISTS check to fire and return true. + mock.ExpectQuery(regexp.QuoteMeta( + "SELECT EXISTS(SELECT 1 FROM mevcommit_57173.fastswap_miles WHERE tx_hash = ?)", + )).WithArgs(txHash).WillReturnRows( + sqlmock.NewRows([]string{"exists"}).AddRow(true), + ) + // Note: we do NOT call mock.ExpectExec for the INSERT — if insertEvent + // wrongly tries to INSERT, sqlmock will fail the test with an unexpected + // query error. + + if err := insertEvent(db, txHash, 12345, &blockTS, newTestEvent(), big.NewInt(1000), "eth_weth"); err != nil { + t.Fatalf("insertEvent returned error on existing row: %v", err) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("sqlmock expectations: %v", err) + } +} + +// TestInsertEvent_InsertsWhenRowDoesNotExist verifies that insertEvent still +// inserts fresh rows. The idempotency check must not break the base case. +func TestInsertEvent_InsertsWhenRowDoesNotExist(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer func() { _ = db.Close() }() + + txHash := "0xdead" + blockTS := time.Date(2026, 4, 17, 12, 0, 0, 0, time.UTC) + + mock.ExpectQuery(regexp.QuoteMeta( + "SELECT EXISTS(SELECT 1 FROM mevcommit_57173.fastswap_miles WHERE tx_hash = ?)", + )).WithArgs(txHash).WillReturnRows( + sqlmock.NewRows([]string{"exists"}).AddRow(false), + ) + mock.ExpectExec(regexp.QuoteMeta("INSERT INTO mevcommit_57173.fastswap_miles")). + WillReturnResult(sqlmock.NewResult(1, 1)) + + if err := insertEvent(db, txHash, 12345, &blockTS, newTestEvent(), big.NewInt(1000), "eth_weth"); err != nil { + t.Fatalf("insertEvent returned error on new row: %v", err) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("sqlmock expectations: %v", err) + } +} + +// TestInsertEvent_PropagatesExistenceCheckError verifies that a DB error on +// the SELECT EXISTS returns an error rather than falling through to INSERT — +// failing closed preserves the idempotency guarantee under DB trouble. +func TestInsertEvent_PropagatesExistenceCheckError(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer func() { _ = db.Close() }() + + txHash := "0xdead" + blockTS := time.Date(2026, 4, 17, 12, 0, 0, 0, time.UTC) + + mock.ExpectQuery(regexp.QuoteMeta( + "SELECT EXISTS(SELECT 1 FROM mevcommit_57173.fastswap_miles WHERE tx_hash = ?)", + )).WithArgs(txHash).WillReturnError(errForceTest) + // No INSERT expected. + + err = insertEvent(db, txHash, 12345, &blockTS, newTestEvent(), big.NewInt(1000), "eth_weth") + if err == nil { + t.Fatalf("expected error from existence check, got nil") + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("sqlmock expectations: %v", err) + } +} + +// errForceTest is a sentinel used to force an error path in tests. +var errForceTest = sqlmockErr("forced test error") + +type sqlmockErr string + +func (e sqlmockErr) Error() string { return string(e) } From a3850158510f9a8339774966e53918680b6a47e1 Mon Sep 17 00:00:00 2001 From: owen-eth Date: Sat, 18 Apr 2026 15:47:42 -0400 Subject: [PATCH 2/3] exclude already-submitted rows from sweep batching and backfill (not replace) existing rows on rescan --- tools/fastswap-miles/main.go | 14 ++++++++++ tools/fastswap-miles/main_test.go | 25 +++++++++++++++++ tools/fastswap-miles/miles.go | 32 ++++++++++++++-------- tools/fastswap-miles/sweep.go | 44 ++++++++++++++++++++++-------- tools/fastswap-miles/sweep_test.go | 28 +++++++++++-------- 5 files changed, 109 insertions(+), 34 deletions(-) diff --git a/tools/fastswap-miles/main.go b/tools/fastswap-miles/main.go index 28b91d966..6788ef919 100644 --- a/tools/fastswap-miles/main.go +++ b/tools/fastswap-miles/main.go @@ -412,6 +412,20 @@ WHERE tx_hash = ?`, } } +// markProcessedFlagOnly flips only the `processed` column to true and touches +// nothing else. It is used when a row was previously submitted to Fuel (its +// fuel_submitted_at marker is set) but is currently back in the pending queue +// because something reset processed=false. We don't want to recompute miles +// (we can't — the ERC20 path's derived values come from a sweep that already +// happened), and we mustn't re-submit to Fuel. Just stop the row from +// being re-picked-up next cycle. +func markProcessedFlagOnly(db *sql.DB, txHash string) { + _, err := db.Exec(`UPDATE mevcommit_57173.fastswap_miles SET processed = true WHERE tx_hash = ?`, txHash) + if err != nil { + log.Printf("markProcessedFlagOnly %s: %v", txHash, err) + } +} + // -------------------- Barter API -------------------- type BarterResponse struct { diff --git a/tools/fastswap-miles/main_test.go b/tools/fastswap-miles/main_test.go index eab74f1b9..fc7b993a5 100644 --- a/tools/fastswap-miles/main_test.go +++ b/tools/fastswap-miles/main_test.go @@ -41,6 +41,31 @@ func TestMarkProcessed_DoesNotTouchFuelSubmittedAt(t *testing.T) { } } +// TestMarkProcessedFlagOnly_UpdatesOnlyProcessedColumn verifies that +// markProcessedFlagOnly (used in the ERC20 path for rows whose surplus +// tokens were already swept on a prior run) issues an UPDATE that only +// mentions `processed = true` and leaves every other column untouched. +// Touching other columns here would overwrite the row's existing +// surplus_eth/net_profit_eth/miles/fuel_submitted_at with stale or empty +// values from the current cycle's context. +func TestMarkProcessedFlagOnly_UpdatesOnlyProcessedColumn(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer func() { _ = db.Close() }() + + mock.ExpectExec(regexp.QuoteMeta( + "UPDATE mevcommit_57173.fastswap_miles SET processed = true WHERE tx_hash = ?", + )).WithArgs("0xdead").WillReturnResult(sqlmock.NewResult(0, 1)) + + markProcessedFlagOnly(db, "0xdead") + + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("sqlmock expectations: %v", err) + } +} + // TestSaveLastBlock_IsAtomicInsert verifies that saveLastBlock issues a // single INSERT (which the fastswap_miles_meta PRIMARY KEY table upserts // atomically) and NOT the old non-atomic DELETE-then-INSERT pattern. The diff --git a/tools/fastswap-miles/miles.go b/tools/fastswap-miles/miles.go index 1080c0629..e2fd11e1e 100644 --- a/tools/fastswap-miles/miles.go +++ b/tools/fastswap-miles/miles.go @@ -347,6 +347,24 @@ WHERE processed = false var readyBidCosts []*big.Int for _, r := range batch.Txs { + // Already-submitted guard runs BEFORE batch aggregation. If a row's + // surplus tokens were already swept on a prior run (fuel_submitted_at + // is set), those tokens are no longer in the executor wallet. + // Including the row in readyTxs / readyTotalSum would make the new + // sweep quote for an amount the wallet can't supply, failing the + // batch or skewing pro-rata allocation for every other row. Skip + // entirely: mark processed=true and move on. + if r.fuelSubmittedAt.Valid { + cfg.Logger.Info("erc20 tx already submitted to Fuel previously, excluding from sweep batch", + slog.String("tx", r.txHash), slog.String("user", r.user), + slog.Time("fuel_submitted_at", r.fuelSubmittedAt.Time)) + if !cfg.DryRun { + markProcessedFlagOnly(cfg.DB, r.txHash) + } + processed++ + continue + } + userPaysGas := strings.EqualFold(r.inputToken, zeroAddr.Hex()) bidCostWei := getBidCost(erc20BidMap, r.txHash) if bidCostWei.Sign() == 0 { @@ -529,17 +547,9 @@ WHERE processed = false continue } - // Idempotency guard: if this row was already submitted to Fuel on - // a prior run, do NOT re-submit — just rebuild the derived columns. - if r.fuelSubmittedAt.Valid { - cfg.Logger.Info("erc20 tx already submitted to Fuel previously, skipping re-submission", - slog.String("tx", r.txHash), slog.String("user", r.user), - slog.Time("fuel_submitted_at", r.fuelSubmittedAt.Time), - slog.Int64("miles", miles.Int64())) - markProcessed(cfg.DB, r.txHash, surplusEth, netProfitEth, miles.Int64(), readyBidCosts[i].String()) - processed++ - continue - } + // Note: the fuel_submitted_at idempotency check runs upstream, + // before batch aggregation — rows with fuel_submitted_at set are + // never in readyTxs here. err := submitToFuel(ctx, cfg.HTTPClient, cfg.FuelURL, cfg.FuelKey, common.HexToAddress(r.user), diff --git a/tools/fastswap-miles/sweep.go b/tools/fastswap-miles/sweep.go index 0a01f6135..32ec819e8 100644 --- a/tools/fastswap-miles/sweep.go +++ b/tools/fastswap-miles/sweep.go @@ -148,9 +148,25 @@ func insertEvent( // row. 78 events ended up re-submitted to Fuel for a single test user // alone; the protocol-wide overcount was much larger. // - // Fix: check for existence before inserting. The IntentExecuted events - // are immutable once L1-finalized, so a row already in the table is - // authoritative — we must never replace it. + var tsVal interface{} = nil + if blockTS != nil { + tsVal = *blockTS + } + var gcStr interface{} = nil + if gasCost != nil { + gcStr = gasCost.String() + } + + // Fix: check for existence before inserting. The IntentExecuted event + // args themselves are immutable once L1-finalized, so the row's core + // fields (user, tokens, amounts, surplus) must never be replaced. + // + // For rows that already exist: run a COALESCE-only UPDATE that fills in + // gas_cost or block_timestamp IF they were previously NULL (which happens + // when indexBatch caught a transient receipt/header RPC failure on the + // first pass). This preserves every derived column (processed, miles, + // surplus_eth, net_profit_eth, bid_cost, fuel_submitted_at) — so a rescan + // can heal partial metadata without destroying pipeline state. var exists bool err := db.QueryRow( `SELECT EXISTS(SELECT 1 FROM mevcommit_57173.fastswap_miles WHERE tx_hash = ?)`, @@ -160,18 +176,22 @@ func insertEvent( return fmt.Errorf("check existing row: %w", err) } if exists { + // Backfill only the two fields that can legitimately arrive NULL from + // transient RPC failures. COALESCE(col, newVal) keeps the existing + // value if it's non-NULL, and substitutes newVal otherwise. If newVal + // is also NULL (we still don't have fresh data), the column is + // unchanged — no-op. + _, err := db.Exec(` +UPDATE mevcommit_57173.fastswap_miles +SET gas_cost = COALESCE(gas_cost, ?), + block_timestamp = COALESCE(block_timestamp, ?) +WHERE tx_hash = ?`, gcStr, tsVal, txHash) + if err != nil { + return fmt.Errorf("backfill null metadata: %w", err) + } return nil } - var tsVal interface{} = nil - if blockTS != nil { - tsVal = *blockTS - } - var gcStr interface{} = nil - if gasCost != nil { - gcStr = gasCost.String() - } - _, err = db.Exec(` INSERT INTO mevcommit_57173.fastswap_miles ( tx_hash, block_number, block_timestamp, user_address, diff --git a/tools/fastswap-miles/sweep_test.go b/tools/fastswap-miles/sweep_test.go index 8b5826d0e..be6d9ae07 100644 --- a/tools/fastswap-miles/sweep_test.go +++ b/tools/fastswap-miles/sweep_test.go @@ -27,13 +27,15 @@ func newTestEvent() *fastsettlement.Fastsettlementv3IntentExecuted { } } -// TestInsertEvent_SkipsWhenRowExists verifies the critical idempotency -// guarantee: when a row with this tx_hash already exists in fastswap_miles, -// insertEvent must NOT execute an INSERT statement. This prevents the -// StarRocks PRIMARY KEY upsert from wiping the existing row's `processed` -// (and other derived) columns — the exact mechanism that caused the -// 2026-04-16 double-credit incident. -func TestInsertEvent_SkipsWhenRowExists(t *testing.T) { +// TestInsertEvent_BackfillsNullMetadataOnExistingRow verifies the critical +// idempotency guarantee: when a row with this tx_hash already exists in +// fastswap_miles, insertEvent must NOT issue an INSERT (which would UPSERT +// under StarRocks PK semantics, wiping derived columns and causing the +// 2026-04-16 double-credit incident). Instead it issues a COALESCE-only +// UPDATE that fills in NULL gas_cost or block_timestamp from a later rescan +// while preserving every derived column (processed, miles, surplus_eth, +// net_profit_eth, bid_cost, fuel_submitted_at). +func TestInsertEvent_BackfillsNullMetadataOnExistingRow(t *testing.T) { db, mock, err := sqlmock.New() if err != nil { t.Fatalf("sqlmock.New: %v", err) @@ -43,15 +45,19 @@ func TestInsertEvent_SkipsWhenRowExists(t *testing.T) { txHash := "0xdead" blockTS := time.Date(2026, 4, 17, 12, 0, 0, 0, time.UTC) - // Expect the SELECT EXISTS check to fire and return true. mock.ExpectQuery(regexp.QuoteMeta( "SELECT EXISTS(SELECT 1 FROM mevcommit_57173.fastswap_miles WHERE tx_hash = ?)", )).WithArgs(txHash).WillReturnRows( sqlmock.NewRows([]string{"exists"}).AddRow(true), ) - // Note: we do NOT call mock.ExpectExec for the INSERT — if insertEvent - // wrongly tries to INSERT, sqlmock will fail the test with an unexpected - // query error. + // COALESCE UPDATE that only touches the two fields that can legitimately + // be NULL from a prior RPC-failed insert. No INSERT must fire. + mock.ExpectExec(regexp.QuoteMeta( + "UPDATE mevcommit_57173.fastswap_miles\n"+ + "SET gas_cost = COALESCE(gas_cost, ?),\n"+ + " block_timestamp = COALESCE(block_timestamp, ?)\n"+ + "WHERE tx_hash = ?", + )).WithArgs("1000", blockTS, txHash).WillReturnResult(sqlmock.NewResult(0, 1)) if err := insertEvent(db, txHash, 12345, &blockTS, newTestEvent(), big.NewInt(1000), "eth_weth"); err != nil { t.Fatalf("insertEvent returned error on existing row: %v", err) From 8d3662511d5956a9b273bb3fdf86ce71b289eefa Mon Sep 17 00:00:00 2001 From: owen-eth Date: Tue, 28 Apr 2026 17:50:59 -0400 Subject: [PATCH 3/3] guardrail changes --- tools/fastswap-miles/README.md | 21 +------- tools/fastswap-miles/main.go | 43 ++++++--------- tools/fastswap-miles/main_test.go | 60 +++++++-------------- tools/fastswap-miles/miles.go | 84 ++++++++++++++++-------------- tools/fastswap-miles/sweep.go | 6 +-- tools/fastswap-miles/sweep_test.go | 2 +- 6 files changed, 86 insertions(+), 130 deletions(-) diff --git a/tools/fastswap-miles/README.md b/tools/fastswap-miles/README.md index 116091765..df5d243d5 100644 --- a/tools/fastswap-miles/README.md +++ b/tools/fastswap-miles/README.md @@ -175,23 +175,6 @@ go run ./tools/fastswap-miles/ \ - **Graceful shutdown**: Catches SIGINT/SIGTERM, finishes current batch, then exits. - **Idempotent**: Re-running from the same start block is safe — inserts use `INSERT INTO` with primary key dedup. - **Fuel-submission idempotency** (three-layer defense built in response to the 2026-04-16 double-credit incident, in which an operator re-used the contract deployment block as `-start-block` on a pod restart, causing the indexer to re-walk all history): - 1. `insertEvent` checks tx_hash existence before INSERT, skipping re-inserts. The `fastswap_miles` table uses StarRocks `PRIMARY KEY(tx_hash)` — an unconditional INSERT upserts the row and wipes columns (including `processed`, `miles`, `fuel_submitted_at`) not specified in the INSERT. Without this check, any block rescan — whether from an explicit `-start-block` flag, a manual reset, or any other trigger — destroys already-processed rows and causes mass re-submission to Fuel. This is the primary fix: re-walking history is now idempotent. - 2. Each row carries a `fuel_submitted_at` timestamp set only when `submitToFuel` succeeds. The service skips re-submission for any row where it is non-null, even if `processed` is flipped back to false by any means. Backstop against future reset paths. + 1. `insertEvent` checks tx_hash existence before INSERT, skipping re-inserts. The `fastswap_miles` table uses StarRocks `PRIMARY KEY(tx_hash)` — an unconditional INSERT upserts the row and wipes every column not specified (including `processed` and `miles`). Without this check, any block rescan — whether from an explicit `-start-block` flag, a manual reset, or any other trigger — destroys already-processed rows and causes mass re-submission to Fuel. This is the primary fix: re-walking history is now idempotent. + 2. The miles processing loops (`processMiles`, `processERC20Miles`) read the `miles` column on every pending row and skip the `submitToFuel` call when it is non-null. `miles` is only ever written after a row's outcome is settled (Fuel submitted, or a terminal no-credit path), so a non-null value means we must not submit again — even if `processed` was flipped back to false by a manual SQL reset. No new column is required; the existing `miles` column doubles as the marker. 3. `saveLastBlock` issues a single atomic INSERT (fastswap_miles_meta has PRIMARY KEY(k) so INSERT upserts). The prior DELETE-then-INSERT pattern could vanish the `last_block` row if the pod died between the two statements. Hardening rather than incident root cause, but worth fixing. - -### Schema requirement - -Before deploying, ensure the DB has the `fuel_submitted_at` column. The service's SELECT queries require it: - -```sql -ALTER TABLE mevcommit_57173.fastswap_miles - ADD COLUMN fuel_submitted_at DATETIME NULL; - --- Backfill: mark every row that has already been submitted to Fuel so the --- service's idempotency check will skip re-submission. -UPDATE mevcommit_57173.fastswap_miles -SET fuel_submitted_at = CURRENT_TIMESTAMP -WHERE miles > 0 AND fuel_submitted_at IS NULL; -``` - -The UPDATE is idempotent — safe to run multiple times (e.g. right before and after code deploy to catch rows processed in the gap window). diff --git a/tools/fastswap-miles/main.go b/tools/fastswap-miles/main.go index 6788ef919..0a9d15e57 100644 --- a/tools/fastswap-miles/main.go +++ b/tools/fastswap-miles/main.go @@ -381,10 +381,10 @@ func saveLastBlock(db *sql.DB, block uint64) { } } -// markProcessed sets processed=true and populates the derived columns. It -// intentionally does NOT touch fuel_submitted_at so that re-runs of the -// pipeline (e.g. a row flipped back to processed=false by a reset SQL) can -// rebuild the derived state without appearing to re-credit Fuel. +// markProcessed sets processed=true and populates the derived columns. The +// `miles` column doubles as our submitted-to-Fuel marker — once it's non-null, +// the pipeline must never re-submit (idempotency check lives in processMiles / +// processERC20Miles). func markProcessed(db *sql.DB, txHash string, surplusEth, netProfitEth float64, miles int64, bidCost string) { _, err := db.Exec(` UPDATE mevcommit_57173.fastswap_miles @@ -396,29 +396,12 @@ WHERE tx_hash = ?`, } } -// markProcessedWithFuelSubmission is called only when submitToFuel has just -// succeeded for this row. It sets fuel_submitted_at so future pipeline runs -// (even after a reset of `processed`) skip the submitToFuel call and don't -// double-credit the user. -func markProcessedWithFuelSubmission(db *sql.DB, txHash string, surplusEth, netProfitEth float64, miles int64, bidCost string) { - _, err := db.Exec(` -UPDATE mevcommit_57173.fastswap_miles -SET surplus_eth = ?, net_profit_eth = ?, miles = ?, bid_cost = ?, processed = true, - fuel_submitted_at = CURRENT_TIMESTAMP -WHERE tx_hash = ?`, - surplusEth, netProfitEth, miles, bidCost, txHash) - if err != nil { - log.Printf("markProcessedWithFuelSubmission %s: %v", txHash, err) - } -} - // markProcessedFlagOnly flips only the `processed` column to true and touches -// nothing else. It is used when a row was previously submitted to Fuel (its -// fuel_submitted_at marker is set) but is currently back in the pending queue -// because something reset processed=false. We don't want to recompute miles -// (we can't — the ERC20 path's derived values come from a sweep that already -// happened), and we mustn't re-submit to Fuel. Just stop the row from -// being re-picked-up next cycle. +// nothing else. It's used by the ERC20 idempotency path: when miles is already +// recorded but processed got reset to false, we must NOT recompute the derived +// columns (surplus_eth/net_profit_eth/bid_cost depend on the sweep result, +// which we can't reproduce without re-sweeping). Just take the row out of the +// pending queue. func markProcessedFlagOnly(db *sql.DB, txHash string) { _, err := db.Exec(`UPDATE mevcommit_57173.fastswap_miles SET processed = true WHERE tx_hash = ?`, txHash) if err != nil { @@ -504,12 +487,18 @@ func submitToFuel( txHash common.Hash, miles *big.Int, ) error { + // dedup_id makes the submission idempotent on Fuul's side: if we ever send + // the same transaction twice (e.g. our service crashes between this call + // succeeding and markProcessed running), Fuul drops the duplicate instead + // of re-crediting the user. Belt-and-suspenders alongside our own + // miles-non-null idempotency check. body := map[string]any{ "user": map[string]any{ "identifier_type": "evm_address", "identifier": user.Hex(), }, - "name": "fast-swap-surplus", + "name": "fast-swap-surplus", + "dedup_id": txHash.Hex(), "args": map[string]any{ "value": map[string]any{ "amount": miles.String(), diff --git a/tools/fastswap-miles/main_test.go b/tools/fastswap-miles/main_test.go index fc7b993a5..98b06160d 100644 --- a/tools/fastswap-miles/main_test.go +++ b/tools/fastswap-miles/main_test.go @@ -14,27 +14,26 @@ import ( "github.com/ethereum/go-ethereum/common" ) -// TestMarkProcessed_DoesNotTouchFuelSubmittedAt is the core idempotency -// contract: the plain markProcessed function (used by orphan, no-profit, -// sub-threshold, and already-submitted-retry paths) must never write to -// fuel_submitted_at. That column is what tells a future run "Fuel already -// has this row — don't re-submit." -func TestMarkProcessed_DoesNotTouchFuelSubmittedAt(t *testing.T) { +// TestMarkProcessed_WritesAllDerivedColumns verifies that markProcessed +// updates every derived column (surplus_eth, net_profit_eth, miles, bid_cost) +// and flips processed=true. Writing miles is what arms the idempotency check: +// once miles is non-null, processMiles / processERC20Miles will skip the +// submitToFuel call on any subsequent run (even if processed gets reset). +func TestMarkProcessed_WritesAllDerivedColumns(t *testing.T) { db, mock, err := sqlmock.New() if err != nil { t.Fatalf("sqlmock.New: %v", err) } defer func() { _ = db.Close() }() - // Expect the UPDATE to omit fuel_submitted_at from its SET clause. mock.ExpectExec(regexp.QuoteMeta( "UPDATE mevcommit_57173.fastswap_miles\n"+ "SET surplus_eth = ?, net_profit_eth = ?, miles = ?, bid_cost = ?, processed = true\n"+ "WHERE tx_hash = ?", - )).WithArgs(0.01, 0.005, int64(0), "0", "0xdead"). + )).WithArgs(0.01, 0.005, int64(7), "1000", "0xdead"). WillReturnResult(sqlmock.NewResult(0, 1)) - markProcessed(db, "0xdead", 0.01, 0.005, 0, "0") + markProcessed(db, "0xdead", 0.01, 0.005, 7, "1000") if err := mock.ExpectationsWereMet(); err != nil { t.Fatalf("sqlmock expectations: %v", err) @@ -42,12 +41,12 @@ func TestMarkProcessed_DoesNotTouchFuelSubmittedAt(t *testing.T) { } // TestMarkProcessedFlagOnly_UpdatesOnlyProcessedColumn verifies that -// markProcessedFlagOnly (used in the ERC20 path for rows whose surplus -// tokens were already swept on a prior run) issues an UPDATE that only -// mentions `processed = true` and leaves every other column untouched. -// Touching other columns here would overwrite the row's existing -// surplus_eth/net_profit_eth/miles/fuel_submitted_at with stale or empty -// values from the current cycle's context. +// markProcessedFlagOnly (used in the ERC20 idempotency path when a row's +// miles are already recorded but processed got reset to false) issues an +// UPDATE that only mentions `processed = true` and leaves every other column +// untouched. Touching surplus_eth/net_profit_eth/bid_cost here would overwrite +// the values that were derived from the original sweep — values we cannot +// reproduce without re-sweeping. func TestMarkProcessedFlagOnly_UpdatesOnlyProcessedColumn(t *testing.T) { db, mock, err := sqlmock.New() if err != nil { @@ -92,31 +91,6 @@ func TestSaveLastBlock_IsAtomicInsert(t *testing.T) { } } -// TestMarkProcessedWithFuelSubmission_SetsFuelSubmittedAt verifies that the -// success-path mark function sets fuel_submitted_at = CURRENT_TIMESTAMP so -// future runs skip re-submission to Fuel. -func TestMarkProcessedWithFuelSubmission_SetsFuelSubmittedAt(t *testing.T) { - db, mock, err := sqlmock.New() - if err != nil { - t.Fatalf("sqlmock.New: %v", err) - } - defer func() { _ = db.Close() }() - - mock.ExpectExec(regexp.QuoteMeta( - "UPDATE mevcommit_57173.fastswap_miles\n"+ - "SET surplus_eth = ?, net_profit_eth = ?, miles = ?, bid_cost = ?, processed = true,\n"+ - " fuel_submitted_at = CURRENT_TIMESTAMP\n"+ - "WHERE tx_hash = ?", - )).WithArgs(0.01, 0.005, int64(7), "1000", "0xdead"). - WillReturnResult(sqlmock.NewResult(0, 1)) - - markProcessedWithFuelSubmission(db, "0xdead", 0.01, 0.005, 7, "1000") - - if err := mock.ExpectationsWereMet(); err != nil { - t.Fatalf("sqlmock expectations: %v", err) - } -} - func TestDecideBidCheckOutcome(t *testing.T) { tests := []struct { name string @@ -345,6 +319,12 @@ func TestSubmitToFuel(t *testing.T) { t.Errorf("expected name=fast-swap-surplus, got %v", req["name"]) } + // dedup_id must equal the tx hash so Fuul can drop duplicate submits. + txHashHex := common.HexToHash("0xabc").Hex() + if req["dedup_id"] != txHashHex { + t.Errorf("expected dedup_id=%s, got %v", txHashHex, req["dedup_id"]) + } + args := req["args"].(map[string]any) val := args["value"].(map[string]any) if val["amount"] != "150" { diff --git a/tools/fastswap-miles/miles.go b/tools/fastswap-miles/miles.go index e2fd11e1e..d63be0af1 100644 --- a/tools/fastswap-miles/miles.go +++ b/tools/fastswap-miles/miles.go @@ -91,24 +91,24 @@ type serviceConfig struct { } type ethRow struct { - txHash string - user string - surplus string - gasCost sql.NullString - inputToken string - blockTS sql.NullTime - fuelSubmittedAt sql.NullTime + txHash string + user string + surplus string + gasCost sql.NullString + inputToken string + blockTS sql.NullTime + miles sql.NullInt64 } type erc20Row struct { - txHash string - user string - token string - surplus string - gasCost sql.NullString - inputToken string - blockTS sql.NullTime - fuelSubmittedAt sql.NullTime + txHash string + user string + token string + surplus string + gasCost sql.NullString + inputToken string + blockTS sql.NullTime + miles sql.NullInt64 } type tokenBatch struct { @@ -121,7 +121,7 @@ type tokenBatch struct { func processMiles(ctx context.Context, cfg *serviceConfig) (int, error) { rows, err := cfg.DB.QueryContext(ctx, ` -SELECT tx_hash, user_address, surplus, gas_cost, input_token, block_timestamp, fuel_submitted_at +SELECT tx_hash, user_address, surplus, gas_cost, input_token, block_timestamp, miles FROM mevcommit_57173.fastswap_miles WHERE processed = false AND swap_type = 'eth_weth' @@ -135,7 +135,7 @@ WHERE processed = false var pending []ethRow for rows.Next() { var r ethRow - if err := rows.Scan(&r.txHash, &r.user, &r.surplus, &r.gasCost, &r.inputToken, &r.blockTS, &r.fuelSubmittedAt); err != nil { + if err := rows.Scan(&r.txHash, &r.user, &r.surplus, &r.gasCost, &r.inputToken, &r.blockTS, &r.miles); err != nil { return 0, err } pending = append(pending, r) @@ -245,16 +245,18 @@ WHERE processed = false continue } - // Idempotency guard: if this row was already submitted to Fuel on a - // prior run (even if `processed` got flipped back to false by a reset - // SQL), do NOT re-submit — just rebuild the derived columns. This is - // the durable guarantee against double-crediting users. - if r.fuelSubmittedAt.Valid { - cfg.Logger.Info("tx already submitted to Fuel previously, skipping re-submission", + // Idempotency guard: if this row already has miles set (even if + // `processed` got flipped back to false by a reset SQL), do NOT + // re-submit to Fuel — just refresh the derived columns. miles is only + // ever written after a successful Fuel submission (or as 0 for the + // no-credit terminal paths), so a non-null value means we already + // settled this row's outcome. + if r.miles.Valid { + cfg.Logger.Info("tx already has miles recorded, skipping re-submission", slog.String("tx", r.txHash), slog.String("user", r.user), - slog.Time("fuel_submitted_at", r.fuelSubmittedAt.Time), - slog.Int64("miles", miles.Int64())) - markProcessed(cfg.DB, r.txHash, surplusEth, netProfitEth, miles.Int64(), bidCostWei.String()) + slog.Int64("recorded_miles", r.miles.Int64), + slog.Int64("recomputed_miles", miles.Int64())) + markProcessed(cfg.DB, r.txHash, surplusEth, netProfitEth, r.miles.Int64, bidCostWei.String()) processed++ continue } @@ -269,7 +271,7 @@ WHERE processed = false continue } - markProcessedWithFuelSubmission(cfg.DB, r.txHash, surplusEth, netProfitEth, miles.Int64(), bidCostWei.String()) + markProcessed(cfg.DB, r.txHash, surplusEth, netProfitEth, miles.Int64(), bidCostWei.String()) processed++ cfg.Logger.Info("awarded miles", slog.Int64("miles", miles.Int64()), slog.String("user", r.user), slog.String("tx", r.txHash)) @@ -284,7 +286,7 @@ func processERC20Miles(ctx context.Context, cfg *serviceConfig) (int, error) { processed := 0 rows, err := cfg.DB.QueryContext(ctx, ` -SELECT tx_hash, user_address, output_token, surplus, gas_cost, input_token, block_timestamp, fuel_submitted_at +SELECT tx_hash, user_address, output_token, surplus, gas_cost, input_token, block_timestamp, miles FROM mevcommit_57173.fastswap_miles WHERE processed = false AND swap_type = 'erc20' @@ -298,7 +300,7 @@ WHERE processed = false var pending []erc20Row for rows.Next() { var r erc20Row - if err := rows.Scan(&r.txHash, &r.user, &r.token, &r.surplus, &r.gasCost, &r.inputToken, &r.blockTS, &r.fuelSubmittedAt); err != nil { + if err := rows.Scan(&r.txHash, &r.user, &r.token, &r.surplus, &r.gasCost, &r.inputToken, &r.blockTS, &r.miles); err != nil { return processed, err } pending = append(pending, r) @@ -347,17 +349,19 @@ WHERE processed = false var readyBidCosts []*big.Int for _, r := range batch.Txs { - // Already-submitted guard runs BEFORE batch aggregation. If a row's - // surplus tokens were already swept on a prior run (fuel_submitted_at - // is set), those tokens are no longer in the executor wallet. + // Already-settled guard runs BEFORE batch aggregation. If a row's + // surplus tokens were already swept on a prior run (miles is + // non-null), those tokens are no longer in the executor wallet. // Including the row in readyTxs / readyTotalSum would make the new // sweep quote for an amount the wallet can't supply, failing the // batch or skewing pro-rata allocation for every other row. Skip - // entirely: mark processed=true and move on. - if r.fuelSubmittedAt.Valid { - cfg.Logger.Info("erc20 tx already submitted to Fuel previously, excluding from sweep batch", + // entirely: just re-flip processed=true and preserve every other + // column (surplus_eth/net_profit_eth/bid_cost depend on the actual + // sweep result, which we can't recompute without re-sweeping). + if r.miles.Valid { + cfg.Logger.Info("erc20 tx already has miles recorded, excluding from sweep batch", slog.String("tx", r.txHash), slog.String("user", r.user), - slog.Time("fuel_submitted_at", r.fuelSubmittedAt.Time)) + slog.Int64("recorded_miles", r.miles.Int64)) if !cfg.DryRun { markProcessedFlagOnly(cfg.DB, r.txHash) } @@ -547,9 +551,9 @@ WHERE processed = false continue } - // Note: the fuel_submitted_at idempotency check runs upstream, - // before batch aggregation — rows with fuel_submitted_at set are - // never in readyTxs here. + // Note: the miles-non-null idempotency check runs upstream, before + // batch aggregation — rows with miles already recorded are never + // in readyTxs here. err := submitToFuel(ctx, cfg.HTTPClient, cfg.FuelURL, cfg.FuelKey, common.HexToAddress(r.user), @@ -561,7 +565,7 @@ WHERE processed = false slog.String("tx", r.txHash), slog.Any("error", err)) continue // don't mark processed — retry next cycle } - markProcessedWithFuelSubmission(cfg.DB, r.txHash, surplusEth, netProfitEth, miles.Int64(), readyBidCosts[i].String()) + markProcessed(cfg.DB, r.txHash, surplusEth, netProfitEth, miles.Int64(), readyBidCosts[i].String()) processed++ } } diff --git a/tools/fastswap-miles/sweep.go b/tools/fastswap-miles/sweep.go index 32ec819e8..6b126d316 100644 --- a/tools/fastswap-miles/sweep.go +++ b/tools/fastswap-miles/sweep.go @@ -134,7 +134,7 @@ func insertEvent( // CRITICAL: the fastswap_miles table uses StarRocks `PRIMARY KEY(tx_hash)` // model, which means an unconditional INSERT UPSERTS the entire row and // resets every column we don't specify (processed → false, miles → NULL, - // surplus_eth → NULL, bid_cost → NULL, fuel_submitted_at → NULL, …). + // surplus_eth → NULL, bid_cost → NULL, …). // // If the indexer rescans a block (pod restart with last_block reset, an // explicit -start-block flag going backward, meta row lost during a @@ -165,8 +165,8 @@ func insertEvent( // gas_cost or block_timestamp IF they were previously NULL (which happens // when indexBatch caught a transient receipt/header RPC failure on the // first pass). This preserves every derived column (processed, miles, - // surplus_eth, net_profit_eth, bid_cost, fuel_submitted_at) — so a rescan - // can heal partial metadata without destroying pipeline state. + // surplus_eth, net_profit_eth, bid_cost) — so a rescan can heal partial + // metadata without destroying pipeline state. var exists bool err := db.QueryRow( `SELECT EXISTS(SELECT 1 FROM mevcommit_57173.fastswap_miles WHERE tx_hash = ?)`, diff --git a/tools/fastswap-miles/sweep_test.go b/tools/fastswap-miles/sweep_test.go index be6d9ae07..a07eb3bf6 100644 --- a/tools/fastswap-miles/sweep_test.go +++ b/tools/fastswap-miles/sweep_test.go @@ -34,7 +34,7 @@ func newTestEvent() *fastsettlement.Fastsettlementv3IntentExecuted { // 2026-04-16 double-credit incident). Instead it issues a COALESCE-only // UPDATE that fills in NULL gas_cost or block_timestamp from a later rescan // while preserving every derived column (processed, miles, surplus_eth, -// net_profit_eth, bid_cost, fuel_submitted_at). +// net_profit_eth, bid_cost). func TestInsertEvent_BackfillsNullMetadataOnExistingRow(t *testing.T) { db, mock, err := sqlmock.New() if err != nil {