diff --git a/tools/l1-tx-volume-indexer/README.md b/tools/l1-tx-volume-indexer/README.md index 9c846aba8..ad8ead77d 100644 --- a/tools/l1-tx-volume-indexer/README.md +++ b/tools/l1-tx-volume-indexer/README.md @@ -29,14 +29,23 @@ On-chain enrichment: - Covalent `transaction_v2` endpoint (ETH mainnet). ## Tombstone behavior for missing transactions (Covalent 404) -If Covalent returns HTTP 404 with a "Transaction hash ... not found" message: -- The tool can insert a placeholder row into `processed_l1_txns_v2` with: - - `l1_tx_hash = ` (no 0x) - - `primary_class = 'not_found'` - - other computed fields left NULL -- This prevents repeated retries of transactions that never land on-chain. - -Age gate (anti-false-tombstone): +Tombstoning uses a two-phase lifecycle to distinguish transient Covalent outages +from transactions that genuinely never landed on-chain: + +1. **First 404** → `not_found_retry` (retryable; rediscovered on next run) +2. **Second 404** (on a subsequent run) → `not_found` (permanent tombstone) + +This ensures a tx must fail across at least two separate runs before being +permanently excluded. + +### Rate guard +Only *new* 404s (first-time failures) count toward the error rate. If the new-404 +rate exceeds 10% of attempted transactions in a run, all new tombstones are +suppressed — this signals a Covalent outage rather than genuinely missing txns. +Repeat 404s (already `not_found_retry`) are upgraded to `not_found` +unconditionally since they've failed across 2+ runs. + +### Age gate - Tombstone insertion is gated on block age using `mctransactions_sr.block_number`. - The tool queries StarRocks to get: - `head_block = MAX(block_number)` over `mctransactions_sr` for `confirmed/pre-confirmed` @@ -80,6 +89,12 @@ Only updates (skip discovering/inserting missing txs): - `-only-old-lending`: restrict updates to rows where existing `is_lending=1` - `-compare-only-old-swapvol-gt0`: dry-run comparison filter for swap volume discrepancies +## Tombstone rate guard +If the Covalent 404 rate exceeds 10% of attempted transactions in a single run, +**all tombstones are suppressed** for that run. This prevents mass false-tombstoning +during Covalent outages where 404s are returned for valid on-chain transactions. +Suppressed transactions are retried naturally on the next scheduled run. + ## Notes - `processed_l1_txns_v2.l1_tx_hash` is stored as `hash_norm` (no `0x` prefix). - `loadExistingNeedingUpdate` excludes rows where `primary_class='not_found'` to avoid reprocessing tombstones. diff --git a/tools/l1-tx-volume-indexer/main.go b/tools/l1-tx-volume-indexer/main.go index 59cf2a66a..03357f45c 100644 --- a/tools/l1-tx-volume-indexer/main.go +++ b/tools/l1-tx-volume-indexer/main.go @@ -63,6 +63,7 @@ const ( ) var httpClient = &http.Client{Timeout: 180 * time.Second} +var covalentFetchTimeout = 10 * time.Second // In-memory cache: date string "YYYY-MM-DD" -> ETH/USD price. // Avoids repeat Covalent API calls for transactions on the same day. @@ -240,9 +241,12 @@ func main() { false, "only update/compare rows where the existing DB row has is_lending=1 (ignores incompleteness filter)", ) + fetchTimeout = flag.Int("fetch-timeout", 10, "Covalent API timeout in seconds per transaction") ) flag.Parse() + covalentFetchTimeout = time.Duration(*fetchTimeout) * time.Second + apiKey := os.Getenv("COVALENT_KEY") if apiKey == "" { log.Fatal("COVALENT_KEY is required") @@ -339,7 +343,36 @@ func main() { } // Real run: insert missing, update existing (fill-only unless recompute-all) + // + // Tombstone lifecycle (two-phase): + // 1st 404 → not_found_retry (retryable; rediscovered on next run) + // 2nd 404 → not_found (permanent tombstone; excluded from future runs) + // + // Rate guard: only NEW 404s (not repeats) count toward the rate. If the + // new-404 rate exceeds 10%, all new tombstones are suppressed for that run + // (likely a Covalent outage). Repeat 404s are upgraded unconditionally + // since they've already failed across 2+ runs. + const tombstoneMaxRate = 0.10 + + type pendingTombstone struct { + candidate Candidate + } + + // Load hashes that are already not_found_retry from a prior run. + notFoundPending, err := loadNotFoundPendingHashes(db) + if err != nil { + log.Fatalf("loadNotFoundPendingHashes: %v", err) + } + if len(notFoundPending) > 0 { + log.Printf("loaded %d not_found_retry hashes from prior runs", len(notFoundPending)) + } + var inserted, updated, computeErr int + var new404Count int // new 404s (first time, counts toward rate) + var repeat404Count int // repeat 404s (already pending, upgraded directly) + var processedCount int + var pendingTombstones []pendingTombstone + for idx, k := range keys { if idx%100 == 0 { log.Printf("progress %d/%d", idx, len(keys)) @@ -351,31 +384,55 @@ func main() { computeErr++ log.Printf("compute error %s: %v", w.Hash0x, err) - // If we get a"tx not found" (Covalent 404), insert a tombstone (not_found row)so we don't retry forever. if w.InsertCandidate != nil && !*onlyUpdates && isCovalentTxNotFound(err) { + hashNorm := strings.ToLower(strings.TrimSpace(strip0x(w.InsertCandidate.HashNorm))) + + // Repeat 404: already failed on a prior run. Upgrade to + // permanent tombstone unconditionally. + if _, isPending := notFoundPending[hashNorm]; isPending { + repeat404Count++ + if upErr := upgradeToNotFound(db, hashNorm); upErr != nil { + log.Printf("upgrade not_found error %s: %v", w.Hash0x, upErr) + } else { + log.Printf("upgraded to not_found (repeat 404) %s", w.Hash0x) + } + continue + } + + // New 404: first time seeing this hash fail. + new404Count++ ok, why := shouldTombstoneNotFound(db, w.InsertCandidate.HashNorm, w.InsertCandidate.Source, w.InsertCandidate.CommitmentIndex, 15) if !ok { log.Printf("skip tombstone %s (%s)", w.Hash0x, why) continue } - - if insErr := insertV2NotFoundRow(db, *w.InsertCandidate); insErr != nil { - log.Printf("insert not_found error %s: %v", w.Hash0x, insErr) - } else { - inserted++ - } + pendingTombstones = append(pendingTombstones, pendingTombstone{candidate: *w.InsertCandidate}) } continue } + processedCount++ + // Insert missing row (if applicable) if w.InsertCandidate != nil && !*onlyUpdates { - err := insertV2Row(db, *w.InsertCandidate, comp) - if err != nil { - // if already exists due to race, fall through to update - log.Printf("insert error %s: %v", w.Hash0x, err) + hashNorm := strings.ToLower(strings.TrimSpace(strip0x(w.InsertCandidate.HashNorm))) + // If this hash was not_found_retry, overwrite it with full data + // via UPDATE instead of INSERT (avoids needing DELETE privileges). + if _, wasPending := notFoundPending[hashNorm]; wasPending { + fakeExisting := ExistingRow{HashNorm: hashNorm, Hash0x: w.Hash0x} + if upErr := updateV2Row(db, fakeExisting, comp, true); upErr != nil { + log.Printf("update not_found_retry error %s: %v", w.Hash0x, upErr) + } else { + inserted++ + } } else { - inserted++ + err := insertV2Row(db, *w.InsertCandidate, comp) + if err != nil { + // if already exists due to race, fall through to update + log.Printf("insert error %s: %v", w.Hash0x, err) + } else { + inserted++ + } } } @@ -390,7 +447,28 @@ func main() { } } - log.Printf("done: inserted=%d updated=%d compute_errors=%d", inserted, updated, computeErr) + // Flush or suppress pending tombstones based on the NEW 404 rate. + tombstonesWritten := 0 + totalAttempted := processedCount + computeErr + if totalAttempted > 0 && len(pendingTombstones) > 0 { + rate := float64(new404Count) / float64(totalAttempted) + if rate > tombstoneMaxRate { + log.Printf("suppressing %d tombstones: new 404 rate %.1f%% (%d/%d) exceeds %.0f%% threshold — likely Covalent outage", + len(pendingTombstones), rate*100, new404Count, totalAttempted, tombstoneMaxRate*100) + } else { + for _, pt := range pendingTombstones { + if insErr := insertV2NotFoundRow(db, pt.candidate); insErr != nil { + log.Printf("insert not_found_retry error %s: %v", pt.candidate.Hash0x, insErr) + } else { + tombstonesWritten++ + inserted++ + } + } + } + } + + log.Printf("done: inserted=%d updated=%d compute_errors=%d new_404s=%d repeat_404s=%d tombstones_pending=%d tombstones_suppressed=%d", + inserted, updated, computeErr, new404Count, repeat404Count, tombstonesWritten, len(pendingTombstones)-tombstonesWritten) } // -------------------- DB connection -------------------- @@ -497,6 +575,7 @@ v2 AS ( FROM mevcommit_57173.processed_l1_txns_v2 WHERE l1_tx_hash IS NOT NULL AND CAST(l1_tx_hash AS VARCHAR) <> '' + AND (primary_class IS NULL OR LOWER(CAST(primary_class AS VARCHAR)) <> 'not_found_retry') ) SELECT o.l1_tx_hash_0x, @@ -570,6 +649,7 @@ v2_raw AS ( FROM mevcommit_57173.processed_l1_txns_v2 WHERE l1_tx_hash IS NOT NULL AND CAST(l1_tx_hash AS VARCHAR) <> '' + AND (primary_class IS NULL OR LOWER(CAST(primary_class AS VARCHAR)) <> 'not_found_retry') ), v2 AS ( SELECT @@ -779,7 +859,7 @@ SELECT FROM mevcommit_57173.processed_l1_txns_v2 WHERE l1_tx_hash IS NOT NULL AND CAST(l1_tx_hash AS VARCHAR) <> '' - AND (primary_class IS NULL OR LOWER(CAST(primary_class AS VARCHAR)) <> 'not_found') + AND (primary_class IS NULL OR LOWER(CAST(primary_class AS VARCHAR)) NOT IN ('not_found', 'not_found_retry')) AND ( is_swap IS NULL OR is_lending IS NULL @@ -999,6 +1079,42 @@ INSERT INTO mevcommit_57173.processed_l1_txns_v2 ( return err } +// loadNotFoundPendingHashes returns a set of hash_norm values that have +// primary_class = 'not_found_retry'. These are txs that got a Covalent 404 +// on a previous run but were not yet confirmed as permanently missing. +func loadNotFoundPendingHashes(db *sql.DB) (map[string]struct{}, error) { + q := ` +SELECT LOWER(CAST(l1_tx_hash AS VARCHAR)) AS hash_norm +FROM mevcommit_57173.processed_l1_txns_v2 +WHERE LOWER(CAST(primary_class AS VARCHAR)) = 'not_found_retry'; +` + rows, err := db.Query(q) + if err != nil { + return nil, err + } + defer func() { _ = rows.Close() }() + out := map[string]struct{}{} + for rows.Next() { + var h string + if err := rows.Scan(&h); err != nil { + return nil, err + } + out[strings.ToLower(strings.TrimSpace(h))] = struct{}{} + } + return out, rows.Err() +} + +// upgradeToNotFound promotes a not_found_retry row to not_found (permanent tombstone). +func upgradeToNotFound(db *sql.DB, hashNorm string) error { + q := ` +UPDATE mevcommit_57173.processed_l1_txns_v2 +SET primary_class = 'not_found' +WHERE l1_tx_hash = ? AND LOWER(CAST(primary_class AS VARCHAR)) = 'not_found_retry'; +` + _, err := db.Exec(q, hashNorm) + return err +} + func insertV2NotFoundRow(db *sql.DB, c Candidate) error { q := ` INSERT INTO mevcommit_57173.processed_l1_txns_v2 ( @@ -1009,7 +1125,7 @@ INSERT INTO mevcommit_57173.processed_l1_txns_v2 ( primary_class ) VALUES (?, ?, ?, ?, ?); ` - primary := "not_found" + primary := "not_found_retry" _, err := db.Exec(q, c.HashNorm, @@ -1455,47 +1571,52 @@ func fetchTransaction(txHash0x, apiKey string) (*TxResponse, error) { url := fmt.Sprintf("%s/%s/transaction_v2/%s/?no-logs=false", covalentBaseURL, chainName, txHash0x) - ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second) - defer cancel() + const maxRetries = 2 + for attempt := 0; ; attempt++ { + ctx, cancel := context.WithTimeout(context.Background(), covalentFetchTimeout) - req, err := http.NewRequestWithContext(ctx, "GET", url, nil) - if err != nil { - return nil, err - } + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + cancel() + return nil, err + } - // Covalent v1 API auth: HTTP Basic (key as username, empty password) - req.SetBasicAuth(apiKey, "") - req.Header.Set("Accept", "application/json") + // Covalent v1 API auth: HTTP Basic (key as username, empty password) + req.SetBasicAuth(apiKey, "") + req.Header.Set("Accept", "application/json") - start := time.Now() - resp, err := httpClient.Do(req) - dur := time.Since(start) - if err != nil { - return nil, fmt.Errorf("tx request error after %s: %w", dur, err) - } - defer func() { - if err := resp.Body.Close(); err != nil { - log.Printf("resp.Body.Close: %v", err) + start := time.Now() + resp, err := httpClient.Do(req) + dur := time.Since(start) + if err != nil { + cancel() + if attempt < maxRetries { + log.Printf("fetchTransaction %s: timeout on attempt %d/%d, retrying", txHash0x, attempt+1, maxRetries+1) + continue + } + return nil, fmt.Errorf("tx request error after %s: %w", dur, err) } - }() - body, readErr := io.ReadAll(resp.Body) - if readErr != nil { - return nil, fmt.Errorf("read tx body: %w", readErr) - } - if resp.StatusCode != 200 { - return nil, fmt.Errorf("covalent tx HTTP %d: %s", resp.StatusCode, truncateBody(body)) - } + body, readErr := io.ReadAll(resp.Body) + _ = resp.Body.Close() + cancel() + if readErr != nil { + return nil, fmt.Errorf("read tx body: %w", readErr) + } + if resp.StatusCode != 200 { + return nil, fmt.Errorf("covalent tx HTTP %d: %s", resp.StatusCode, truncateBody(body)) + } - var txResp TxResponse - if err := json.Unmarshal(body, &txResp); err != nil { - return nil, fmt.Errorf("covalent tx JSON decode: %w; body: %s", err, truncateBody(body)) - } - if txResp.Error { - log.Printf("fetchTransaction %s status=%d", txHash0x, resp.StatusCode) - return nil, fmt.Errorf("covalent tx error: %s", txResp.ErrorMessage) + var txResp TxResponse + if err := json.Unmarshal(body, &txResp); err != nil { + return nil, fmt.Errorf("covalent tx JSON decode: %w; body: %s", err, truncateBody(body)) + } + if txResp.Error { + log.Printf("fetchTransaction %s status=%d", txHash0x, resp.StatusCode) + return nil, fmt.Errorf("covalent tx error: %s", txResp.ErrorMessage) + } + return &txResp, nil } - return &txResp, nil } func isCovalentTxNotFound(err error) bool {