From e8ebc855cfed45a6383ca5495dbd7abfe41c81d5 Mon Sep 17 00:00:00 2001 From: satyakwok <119509589+satyakwok@users.noreply.github.com> Date: Thu, 4 Jun 2026 07:21:03 +0200 Subject: [PATCH] chore(faucet): token-bucket rate limiting via reliakit-ratelimit Replace the per-IP sliding-window Vec (which grew per IP, and whose own doc comment already called it a "token bucket") and the hand-rolled per-recipient last-timestamp cooldown with reliakit-ratelimit::RateLimiter: - per-IP: max_drips per window with a burst allowance - per-recipient: a 1-token-per-cooldown bucket - O(1) memory per key, precise Retry-After hints The DashMap entry guard still holds the per-key lock across try_acquire_one, so the cooldown TOCTOU fix is preserved. Non-consensus (faucet binary only); no version bump. --- CHANGELOG.md | 11 +++ Cargo.lock | 7 ++ bin/sentrix-faucet/Cargo.toml | 1 + bin/sentrix-faucet/src/main.rs | 146 +++++++++++++++++++++++++-------- 4 files changed, 129 insertions(+), 36 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c2b41d31..3ae5c05d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,17 @@ This project uses [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Faucet — token-bucket rate limiting via reliakit-ratelimit + +The faucet's per-IP limiter was a sliding-window `Vec` that grew per +IP (its own doc comment already described the intent as "token bucket"), and +the per-recipient cooldown was a hand-rolled last-timestamp check. Both now use +`reliakit-ratelimit::RateLimiter`: per-IP is `max_drips` per window with a +burst allowance, per-recipient is a 1-token-per-cooldown bucket. O(1) memory +per key, precise `Retry-After` hints, and the DashMap entry guard still holds +the per-key lock across acquire so the cooldown TOCTOU fix is preserved. +Non-consensus (testnet faucet binary only). + ## [2.2.28] — 2026-06-04 — Pin validators as gossipsub explicit peers ### Networking — silent mesh-death fix diff --git a/Cargo.lock b/Cargo.lock index 6d9aafb7..dcc1a021 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4555,6 +4555,12 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fdd920a56c8d9c7bb55fc8db757f8fc4e442f2ca029e9dabae5125f8fecc8d2b" +[[package]] +name = "reliakit-ratelimit" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9ee2d7152b488417bb6d8a39daaac4769833857b95d085231964dce1ccad4d1" + [[package]] name = "reliakit-secret" version = "0.1.0" @@ -5302,6 +5308,7 @@ dependencies = [ "dashmap", "hex", "reliakit-primitives", + "reliakit-ratelimit", "reliakit-validate", "reqwest", "secp256k1 0.31.1", diff --git a/bin/sentrix-faucet/Cargo.toml b/bin/sentrix-faucet/Cargo.toml index 914ab2c6..b54e2596 100644 --- a/bin/sentrix-faucet/Cargo.toml +++ b/bin/sentrix-faucet/Cargo.toml @@ -14,6 +14,7 @@ sentrix-primitives = { path = "../../crates/sentrix-primitives" } sentrix-wallet = { path = "../../crates/sentrix-wallet" } reliakit-primitives = "0.2" reliakit-validate = "0.1" +reliakit-ratelimit = "0.1" axum = "0.8" tokio = { version = "1", features = ["macros", "rt-multi-thread", "signal"] } diff --git a/bin/sentrix-faucet/src/main.rs b/bin/sentrix-faucet/src/main.rs index 5cc55ad9..2fd7a307 100644 --- a/bin/sentrix-faucet/src/main.rs +++ b/bin/sentrix-faucet/src/main.rs @@ -29,8 +29,9 @@ use axum::{ routing::{get, post}, }; use clap::Parser; -use dashmap::{DashMap, mapref::entry::Entry}; +use dashmap::DashMap; use reliakit_primitives::{HexString, NonEmptyStr, PositiveInt}; +use reliakit_ratelimit::RateLimiter; use reliakit_validate::{Valid, Validate, ValidationError}; use secp256k1::{PublicKey, SecretKey}; use sentrix_primitives::transaction::{MIN_TX_FEE, Transaction}; @@ -111,10 +112,16 @@ struct AppState { ip_max_drips: u32, addr_cooldown: Duration, http: reqwest::Client, - /// IP → list of drip timestamps within the current window - ip_history: Arc>>, - /// recipient address → most recent drip timestamp - addr_last_drip: Arc>, + /// Monotonic base — rate-limit "now" is `start.elapsed()` in ms, the unit + /// reliakit-ratelimit buckets are clocked in (the crate never reads the + /// clock itself). + start: Instant, + /// IP → token bucket (`ip_max_drips` per `ip_window`). O(1) per IP, vs the + /// old per-IP `Vec` that grew with traffic. + ip_buckets: Arc>, + /// recipient address → 1-token bucket refilling every `addr_cooldown` + /// (one drip per address per cooldown). + addr_buckets: Arc>, } #[derive(Deserialize)] @@ -214,46 +221,65 @@ async fn submit_tx(http: &reqwest::Client, rpc_url: &str, tx: &Transaction) -> R .ok_or_else(|| anyhow!("txid missing in response: {}", body)) } +/// Per-IP bucket: `max_drips` capacity, refilling one token every +/// `window_ms / max_drips` — steady rate of `max_drips` per window with a +/// full-burst allowance. RateLimiter::new clamps every arg to ≥1, so a zero +/// interval can't divide. +fn ip_bucket(max_drips: u32, window_ms: u64) -> RateLimiter { + let cap = max_drips.max(1) as u64; + RateLimiter::new(cap, 1, window_ms / cap) +} + +/// Per-recipient cooldown bucket: a single token refilling every +/// `cooldown_ms` — exactly "one drip per address per cooldown". +fn addr_bucket(cooldown_ms: u64) -> RateLimiter { + RateLimiter::new(1, 1, cooldown_ms) +} + /// Rate-limit checks. Returns Ok(()) on pass, Err(reason) on rejection. fn check_rate_limits(state: &AppState, ip: IpAddr, recipient: &str) -> Result<(), String> { - let now = Instant::now(); - let window = state.ip_window; + let now = state.start.elapsed().as_millis() as u64; + let ip_window_ms = state.ip_window.as_millis() as u64; + let cooldown_ms = state.addr_cooldown.as_millis() as u64; - // Per-IP: prune old entries, then count remaining + // Per-IP token bucket. DashMap's entry guard holds the per-key shard lock + // across try_acquire_one, so refill+consume is atomic under concurrent + // requests from the same IP. { - let mut entry = state.ip_history.entry(ip).or_default(); - entry.retain(|t| now.duration_since(*t) < window); - if entry.len() >= state.ip_max_drips as usize { + let mut bucket = state + .ip_buckets + .entry(ip) + .or_insert_with(|| ip_bucket(state.ip_max_drips, ip_window_ms)); + if !bucket.try_acquire_one(now) { + let retry_s = bucket + .retry_after(now, 1) + .unwrap_or(ip_window_ms) + .div_ceil(1000); return Err(format!( - "rate limit: IP {} reached {} drips in {}s window", + "rate limit: IP {} reached {} drips per {}s — retry in {}s", ip, state.ip_max_drips, - window.as_secs() + ip_window_ms / 1000, + retry_s, )); } - entry.push(now); } - // Per-recipient cooldown — atomic via Entry guard. The old - // `get` + check + `insert` pattern released the lock between - // read and write, so two parallel requests for the same address - // could each see "no prior drip" and both proceed → cooldown - // bypass. Matching on `Entry` holds the per-key lock for the - // entire check + update so concurrent callers serialize cleanly. - match state.addr_last_drip.entry(recipient.to_string()) { - Entry::Occupied(mut o) => { - let last = *o.get(); - if now.duration_since(last) < state.addr_cooldown { - let remaining = state.addr_cooldown - now.duration_since(last); - return Err(format!( - "address cooldown: try again in {} seconds", - remaining.as_secs() - )); - } - o.insert(now); - } - Entry::Vacant(v) => { - v.insert(now); + // Per-recipient cooldown as a 1-token bucket. The same entry-guard + // atomicity closes the TOCTOU the old get+check+insert pattern had: + // two parallel requests for one address serialize on the per-key lock, + // so only one acquires the single token per cooldown. + { + let mut bucket = state + .addr_buckets + .entry(recipient.to_string()) + .or_insert_with(|| addr_bucket(cooldown_ms)); + if !bucket.try_acquire_one(now) { + let retry_s = bucket + .retry_after(now, 1) + .unwrap_or(cooldown_ms) + .div_ceil(1000); + return Err(format!("address cooldown: try again in {} seconds", retry_s)); } } @@ -414,8 +440,9 @@ async fn main() -> Result<()> { .timeout(Duration::from_secs(10)) .build() .context("build reqwest client")?, - ip_history: Arc::new(DashMap::new()), - addr_last_drip: Arc::new(DashMap::new()), + start: Instant::now(), + ip_buckets: Arc::new(DashMap::new()), + addr_buckets: Arc::new(DashMap::new()), }; let cors = CorsLayer::new() @@ -448,3 +475,50 @@ async fn main() -> Result<()> { Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn ip_bucket_allows_burst_then_blocks() { + // 5 drips per 5000ms window → cap 5, +1 every 1000ms. + let mut b = ip_bucket(5, 5000); + for _ in 0..5 { + assert!(b.try_acquire_one(0), "burst of capacity must pass"); + } + assert!(!b.try_acquire_one(0), "6th in the same instant is blocked"); + // retry_after points at the next refill (~1 interval). + let wait = b.retry_after(0, 1).expect("1 <= capacity"); + assert!(wait > 0 && wait <= 1000, "retry within one interval, got {wait}"); + // After one refill interval, exactly one token is back. + assert!(b.try_acquire_one(1000)); + assert!(!b.try_acquire_one(1000)); + } + + #[test] + fn ip_bucket_zero_max_does_not_panic() { + // RateLimiter::new clamps to ≥1, so a misconfigured 0 still works. + let mut b = ip_bucket(0, 1000); + assert!(b.try_acquire_one(0)); + } + + #[test] + fn addr_bucket_one_per_cooldown() { + // One drip, then blocked until a full cooldown elapses. + let mut b = addr_bucket(60_000); + assert!(b.try_acquire_one(0), "first drip allowed"); + assert!(!b.try_acquire_one(0), "second within cooldown blocked"); + assert!(!b.try_acquire_one(59_999), "still blocked just before cooldown"); + assert!(b.try_acquire_one(60_000), "allowed once cooldown elapses"); + } + + #[test] + fn addr_bucket_retry_after_reports_remaining() { + let mut b = addr_bucket(60_000); + assert!(b.try_acquire_one(0)); + // At t=10s into a 60s cooldown, ~50s remain. + let wait = b.retry_after(10_000, 1).expect("1 <= capacity"); + assert_eq!(wait, 50_000); + } +}