Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions bin/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ struct IndexerConfig {
indexer_backfill_loop_secs: u64,
#[serde(default = "default_analytics_flush_secs")]
indexer_analytics_flush_secs: u64,
#[serde(default = "default_stats_refresh_secs")]
indexer_stats_refresh_secs: u64,
Comment on lines +53 to +54
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🌐 Web query:

Does tokio::time::interval allow a zero Duration, or does it panic/error?

💡 Result:

The tokio::time::interval function panics if provided with a zero Duration [1][2][3]. This behavior is explicitly documented in the official Tokio API documentation [1][4]. The implementation enforces this requirement using an assertion, ensuring that the period must be greater than zero [5][6][7]. This also applies to tokio::time::interval_at [4][5].

Citations:


🏁 Script executed:

#!/bin/bash
set -euo pipefail

FILE="bin/indexer.rs"

echo "== File exists =="
ls -l "$FILE"

echo
echo "== Lines 1-120 =="
nl -ba "$FILE" | sed -n '1,120p'

echo
echo "== Lines 200-310 =="
nl -ba "$FILE" | sed -n '200,310p'

echo
echo "== Find refresh/statistics interval creation =="
rg -n "interval(_at)?|indexer_stats_refresh_secs|stats_refresh_handle|refresh.*db|cancel|shutdown" "$FILE" -S

Repository: Sentriscloud/indexer-rs

Length of output: 231


🏁 Script executed:

#!/bin/bash
set -euo pipefail

FILE="bin/indexer.rs"
[ -f "$FILE" ] || { echo "Missing $FILE"; exit 1; }

echo "== File exists =="
ls -l "$FILE"

echo
echo "== Lines 1-140 =="
awk 'NR>=1 && NR<=140 {printf "%d:%s\n", NR, $0}' "$FILE"

echo
echo "== Lines 200-320 =="
awk 'NR>=200 && NR<=320 {printf "%d:%s\n", NR, $0}' "$FILE"

echo
echo "== Find refresh/statistics interval creation (and DB refresh calls) =="
rg -n "interval(_at)?|indexer_stats_refresh_secs|stats_refresh_handle|refresh.*db|cancel|shutdown|SELECT|UPDATE|INSERT" "$FILE" -S

Repository: Sentriscloud/indexer-rs

Length of output: 11879


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Find where indexer_db::stats::refresh and refresh_full live
rg -n "mod stats|pub mod stats|fn refresh_full|fn refresh\\b|refresh_full\\(" -S . -g'*.rs' --hidden || true

echo
echo "== Candidate refresh implementations =="
rg -n "refresh_full\\s*\\(|refresh\\s*\\(" -S . -g'*.rs' --hidden | head -n 80

echo
echo "== Show indexer_db stats module if present =="
fd -t f "stats.rs" . || true
fd -t f "stats*" . || true

# Print likely files
for f in $(fd -t f "stats.rs" . 2>/dev/null || true); do
  echo
  echo "----- $f (first 220 lines) -----"
  awk 'NR>=1 && NR<=220 {print NR ":" $0}' "$f"
done

Repository: Sentriscloud/indexer-rs

Length of output: 6702


Guard INDEXER_STATS_REFRESH_SECS > 0 and make stats refresh shutdown-cancel responsive

  • cfg.indexer_stats_refresh_secs == 0 will panic when constructing tokio::time::interval(Duration::from_secs(cfg.indexer_stats_refresh_secs)); fail fast on startup.
Proposed fix
     let cfg: IndexerConfig = Figment::new().merge(Env::raw()).extract()?;
+    if cfg.indexer_stats_refresh_secs == 0 {
+        anyhow::bail!("INDEXER_STATS_REFRESH_SECS must be >= 1");
+    }
  • Shutdown can block: after a tick fires, the refresh task awaits indexer_db::stats::refresh(_full) directly (no timeout) and doesn’t observe cancel.cancelled() until that DB await completes; main then awaits stats_refresh_handle after cancel.cancel().
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@bin/indexer.rs` around lines 53 - 54, Validate that
IndexerConfig.indexer_stats_refresh_secs is > 0 at startup (fail fast with a
clear error if it is 0) instead of passing zero into tokio::time::interval; use
the existing default_stats_refresh_secs for the default. In the stats refresh
task (the loop that creates tick =
tokio::time::interval(Duration::from_secs(cfg.indexer_stats_refresh_secs))) make
the DB refresh calls (indexer_db::stats::refresh and refresh_full)
cancellation-aware by spawning or converting the DB future into a selectable
future and using tokio::select! to race it with cancel.cancelled() (and
optionally a timeout via tokio::time::timeout) so the task returns promptly when
the CancellationToken is triggered; ensure main still awaits
stats_refresh_handle but that the handle will complete quickly on cancel.

}

fn default_network() -> String {
Expand All @@ -67,6 +69,9 @@ fn default_clickhouse_table() -> String {
fn default_analytics_flush_secs() -> u64 {
15
}
fn default_stats_refresh_secs() -> u64 {
300
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
Expand Down Expand Up @@ -221,10 +226,45 @@ async fn main() -> anyhow::Result<()> {
})
};

// Stats MV refresh loop. `stats_daily_mv` (migration 0002) has no
// auto-refresh; without this it stays empty and `/stats/daily` returns
// nothing. The first tick fires immediately and does a plain (blocking)
// refresh — Postgres rejects `REFRESH ... CONCURRENTLY` on a
// never-populated MV — then every subsequent tick uses CONCURRENTLY so
// reads are never locked out.
let stats_refresh_handle = {
let pool = pool.clone();
let cancel = cancel.clone();
let interval = Duration::from_secs(cfg.indexer_stats_refresh_secs);
tokio::spawn(async move {
let mut populated = false;
let mut tick = tokio::time::interval(interval);
loop {
tokio::select! {
_ = cancel.cancelled() => return Ok::<(), anyhow::Error>(()),
_ = tick.tick() => {
let res = if populated {
indexer_db::stats::refresh(&pool).await
} else {
indexer_db::stats::refresh_full(&pool).await
};
match res {
Ok(()) => populated = true,
Err(e) => {
tracing::warn!(error = %e, "stats_daily_mv refresh failed");
}
Comment on lines +246 to +255
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Add a timeout around each MV refresh call to keep shutdown responsive.

The refresh future is awaited directly; if the DB call hangs, this task cannot observe cancellation and stats_refresh_handle.await can block shutdown indefinitely.

Proposed fix
                     _ = tick.tick() => {
-                        let res = if populated {
-                            indexer_db::stats::refresh(&pool).await
-                        } else {
-                            indexer_db::stats::refresh_full(&pool).await
-                        };
+                        let res = tokio::time::timeout(Duration::from_secs(30), async {
+                            if populated {
+                                indexer_db::stats::refresh(&pool).await
+                            } else {
+                                indexer_db::stats::refresh_full(&pool).await
+                            }
+                        }).await;
                         match res {
-                            Ok(()) => populated = true,
-                            Err(e) => {
+                            Ok(Ok(())) => populated = true,
+                            Ok(Err(e)) => {
                                 tracing::warn!(error = %e, "stats_daily_mv refresh failed");
                             }
+                            Err(_) => {
+                                tracing::warn!("stats_daily_mv refresh timed out");
+                            }
                         }
                     }

Also applies to: 267-267

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@bin/indexer.rs` around lines 246 - 255, Wrap each call to
indexer_db::stats::refresh and indexer_db::stats::refresh_full with a tokio
timeout (e.g., tokio::time::timeout(Duration::from_secs(...), ...)) so the task
doesn’t await a hanging DB call; on timeout treat it like an Err path (log a
warning including that it timed out) and continue so stats_refresh_handle.await
can remain responsive during shutdown; apply the same change for the other
occurrence at the second refresh call (the refresh_full site) and pick a
sensible timeout constant used by this task.

}
}
}
}
})
};

shutdown_signal().await;
tracing::info!("indexer: shutdown signal received; cancelling workers");
cancel.cancel();

let _ = stats_refresh_handle.await?;
let _ = backfill_handle.await?;
let _ = coinblast_handle.await?;
if let Some(t) = tail_handle {
Expand Down
5 changes: 5 additions & 0 deletions crates/api/src/routes/leaderboards.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,13 @@ async fn whale_transfers(
}

/// Router for `/accounts/active` + `/whale/transfers`.
///
/// `/whale/tx` is an alias for `/whale/transfers` — the legacy TS indexer (and
/// the explorer frontend wired to it) names this path `/whale/tx`; serve both
/// so the frontend is indexer-agnostic.
pub fn router() -> Router<SharedState> {
Router::new()
.route("/accounts/active", get(accounts_active))
.route("/whale/transfers", get(whale_transfers))
.route("/whale/tx", get(whale_transfers))
}
86 changes: 54 additions & 32 deletions crates/api/src/routes/stats.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! `/stats/daily` — chain-wide aggregates per day_bucket from the
//! `stats_daily_mv` materialised view (db migration 0002).
//! `/stats/daily` — chain-wide aggregates per day from the `stats_daily_mv`
//! materialised view (db migration 0002). The response is a bare array
//! `[{date, blocks, transactions}]`, matching the legacy TS indexer so the
//! explorer frontend can consume either indexer interchangeably.

use crate::error::{ApiError, ApiResult};
use crate::routes::clamp_limit;
Expand All @@ -17,54 +19,74 @@ struct ListQuery {

#[derive(Debug, Serialize, Deserialize)]
struct DailyRow {
/// Decimal-string `floor(timestamp / 86400)`.
day_bucket: String,
/// Decimal-string block count for the bucket.
block_count: String,
/// Decimal-string sum of tx_count.
tx_count: String,
/// Decimal-string sum of gas_used.
gas_used: String,
/// Decimal-string lowest height in the bucket.
first_height: String,
/// Decimal-string highest height in the bucket.
last_height: String,
}

#[derive(Debug, Serialize, Deserialize)]
struct DailyResponse {
daily: Vec<DailyRow>,
/// `YYYY-MM-DD` (UTC).
date: String,
/// Block count for the day.
blocks: i64,
/// Transaction count for the day.
transactions: i64,
}

async fn daily(
State(state): State<SharedState>,
Query(q): Query<ListQuery>,
) -> ApiResult<Json<DailyResponse>> {
) -> ApiResult<Json<Vec<DailyRow>>> {
let limit = clamp_limit(q.limit.as_deref());
// Cache-aside: chain tier (60s TTL). MV refresh cadence is 5 min;
// even short cache TTL collapses 60s of bursts into 1 PG read.
let key = format!("stats:daily:{limit}");
let response: DailyResponse = cached::get_or_load(&state, &key, CacheTier::Chain, || async {
let rows: Vec<DailyRow> = cached::get_or_load(&state, &key, CacheTier::Chain, || async {
let rows = stats::daily(&state.pool, limit).await?;
Ok::<_, ApiError>(DailyResponse {
daily: rows
.into_iter()
Ok::<_, ApiError>(
rows.into_iter()
.map(|r| DailyRow {
day_bucket: r.day_bucket.to_string(),
block_count: r.block_count.to_string(),
tx_count: r.tx_count.to_string(),
gas_used: r.gas_used.to_string(),
first_height: r.first_height.to_string(),
last_height: r.last_height.to_string(),
date: r.date,
blocks: r.blocks,
transactions: r.transactions,
})
.collect(),
})
)
})
.await?;
Ok(Json(response))
Ok(Json(rows))
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

/// Router for `/stats/daily`.
pub fn router() -> Router<SharedState> {
Router::new().route("/stats/daily", get(daily))
}

#[cfg(test)]
mod tests {
use super::DailyRow;

#[test]
fn daily_row_is_flat_legacy_shape() {
// Each element must be a flat {date, blocks, transactions} object with
// numeric counts — the legacy TS indexer shape the explorer consumes.
// No `daily` wrapper, no `day_bucket`/`block_count` field names.
let row = DailyRow {
date: "2026-04-24".into(),
blocks: 102108,
transactions: 102110,
};
let v = serde_json::to_value(&row).unwrap();
assert_eq!(v["date"], "2026-04-24");
assert_eq!(v["blocks"], 102108);
assert_eq!(v["transactions"], 102110);
assert!(v.get("day_bucket").is_none());
assert!(v.get("block_count").is_none());
}

#[test]
fn daily_response_serialises_as_bare_array() {
let rows = vec![DailyRow {
date: "2026-04-24".into(),
blocks: 1,
transactions: 2,
}];
let v = serde_json::to_value(&rows).unwrap();
assert!(v.is_array(), "response must be a bare array, not an object");
assert!(v.get("daily").is_none());
}
}
46 changes: 26 additions & 20 deletions crates/db/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,27 @@
use crate::{DbResult, PgPool};
use sqlx::Row;

/// One row of `/stats/daily` — pre-aggregated per day_bucket.
/// One row of `/stats/daily`. Field names + types mirror the legacy TS
/// indexer's response (`date` ISO-8601 day, numeric `blocks`/`transactions`)
/// so the explorer frontend consumes either indexer interchangeably. The
/// calendar date is derived from the `day_bucket` (epoch-day) in SQL via
/// `to_timestamp(day_bucket * 86400)` at UTC.
#[derive(Debug, Clone)]
pub struct StatsDailyRow {
/// `floor(timestamp / 86400)` — chain-day bucket.
pub day_bucket: i64,
/// `YYYY-MM-DD` (UTC) for the bucket.
pub date: String,
/// Blocks that landed in this bucket.
pub block_count: i64,
pub blocks: i64,
/// Sum of `blocks.tx_count` over the bucket.
pub tx_count: i64,
/// Sum of `blocks.gas_used` over the bucket.
pub gas_used: i64,
/// First (lowest) block height in the bucket.
pub first_height: i64,
/// Last (highest) block height in the bucket.
pub last_height: i64,
pub transactions: i64,
}

/// Read the last `limit` daily buckets, newest-first.
pub async fn daily(pool: &PgPool, limit: i64) -> DbResult<Vec<StatsDailyRow>> {
let rows = sqlx::query(
"SELECT day_bucket, block_count, tx_count, gas_used, first_height, last_height \
"SELECT to_char(to_timestamp(day_bucket * 86400) AT TIME ZONE 'UTC', 'YYYY-MM-DD') AS date, \
block_count AS blocks, \
tx_count AS transactions \
FROM stats_daily_mv ORDER BY day_bucket DESC LIMIT $1",
)
.bind(limit)
Expand All @@ -32,23 +32,29 @@ pub async fn daily(pool: &PgPool, limit: i64) -> DbResult<Vec<StatsDailyRow>> {
rows.into_iter()
.map(|r| {
Ok(StatsDailyRow {
day_bucket: r.try_get("day_bucket")?,
block_count: r.try_get("block_count")?,
tx_count: r.try_get("tx_count")?,
gas_used: r.try_get("gas_used")?,
first_height: r.try_get("first_height")?,
last_height: r.try_get("last_height")?,
date: r.try_get("date")?,
blocks: r.try_get("blocks")?,
transactions: r.try_get("transactions")?,
})
})
.collect::<Result<_, sqlx::Error>>()
.map_err(Into::into)
}

/// Trigger a CONCURRENTLY refresh of the MV. Called by the route on cache
/// miss for the most-recent bucket, OR by the operator on demand.
/// CONCURRENTLY refresh — does not lock out reads, but Postgres rejects it on
/// a never-populated MV. Use for the periodic refresh once populated.
pub async fn refresh(pool: &PgPool) -> DbResult<()> {
sqlx::query("REFRESH MATERIALIZED VIEW CONCURRENTLY stats_daily_mv")
.execute(pool)
.await?;
Ok(())
}

/// Plain (blocking) refresh — the only form that works on a never-populated
/// MV. Run once at startup before switching to `refresh` on the interval.
pub async fn refresh_full(pool: &PgPool) -> DbResult<()> {
sqlx::query("REFRESH MATERIALIZED VIEW stats_daily_mv")
.execute(pool)
.await?;
Ok(())
}
24 changes: 17 additions & 7 deletions scripts/smoke.sh
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@ v=$(curl -fsS "$API_BASE/whale/transfers" | jq -r '.transfers[0].hash')
fail "/whale/transfers top != txaaaa (got '$v')"
ok "/whale/transfers (sorted by value)"

# /whale/tx -> alias of /whale/transfers (legacy path name the frontend uses)
v=$(curl -fsS "$API_BASE/whale/tx" | jq -r '.transfers[0].hash')
[[ "$v" == "0xtxaaaa00000000000000000000000000000000000000000000000000000000aa" ]] || \
fail "/whale/tx alias top != txaaaa (got '$v')"
ok "/whale/tx (alias of /whale/transfers)"

# /coinblast/tokens -> 1 curve
v=$(curl -fsS "$API_BASE/coinblast/tokens" | jq -r '.tokens | length')
[[ "$v" == "1" ]] || fail "/coinblast/tokens len != 1 (got $v)"
Expand All @@ -202,14 +208,18 @@ v=$(curl -fsS "$API_BASE/coinblast/trades" | jq -r '.trades[0].type')
[[ "$v" == "sell" ]] || fail "/coinblast/trades[0].type != sell (got '$v')"
ok "/coinblast/trades (newest first)"

# /stats/daily -> 3 day rows (each fixture block is 86400s apart = distinct day)
v=$(curl -fsS "$API_BASE/stats/daily" | jq -r '.daily | length')
# /stats/daily -> bare array of 3 day rows (each fixture block 86400s apart =
# distinct day), newest first. Shape: [{date, blocks, transactions}].
v=$(curl -fsS "$API_BASE/stats/daily" | jq -r 'length')
[[ "$v" == "3" ]] || fail "/stats/daily len != 3 (got $v)"
# Highest bucket should be the newest (block 3 day).
v=$(curl -fsS "$API_BASE/stats/daily" | jq -r '.daily[0].day_bucket | tonumber')
prev=$(curl -fsS "$API_BASE/stats/daily" | jq -r '.daily[1].day_bucket | tonumber')
[[ "$v" -gt "$prev" ]] || fail "/stats/daily not ordered DESC (got $v <= $prev)"
ok "/stats/daily (3 day buckets, ordered DESC)"
# Newest day first — ISO dates sort lexically.
v=$(curl -fsS "$API_BASE/stats/daily" | jq -r '.[0].date')
prev=$(curl -fsS "$API_BASE/stats/daily" | jq -r '.[1].date')
[[ "$v" > "$prev" ]] || fail "/stats/daily not ordered DESC by date (got $v <= $prev)"
# Counts are numbers, not decimal strings.
v=$(curl -fsS "$API_BASE/stats/daily" | jq -r '.[0].blocks | type')
[[ "$v" == "number" ]] || fail "/stats/daily blocks not numeric (got $v)"
ok "/stats/daily (bare array, 3 day buckets, ordered DESC)"

# /api?module=account&action=txlist (etherscan compat)
v=$(curl -fsS "$API_BASE/api?module=account&action=txlist&address=0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef" | jq -r '.status')
Expand Down
Loading