Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 37 additions & 4 deletions bin/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ struct IndexerConfig {
indexer_analytics_flush_secs: u64,
#[serde(default = "default_stats_refresh_secs")]
indexer_stats_refresh_secs: u64,
#[serde(default = "default_contract_detect_interval_secs")]
indexer_contract_detect_interval_secs: u64,
#[serde(default = "default_contract_detect_batch")]
indexer_contract_detect_batch: i64,
}

fn default_network() -> String {
Expand All @@ -72,6 +76,12 @@ fn default_analytics_flush_secs() -> u64 {
fn default_stats_refresh_secs() -> u64 {
300
}
fn default_contract_detect_interval_secs() -> u64 {
4
}
fn default_contract_detect_batch() -> i64 {
10
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
Expand All @@ -96,13 +106,20 @@ async fn main() -> anyhow::Result<()> {
let pool = connect(&pool_cfg).await?;
migrate(&pool).await?;

// One-time contract-history backfill (no-op once `contracts` is populated).
// Runs in the background so it never blocks the sync loops on a large chain.
// One-time address-history backfill: seed `addresses` from every from/to
// address already in `transactions` so the contract detector can classify
// historical addresses too. No-op once `addresses` is populated. Runs in the
// background — the GROUP BY over all txs is heavy on a large chain.
{
let pool = pool.clone();
tokio::spawn(async move {
if let Err(e) = indexer_sync::block_writer::backfill_contracts(&pool).await {
tracing::warn!(error = %e, "contracts history backfill failed");
match indexer_db::addresses::count(&pool).await {
Ok(0) => match indexer_db::addresses::backfill_from_transactions(&pool).await {
Ok(n) => tracing::info!(inserted = n, "addresses: history backfill complete"),
Err(e) => tracing::warn!(error = %e, "addresses history backfill failed"),
},
Ok(_) => {}
Err(e) => tracing::warn!(error = %e, "addresses backfill: count failed"),
}
});
}
Expand Down Expand Up @@ -271,11 +288,27 @@ async fn main() -> anyhow::Result<()> {
})
};

// Contract detector: lazily classify `addresses` (is_contract + code_hash)
// via eth_getCode, rate-limited, so /contracts/* fills over time.
let detector_handle = {
let pool = pool.clone();
let provider = provider.clone();
let cancel = cancel.clone();
let interval = Duration::from_secs(cfg.indexer_contract_detect_interval_secs);
let batch = cfg.indexer_contract_detect_batch;
tokio::spawn(async move {
indexer_sync::run_contract_detector(&pool, &provider, interval, batch, cancel)
.await
.map_err(anyhow::Error::from)
})
};

shutdown_signal().await;
tracing::info!("indexer: shutdown signal received; cancelling workers");
cancel.cancel();

let _ = stats_refresh_handle.await?;
let _ = detector_handle.await?;
let _ = backfill_handle.await?;
let _ = coinblast_handle.await?;
if let Some(t) = tail_handle {
Expand Down
6 changes: 3 additions & 3 deletions crates/api/src/routes/contracts.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! `/contracts/recent|pioneers|stats` — contract leaderboards from the
//! `contracts` table (migration 0004). Response shape
//! `addresses` table (migration 0005), `WHERE is_contract = true`. Response shape
//! `{"contracts":[{rank, address, first_seen_block, last_seen_block, code_hash}]}`,
//! matching the legacy indexer / the explorer's expected contract.

Expand All @@ -9,7 +9,7 @@ use crate::{CacheTier, SharedState, cached};
use axum::extract::{Query, State};
use axum::routing::get;
use axum::{Json, Router};
use indexer_db::contracts;
use indexer_db::addresses;
use serde::{Deserialize, Serialize};

#[derive(Debug, Deserialize)]
Expand Down Expand Up @@ -39,7 +39,7 @@ async fn load(
key: &str,
) -> ApiResult<Json<ContractsResponse>> {
let resp: ContractsResponse = cached::get_or_load(state, key, CacheTier::Chain, || async {
let rows = contracts::list(&state.pool, limit, ascending).await?;
let rows = addresses::list_contracts(&state.pool, limit, ascending).await?;
Ok::<_, ApiError>(ContractsResponse {
contracts: rows
.into_iter()
Expand Down
7 changes: 7 additions & 0 deletions crates/chain/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ impl ChainProvider {
self.next().get_logs(&filter).await.map_err(rpc_err)
}

/// `eth_getCode(address, latest)` — the account's deployed bytecode. Empty
/// (`0x`) for an EOA, non-empty for a contract. Used by the contract
/// detector to classify `addresses.is_contract` + `code_hash`.
pub async fn get_code(&self, address: Address) -> ChainResult<Bytes> {
self.next().get_code_at(address).await.map_err(rpc_err)
}

/// `eth_call` against `to` with abi-encoded `data`. Returns the raw
/// return bytes; caller decodes via `alloy_sol_types`. Used by the
/// CoinBlast worker to validate orphan curves (probe `token()` etc).
Expand Down
30 changes: 30 additions & 0 deletions crates/db/migrations/0005_addresses.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
-- 0005_addresses.sql — address registry powering /contracts leaderboards.
--
-- Every `from`/`to` address seen in a tx is upserted here by the block writer
-- (is_contract=false, code_hash NULL = "not yet classified"). A background
-- detector lazily runs eth_getCode on unclassified rows and sets is_contract +
-- code_hash ("0x" for EOAs, keccak(code) for contracts). `/contracts/*` then
-- serves `WHERE is_contract = true ORDER BY first_seen_block`. Mirrors the
-- legacy TS indexer's `addresses` table + contract-detector worker.
--
-- Supersedes migration 0004's `contracts` table (which detected creations via
-- `to_addr IS NULL` — wrong for Sentrix, which records to_addr = the contract
-- address). The 0004 table is left in place (append-only migrations) but
-- unused; the API now reads `addresses`.

CREATE TABLE IF NOT EXISTS addresses (
address varchar(42) PRIMARY KEY,
first_seen_block bigint NOT NULL,
last_seen_block bigint NOT NULL,
is_contract boolean NOT NULL DEFAULT false,
code_hash varchar(66)
);

-- /contracts/recent (DESC) + /contracts/pioneers (ASC): is_contract narrows,
-- first_seen_block sorts within the narrowed slice.
CREATE INDEX IF NOT EXISTS addresses_contract_recent_idx
ON addresses (is_contract, first_seen_block);

-- Detector candidate scan stays cheap: only unclassified rows.
CREATE INDEX IF NOT EXISTS addresses_unclassified_idx
ON addresses (address) WHERE code_hash IS NULL;
135 changes: 135 additions & 0 deletions crates/db/src/addresses.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
//! `addresses` registry (migration 0005) — every from/to address seen in a tx,
//! lazily classified as contract/EOA by the detector worker. Powers
//! `/contracts/recent|pioneers|stats` (`WHERE is_contract = true`).

use crate::{DbResult, PgPool};
use sqlx::Row;

/// One contract row for the `/contracts` leaderboards.
#[derive(Debug, Clone)]
pub struct ContractRow {
/// Contract address (lowercase 0x-hex).
pub address: String,
/// Block the address was first seen.
pub first_seen_block: i64,
/// Block the address was most recently seen.
pub last_seen_block: i64,
/// `keccak(code)` for the contract; never NULL once classified.
pub code_hash: Option<String>,
}

/// Upsert a batch of `(address, block)` sightings. Keeps the earliest
/// `first_seen_block` (ON CONFLICT leaves it) and advances `last_seen_block`.
/// New rows default to `is_contract=false, code_hash=NULL` (unclassified) so
/// the detector picks them up. Idempotent — safe on reorg replay.
pub async fn upsert_batch<'e, E>(executor: E, seen: &[(String, i64)]) -> DbResult<()>
where
E: sqlx::PgExecutor<'e>,
{
if seen.is_empty() {
return Ok(());
}
let mut qb = sqlx::QueryBuilder::new(
"INSERT INTO addresses (address, first_seen_block, last_seen_block) ",
);
qb.push_values(seen.iter(), |mut row, (addr, block)| {
row.push_bind(addr).push_bind(*block).push_bind(*block);
});
qb.push(
" ON CONFLICT (address) DO UPDATE SET \
last_seen_block = GREATEST(addresses.last_seen_block, EXCLUDED.last_seen_block)",
);
qb.build().execute(executor).await?;
Ok(())
}

/// Up to `limit` not-yet-classified addresses (code_hash IS NULL) for the
/// detector to run eth_getCode against. Uses the partial unclassified index.
pub async fn unclassified_batch(pool: &PgPool, limit: i64) -> DbResult<Vec<String>> {
let rows = sqlx::query("SELECT address FROM addresses WHERE code_hash IS NULL LIMIT $1")
.bind(limit)
.fetch_all(pool)
.await?;
rows.into_iter()
.map(|r| r.try_get::<String, _>("address"))
.collect::<Result<_, sqlx::Error>>()
.map_err(Into::into)
}

/// Record a detector classification. `code_hash` is always set after a probe
/// ("0x" for an EOA, keccak(code) for a contract) so the row leaves the
/// unclassified set.
pub async fn classify<'e, E>(
executor: E,
address: &str,
is_contract: bool,
code_hash: &str,
) -> DbResult<()>
where
E: sqlx::PgExecutor<'e>,
{
sqlx::query("UPDATE addresses SET is_contract = $2, code_hash = $3 WHERE address = $1")
.bind(address)
.bind(is_contract)
.bind(code_hash)
.execute(executor)
.await?;
Ok(())
}

/// List contracts by first-seen height. `ascending` → pioneers (oldest first);
/// otherwise recent (newest first).
pub async fn list_contracts(
pool: &PgPool,
limit: i64,
ascending: bool,
) -> DbResult<Vec<ContractRow>> {
let sql = if ascending {
"SELECT address, first_seen_block, last_seen_block, code_hash \
FROM addresses WHERE is_contract = true \
ORDER BY first_seen_block ASC, address ASC LIMIT $1"
} else {
"SELECT address, first_seen_block, last_seen_block, code_hash \
FROM addresses WHERE is_contract = true \
ORDER BY first_seen_block DESC, address ASC LIMIT $1"
};
let rows = sqlx::query(sql).bind(limit).fetch_all(pool).await?;
rows.into_iter()
.map(|r| {
Ok(ContractRow {
address: r.try_get("address")?,
first_seen_block: r.try_get("first_seen_block")?,
last_seen_block: r.try_get("last_seen_block")?,
code_hash: r.try_get("code_hash")?,
})
})
.collect::<Result<_, sqlx::Error>>()
.map_err(Into::into)
}

/// Total address rows — gates the one-time history backfill.
pub async fn count(pool: &PgPool) -> DbResult<i64> {
let row = sqlx::query("SELECT COUNT(*) AS n FROM addresses")
.fetch_one(pool)
.await?;
Ok(row.try_get("n")?)
}

/// One-time history backfill: seed `addresses` from every from/to address
/// already in `transactions`, with min/max block as first/last seen. Rows land
/// unclassified (code_hash NULL) for the detector to process. Returns rows
/// inserted. No-op-safe via ON CONFLICT.
pub async fn backfill_from_transactions(pool: &PgPool) -> DbResult<u64> {
let res = sqlx::query(
"INSERT INTO addresses (address, first_seen_block, last_seen_block) \
SELECT addr, MIN(block_height), MAX(block_height) FROM ( \
SELECT from_addr AS addr, block_height FROM transactions WHERE from_addr IS NOT NULL \
UNION ALL \
SELECT to_addr AS addr, block_height FROM transactions WHERE to_addr IS NOT NULL \
) u GROUP BY addr \
ON CONFLICT (address) DO NOTHING",
)
.execute(pool)
.await?;
Ok(res.rows_affected())
}
115 changes: 0 additions & 115 deletions crates/db/src/contracts.rs

This file was deleted.

Loading
Loading