Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions tools/l1-tx-volume-indexer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,23 @@ On-chain enrichment:
- Covalent `transaction_v2` endpoint (ETH mainnet).

## Tombstone behavior for missing transactions (Covalent 404)
If Covalent returns HTTP 404 with a "Transaction hash ... not found" message:
- The tool can insert a placeholder row into `processed_l1_txns_v2` with:
- `l1_tx_hash = <hash_norm>` (no 0x)
- `primary_class = 'not_found'`
- other computed fields left NULL
- This prevents repeated retries of transactions that never land on-chain.

Age gate (anti-false-tombstone):
Tombstoning uses a two-phase lifecycle to distinguish transient Covalent outages
from transactions that genuinely never landed on-chain:

1. **First 404** → `not_found_retry` (retryable; rediscovered on next run)
2. **Second 404** (on a subsequent run) → `not_found` (permanent tombstone)

This ensures a tx must fail across at least two separate runs before being
permanently excluded.

### Rate guard
Only *new* 404s (first-time failures) count toward the error rate. If the new-404
rate exceeds 10% of attempted transactions in a run, all new tombstones are
suppressed — this signals a Covalent outage rather than genuinely missing txns.
Repeat 404s (already `not_found_retry`) are upgraded to `not_found`
unconditionally since they've failed across 2+ runs.

### Age gate
- Tombstone insertion is gated on block age using `mctransactions_sr.block_number`.
- The tool queries StarRocks to get:
- `head_block = MAX(block_number)` over `mctransactions_sr` for `confirmed/pre-confirmed`
Expand Down Expand Up @@ -80,6 +89,12 @@ Only updates (skip discovering/inserting missing txs):
- `-only-old-lending`: restrict updates to rows where existing `is_lending=1`
- `-compare-only-old-swapvol-gt0`: dry-run comparison filter for swap volume discrepancies

## Tombstone rate guard
If the Covalent 404 rate exceeds 10% of attempted transactions in a single run,
**all tombstones are suppressed** for that run. This prevents mass false-tombstoning
during Covalent outages where 404s are returned for valid on-chain transactions.
Suppressed transactions are retried naturally on the next scheduled run.

## Notes
- `processed_l1_txns_v2.l1_tx_hash` is stored as `hash_norm` (no `0x` prefix).
- `loadExistingNeedingUpdate` excludes rows where `primary_class='not_found'` to avoid reprocessing tombstones.
219 changes: 170 additions & 49 deletions tools/l1-tx-volume-indexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ const (
)

var httpClient = &http.Client{Timeout: 180 * time.Second}
var covalentFetchTimeout = 10 * time.Second

// In-memory cache: date string "YYYY-MM-DD" -> ETH/USD price.
// Avoids repeat Covalent API calls for transactions on the same day.
Expand Down Expand Up @@ -240,9 +241,12 @@ func main() {
false,
"only update/compare rows where the existing DB row has is_lending=1 (ignores incompleteness filter)",
)
fetchTimeout = flag.Int("fetch-timeout", 10, "Covalent API timeout in seconds per transaction")
)
flag.Parse()

covalentFetchTimeout = time.Duration(*fetchTimeout) * time.Second

apiKey := os.Getenv("COVALENT_KEY")
if apiKey == "" {
log.Fatal("COVALENT_KEY is required")
Expand Down Expand Up @@ -339,7 +343,36 @@ func main() {
}

// Real run: insert missing, update existing (fill-only unless recompute-all)
//
// Tombstone lifecycle (two-phase):
// 1st 404 → not_found_retry (retryable; rediscovered on next run)
// 2nd 404 → not_found (permanent tombstone; excluded from future runs)
//
// Rate guard: only NEW 404s (not repeats) count toward the rate. If the
// new-404 rate exceeds 10%, all new tombstones are suppressed for that run
// (likely a Covalent outage). Repeat 404s are upgraded unconditionally
// since they've already failed across 2+ runs.
const tombstoneMaxRate = 0.10

type pendingTombstone struct {
candidate Candidate
}

// Load hashes that are already not_found_retry from a prior run.
notFoundPending, err := loadNotFoundPendingHashes(db)
if err != nil {
log.Fatalf("loadNotFoundPendingHashes: %v", err)
}
if len(notFoundPending) > 0 {
log.Printf("loaded %d not_found_retry hashes from prior runs", len(notFoundPending))
}

var inserted, updated, computeErr int
var new404Count int // new 404s (first time, counts toward rate)
var repeat404Count int // repeat 404s (already pending, upgraded directly)
var processedCount int
var pendingTombstones []pendingTombstone

for idx, k := range keys {
if idx%100 == 0 {
log.Printf("progress %d/%d", idx, len(keys))
Expand All @@ -351,31 +384,55 @@ func main() {
computeErr++
log.Printf("compute error %s: %v", w.Hash0x, err)

// If we get a"tx not found" (Covalent 404), insert a tombstone (not_found row)so we don't retry forever.
if w.InsertCandidate != nil && !*onlyUpdates && isCovalentTxNotFound(err) {
hashNorm := strings.ToLower(strings.TrimSpace(strip0x(w.InsertCandidate.HashNorm)))

// Repeat 404: already failed on a prior run. Upgrade to
// permanent tombstone unconditionally.
if _, isPending := notFoundPending[hashNorm]; isPending {
repeat404Count++
if upErr := upgradeToNotFound(db, hashNorm); upErr != nil {
log.Printf("upgrade not_found error %s: %v", w.Hash0x, upErr)
} else {
log.Printf("upgraded to not_found (repeat 404) %s", w.Hash0x)
}
continue
}

// New 404: first time seeing this hash fail.
new404Count++
ok, why := shouldTombstoneNotFound(db, w.InsertCandidate.HashNorm, w.InsertCandidate.Source, w.InsertCandidate.CommitmentIndex, 15)
if !ok {
log.Printf("skip tombstone %s (%s)", w.Hash0x, why)
continue
}

if insErr := insertV2NotFoundRow(db, *w.InsertCandidate); insErr != nil {
log.Printf("insert not_found error %s: %v", w.Hash0x, insErr)
} else {
inserted++
}
pendingTombstones = append(pendingTombstones, pendingTombstone{candidate: *w.InsertCandidate})
}
continue
}

processedCount++

// Insert missing row (if applicable)
if w.InsertCandidate != nil && !*onlyUpdates {
err := insertV2Row(db, *w.InsertCandidate, comp)
if err != nil {
// if already exists due to race, fall through to update
log.Printf("insert error %s: %v", w.Hash0x, err)
hashNorm := strings.ToLower(strings.TrimSpace(strip0x(w.InsertCandidate.HashNorm)))
// If this hash was not_found_retry, overwrite it with full data
// via UPDATE instead of INSERT (avoids needing DELETE privileges).
if _, wasPending := notFoundPending[hashNorm]; wasPending {
fakeExisting := ExistingRow{HashNorm: hashNorm, Hash0x: w.Hash0x}
if upErr := updateV2Row(db, fakeExisting, comp, true); upErr != nil {
log.Printf("update not_found_retry error %s: %v", w.Hash0x, upErr)
} else {
inserted++
}
} else {
inserted++
err := insertV2Row(db, *w.InsertCandidate, comp)
if err != nil {
// if already exists due to race, fall through to update
log.Printf("insert error %s: %v", w.Hash0x, err)
} else {
inserted++
}
}
}

Expand All @@ -390,7 +447,28 @@ func main() {
}
}

log.Printf("done: inserted=%d updated=%d compute_errors=%d", inserted, updated, computeErr)
// Flush or suppress pending tombstones based on the NEW 404 rate.
tombstonesWritten := 0
totalAttempted := processedCount + computeErr
if totalAttempted > 0 && len(pendingTombstones) > 0 {
rate := float64(new404Count) / float64(totalAttempted)
if rate > tombstoneMaxRate {
log.Printf("suppressing %d tombstones: new 404 rate %.1f%% (%d/%d) exceeds %.0f%% threshold — likely Covalent outage",
len(pendingTombstones), rate*100, new404Count, totalAttempted, tombstoneMaxRate*100)
} else {
for _, pt := range pendingTombstones {
if insErr := insertV2NotFoundRow(db, pt.candidate); insErr != nil {
log.Printf("insert not_found_retry error %s: %v", pt.candidate.Hash0x, insErr)
} else {
tombstonesWritten++
inserted++
}
}
}
}

log.Printf("done: inserted=%d updated=%d compute_errors=%d new_404s=%d repeat_404s=%d tombstones_pending=%d tombstones_suppressed=%d",
inserted, updated, computeErr, new404Count, repeat404Count, tombstonesWritten, len(pendingTombstones)-tombstonesWritten)
}

// -------------------- DB connection --------------------
Expand Down Expand Up @@ -497,6 +575,7 @@ v2 AS (
FROM mevcommit_57173.processed_l1_txns_v2
WHERE l1_tx_hash IS NOT NULL
AND CAST(l1_tx_hash AS VARCHAR) <> ''
AND (primary_class IS NULL OR LOWER(CAST(primary_class AS VARCHAR)) <> 'not_found_retry')
)
SELECT
o.l1_tx_hash_0x,
Expand Down Expand Up @@ -570,6 +649,7 @@ v2_raw AS (
FROM mevcommit_57173.processed_l1_txns_v2
WHERE l1_tx_hash IS NOT NULL
AND CAST(l1_tx_hash AS VARCHAR) <> ''
AND (primary_class IS NULL OR LOWER(CAST(primary_class AS VARCHAR)) <> 'not_found_retry')
),
v2 AS (
SELECT
Expand Down Expand Up @@ -779,7 +859,7 @@ SELECT
FROM mevcommit_57173.processed_l1_txns_v2
WHERE l1_tx_hash IS NOT NULL
AND CAST(l1_tx_hash AS VARCHAR) <> ''
AND (primary_class IS NULL OR LOWER(CAST(primary_class AS VARCHAR)) <> 'not_found')
AND (primary_class IS NULL OR LOWER(CAST(primary_class AS VARCHAR)) NOT IN ('not_found', 'not_found_retry'))
AND (
is_swap IS NULL
OR is_lending IS NULL
Expand Down Expand Up @@ -999,6 +1079,42 @@ INSERT INTO mevcommit_57173.processed_l1_txns_v2 (
return err
}

// loadNotFoundPendingHashes returns a set of hash_norm values that have
// primary_class = 'not_found_retry'. These are txs that got a Covalent 404
// on a previous run but were not yet confirmed as permanently missing.
func loadNotFoundPendingHashes(db *sql.DB) (map[string]struct{}, error) {
q := `
SELECT LOWER(CAST(l1_tx_hash AS VARCHAR)) AS hash_norm
FROM mevcommit_57173.processed_l1_txns_v2
WHERE LOWER(CAST(primary_class AS VARCHAR)) = 'not_found_retry';
`
rows, err := db.Query(q)
if err != nil {
return nil, err
}
defer func() { _ = rows.Close() }()
out := map[string]struct{}{}
for rows.Next() {
var h string
if err := rows.Scan(&h); err != nil {
return nil, err
}
out[strings.ToLower(strings.TrimSpace(h))] = struct{}{}
}
return out, rows.Err()
}

// upgradeToNotFound promotes a not_found_retry row to not_found (permanent tombstone).
func upgradeToNotFound(db *sql.DB, hashNorm string) error {
q := `
UPDATE mevcommit_57173.processed_l1_txns_v2
SET primary_class = 'not_found'
WHERE l1_tx_hash = ? AND LOWER(CAST(primary_class AS VARCHAR)) = 'not_found_retry';
`
_, err := db.Exec(q, hashNorm)
return err
}

func insertV2NotFoundRow(db *sql.DB, c Candidate) error {
q := `
INSERT INTO mevcommit_57173.processed_l1_txns_v2 (
Expand All @@ -1009,7 +1125,7 @@ INSERT INTO mevcommit_57173.processed_l1_txns_v2 (
primary_class
) VALUES (?, ?, ?, ?, ?);
`
primary := "not_found"
primary := "not_found_retry"

_, err := db.Exec(q,
c.HashNorm,
Expand Down Expand Up @@ -1455,47 +1571,52 @@ func fetchTransaction(txHash0x, apiKey string) (*TxResponse, error) {

url := fmt.Sprintf("%s/%s/transaction_v2/%s/?no-logs=false", covalentBaseURL, chainName, txHash0x)

ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second)
defer cancel()
const maxRetries = 2
for attempt := 0; ; attempt++ {
ctx, cancel := context.WithTimeout(context.Background(), covalentFetchTimeout)

req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, err
}
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
cancel()
return nil, err
}

// Covalent v1 API auth: HTTP Basic (key as username, empty password)
req.SetBasicAuth(apiKey, "")
req.Header.Set("Accept", "application/json")
// Covalent v1 API auth: HTTP Basic (key as username, empty password)
req.SetBasicAuth(apiKey, "")
req.Header.Set("Accept", "application/json")

start := time.Now()
resp, err := httpClient.Do(req)
dur := time.Since(start)
if err != nil {
return nil, fmt.Errorf("tx request error after %s: %w", dur, err)
}
defer func() {
if err := resp.Body.Close(); err != nil {
log.Printf("resp.Body.Close: %v", err)
start := time.Now()
resp, err := httpClient.Do(req)
dur := time.Since(start)
if err != nil {
cancel()
if attempt < maxRetries {
log.Printf("fetchTransaction %s: timeout on attempt %d/%d, retrying", txHash0x, attempt+1, maxRetries+1)
continue
}
return nil, fmt.Errorf("tx request error after %s: %w", dur, err)
}
}()

body, readErr := io.ReadAll(resp.Body)
if readErr != nil {
return nil, fmt.Errorf("read tx body: %w", readErr)
}
if resp.StatusCode != 200 {
return nil, fmt.Errorf("covalent tx HTTP %d: %s", resp.StatusCode, truncateBody(body))
}
body, readErr := io.ReadAll(resp.Body)
_ = resp.Body.Close()
cancel()
if readErr != nil {
return nil, fmt.Errorf("read tx body: %w", readErr)
}
if resp.StatusCode != 200 {
return nil, fmt.Errorf("covalent tx HTTP %d: %s", resp.StatusCode, truncateBody(body))
}

var txResp TxResponse
if err := json.Unmarshal(body, &txResp); err != nil {
return nil, fmt.Errorf("covalent tx JSON decode: %w; body: %s", err, truncateBody(body))
}
if txResp.Error {
log.Printf("fetchTransaction %s status=%d", txHash0x, resp.StatusCode)
return nil, fmt.Errorf("covalent tx error: %s", txResp.ErrorMessage)
var txResp TxResponse
if err := json.Unmarshal(body, &txResp); err != nil {
return nil, fmt.Errorf("covalent tx JSON decode: %w; body: %s", err, truncateBody(body))
}
if txResp.Error {
log.Printf("fetchTransaction %s status=%d", txHash0x, resp.StatusCode)
return nil, fmt.Errorf("covalent tx error: %s", txResp.ErrorMessage)
}
return &txResp, nil
}
return &txResp, nil
}

func isCovalentTxNotFound(err error) bool {
Expand Down
Loading