Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,17 @@ This project uses [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Faucet — token-bucket rate limiting via reliakit-ratelimit

The faucet's per-IP limiter was a sliding-window `Vec<Instant>` that grew per
IP (its own doc comment already described the intent as "token bucket"), and
the per-recipient cooldown was a hand-rolled last-timestamp check. Both now use
`reliakit-ratelimit::RateLimiter`: per-IP is `max_drips` per window with a
burst allowance, per-recipient is a 1-token-per-cooldown bucket. O(1) memory
per key, precise `Retry-After` hints, and the DashMap entry guard still holds
the per-key lock across acquire so the cooldown TOCTOU fix is preserved.
Non-consensus (testnet faucet binary only).

## [2.2.28] — 2026-06-04 — Pin validators as gossipsub explicit peers

### Networking — silent mesh-death fix
Expand Down
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/sentrix-faucet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ sentrix-primitives = { path = "../../crates/sentrix-primitives" }
sentrix-wallet = { path = "../../crates/sentrix-wallet" }
reliakit-primitives = "0.2"
reliakit-validate = "0.1"
reliakit-ratelimit = "0.1"

axum = "0.8"
tokio = { version = "1", features = ["macros", "rt-multi-thread", "signal"] }
Expand Down
146 changes: 110 additions & 36 deletions bin/sentrix-faucet/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ use axum::{
routing::{get, post},
};
use clap::Parser;
use dashmap::{DashMap, mapref::entry::Entry};
use dashmap::DashMap;
use reliakit_primitives::{HexString, NonEmptyStr, PositiveInt};
use reliakit_ratelimit::RateLimiter;
use reliakit_validate::{Valid, Validate, ValidationError};
use secp256k1::{PublicKey, SecretKey};
use sentrix_primitives::transaction::{MIN_TX_FEE, Transaction};
Expand Down Expand Up @@ -111,10 +112,16 @@ struct AppState {
ip_max_drips: u32,
addr_cooldown: Duration,
http: reqwest::Client,
/// IP → list of drip timestamps within the current window
ip_history: Arc<DashMap<IpAddr, Vec<Instant>>>,
/// recipient address → most recent drip timestamp
addr_last_drip: Arc<DashMap<String, Instant>>,
/// Monotonic base — rate-limit "now" is `start.elapsed()` in ms, the unit
/// reliakit-ratelimit buckets are clocked in (the crate never reads the
/// clock itself).
start: Instant,
/// IP → token bucket (`ip_max_drips` per `ip_window`). O(1) per IP, vs the
/// old per-IP `Vec<Instant>` that grew with traffic.
ip_buckets: Arc<DashMap<IpAddr, RateLimiter>>,
/// recipient address → 1-token bucket refilling every `addr_cooldown`
/// (one drip per address per cooldown).
addr_buckets: Arc<DashMap<String, RateLimiter>>,
}

#[derive(Deserialize)]
Expand Down Expand Up @@ -214,46 +221,65 @@ async fn submit_tx(http: &reqwest::Client, rpc_url: &str, tx: &Transaction) -> R
.ok_or_else(|| anyhow!("txid missing in response: {}", body))
}

/// Per-IP bucket: `max_drips` capacity, refilling one token every
/// `window_ms / max_drips` — steady rate of `max_drips` per window with a
/// full-burst allowance. RateLimiter::new clamps every arg to ≥1, so a zero
/// interval can't divide.
fn ip_bucket(max_drips: u32, window_ms: u64) -> RateLimiter {
let cap = max_drips.max(1) as u64;
RateLimiter::new(cap, 1, window_ms / cap)
}

/// Per-recipient cooldown bucket: a single token refilling every
/// `cooldown_ms` — exactly "one drip per address per cooldown".
fn addr_bucket(cooldown_ms: u64) -> RateLimiter {
RateLimiter::new(1, 1, cooldown_ms)
}

/// Rate-limit checks. Returns Ok(()) on pass, Err(reason) on rejection.
fn check_rate_limits(state: &AppState, ip: IpAddr, recipient: &str) -> Result<(), String> {
let now = Instant::now();
let window = state.ip_window;
let now = state.start.elapsed().as_millis() as u64;
let ip_window_ms = state.ip_window.as_millis() as u64;
let cooldown_ms = state.addr_cooldown.as_millis() as u64;

// Per-IP: prune old entries, then count remaining
// Per-IP token bucket. DashMap's entry guard holds the per-key shard lock
// across try_acquire_one, so refill+consume is atomic under concurrent
// requests from the same IP.
{
let mut entry = state.ip_history.entry(ip).or_default();
entry.retain(|t| now.duration_since(*t) < window);
if entry.len() >= state.ip_max_drips as usize {
let mut bucket = state
.ip_buckets
.entry(ip)
.or_insert_with(|| ip_bucket(state.ip_max_drips, ip_window_ms));
if !bucket.try_acquire_one(now) {
let retry_s = bucket
.retry_after(now, 1)
.unwrap_or(ip_window_ms)
.div_ceil(1000);
return Err(format!(
"rate limit: IP {} reached {} drips in {}s window",
"rate limit: IP {} reached {} drips per {}s — retry in {}s",
ip,
state.ip_max_drips,
window.as_secs()
ip_window_ms / 1000,
retry_s,
));
}
entry.push(now);
}

// Per-recipient cooldown — atomic via Entry guard. The old
// `get` + check + `insert` pattern released the lock between
// read and write, so two parallel requests for the same address
// could each see "no prior drip" and both proceed → cooldown
// bypass. Matching on `Entry` holds the per-key lock for the
// entire check + update so concurrent callers serialize cleanly.
match state.addr_last_drip.entry(recipient.to_string()) {
Entry::Occupied(mut o) => {
let last = *o.get();
if now.duration_since(last) < state.addr_cooldown {
let remaining = state.addr_cooldown - now.duration_since(last);
return Err(format!(
"address cooldown: try again in {} seconds",
remaining.as_secs()
));
}
o.insert(now);
}
Entry::Vacant(v) => {
v.insert(now);
// Per-recipient cooldown as a 1-token bucket. The same entry-guard
// atomicity closes the TOCTOU the old get+check+insert pattern had:
// two parallel requests for one address serialize on the per-key lock,
// so only one acquires the single token per cooldown.
{
let mut bucket = state
.addr_buckets
.entry(recipient.to_string())
.or_insert_with(|| addr_bucket(cooldown_ms));
if !bucket.try_acquire_one(now) {
let retry_s = bucket
.retry_after(now, 1)
.unwrap_or(cooldown_ms)
.div_ceil(1000);
return Err(format!("address cooldown: try again in {} seconds", retry_s));
}
}

Expand Down Expand Up @@ -414,8 +440,9 @@ async fn main() -> Result<()> {
.timeout(Duration::from_secs(10))
.build()
.context("build reqwest client")?,
ip_history: Arc::new(DashMap::new()),
addr_last_drip: Arc::new(DashMap::new()),
start: Instant::now(),
ip_buckets: Arc::new(DashMap::new()),
addr_buckets: Arc::new(DashMap::new()),
};

let cors = CorsLayer::new()
Expand Down Expand Up @@ -448,3 +475,50 @@ async fn main() -> Result<()> {

Ok(())
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn ip_bucket_allows_burst_then_blocks() {
// 5 drips per 5000ms window → cap 5, +1 every 1000ms.
let mut b = ip_bucket(5, 5000);
for _ in 0..5 {
assert!(b.try_acquire_one(0), "burst of capacity must pass");
}
assert!(!b.try_acquire_one(0), "6th in the same instant is blocked");
// retry_after points at the next refill (~1 interval).
let wait = b.retry_after(0, 1).expect("1 <= capacity");
assert!(wait > 0 && wait <= 1000, "retry within one interval, got {wait}");
// After one refill interval, exactly one token is back.
assert!(b.try_acquire_one(1000));
assert!(!b.try_acquire_one(1000));
}

#[test]
fn ip_bucket_zero_max_does_not_panic() {
// RateLimiter::new clamps to ≥1, so a misconfigured 0 still works.
let mut b = ip_bucket(0, 1000);
assert!(b.try_acquire_one(0));
}

#[test]
fn addr_bucket_one_per_cooldown() {
// One drip, then blocked until a full cooldown elapses.
let mut b = addr_bucket(60_000);
assert!(b.try_acquire_one(0), "first drip allowed");
assert!(!b.try_acquire_one(0), "second within cooldown blocked");
assert!(!b.try_acquire_one(59_999), "still blocked just before cooldown");
assert!(b.try_acquire_one(60_000), "allowed once cooldown elapses");
}

#[test]
fn addr_bucket_retry_after_reports_remaining() {
let mut b = addr_bucket(60_000);
assert!(b.try_acquire_one(0));
// At t=10s into a 60s cooldown, ~50s remain.
let wait = b.retry_after(10_000, 1).expect("1 <= capacity");
assert_eq!(wait, 50_000);
}
}
Loading