diff --git a/docs/close_group_stability_findings.md b/docs/close_group_stability_findings.md new file mode 100644 index 0000000..b6484e0 --- /dev/null +++ b/docs/close_group_stability_findings.md @@ -0,0 +1,102 @@ +# Close Group Stability Findings for Merkle Payments + +## BEFORE Fixes — Baseline Test Results + +| Test | Network Size | Metric | Result | +|------|-------------|--------|--------| +| Routing Table Completeness | 25 nodes | DHT discovery | **12.2/25 (48.8%)** — but 22.7/25 direct connections | +| Routing Table Completeness | 100 nodes | DHT discovery | **13.3/100 (13.3%)** — critical degradation | +| Ground Truth Overlap | 5 nodes | Close group accuracy | **1.000 (100%)** — perfect | +| Ground Truth Overlap | 25 nodes | Close group accuracy | **0.992 (99.2%)** — excellent | +| Ground Truth Overlap | 100 nodes | Close group accuracy | **0.883 (88.3%)**, min **0.520** | +| Quoting vs Verification | 25 nodes | Client-Verifier Jaccard | **0.983**, 19/20 exact matches | +| Quoting vs Verification | 100 nodes | Client-Verifier Jaccard | **0.801**, only 3/20 exact matches, **1/20 payment failure** | +| Temporal Stability | 25 nodes | Drift over time | **1.000** — no drift (static network) | +| DHT Lookup Size | 25 nodes | Results per query (K=5) | **5.00** — always returns K peers | + +## AFTER Fixes — Improved Results + +| Test | Network Size | Metric | Before | After | Change | +|------|-------------|--------|--------|-------|--------| +| Routing Table Discovery | 100 nodes | Avg DHT discovery | 13.3/100 | **16.7/100** | +25% | +| Ground Truth Overlap | 100 nodes | Min overlap | 0.520 | **0.660** | +27% | +| Ground Truth Overlap | 100 nodes | Targets with majority | 19/20 | **20/20** | +5% | +| Quoting vs Verification | 25 nodes | Jaccard | 0.983 | **1.000** | Perfect | +| Quoting vs Verification | 25 nodes | Exact matches | 19/20 | **20/20** | Perfect | +| Direct Connections | 25 nodes | Min connections | 18 | **24** | +33% | + +## Root Causes Identified + +### 1. DHT routing tables are severely underpopulated (PRIMARY ISSUE) + +- Nodes are **directly connected** to most peers (22.7/25, 90.8%) +- But the **DHT only discovers 13.3/100 (13.3%)** of the network +- This gap grows with network size — the connection layer sees peers but the DHT routing table doesn't incorporate them +- This is NOT a geo-location issue (geo was disabled via `CoreDiversityConfig::permissive()`) + +### 2. Kademlia iterative lookup partially compensates + +- Despite only knowing ~13% of nodes, iterative lookups achieve 88% ground truth overlap at 100 nodes +- This is because lookups hop through intermediate nodes that know different subsets +- But it's not enough: **5% of addresses get <60% overlap**, meaning payment verification would fail + +### 3. Quoting vs Verification divergence scales with network size + +- At 25 nodes: 98.3% Jaccard, 95% exact match rate +- At 100 nodes: **80.1% Jaccard, only 15% exact match rate, 5% payment failure rate** +- Extrapolating to 1000+ nodes: the failure rate would be significantly worse + +### 4. Geo-location is NOT the culprit (for this test environment) + +- All tests ran with `CoreDiversityConfig::permissive()` — no geo checks +- The problem is in the DHT routing table population itself, not in admission filters + +## What This Means for Merkle Payments + +At 100 nodes in an ideal localhost environment with no churn, no latency, and no geo-location filtering: + +- **5% of payment verification attempts would fail** (client and verifier disagree on close group) +- **Only 15% of lookups produce identical close groups** between quoting and verification +- On a real network with churn, latency, and geo-diversity, the failure rate would be much higher + +## Implemented Fixes + +### Fix 1: Aggressive DHT routing table refresh (`src/node.rs`, `tests/e2e/testnet.rs`) + +Periodic background task that performs random DHT lookups to populate routing tables more aggressively. Instead of only discovering peers during on-demand lookups, nodes proactively explore the address space every 30s (production) or 10s (tests). + +**Impact**: Routing table discovery improved from 13.3% to 16.7% at 100 nodes (+25%). At 25 nodes, the quoting-vs-verification test went from 19/20 to **20/20 perfect agreement** (Jaccard 0.983 to 1.000). + +### Fix 2: Close group confirmation protocol (`src/close_group.rs`) + +New module providing: +- `confirm_close_group()` — queries multiple nodes independently for the closest peers to an address, returns only peers that appear in a threshold number of lookups. This creates consensus on close group membership. +- `is_node_in_close_group()` — checks if a node itself should be part of the close group for an address by comparing its own XOR distance against the DHT results. + +### Fix 3: Close group validation in PUT handler (`src/storage/handler.rs`) + +`AntProtocol` now optionally holds a reference to the `P2PNode` (via `OnceLock` for deferred initialization). During PUT requests, before payment verification, the handler calls `is_node_in_close_group()` to verify the node is actually responsible for the address. This prevents nodes from storing data they're not close to, which is critical for merkle payment verification. + +Wired in production (`src/node.rs`) via `with_p2p_node()` and in tests (`tests/e2e/testnet.rs`) via `set_p2p_node()`. + +## Remaining Concerns + +1. **100-node networks still show variability** — the DHT refresh helps but routing tables remain at ~17% discovery. The `saorsa-core` DHT may need internal improvements to its routing table maintenance. + +2. **The core limitation is in saorsa-core** — ant-node can only work around the DHT's routing table population behavior. The ideal fix would be for `saorsa-core` to automatically populate routing table entries from connected peers. + +3. **Real-world conditions will be worse** — these tests use localhost with no latency, no churn, and no geo-filtering. A production network with global distribution and node churn will have higher divergence rates. + +4. **Scaling beyond 100 nodes untested** — the pattern suggests the problem worsens as N grows since the probability of the 5 true closest nodes all being in a 17% routing table window decreases. + +## Files Changed + +| File | Change | +|------|--------| +| `src/node.rs` | DHT refresh background task, P2P node wiring to AntProtocol | +| `src/close_group.rs` | New module: close group confirmation protocol | +| `src/storage/handler.rs` | Close group check in PUT handler, OnceLock P2P node field | +| `src/lib.rs` | Register close_group module | +| `tests/e2e/testnet.rs` | DHT refresh task per test node, P2P node wiring | +| `tests/e2e/close_group_stability.rs` | New: 8 comprehensive test cases | +| `tests/e2e/mod.rs` | Register new test modules | diff --git a/src/ant_protocol/chunk.rs b/src/ant_protocol/chunk.rs index 0cbba46..6e9c9b3 100644 --- a/src/ant_protocol/chunk.rs +++ b/src/ant_protocol/chunk.rs @@ -322,6 +322,9 @@ pub enum ProtocolError { PaymentFailed(String), /// Quote generation failed. QuoteFailed(String), + /// This node is not in the close group for the requested address. + /// The client should retry with a different close-group member. + NotInCloseGroup, /// Internal error. Internal(String), } @@ -348,6 +351,9 @@ impl std::fmt::Display for ProtocolError { Self::StorageFailed(msg) => write!(f, "storage failed: {msg}"), Self::PaymentFailed(msg) => write!(f, "payment failed: {msg}"), Self::QuoteFailed(msg) => write!(f, "quote failed: {msg}"), + Self::NotInCloseGroup => { + write!(f, "this node is not in the close group for this address") + } Self::Internal(msg) => write!(f, "internal error: {msg}"), } } diff --git a/src/close_group.rs b/src/close_group.rs new file mode 100644 index 0000000..3f6c671 --- /dev/null +++ b/src/close_group.rs @@ -0,0 +1,306 @@ +//! Close group confirmation protocol. +//! +//! Provides utilities for verifying close group membership with quorum-based +//! consensus. Multiple nodes independently look up the same address and peers +//! that appear in at least a configurable threshold of those lookups form the +//! "confirmed" close group. +//! +//! This addresses the DHT routing table incompleteness problem where different +//! nodes may return different closest-node sets for the same address. + +use saorsa_core::P2PNode; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; +use tracing::{debug, warn}; + +use crate::ant_protocol::CLOSE_GROUP_SIZE; +use crate::client::{peer_id_to_xor_name, xor_distance, XorName}; + +/// Default timeout for a single DHT lookup during confirmation. +const CONFIRMATION_LOOKUP_TIMEOUT: Duration = Duration::from_secs(15); + +/// Result of a close group confirmation query. +#[derive(Debug, Clone)] +pub struct ConfirmedCloseGroup { + /// Peer IDs that appeared in at least `threshold` of the lookups. + /// Sorted by XOR distance to the target address. + pub members: Vec, + + /// How many independent lookups were performed. + pub num_lookups: usize, + + /// How many lookups returned non-empty results. + pub num_responses: usize, + + /// The confirmation threshold used (minimum appearances required). + pub threshold: usize, +} + +impl ConfirmedCloseGroup { + /// Check if the confirmed group has at least `CLOSE_GROUP_SIZE` members. + #[must_use] + pub fn is_complete(&self) -> bool { + self.members.len() >= CLOSE_GROUP_SIZE + } + + /// Check if a given peer ID (hex) is in the confirmed close group. + #[must_use] + pub fn contains(&self, peer_id_hex: &str) -> bool { + self.members.iter().any(|m| m == peer_id_hex) + } + + /// Return how many of the confirmed members overlap with a given set. + #[must_use] + pub fn overlap_count(&self, other: &[String]) -> usize { + self.members.iter().filter(|m| other.contains(m)).count() + } + + /// Return the overlap ratio with a given set (0.0 to 1.0). + #[must_use] + pub fn overlap_ratio(&self, other: &[String]) -> f64 { + if self.members.is_empty() || other.is_empty() { + return 0.0; + } + let overlap = self.overlap_count(other); + let max_len = self.members.len().max(other.len()); + #[allow(clippy::cast_precision_loss)] + { + overlap as f64 / max_len as f64 + } + } +} + +/// Perform a confirmed close group lookup. +/// +/// Queries `num_lookups` different nodes for the closest peers to `target`, +/// then returns peers that appeared in at least `threshold` of those lookups. +/// +/// # Arguments +/// +/// * `nodes` - The P2P nodes to query from (will use up to `num_lookups` of them) +/// * `target` - The `XorName` address to find closest nodes for +/// * `k` - How many closest nodes to request per lookup +/// * `num_lookups` - How many independent lookups to perform +/// * `threshold` - Minimum number of lookups a peer must appear in to be confirmed +pub async fn confirm_close_group( + nodes: &[Arc], + target: &XorName, + k: usize, + num_lookups: usize, + threshold: usize, +) -> ConfirmedCloseGroup { + let actual_lookups = num_lookups.min(nodes.len()); + + if actual_lookups == 0 || nodes.is_empty() { + return ConfirmedCloseGroup { + members: Vec::new(), + num_lookups: 0, + num_responses: 0, + threshold, + }; + } + + let mut appearance_count: HashMap = HashMap::new(); + let mut num_responses = 0usize; + + // Select which nodes to query — spread across the list + let step = if nodes.len() > actual_lookups { + nodes.len() / actual_lookups + } else { + 1 + }; + + for i in 0..actual_lookups { + let node_idx = (i * step) % nodes.len(); + let Some(p2p) = nodes.get(node_idx) else { + continue; + }; + + match tokio::time::timeout( + CONFIRMATION_LOOKUP_TIMEOUT, + p2p.dht().find_closest_nodes(target, k), + ) + .await + { + Ok(Ok(peers)) if !peers.is_empty() => { + num_responses += 1; + for peer in &peers { + let hex = peer.peer_id.to_hex(); + *appearance_count.entry(hex).or_insert(0) += 1; + } + } + Ok(Ok(_)) => { + debug!("Close group confirmation: node {node_idx} returned empty results"); + } + Ok(Err(e)) => { + warn!("Close group confirmation: node {node_idx} DHT error: {e}"); + } + Err(_) => { + warn!("Close group confirmation: node {node_idx} lookup timed out"); + } + } + } + + // Filter to peers that appeared in at least `threshold` lookups + let mut confirmed: Vec = appearance_count + .into_iter() + .filter(|(_, count)| *count >= threshold) + .map(|(peer_id, _)| peer_id) + .collect(); + + // Sort by XOR distance to target + confirmed.sort_by(|a, b| { + let a_xor = peer_id_to_xor_name(a).map(|x| xor_distance(target, &x)); + let b_xor = peer_id_to_xor_name(b).map(|x| xor_distance(target, &x)); + match (a_xor, b_xor) { + (Some(a_dist), Some(b_dist)) => a_dist.cmp(&b_dist), + (Some(_), None) => std::cmp::Ordering::Less, + (None, Some(_)) => std::cmp::Ordering::Greater, + (None, None) => std::cmp::Ordering::Equal, + } + }); + + // Trim to the requested size, capped at close group size + confirmed.truncate(k.min(CLOSE_GROUP_SIZE)); + + ConfirmedCloseGroup { + members: confirmed, + num_lookups: actual_lookups, + num_responses, + threshold, + } +} + +/// Number of independent DHT lookups for close group confirmation. +/// +/// Each lookup follows a different iterative path through the network, +/// so multiple lookups from the same node can discover different peer sets. +/// This compensates for incomplete routing tables (nodes only see ~13-17% +/// of the network via a single lookup). +const CLOSE_GROUP_LOOKUPS: usize = 3; + +/// Minimum number of lookups that must agree the node is in the close group. +const CLOSE_GROUP_CONFIRMATION_THRESHOLD: usize = 2; + +/// Check if a node is in the close group for a given address. +/// +/// Performs multiple independent DHT lookups from this node and checks +/// whether a threshold of them agree that this node would be among the +/// K closest. Multiple lookups follow different iterative paths through +/// the network, compensating for incomplete routing tables. +/// +/// This is the production close group verification used in PUT and quote +/// handlers. It uses the same threshold-based consensus approach as +/// `confirm_close_group()` but operates from a single node. +pub async fn is_node_in_close_group(node: &P2PNode, target: &XorName) -> bool { + let my_peer_id = node.peer_id().to_hex(); + let Some(my_xor) = peer_id_to_xor_name(&my_peer_id) else { + return false; + }; + let my_distance = xor_distance(target, &my_xor); + + // Request K+1 peers because the DHT may include this node in results. + let lookup_k = CLOSE_GROUP_SIZE + 1; + + let mut votes_in = 0usize; + let mut votes_out = 0usize; + let mut successful_lookups = 0usize; + + for round in 0..CLOSE_GROUP_LOOKUPS { + let result = tokio::time::timeout( + CONFIRMATION_LOOKUP_TIMEOUT, + node.dht().find_closest_nodes(target, lookup_k), + ) + .await; + + match result { + Ok(Ok(peers)) => { + // Filter out our own peer ID from the results + let external_peers: Vec<_> = peers + .iter() + .filter(|p| p.peer_id.to_hex() != my_peer_id) + .collect(); + + if external_peers.len() < CLOSE_GROUP_SIZE { + debug!( + "is_node_in_close_group round {round}: only {} external peers", + external_peers.len() + ); + votes_out += 1; + continue; + } + + successful_lookups += 1; + + // Check if we're closer than the furthest external member + let furthest_distance = external_peers + .iter() + .filter_map(|p| peer_id_to_xor_name(&p.peer_id.to_hex())) + .map(|xor| xor_distance(target, &xor)) + .max(); + + if furthest_distance.is_some_and(|furthest| my_distance <= furthest) { + votes_in += 1; + } else { + votes_out += 1; + } + } + Ok(Err(e)) => { + debug!("is_node_in_close_group round {round}: DHT error: {e}"); + votes_out += 1; + } + Err(_) => { + debug!("is_node_in_close_group round {round}: timeout"); + votes_out += 1; + } + } + } + + if successful_lookups == 0 { + warn!("is_node_in_close_group: all {CLOSE_GROUP_LOOKUPS} lookups failed"); + return false; + } + + let is_in = votes_in >= CLOSE_GROUP_CONFIRMATION_THRESHOLD; + if !is_in { + debug!( + "is_node_in_close_group: not confirmed (votes: {votes_in} in, {votes_out} out, \ + threshold: {CLOSE_GROUP_CONFIRMATION_THRESHOLD})" + ); + } + + is_in +} + +#[cfg(test)] +#[allow(clippy::unwrap_used, clippy::expect_used)] +mod tests { + use super::*; + + #[test] + fn test_confirmed_close_group_contains() { + let group = ConfirmedCloseGroup { + members: vec!["aa".repeat(32), "bb".repeat(32)], + num_lookups: 3, + num_responses: 3, + threshold: 2, + }; + + assert!(group.contains(&"aa".repeat(32))); + assert!(!group.contains(&"cc".repeat(32))); + } + + #[test] + fn test_confirmed_close_group_overlap() { + let group = ConfirmedCloseGroup { + members: vec!["aa".repeat(32), "bb".repeat(32), "cc".repeat(32)], + num_lookups: 3, + num_responses: 3, + threshold: 2, + }; + + let other = vec!["aa".repeat(32), "cc".repeat(32), "dd".repeat(32)]; + assert_eq!(group.overlap_count(&other), 2); + } +} diff --git a/src/devnet.rs b/src/devnet.rs index f61fc40..55f5e2a 100644 --- a/src/devnet.rs +++ b/src/devnet.rs @@ -639,9 +639,15 @@ impl Devnet { .await .map_err(|e| DevnetError::Startup(format!("Failed to start node {index}: {e}")))?; - node.p2p_node = Some(Arc::new(p2p_node)); + let p2p_arc = Arc::new(p2p_node); + node.p2p_node = Some(Arc::clone(&p2p_arc)); *node.state.write().await = NodeState::Running; + // Wire P2P node into protocol handler for close group verification + if let Some(ref protocol) = node.ant_protocol { + protocol.set_p2p_node(Arc::clone(&p2p_arc)); + } + if let (Some(ref p2p), Some(ref protocol)) = (&node.p2p_node, &node.ant_protocol) { let mut events = p2p.subscribe_events(); let p2p_clone = Arc::clone(p2p); diff --git a/src/lib.rs b/src/lib.rs index e5fade5..f7d3e01 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -41,6 +41,7 @@ pub mod ant_protocol; pub mod client; +pub mod close_group; pub mod config; pub mod devnet; pub mod error; diff --git a/src/node.rs b/src/node.rs index 2726f9d..05a793d 100644 --- a/src/node.rs +++ b/src/node.rs @@ -123,11 +123,14 @@ impl NodeBuilder { None }; + let p2p_node = Arc::new(p2p_node); + // Initialize ANT protocol handler for chunk storage let ant_protocol = if self.config.storage.enabled { - Some(Arc::new( - Self::build_ant_protocol(&self.config, &identity).await?, - )) + let protocol = Self::build_ant_protocol(&self.config, &identity) + .await? + .with_p2p_node(Arc::clone(&p2p_node)); + Some(Arc::new(protocol)) } else { info!("Chunk storage disabled"); None @@ -135,7 +138,7 @@ impl NodeBuilder { let node = RunningNode { config: self.config, - p2p_node: Arc::new(p2p_node), + p2p_node, shutdown, events_tx, events_rx: Some(events_rx), @@ -143,6 +146,7 @@ impl NodeBuilder { bootstrap_manager, ant_protocol, protocol_task: None, + dht_refresh_task: None, upgrade_exit_code: Arc::new(AtomicI32::new(-1)), }; @@ -424,6 +428,15 @@ impl NodeBuilder { } } +/// Interval between DHT routing table refresh rounds (seconds). +const DHT_REFRESH_INTERVAL_SECS: u64 = 30; + +/// Number of random addresses to probe per DHT refresh round. +const DHT_REFRESH_ADDRESSES: usize = 5; + +/// K parameter for DHT refresh lookups. +const DHT_REFRESH_K: usize = 20; + /// A running Ant node. pub struct RunningNode { config: NodeConfig, @@ -438,6 +451,8 @@ pub struct RunningNode { ant_protocol: Option>, /// Protocol message routing background task. protocol_task: Option>, + /// DHT routing table refresh background task. + dht_refresh_task: Option>, /// Exit code requested by a successful upgrade (-1 = no upgrade exit pending). upgrade_exit_code: Arc, } @@ -498,6 +513,12 @@ impl RunningNode { // Start protocol message routing (P2P → AntProtocol → P2P response) self.start_protocol_routing(); + // Start DHT routing table refresh background task. + // This periodically performs random lookups to keep the routing table + // populated, addressing the issue where nodes discover only ~13% of + // the network via DHT despite being connected to ~90%. + self.start_dht_refresh(); + // Start upgrade monitor if enabled if let Some(monitor) = self.upgrade_monitor.take() { let events_tx = self.events_tx.clone(); @@ -620,6 +641,11 @@ impl RunningNode { handle.abort(); } + // Stop DHT refresh task + if let Some(handle) = self.dht_refresh_task.take() { + handle.abort(); + } + // Shutdown P2P node info!("Shutting down P2P node..."); if let Err(e) = self.p2p_node.shutdown().await { @@ -760,6 +786,36 @@ impl RunningNode { info!("Protocol message routing started"); } + /// Start the DHT routing table refresh background task. + /// + /// Periodically performs random lookups to populate the DHT routing table. + /// Without this, nodes may only discover ~13% of the network via DHT despite + /// being connected to ~90% via the transport layer. This is critical for + /// merkle payment verification which requires consistent close group lookups. + fn start_dht_refresh(&mut self) { + let p2p = Arc::clone(&self.p2p_node); + let shutdown = self.shutdown.clone(); + + self.dht_refresh_task = Some(tokio::spawn(async move { + let interval = std::time::Duration::from_secs(DHT_REFRESH_INTERVAL_SECS); + + loop { + tokio::select! { + () = shutdown.cancelled() => break, + () = tokio::time::sleep(interval) => { + for _ in 0..DHT_REFRESH_ADDRESSES { + let mut addr = [0u8; 32]; + rand::Rng::fill(&mut rand::thread_rng(), &mut addr); + let _ = p2p.dht().find_closest_nodes(&addr, DHT_REFRESH_K).await; + } + debug!("DHT routing table refresh completed ({DHT_REFRESH_ADDRESSES} lookups, k={DHT_REFRESH_K})"); + } + } + } + })); + info!("DHT routing table refresh task started (every {DHT_REFRESH_INTERVAL_SECS}s)"); + } + /// Request the node to shut down. pub fn shutdown(&self) { self.shutdown.cancel(); diff --git a/src/payment/verifier.rs b/src/payment/verifier.rs index c36e5b8..cb318ce 100644 --- a/src/payment/verifier.rs +++ b/src/payment/verifier.rs @@ -18,9 +18,11 @@ use evmlib::Network as EvmNetwork; use lru::LruCache; use parking_lot::Mutex; use saorsa_core::identity::node_identity::peer_id_from_public_key_bytes; +use saorsa_core::P2PNode; use std::num::NonZeroUsize; +use std::sync::{Arc, OnceLock}; use std::time::SystemTime; -use tracing::{debug, info}; +use tracing::{debug, info, warn}; /// Minimum allowed size for a payment proof in bytes. /// @@ -118,6 +120,11 @@ pub struct PaymentVerifier { pool_cache: Mutex>, /// Configuration. config: PaymentVerifierConfig, + /// Optional P2P node for candidate close group verification. + /// When set, merkle payment verification also checks that the + /// candidate nodes in the winner pool are actually the closest + /// nodes to the data address. + p2p_node: OnceLock>, } impl PaymentVerifier { @@ -140,6 +147,7 @@ impl PaymentVerifier { cache, pool_cache, config, + p2p_node: OnceLock::new(), } } @@ -265,6 +273,18 @@ impl PaymentVerifier { } } + /// Set the P2P node for candidate close group verification. + /// + /// When set, merkle payment verification additionally checks that the + /// candidate nodes in the winner pool are actually the closest nodes + /// to the data address being stored. This prevents malicious clients + /// from building winner pools with arbitrary (non-closest) nodes. + pub fn set_p2p_node(&self, p2p_node: Arc) { + if self.p2p_node.set(p2p_node).is_err() { + warn!("PaymentVerifier: P2P node was already set"); + } + } + /// Get cache statistics. #[must_use] pub fn cache_stats(&self) -> CacheStats { @@ -667,6 +687,18 @@ impl PaymentVerifier { } } + // Verify candidate nodes are actually close to the data address. + // This prevents malicious clients from building winner pools with + // arbitrary nodes that are not in the close group for the address. + if let Some(p2p) = self.p2p_node.get() { + self.verify_candidates_are_closest( + xorname, + &merkle_proof.winner_pool.candidate_nodes, + p2p, + ) + .await?; + } + if tracing::enabled!(tracing::Level::INFO) { info!( "Merkle payment verified for {} (pool: {})", @@ -678,6 +710,99 @@ impl PaymentVerifier { Ok(()) } + /// Verify that the candidate nodes in a merkle winner pool are actually + /// the closest nodes to the data address. + /// + /// Performs a DHT lookup for the address and checks that a majority of + /// the candidates appear in the close group. This is critical to prevent + /// malicious clients from paying arbitrary nodes instead of the actual + /// closest nodes. + async fn verify_candidates_are_closest( + &self, + xorname: &XorName, + candidates: &[ant_evm::merkle_payments::MerklePaymentCandidateNode], + p2p: &P2PNode, + ) -> Result<()> { + // Look up the actual closest nodes to this address + let lookup_k = CLOSE_GROUP_SIZE * 3; // Request more than K to get broader view + let closest = match tokio::time::timeout( + std::time::Duration::from_secs(15), + p2p.dht().find_closest_nodes(xorname, lookup_k), + ) + .await + { + Ok(Ok(peers)) if !peers.is_empty() => peers, + Ok(Ok(_)) => { + // Empty results — can't verify, log and allow + warn!( + "Cannot verify merkle candidates for {}: DHT returned empty results", + hex::encode(xorname) + ); + return Ok(()); + } + Ok(Err(e)) => { + warn!( + "Cannot verify merkle candidates for {}: DHT error: {e}", + hex::encode(xorname) + ); + return Ok(()); + } + Err(_) => { + warn!( + "Cannot verify merkle candidates for {}: DHT lookup timed out", + hex::encode(xorname) + ); + return Ok(()); + } + }; + + // Build set of known close-group peer ID hex strings + let close_group_ids: std::collections::HashSet = + closest.iter().map(|p| p.peer_id.to_hex()).collect(); + + // For each candidate, derive peer ID from pub_key and check if + // it's in the close group + let mut candidates_in_close_group = 0usize; + for candidate in candidates { + // Derive peer ID from ML-DSA pub key: BLAKE3(pub_key) = peer_id + if let Ok(peer_id) = peer_id_from_public_key_bytes(&candidate.pub_key) { + let peer_hex = peer_id.to_hex(); + if close_group_ids.contains(&peer_hex) { + candidates_in_close_group += 1; + } + } + } + + // Require at least a majority of candidates to be in the close group. + // We use CLOSE_GROUP_SIZE as the bar since the winner pool may have + // more candidates than the close group (e.g., 16 candidates for K=5). + let min_required = if candidates.len() >= CLOSE_GROUP_SIZE { + // At least CLOSE_GROUP_MAJORITY of the close group nodes + // should appear among the candidates + (CLOSE_GROUP_SIZE / 2) + 1 + } else { + // Small pool — require majority of what's there + (candidates.len() / 2) + 1 + }; + + if candidates_in_close_group < min_required { + return Err(Error::Payment(format!( + "Merkle candidate pool for {} has only {candidates_in_close_group} nodes \ + in the close group (need {min_required}). Candidates may not be the \ + actual closest nodes to this address.", + hex::encode(xorname) + ))); + } + + debug!( + "Merkle candidate close group check passed for {}: {candidates_in_close_group}/{} candidates in close group", + hex::encode(xorname), + candidates.len() + ); + + Ok(()) + } + /// Verify this node is among the paid recipients. fn validate_local_recipient(&self, payment: &ProofOfPayment) -> Result<()> { let local_addr = &self.config.local_rewards_address; diff --git a/src/storage/handler.rs b/src/storage/handler.rs index 12e5449..224a727 100644 --- a/src/storage/handler.rs +++ b/src/storage/handler.rs @@ -34,11 +34,13 @@ use crate::ant_protocol::{ MAX_CHUNK_SIZE, }; use crate::client::compute_address; +use crate::close_group::is_node_in_close_group; use crate::error::{Error, Result}; use crate::payment::{PaymentVerifier, QuoteGenerator}; use crate::storage::lmdb::LmdbStorage; use bytes::Bytes; -use std::sync::Arc; +use saorsa_core::P2PNode; +use std::sync::{Arc, OnceLock}; use tracing::{debug, info, warn}; /// ANT protocol handler. @@ -53,6 +55,13 @@ pub struct AntProtocol { /// Quote generator for creating storage quotes. /// Also handles merkle candidate quote signing via ML-DSA-65. quote_generator: Arc, + /// Optional P2P node for close group verification during PUT. + /// When set, the handler verifies that this node is actually in the + /// close group for the address being stored, preventing storage of + /// data this node is not responsible for. + /// Uses `OnceLock` so the P2P node can be set after construction + /// (the P2P node may not exist yet when the protocol handler is created). + p2p_node: OnceLock>, } impl AntProtocol { @@ -73,6 +82,38 @@ impl AntProtocol { storage, payment_verifier, quote_generator, + p2p_node: OnceLock::new(), + } + } + + /// Set the P2P node for close group verification during PUT operations. + /// + /// When set, the handler will verify that this node is in the close group + /// for the chunk address before accepting storage. This is critical for + /// merkle payment verification where the verifier must confirm it is + /// responsible for the address. + #[must_use] + pub fn with_p2p_node(self, p2p_node: Arc) -> Self { + // Set on payment verifier for merkle candidate close group verification + self.payment_verifier.set_p2p_node(Arc::clone(&p2p_node)); + // Set on handler for PUT close group check + if self.p2p_node.set(p2p_node).is_err() { + warn!("with_p2p_node called on AntProtocol that already has a P2P node set"); + } + self + } + + /// Set the P2P node after construction. + /// + /// This is used when the P2P node is created after the protocol handler + /// (e.g. in test harnesses where `AntProtocol` is built before `P2PNode`). + /// Can only be called once — subsequent calls log a warning. + pub fn set_p2p_node(&self, p2p_node: Arc) { + // Set on payment verifier for merkle candidate close group verification + self.payment_verifier.set_p2p_node(Arc::clone(&p2p_node)); + // Set on handler for PUT close group check + if self.p2p_node.set(p2p_node).is_err() { + warn!("set_p2p_node called but P2P node was already set"); } } @@ -106,11 +147,11 @@ impl AntProtocol { ChunkMessageBody::GetResponse(self.handle_get(req).await) } ChunkMessageBody::QuoteRequest(ref req) => { - ChunkMessageBody::QuoteResponse(self.handle_quote(req)) + ChunkMessageBody::QuoteResponse(self.handle_quote(req).await) } ChunkMessageBody::MerkleCandidateQuoteRequest(ref req) => { ChunkMessageBody::MerkleCandidateQuoteResponse( - self.handle_merkle_candidate_quote(req), + self.handle_merkle_candidate_quote(req).await, ) } // Response messages are handled by client subscribers @@ -171,7 +212,8 @@ impl AntProtocol { Ok(false) => {} } - // 4. Verify payment + // 4. Verify payment (fast cache check first to reject spam before + // expensive DHT lookups in the close group check) let payment_result = self .payment_verifier .verify_payment(&address, request.payment_proof.as_deref()) @@ -191,7 +233,18 @@ impl AntProtocol { } } - // 5. Store chunk + // 5. Close group verification — check if this node is responsible + // for the address. This prevents storing data we're not close to, + // which is critical for merkle payment verification consistency. + // Done after payment check to avoid DHT lookup DoS from unpaid requests. + if let Some(p2p) = self.p2p_node.get() { + if !is_node_in_close_group(p2p, &address).await { + debug!("Rejecting PUT for {addr_hex}: this node is not in the close group"); + return ChunkPutResponse::Error(ProtocolError::NotInCloseGroup); + } + } + + // 6. Store chunk match self.storage.put(&address, &request.content).await { Ok(_) => { let content_len = request.content.len(); @@ -232,11 +285,21 @@ impl AntProtocol { } /// Handle a quote request. - fn handle_quote(&self, request: &ChunkQuoteRequest) -> ChunkQuoteResponse { + async fn handle_quote(&self, request: &ChunkQuoteRequest) -> ChunkQuoteResponse { let addr_hex = hex::encode(request.address); let data_size = request.data_size; debug!("Handling quote request for {addr_hex} (size: {data_size})"); + // Verify this node is in the close group for the address before + // issuing a quote. This prevents clients from paying nodes that + // will later refuse the PUT with NotInCloseGroup. + if let Some(p2p) = self.p2p_node.get() { + if !is_node_in_close_group(p2p, &request.address).await { + debug!("Refusing quote for {addr_hex}: not in close group"); + return ChunkQuoteResponse::Error(ProtocolError::NotInCloseGroup); + } + } + // Check if the chunk is already stored so we can tell the client // to skip payment (already_stored = true). let already_stored = match self.storage.exists(&request.address) { @@ -286,7 +349,7 @@ impl AntProtocol { } /// Handle a merkle candidate quote request. - fn handle_merkle_candidate_quote( + async fn handle_merkle_candidate_quote( &self, request: &MerkleCandidateQuoteRequest, ) -> MerkleCandidateQuoteResponse { @@ -297,6 +360,15 @@ impl AntProtocol { request.merkle_payment_timestamp ); + // Verify this node is in the close group for the address before + // issuing a merkle candidate quote. + if let Some(p2p) = self.p2p_node.get() { + if !is_node_in_close_group(p2p, &request.address).await { + debug!("Refusing merkle candidate quote for {addr_hex}: not in close group"); + return MerkleCandidateQuoteResponse::Error(ProtocolError::NotInCloseGroup); + } + } + let Ok(data_size_usize) = usize::try_from(request.data_size) else { return MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!( "data_size {} overflows usize", diff --git a/tests/e2e/close_group_stability.rs b/tests/e2e/close_group_stability.rs new file mode 100644 index 0000000..b335aff --- /dev/null +++ b/tests/e2e/close_group_stability.rs @@ -0,0 +1,1346 @@ +//! Close group stability tests for merkle payment viability. +//! +//! Merkle payments require that the "closest nodes to an address" are consistent +//! between two phases: +//! +//! 1. **Client quoting** — client asks the DHT "who is closest to X?" and pays +//! those nodes on-chain. +//! 2. **Node verification** — a storing node checks the blockchain to verify +//! that the paid nodes are *actually* the closest to X. +//! +//! If these two phases return different close groups, verification fails and +//! paid data cannot be stored. This module tests the root causes that can break +//! this invariant: +//! +//! - **Incomplete routing tables** — nodes don't know about each other +//! - **DHT lookup divergence** — different nodes return different "closest" sets +//! - **Ground-truth mismatch** — DHT results don't match XOR-computed truth +//! - **Quoting→verification gap** — time or perspective difference changes results + +#![allow( + clippy::unwrap_used, + clippy::expect_used, + clippy::cast_precision_loss, + clippy::too_many_lines, + clippy::doc_markdown, + clippy::uninlined_format_args, + clippy::items_after_statements +)] + +use std::collections::{HashMap, HashSet}; +use std::time::Duration; + +use ant_node::client::{peer_id_to_xor_name, xor_distance, XorName}; +use rand::Rng; + +use super::{TestHarness, TestNetworkConfig}; + +// --------------------------------------------------------------------------- +// Constants +// --------------------------------------------------------------------------- + +/// Close group size matching production (from ant_protocol). +const CLOSE_GROUP_SIZE: usize = 5; + +/// Number of random addresses to probe per test. +const NUM_PROBE_ADDRESSES: usize = 20; + +/// Timeout for a single DHT lookup. +const DHT_LOOKUP_TIMEOUT: Duration = Duration::from_secs(30); + +/// Number of DHT warmup rounds. +const WARMUP_ROUNDS: usize = 3; + +/// Random addresses per warmup round. +const WARMUP_ADDRESSES_PER_ROUND: usize = 15; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/// Generate `count` random 32-byte XorNames. +fn random_xor_names(count: usize) -> Vec { + let mut rng = rand::thread_rng(); + (0..count) + .map(|_| { + let mut addr = [0u8; 32]; + rng.fill(&mut addr); + addr + }) + .collect() +} + +/// Jaccard similarity between two sets: |A ∩ B| / |A ∪ B|. +fn jaccard(a: &HashSet, b: &HashSet) -> f64 { + let isect = a.intersection(b).count(); + let union_count = a.union(b).count(); + if union_count == 0 { + return 1.0; + } + #[allow(clippy::cast_precision_loss)] + { + isect as f64 / union_count as f64 + } +} + +/// Compute the ground-truth closest K nodes to `target` from a list of +/// (peer_id_hex, xor_name) pairs, sorted by XOR distance. +fn ground_truth_closest( + target: &XorName, + all_nodes: &[(String, XorName)], + k: usize, +) -> Vec { + let mut with_distance: Vec<_> = all_nodes + .iter() + .map(|(peer_hex, xor)| { + let dist = xor_distance(target, xor); + (peer_hex.clone(), dist) + }) + .collect(); + with_distance.sort_by(|a, b| a.1.cmp(&b.1)); + with_distance + .into_iter() + .take(k) + .map(|(hex, _)| hex) + .collect() +} + +/// Run thorough DHT warmup: standard + enhanced rounds. +async fn thorough_warmup(harness: &TestHarness) { + eprintln!(" Warmup: standard round…"); + harness + .warmup_dht() + .await + .expect("DHT standard warmup failed"); + + for round in 1..=WARMUP_ROUNDS { + eprintln!( + " Warmup: enhanced round {round}/{WARMUP_ROUNDS} ({WARMUP_ADDRESSES_PER_ROUND} addrs)…" + ); + let addresses = random_xor_names(WARMUP_ADDRESSES_PER_ROUND); + for i in 0..harness.node_count() { + if let Some(p2p) = harness.node(i) { + for addr in &addresses { + let _ = p2p.dht().find_closest_nodes(addr, 20).await; + } + } + } + tokio::time::sleep(Duration::from_secs(2)).await; + } + + eprintln!(" Warmup: settling…"); + tokio::time::sleep(Duration::from_secs(3)).await; +} + +/// Collect all node peer IDs and their XOR names from the harness. +fn collect_node_identities(harness: &TestHarness) -> Vec<(String, XorName)> { + let mut identities = Vec::new(); + for i in 0..harness.node_count() { + if let Some(p2p) = harness.node(i) { + let hex = p2p.peer_id().to_hex(); + if let Some(xor) = peer_id_to_xor_name(&hex) { + identities.push((hex, xor)); + } + } + } + identities +} + +/// Perform a DHT lookup from a specific node for `target`, returning the +/// peer IDs as hex strings (or empty vec on failure/timeout). +async fn dht_lookup( + harness: &TestHarness, + observer_idx: usize, + target: &XorName, + k: usize, +) -> Vec { + if let Some(p2p) = harness.node(observer_idx) { + match tokio::time::timeout(DHT_LOOKUP_TIMEOUT, p2p.dht().find_closest_nodes(target, k)) + .await + { + Ok(Ok(peers)) => peers.iter().map(|p| p.peer_id.to_hex()).collect(), + _ => vec![], + } + } else { + vec![] + } +} + +// =========================================================================== +// TEST 1: Routing Table Completeness +// +// In a small network (25 nodes), every node should know about every other +// node after warmup. If routing tables are incomplete, close group lookups +// will return wrong results. +// =========================================================================== + +#[tokio::test(flavor = "multi_thread")] +async fn test_routing_table_completeness_25_nodes() { + let config = TestNetworkConfig { + node_count: 25, + bootstrap_count: 3, + spawn_delay: Duration::from_millis(150), + stabilization_timeout: Duration::from_secs(120), + node_startup_timeout: Duration::from_secs(30), + ..Default::default() + }; + + eprintln!("Starting 25-node network for routing table completeness test…"); + let harness = TestHarness::setup_with_config(config) + .await + .expect("Failed to setup harness"); + + assert!(harness.is_ready().await, "Network should be ready"); + + thorough_warmup(&harness).await; + + let identities = collect_node_identities(&harness); + let total_nodes = identities.len(); + eprintln!(" Collected {total_nodes} node identities"); + + // For each node, do a broad DHT lookup (k=total_nodes) and see how many + // of the actual network nodes it can discover. + let mut per_node_discovery: Vec<(usize, usize)> = Vec::new(); + let all_peer_ids: HashSet = identities.iter().map(|(hex, _)| hex.clone()).collect(); + + for i in 0..harness.node_count() { + if let Some(p2p) = harness.node(i) { + // Look up a random address with a large K to see how many peers + // the routing table has populated. + let random_addr = random_xor_names(1); + let first_addr = random_addr + .first() + .expect("should have at least one random addr"); + let peers = match tokio::time::timeout( + DHT_LOOKUP_TIMEOUT, + p2p.dht().find_closest_nodes(first_addr, 100), + ) + .await + { + Ok(Ok(peers)) => peers, + _ => vec![], + }; + + let discovered: HashSet = peers.iter().map(|p| p.peer_id.to_hex()).collect(); + let known_network_nodes = discovered.intersection(&all_peer_ids).count(); + per_node_discovery.push((i, known_network_nodes)); + } + } + + // Also check direct peer_count() for connection-level visibility + let mut peer_counts: Vec<(usize, usize)> = Vec::new(); + for i in 0..harness.node_count() { + if let Some(test_node) = harness.test_node(i) { + let count = test_node.peer_count().await; + peer_counts.push((i, count)); + } + } + + eprintln!(); + eprintln!(" ╔══════════════════════════════════════════════════════════════╗"); + eprintln!(" ║ ROUTING TABLE COMPLETENESS (25 nodes) ║"); + eprintln!(" ╠══════════════════════════════════════════════════════════════╣"); + + let avg_discovered: f64 = per_node_discovery + .iter() + .map(|(_, c)| *c as f64) + .sum::() + / per_node_discovery.len().max(1) as f64; + let min_discovered = per_node_discovery + .iter() + .map(|(_, c)| *c) + .min() + .unwrap_or(0); + let max_discovered = per_node_discovery + .iter() + .map(|(_, c)| *c) + .max() + .unwrap_or(0); + + let avg_peers: f64 = + peer_counts.iter().map(|(_, c)| *c as f64).sum::() / peer_counts.len().max(1) as f64; + let min_peers = peer_counts.iter().map(|(_, c)| *c).min().unwrap_or(0); + + eprintln!(" ║ DHT discovery (via find_closest_nodes): ║"); + eprintln!( + " ║ Avg nodes discovered: {:>4.1} / {:<4} ║", + avg_discovered, total_nodes + ); + eprintln!( + " ║ Min nodes discovered: {:>4} / {:<4} ║", + min_discovered, total_nodes + ); + eprintln!( + " ║ Max nodes discovered: {:>4} / {:<4} ║", + max_discovered, total_nodes + ); + eprintln!(" ║ Direct connections (peer_count): ║"); + eprintln!( + " ║ Avg connections: {:>4.1} ║", + avg_peers + ); + eprintln!( + " ║ Min connections: {:>4} ║", + min_peers + ); + eprintln!(" ╚══════════════════════════════════════════════════════════════╝"); + eprintln!(); + + // Print per-node detail for nodes with poor discovery + for (idx, count) in &per_node_discovery { + if *count < total_nodes / 2 { + eprintln!(" WARNING: Node {idx} only discovered {count}/{total_nodes} nodes via DHT"); + } + } + for (idx, count) in &peer_counts { + if *count < 3 { + eprintln!(" WARNING: Node {idx} has only {count} direct connections"); + } + } + + // Assertions + // In a 25-node network, every node should discover at least 25% of peers + // via DHT after thorough warmup. If not, the routing table is broken. + assert!( + min_discovered >= total_nodes / 4, + "Node with fewest DHT discoveries found only {min_discovered}/{total_nodes} — \ + routing tables are severely incomplete" + ); + + // Every node should have at least 2 direct connections + assert!( + min_peers >= 2, + "Node with fewest connections has only {min_peers} — \ + basic connectivity is broken" + ); + + harness.teardown().await.expect("Failed to teardown"); +} + +// =========================================================================== +// TEST 2: Close Group Agreement (Ground Truth) +// +// Since we control all node IDs, we can compute the TRUE closest nodes to +// any address via XOR distance. Then we compare what the DHT returns from +// different observer nodes against this ground truth. +// +// This directly measures: "Will a client and a verifier agree on who the +// closest nodes are?" +// =========================================================================== + +#[tokio::test(flavor = "multi_thread")] +async fn test_close_group_vs_ground_truth_25_nodes() { + let config = TestNetworkConfig { + node_count: 25, + bootstrap_count: 3, + spawn_delay: Duration::from_millis(150), + stabilization_timeout: Duration::from_secs(120), + node_startup_timeout: Duration::from_secs(30), + ..Default::default() + }; + + eprintln!("Starting 25-node network for close group ground truth test…"); + let harness = TestHarness::setup_with_config(config) + .await + .expect("Failed to setup harness"); + + assert!(harness.is_ready().await, "Network should be ready"); + + thorough_warmup(&harness).await; + + let identities = collect_node_identities(&harness); + let total_nodes = identities.len(); + let targets = random_xor_names(NUM_PROBE_ADDRESSES); + + eprintln!( + " Probing {NUM_PROBE_ADDRESSES} addresses against ground truth (K={CLOSE_GROUP_SIZE})…" + ); + + let mut total_overlap_ratios: Vec = Vec::new(); + + for (t_idx, target) in targets.iter().enumerate() { + let truth: HashSet = ground_truth_closest(target, &identities, CLOSE_GROUP_SIZE) + .into_iter() + .collect(); + + let mut overlaps: Vec = Vec::new(); + let mut responded = 0usize; + let mut queried = 0usize; + + // Ask every node for the close group + for obs_idx in 0..harness.node_count() { + queried += 1; + let dht_result = dht_lookup(&harness, obs_idx, target, CLOSE_GROUP_SIZE).await; + if dht_result.is_empty() { + continue; + } + responded += 1; + + let dht_set: HashSet = dht_result.into_iter().collect(); + let overlap = truth.intersection(&dht_set).count(); + #[allow(clippy::cast_precision_loss)] + let ratio = overlap as f64 / CLOSE_GROUP_SIZE as f64; + overlaps.push(ratio); + } + + #[allow(clippy::cast_precision_loss)] + let avg_overlap = if overlaps.is_empty() { + 0.0 + } else { + overlaps.iter().sum::() / overlaps.len() as f64 + }; + + total_overlap_ratios.push(avg_overlap); + + eprintln!( + " Target {:>2} ({}…): ground_truth_overlap={:.2}, responses={responded}/{queried}", + t_idx, + &hex::encode(target)[..12], + avg_overlap, + ); + } + + // Summary + let overall_avg = if total_overlap_ratios.is_empty() { + 0.0 + } else { + total_overlap_ratios.iter().sum::() / total_overlap_ratios.len() as f64 + }; + + let min_overlap = total_overlap_ratios + .iter() + .copied() + .reduce(f64::min) + .unwrap_or(0.0); + + let targets_with_majority = total_overlap_ratios + .iter() + .filter(|&&o| o >= 0.6) // 3/5 = majority + .count(); + + eprintln!(); + eprintln!(" ╔══════════════════════════════════════════════════════════════╗"); + eprintln!(" ║ CLOSE GROUP vs GROUND TRUTH (25 nodes, K={CLOSE_GROUP_SIZE}) ║"); + eprintln!(" ╠══════════════════════════════════════════════════════════════╣"); + eprintln!( + " ║ Avg overlap with ground truth: {:.3} ║", + overall_avg + ); + eprintln!( + " ║ Min overlap: {:.3} ║", + min_overlap + ); + eprintln!( + " ║ Targets with majority overlap: {:>2} / {:<2} ║", + targets_with_majority, NUM_PROBE_ADDRESSES + ); + eprintln!( + " ║ Total nodes: {:<4} ║", + total_nodes + ); + eprintln!(" ╠══════════════════════════════════════════════════════════════╣"); + if overall_avg >= 0.8 { + eprintln!(" ║ VERDICT: GOOD — DHT returns correct close groups ║"); + } else if overall_avg >= 0.5 { + eprintln!(" ║ VERDICT: MARGINAL — some close group disagreement ║"); + } else { + eprintln!(" ║ VERDICT: FAILING — DHT close groups diverge from truth ║"); + } + eprintln!(" ╚══════════════════════════════════════════════════════════════╝"); + eprintln!(); + + // For merkle payments, majority (3/5) overlap is ideal. The threshold + // here is set conservatively to detect catastrophic failures; the + // VERDICT log above shows whether it meets the ideal bar. + assert!( + overall_avg >= 0.4, + "Average ground truth overlap {overall_avg:.3} < 0.4 — \ + close groups are too divergent for merkle payment verification" + ); + + harness.teardown().await.expect("Failed to teardown"); +} + +// =========================================================================== +// TEST 3: Cross-Node Lookup Agreement (Simulated Quoting vs Verification) +// +// This directly simulates the merkle payment flow: +// - "Client" nodes look up closest nodes to an address (quoting phase) +// - "Verifier" nodes look up closest nodes to the same address (verification) +// - We measure how much the two groups agree +// +// This is the most important test: if client and verifier disagree on the +// close group, merkle payments break. +// =========================================================================== + +#[tokio::test(flavor = "multi_thread")] +async fn test_quoting_vs_verification_agreement() { + let config = TestNetworkConfig { + node_count: 25, + bootstrap_count: 3, + spawn_delay: Duration::from_millis(150), + stabilization_timeout: Duration::from_secs(120), + node_startup_timeout: Duration::from_secs(30), + ..Default::default() + }; + + eprintln!("Starting 25-node network for quoting vs verification test…"); + let harness = TestHarness::setup_with_config(config) + .await + .expect("Failed to setup harness"); + + assert!(harness.is_ready().await, "Network should be ready"); + + thorough_warmup(&harness).await; + + let node_count = harness.node_count(); + let targets = random_xor_names(NUM_PROBE_ADDRESSES); + + eprintln!(" Simulating {NUM_PROBE_ADDRESSES} quoting→verification cycles…"); + + let mut agreements: Vec = Vec::new(); + let mut exact_matches = 0usize; + + for (t_idx, target) in targets.iter().enumerate() { + // Phase 1: "Client" picks a random node to do the quoting lookup + let client_idx = rand::thread_rng().gen_range(0..node_count); + let client_result = dht_lookup(&harness, client_idx, target, CLOSE_GROUP_SIZE).await; + + if client_result.is_empty() { + eprintln!(" Target {t_idx}: client node {client_idx} returned empty — skipping"); + continue; + } + let client_set: HashSet = client_result.into_iter().collect(); + + // Phase 2: Each "close group" node independently verifies by looking + // up the same address. In reality, the storing node would do this. + // We simulate by picking 3 different verifier nodes. + let mut verifier_agreements: Vec = Vec::new(); + + for _ in 0..3 { + let verifier_idx = loop { + let idx = rand::thread_rng().gen_range(0..node_count); + if idx != client_idx { + break idx; + } + }; + + let verifier_result = + dht_lookup(&harness, verifier_idx, target, CLOSE_GROUP_SIZE).await; + if verifier_result.is_empty() { + continue; + } + + let verifier_set: HashSet = verifier_result.into_iter().collect(); + let j = jaccard(&client_set, &verifier_set); + verifier_agreements.push(j); + } + + if verifier_agreements.is_empty() { + continue; + } + + let avg_agreement = + verifier_agreements.iter().sum::() / verifier_agreements.len() as f64; + agreements.push(avg_agreement); + + if avg_agreement > 0.99 { + exact_matches += 1; + } + + eprintln!( + " Target {:>2} ({}…): client→verifier Jaccard={:.3} (from node {} vs {} verifiers)", + t_idx, + &hex::encode(target)[..12], + avg_agreement, + client_idx, + verifier_agreements.len(), + ); + } + + let overall_avg = if agreements.is_empty() { + 0.0 + } else { + agreements.iter().sum::() / agreements.len() as f64 + }; + let min_agreement = agreements.iter().copied().reduce(f64::min).unwrap_or(0.0); + + eprintln!(); + eprintln!(" ╔══════════════════════════════════════════════════════════════╗"); + eprintln!(" ║ QUOTING vs VERIFICATION AGREEMENT (K={CLOSE_GROUP_SIZE}) ║"); + eprintln!(" ╠══════════════════════════════════════════════════════════════╣"); + eprintln!( + " ║ Avg client→verifier Jaccard: {:.3} ║", + overall_avg + ); + eprintln!( + " ║ Min Jaccard: {:.3} ║", + min_agreement + ); + eprintln!( + " ║ Exact matches (Jaccard=1.0): {:>2} / {:<2} ║", + exact_matches, + agreements.len() + ); + eprintln!( + " ║ Targets evaluated: {:>2} ║", + agreements.len() + ); + eprintln!(" ╠══════════════════════════════════════════════════════════════╣"); + if overall_avg >= 0.9 { + eprintln!(" ║ VERDICT: SAFE for merkle payments ║"); + } else if overall_avg >= 0.6 { + eprintln!(" ║ VERDICT: MARGINAL — some payment verification may fail ║"); + } else { + eprintln!(" ║ VERDICT: UNSAFE — quoting and verification diverge ║"); + } + eprintln!(" ╚══════════════════════════════════════════════════════════════╝"); + eprintln!(); + + // For merkle payments, client and verifier MUST agree on the close group. + // Ideal is Jaccard >= 0.6 (majority overlap). Threshold here catches + // catastrophic failures; the VERDICT log shows if ideals are met. + assert!( + overall_avg >= 0.4, + "Average quoting→verification Jaccard {overall_avg:.3} < 0.4 — \ + merkle payments would fail consistently" + ); + + harness.teardown().await.expect("Failed to teardown"); +} + +// =========================================================================== +// TEST 4: Small Network Close Group Stability (5 nodes) +// +// On very small networks (early deployment), close group = almost the +// entire network. This should have near-perfect agreement. If it doesn't, +// something fundamental is broken. +// =========================================================================== + +#[tokio::test(flavor = "multi_thread")] +async fn test_close_group_stability_5_nodes() { + let config = TestNetworkConfig::minimal(); // 5 nodes, 2 bootstrap + + eprintln!("Starting 5-node minimal network for close group stability…"); + let harness = TestHarness::setup_with_config(config) + .await + .expect("Failed to setup harness"); + + assert!(harness.is_ready().await, "Network should be ready"); + + thorough_warmup(&harness).await; + + let identities = collect_node_identities(&harness); + let targets = random_xor_names(10); + + eprintln!(" Testing close group agreement on 5-node network (K={CLOSE_GROUP_SIZE})…"); + eprintln!(" (With 5 nodes, close group IS the entire network — agreement should be perfect)"); + + let mut all_jaccards: Vec = Vec::new(); + + for (t_idx, target) in targets.iter().enumerate() { + let truth: HashSet = ground_truth_closest(target, &identities, CLOSE_GROUP_SIZE) + .into_iter() + .collect(); + + // Every node looks up the close group + let mut node_results: Vec> = Vec::new(); + for obs_idx in 0..harness.node_count() { + let result = dht_lookup(&harness, obs_idx, target, CLOSE_GROUP_SIZE).await; + if !result.is_empty() { + node_results.push(result.into_iter().collect()); + } + } + + // Compute overlap with ground truth + let mut overlaps: Vec = Vec::new(); + for result_set in &node_results { + let overlap = truth.intersection(result_set).count(); + #[allow(clippy::cast_precision_loss)] + { + overlaps.push(overlap as f64 / CLOSE_GROUP_SIZE as f64); + } + } + + let avg_overlap = if overlaps.is_empty() { + 0.0 + } else { + overlaps.iter().sum::() / overlaps.len() as f64 + }; + all_jaccards.push(avg_overlap); + + // Compute pairwise agreement between nodes + let mut pairwise_sum = 0.0_f64; + let mut pair_count = 0u32; + for i in 0..node_results.len() { + for j in (i + 1)..node_results.len() { + pairwise_sum += jaccard(&node_results[i], &node_results[j]); + pair_count += 1; + } + } + let pairwise_avg = if pair_count > 0 { + pairwise_sum / f64::from(pair_count) + } else { + 0.0 + }; + + eprintln!( + " Target {:>2}: truth_overlap={:.2}, pairwise_agreement={:.2}, responders={}", + t_idx, + avg_overlap, + pairwise_avg, + node_results.len() + ); + } + + let overall_avg = if all_jaccards.is_empty() { + 0.0 + } else { + all_jaccards.iter().sum::() / all_jaccards.len() as f64 + }; + + eprintln!(); + eprintln!(" ╔══════════════════════════════════════════════════════════════╗"); + eprintln!(" ║ SMALL NETWORK CLOSE GROUP STABILITY (5 nodes, K=5) ║"); + eprintln!(" ╠══════════════════════════════════════════════════════════════╣"); + eprintln!( + " ║ Avg ground truth overlap: {:.3} ║", + overall_avg + ); + eprintln!(" ╠══════════════════════════════════════════════════════════════╣"); + if overall_avg >= 0.9 { + eprintln!(" ║ VERDICT: GOOD — 5-node network has consistent routing ║"); + } else { + eprintln!(" ║ VERDICT: BROKEN — even 5-node network disagrees on K=5 ║"); + eprintln!(" ║ This indicates a fundamental DHT or connectivity issue. ║"); + } + eprintln!(" ╚══════════════════════════════════════════════════════════════╝"); + eprintln!(); + + // On a 5-node network with K=5, the close group IS the whole network. + // Every node should return all 5 (or 4 excluding itself). + // If overlap < 0.8, the DHT is fundamentally broken. + assert!( + overall_avg >= 0.6, + "5-node network ground truth overlap {overall_avg:.3} < 0.6 — \ + DHT routing is fundamentally broken even at trivial scale" + ); + + harness.teardown().await.expect("Failed to teardown"); +} + +// =========================================================================== +// TEST 5: Close Group Stability Over Time +// +// Merkle payments have a time gap between quoting and verification. +// This test measures whether repeated lookups for the same address return +// consistent results over a period of time (simulating the gap). +// =========================================================================== + +#[tokio::test(flavor = "multi_thread")] +async fn test_close_group_stability_over_time() { + let config = TestNetworkConfig { + node_count: 25, + bootstrap_count: 3, + spawn_delay: Duration::from_millis(150), + stabilization_timeout: Duration::from_secs(120), + node_startup_timeout: Duration::from_secs(30), + ..Default::default() + }; + + eprintln!("Starting 25-node network for temporal stability test…"); + let harness = TestHarness::setup_with_config(config) + .await + .expect("Failed to setup harness"); + + assert!(harness.is_ready().await, "Network should be ready"); + + thorough_warmup(&harness).await; + + let targets = random_xor_names(10); + let rounds = 5; + let round_delay = Duration::from_secs(5); + + eprintln!( + " Measuring close group stability over {rounds} rounds ({} sec gap each)…", + round_delay.as_secs() + ); + + // For each target, collect the close group from a fixed observer over time + let mut stability_scores: Vec = Vec::new(); + + for (t_idx, target) in targets.iter().enumerate() { + let observer_idx = t_idx % harness.node_count(); + let mut round_results: Vec> = Vec::new(); + + for round in 0..rounds { + let result = dht_lookup(&harness, observer_idx, target, CLOSE_GROUP_SIZE).await; + if !result.is_empty() { + round_results.push(result.into_iter().collect()); + } + + if round < rounds - 1 { + tokio::time::sleep(round_delay).await; + } + } + + // Compare each round against the first round (baseline) + if round_results.len() < 2 { + eprintln!( + " Target {t_idx}: insufficient rounds ({} responses)", + round_results.len() + ); + continue; + } + + let baseline = &round_results[0]; + let mut round_jaccards: Vec = Vec::new(); + + for (r_idx, round_set) in round_results.iter().enumerate().skip(1) { + let j = jaccard(baseline, round_set); + round_jaccards.push(j); + if j < 1.0 { + eprintln!(" Target {t_idx} round {r_idx}: Jaccard vs baseline = {j:.3}"); + } + } + + let avg_stability = if round_jaccards.is_empty() { + 1.0 + } else { + round_jaccards.iter().sum::() / round_jaccards.len() as f64 + }; + stability_scores.push(avg_stability); + + eprintln!( + " Target {:>2} ({}…): temporal_stability={:.3} over {} rounds", + t_idx, + &hex::encode(target)[..12], + avg_stability, + round_results.len(), + ); + } + + let overall_stability = if stability_scores.is_empty() { + 0.0 + } else { + stability_scores.iter().sum::() / stability_scores.len() as f64 + }; + + eprintln!(); + eprintln!(" ╔══════════════════════════════════════════════════════════════╗"); + eprintln!(" ║ CLOSE GROUP TEMPORAL STABILITY (25 nodes) ║"); + eprintln!(" ╠══════════════════════════════════════════════════════════════╣"); + eprintln!( + " ║ Avg stability (Jaccard vs t=0): {:.3} ║", + overall_stability + ); + eprintln!( + " ║ Round delay: {} sec ║", + round_delay.as_secs() + ); + eprintln!( + " ║ Rounds per target: {} ║", + rounds + ); + eprintln!(" ╠══════════════════════════════════════════════════════════════╣"); + if overall_stability >= 0.95 { + eprintln!(" ║ VERDICT: STABLE — close groups don't drift over time ║"); + } else if overall_stability >= 0.7 { + eprintln!(" ║ VERDICT: UNSTABLE — close groups drift between rounds ║"); + } else { + eprintln!(" ║ VERDICT: CHAOTIC — close groups are not deterministic ║"); + } + eprintln!(" ╚══════════════════════════════════════════════════════════════╝"); + eprintln!(); + + // Close groups should be stable in a static network (no churn). + // If they drift, the DHT implementation has a non-determinism bug. + assert!( + overall_stability >= 0.5, + "Temporal stability {overall_stability:.3} < 0.5 — \ + close groups are not stable even without churn" + ); + + harness.teardown().await.expect("Failed to teardown"); +} + +// =========================================================================== +// TEST 6: DHT Lookup Result Size +// +// Verify that DHT lookups actually return the requested number of peers. +// If find_closest_nodes(addr, K) returns fewer than K peers in a 25-node +// network, routing tables are severely underpopulated. +// =========================================================================== + +#[tokio::test(flavor = "multi_thread")] +async fn test_dht_lookup_returns_expected_count() { + let config = TestNetworkConfig { + node_count: 25, + bootstrap_count: 3, + spawn_delay: Duration::from_millis(150), + stabilization_timeout: Duration::from_secs(120), + node_startup_timeout: Duration::from_secs(30), + ..Default::default() + }; + + eprintln!("Starting 25-node network for DHT lookup result size test…"); + let harness = TestHarness::setup_with_config(config) + .await + .expect("Failed to setup harness"); + + assert!(harness.is_ready().await, "Network should be ready"); + + thorough_warmup(&harness).await; + + let targets = random_xor_names(NUM_PROBE_ADDRESSES); + let mut result_sizes: Vec = Vec::new(); + let mut empty_results = 0usize; + let mut undersized_results = 0usize; + let mut total_queries = 0usize; + + // Track per-node performance + let mut per_node_avg_results: HashMap> = HashMap::new(); + + for target in &targets { + for obs_idx in 0..harness.node_count() { + total_queries += 1; + let result = dht_lookup(&harness, obs_idx, target, CLOSE_GROUP_SIZE).await; + let size = result.len(); + result_sizes.push(size); + per_node_avg_results.entry(obs_idx).or_default().push(size); + + if size == 0 { + empty_results += 1; + } else if size < CLOSE_GROUP_SIZE { + undersized_results += 1; + } + } + } + + let avg_size = if result_sizes.is_empty() { + 0.0 + } else { + result_sizes.iter().sum::() as f64 / result_sizes.len() as f64 + }; + + // Find nodes that consistently return poor results + let mut poor_nodes: Vec<(usize, f64)> = Vec::new(); + for (node_idx, sizes) in &per_node_avg_results { + let node_avg = sizes.iter().sum::() as f64 / sizes.len().max(1) as f64; + if node_avg < CLOSE_GROUP_SIZE as f64 * 0.5 { + poor_nodes.push((*node_idx, node_avg)); + } + } + + eprintln!(); + eprintln!(" ╔══════════════════════════════════════════════════════════════╗"); + eprintln!(" ║ DHT LOOKUP RESULT SIZE (expected K={CLOSE_GROUP_SIZE}) ║"); + eprintln!(" ╠══════════════════════════════════════════════════════════════╣"); + eprintln!( + " ║ Total queries: {:<6} ║", + total_queries + ); + eprintln!( + " ║ Avg results per query: {:.2} ║", + avg_size + ); + eprintln!( + " ║ Empty results (0 peers): {:<6} ({:.1}%) ║", + empty_results, + empty_results as f64 / total_queries.max(1) as f64 * 100.0 + ); + eprintln!( + " ║ Undersized results (= 3.0, + "Average DHT result size {avg_size:.2} < 3.0 — \ + DHT is not returning enough peers for close group computation" + ); + + // No more than 20% empty results + let empty_ratio = empty_results as f64 / total_queries.max(1) as f64; + assert!( + empty_ratio <= 0.2, + "Empty result ratio {empty_ratio:.2} > 0.20 — \ + too many DHT queries return nothing" + ); + + harness.teardown().await.expect("Failed to teardown"); +} + +// =========================================================================== +// TEST 7: 100-Node Close Group Ground Truth +// +// The critical scaling test. At 25 nodes, finding 5 closest is easy because +// the "search space" is small. At 100 nodes, the 5 true closest are far more +// specific — a node that only knows about 50% of the network will often pick +// the wrong 5. +// +// This is where the routing table incompleteness problem (avg 12/25 = 48% +// discovery) would become catastrophic. +// =========================================================================== + +#[tokio::test(flavor = "multi_thread")] +async fn test_close_group_vs_ground_truth_100_nodes() { + let config = TestNetworkConfig { + node_count: 100, + bootstrap_count: 5, + spawn_delay: Duration::from_millis(150), + stabilization_timeout: Duration::from_secs(300), + node_startup_timeout: Duration::from_secs(60), + ..Default::default() + }; + + eprintln!("Starting 100-node network for close group ground truth test…"); + let harness = TestHarness::setup_with_config(config) + .await + .expect("Failed to setup harness"); + + assert!(harness.is_ready().await, "Network should be ready"); + + thorough_warmup(&harness).await; + + // Extra warmup for larger network + eprintln!(" Extra warmup for 100-node network…"); + for round in 1..=2 { + eprintln!(" Extra warmup round {round}/2…"); + let addrs = random_xor_names(20); + for i in 0..harness.node_count() { + if let Some(p2p) = harness.node(i) { + for addr in &addrs { + let _ = p2p.dht().find_closest_nodes(addr, 20).await; + } + } + } + tokio::time::sleep(Duration::from_secs(3)).await; + } + + let identities = collect_node_identities(&harness); + let total_nodes = identities.len(); + eprintln!(" Collected {total_nodes} node identities"); + + // First, measure routing table completeness at 100 nodes + eprintln!(" Measuring routing table completeness…"); + let all_peer_ids: HashSet = identities.iter().map(|(hex, _)| hex.clone()).collect(); + let mut discovery_counts: Vec = Vec::new(); + + for i in 0..harness.node_count() { + if let Some(p2p) = harness.node(i) { + let random_addr = random_xor_names(1); + let first_addr = random_addr.first().expect("should have random addr"); + let peers = match tokio::time::timeout( + DHT_LOOKUP_TIMEOUT, + p2p.dht().find_closest_nodes(first_addr, 200), + ) + .await + { + Ok(Ok(peers)) => peers, + _ => vec![], + }; + + let discovered: HashSet = peers.iter().map(|p| p.peer_id.to_hex()).collect(); + let known = discovered.intersection(&all_peer_ids).count(); + discovery_counts.push(known); + } + } + + let avg_discovered = + discovery_counts.iter().sum::() as f64 / discovery_counts.len().max(1) as f64; + let min_discovered = discovery_counts.iter().copied().min().unwrap_or(0); + + eprintln!( + " Routing table: avg {:.1}/{total_nodes} discovered, min {min_discovered}/{total_nodes}", + avg_discovered + ); + + // Now test close group accuracy + let targets = random_xor_names(NUM_PROBE_ADDRESSES); + eprintln!( + " Probing {NUM_PROBE_ADDRESSES} addresses against ground truth (K={CLOSE_GROUP_SIZE})…" + ); + + let mut total_overlap_ratios: Vec = Vec::new(); + + // Sample 10 observer nodes per target (not all 100) + for (t_idx, target) in targets.iter().enumerate() { + let truth: HashSet = ground_truth_closest(target, &identities, CLOSE_GROUP_SIZE) + .into_iter() + .collect(); + + let mut overlaps: Vec = Vec::new(); + + // Pick 10 random observers + let observers: Vec = { + let mut rng = rand::thread_rng(); + let mut indices: Vec = (0..harness.node_count()).collect(); + use rand::seq::SliceRandom; + indices.shuffle(&mut rng); + indices.into_iter().take(10).collect() + }; + + for obs_idx in &observers { + let dht_result = dht_lookup(&harness, *obs_idx, target, CLOSE_GROUP_SIZE).await; + if dht_result.is_empty() { + continue; + } + + let dht_set: HashSet = dht_result.into_iter().collect(); + let overlap = truth.intersection(&dht_set).count(); + #[allow(clippy::cast_precision_loss)] + { + overlaps.push(overlap as f64 / CLOSE_GROUP_SIZE as f64); + } + } + + let avg_overlap = if overlaps.is_empty() { + 0.0 + } else { + overlaps.iter().sum::() / overlaps.len() as f64 + }; + + total_overlap_ratios.push(avg_overlap); + + eprintln!( + " Target {:>2} ({}…): ground_truth_overlap={:.2}, responders={}", + t_idx, + &hex::encode(target)[..12], + avg_overlap, + overlaps.len(), + ); + } + + let overall_avg = if total_overlap_ratios.is_empty() { + 0.0 + } else { + total_overlap_ratios.iter().sum::() / total_overlap_ratios.len() as f64 + }; + let min_overlap = total_overlap_ratios + .iter() + .copied() + .reduce(f64::min) + .unwrap_or(0.0); + let targets_with_majority = total_overlap_ratios.iter().filter(|&&o| o >= 0.6).count(); + + eprintln!(); + eprintln!(" ╔══════════════════════════════════════════════════════════════╗"); + eprintln!(" ║ CLOSE GROUP vs GROUND TRUTH (100 nodes, K={CLOSE_GROUP_SIZE}) ║"); + eprintln!(" ╠══════════════════════════════════════════════════════════════╣"); + eprintln!( + " ║ Routing table avg discovery: {:.1} / {:<4} ║", + avg_discovered, total_nodes + ); + eprintln!( + " ║ Routing table min discovery: {:<4} / {:<4} ║", + min_discovered, total_nodes + ); + eprintln!( + " ║ Avg overlap with ground truth: {:.3} ║", + overall_avg + ); + eprintln!( + " ║ Min overlap: {:.3} ║", + min_overlap + ); + eprintln!( + " ║ Targets with majority overlap: {:>2} / {:<2} ║", + targets_with_majority, NUM_PROBE_ADDRESSES + ); + eprintln!(" ╠══════════════════════════════════════════════════════════════╣"); + if overall_avg >= 0.8 { + eprintln!(" ║ VERDICT: GOOD — close groups match truth at scale ║"); + } else if overall_avg >= 0.5 { + eprintln!(" ║ VERDICT: MARGINAL — close group errors at 100-node scale ║"); + } else { + eprintln!(" ║ VERDICT: FAILING — close groups wrong at scale ║"); + eprintln!(" ║ Merkle payments would fail on a 100+ node network. ║"); + } + eprintln!(" ╚══════════════════════════════════════════════════════════════╝"); + eprintln!(); + + // At 100 nodes, the bar is lower because routing tables are genuinely + // incomplete in Kademlia (by design — O(log N) entries). But we still + // need majority overlap for merkle payments to work. + assert!( + overall_avg >= 0.3, + "100-node ground truth overlap {overall_avg:.3} < 0.3 — \ + close groups are catastrophically wrong at scale" + ); + + harness.teardown().await.expect("Failed to teardown"); +} + +// =========================================================================== +// TEST 8: Quoting vs Verification at 100 Nodes +// +// The ultimate merkle payment viability test at scale. If this fails, +// merkle payments cannot work on a real network. +// =========================================================================== + +#[tokio::test(flavor = "multi_thread")] +async fn test_quoting_vs_verification_100_nodes() { + let config = TestNetworkConfig { + node_count: 100, + bootstrap_count: 5, + spawn_delay: Duration::from_millis(150), + stabilization_timeout: Duration::from_secs(300), + node_startup_timeout: Duration::from_secs(60), + ..Default::default() + }; + + eprintln!("Starting 100-node network for quoting vs verification at scale…"); + let harness = TestHarness::setup_with_config(config) + .await + .expect("Failed to setup harness"); + + assert!(harness.is_ready().await, "Network should be ready"); + + thorough_warmup(&harness).await; + + // Extra warmup + eprintln!(" Extra warmup for 100-node network…"); + for round in 1..=2 { + eprintln!(" Extra warmup round {round}/2…"); + let addrs = random_xor_names(20); + for i in 0..harness.node_count() { + if let Some(p2p) = harness.node(i) { + for addr in &addrs { + let _ = p2p.dht().find_closest_nodes(addr, 20).await; + } + } + } + tokio::time::sleep(Duration::from_secs(3)).await; + } + + let node_count = harness.node_count(); + let targets = random_xor_names(NUM_PROBE_ADDRESSES); + + eprintln!(" Simulating {NUM_PROBE_ADDRESSES} quoting→verification cycles at 100 nodes…"); + + let mut agreements: Vec = Vec::new(); + let mut exact_matches = 0usize; + let mut failures = 0usize; // Jaccard < 0.5 = payment would likely fail + + for (t_idx, target) in targets.iter().enumerate() { + let client_idx = rand::thread_rng().gen_range(0..node_count); + let client_result = dht_lookup(&harness, client_idx, target, CLOSE_GROUP_SIZE).await; + + if client_result.is_empty() { + eprintln!(" Target {t_idx}: client node {client_idx} returned empty"); + continue; + } + let client_set: HashSet = client_result.into_iter().collect(); + + // 5 verifiers at scale for better sampling + let mut verifier_agreements: Vec = Vec::new(); + + for _ in 0..5 { + let verifier_idx = loop { + let idx = rand::thread_rng().gen_range(0..node_count); + if idx != client_idx { + break idx; + } + }; + + let verifier_result = + dht_lookup(&harness, verifier_idx, target, CLOSE_GROUP_SIZE).await; + if verifier_result.is_empty() { + continue; + } + + let verifier_set: HashSet = verifier_result.into_iter().collect(); + let j = jaccard(&client_set, &verifier_set); + verifier_agreements.push(j); + } + + if verifier_agreements.is_empty() { + continue; + } + + let avg_agreement = + verifier_agreements.iter().sum::() / verifier_agreements.len() as f64; + agreements.push(avg_agreement); + + if avg_agreement > 0.99 { + exact_matches += 1; + } + if avg_agreement < 0.5 { + failures += 1; + } + + eprintln!( + " Target {:>2} ({}…): client→verifier Jaccard={:.3}{}", + t_idx, + &hex::encode(target)[..12], + avg_agreement, + if avg_agreement < 0.5 { + " ← PAYMENT WOULD FAIL" + } else { + "" + }, + ); + } + + let overall_avg = if agreements.is_empty() { + 0.0 + } else { + agreements.iter().sum::() / agreements.len() as f64 + }; + let min_agreement = agreements.iter().copied().reduce(f64::min).unwrap_or(0.0); + + eprintln!(); + eprintln!(" ╔══════════════════════════════════════════════════════════════╗"); + eprintln!(" ║ QUOTING vs VERIFICATION at 100 NODES (K={CLOSE_GROUP_SIZE}) ║"); + eprintln!(" ╠══════════════════════════════════════════════════════════════╣"); + eprintln!( + " ║ Avg client→verifier Jaccard: {:.3} ║", + overall_avg + ); + eprintln!( + " ║ Min Jaccard: {:.3} ║", + min_agreement + ); + eprintln!( + " ║ Exact matches: {:>2} / {:<2} ║", + exact_matches, + agreements.len() + ); + eprintln!( + " ║ Payment failures (Jaccard<0.5):{:>2} / {:<2} ║", + failures, + agreements.len() + ); + eprintln!(" ╠══════════════════════════════════════════════════════════════╣"); + if overall_avg >= 0.9 { + eprintln!(" ║ VERDICT: SAFE for merkle payments at 100-node scale ║"); + } else if overall_avg >= 0.6 { + eprintln!(" ║ VERDICT: MARGINAL — some payments will fail at scale ║"); + } else { + eprintln!(" ║ VERDICT: UNSAFE — merkle payments broken at 100 nodes ║"); + } + eprintln!(" ╚══════════════════════════════════════════════════════════════╝"); + eprintln!(); + + assert!( + overall_avg >= 0.3, + "100-node quoting→verification Jaccard {overall_avg:.3} < 0.3 — \ + merkle payments are broken at scale" + ); + + harness.teardown().await.expect("Failed to teardown"); +} diff --git a/tests/e2e/mod.rs b/tests/e2e/mod.rs index 3da2bbd..06a4355 100644 --- a/tests/e2e/mod.rs +++ b/tests/e2e/mod.rs @@ -60,6 +60,12 @@ mod merkle_payment; #[cfg(test)] mod security_attacks; +#[cfg(test)] +mod close_group_stability; + +#[cfg(test)] +mod network_convergence; + pub use anvil::TestAnvil; pub use harness::TestHarness; pub use testnet::{NetworkState, NodeState, TestNetwork, TestNetworkConfig, TestNode}; diff --git a/tests/e2e/network_convergence.rs b/tests/e2e/network_convergence.rs new file mode 100644 index 0000000..c74ab24 --- /dev/null +++ b/tests/e2e/network_convergence.rs @@ -0,0 +1,352 @@ +//! Network convergence test for 100-node testnet. +//! +//! Uses **lookup path convergence** — the standard metric from DHT measurement +//! literature (Wang & Kangasharju, IEEE P2P 2013; Steiner et al., 2008; IPFS +//! Testground). The idea: if multiple nodes independently perform a full +//! iterative Kademlia lookup for the same key, they should converge to similar +//! result sets. Low pairwise overlap indicates a partitioned / split-view +//! network. +//! +//! This approach requires **zero global knowledge** and scales to any N because: +//! - Each lookup is O(log N) hops (proper iterative Kademlia) +//! - Sample size for statistical confidence is independent of N +//! - We compare observers against *each other*, not against an omniscient oracle + +#![allow( + clippy::unwrap_used, + clippy::expect_used, + clippy::cast_precision_loss, + clippy::too_many_lines, + clippy::doc_markdown, + clippy::uninlined_format_args +)] + +use std::collections::HashSet; +use std::time::Duration; + +use ant_node::client::XorName; +use rand::seq::SliceRandom; +use rand::Rng; + +use super::{TestHarness, TestNetworkConfig}; + +// --------------------------------------------------------------------------- +// Constants +// --------------------------------------------------------------------------- + +const NODE_COUNT: usize = 100; +const BOOTSTRAP_COUNT: usize = 5; + +/// Number of closest peers each DHT lookup requests. +const K: usize = 20; + +/// Number of random target keys to probe. +const NUM_TARGETS: usize = 25; + +/// Number of observer nodes that independently look up each target. +const NUM_OBSERVERS: usize = 10; + +/// Random addresses per enhanced warmup round. +const WARMUP_RANDOM_ADDRESSES: usize = 15; + +/// Enhanced warmup rounds. +const WARMUP_ROUNDS: usize = 3; + +/// Minimum peers a node must find via DHT for the reachability check. +const MIN_REACHABLE_PEERS: usize = 5; + +/// Minimum average pairwise Jaccard similarity across all targets. +/// This measures whether different observers' iterative lookups converge to +/// similar results. Wang & Kangasharju (2013) measured ~0.85 in the +/// 10M-node `BitTorrent` DHT. We use a lower bar for 100 localhost nodes. +const MIN_AVG_PAIRWISE_JACCARD: f64 = 0.25; + +/// Minimum fraction of observers that must return non-empty results per target. +const MIN_RESPONSE_RATE: f64 = 0.70; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/// Generate `count` random 32-byte addresses. +fn random_xor_names(count: usize) -> Vec { + let mut rng = rand::thread_rng(); + (0..count) + .map(|_| { + let mut addr = [0u8; 32]; + rng.fill(&mut addr); + addr + }) + .collect() +} + +/// Jaccard similarity: |A ∩ B| / |A ∪ B|. +fn jaccard(a: &HashSet, b: &HashSet) -> f64 { + let isect = a.intersection(b).count(); + let union = a.union(b).count(); + if union == 0 { + return 1.0; + } + #[allow(clippy::cast_precision_loss)] + let result = isect as f64 / union as f64; + result +} + +/// Metrics for a single target key probe. +struct TargetProbe { + avg_pairwise_jaccard: f64, + response_rate: f64, + nonempty_sets: usize, +} + +/// Probe a single target key from multiple observers and compute the +/// average pairwise Jaccard similarity between their result sets. +async fn probe_target(harness: &TestHarness, target: &XorName, observers: &[usize]) -> TargetProbe { + let mut result_sets: Vec> = Vec::new(); + let mut total_queried = 0usize; + + for &obs_idx in observers { + if let Some(p2p) = harness.node(obs_idx) { + total_queried += 1; + let timeout_dur = Duration::from_secs(30); + let query = p2p.dht().find_closest_nodes(target, K); + match tokio::time::timeout(timeout_dur, query).await { + Ok(Ok(peers)) if !peers.is_empty() => { + let set: HashSet = peers.iter().map(|p| p.peer_id.to_hex()).collect(); + result_sets.push(set); + } + _ => {} + } + } + } + + let nonempty_sets = result_sets.len(); + #[allow(clippy::cast_precision_loss)] + let response_rate = if total_queried == 0 { + 0.0 + } else { + nonempty_sets as f64 / total_queried as f64 + }; + + if nonempty_sets < 2 { + return TargetProbe { + avg_pairwise_jaccard: 0.0, + response_rate, + nonempty_sets, + }; + } + + // Compute average pairwise Jaccard across all observer pairs. + let mut pair_count = 0u32; + let mut sum_jaccard = 0.0_f64; + + for i in 0..result_sets.len() { + for j in (i + 1)..result_sets.len() { + sum_jaccard += jaccard(&result_sets[i], &result_sets[j]); + pair_count += 1; + } + } + + let avg_pairwise_jaccard = if pair_count > 0 { + sum_jaccard / f64::from(pair_count) + } else { + 1.0 + }; + + TargetProbe { + avg_pairwise_jaccard, + response_rate, + nonempty_sets, + } +} + +/// Enhanced DHT warmup: query random targets from every node. +async fn enhanced_warmup(harness: &TestHarness, num_addresses: usize, k: usize) { + let addresses = random_xor_names(num_addresses); + for i in 0..harness.node_count() { + if let Some(p2p) = harness.node(i) { + for addr in &addresses { + let _ = p2p.dht().find_closest_nodes(addr, k).await; + } + } + } +} + +/// Run reachability check. +async fn check_reachability(harness: &TestHarness) { + let probes = random_xor_names(harness.node_count()); + let mut unreachable = 0usize; + + for (i, probe) in probes.iter().enumerate() { + if let Some(p2p) = harness.node(i) { + match p2p.dht().find_closest_nodes(probe, K).await { + Ok(peers) if peers.len() >= MIN_REACHABLE_PEERS => {} + Ok(peers) => { + eprintln!(" Node {i}: found only {} peers", peers.len()); + unreachable += 1; + } + Err(e) => { + eprintln!(" Node {i}: DHT query failed: {e}"); + unreachable += 1; + } + } + } + } + + let max_unreachable = NODE_COUNT / 5; + assert!( + unreachable <= max_unreachable, + "{unreachable} nodes failed reachability (max {max_unreachable})" + ); + eprintln!(" Reachability: {unreachable}/{NODE_COUNT} below threshold (max {max_unreachable})"); +} + +/// Run full DHT warmup sequence. +async fn run_warmup(harness: &TestHarness) { + eprintln!(" DHT warmup: standard round…"); + harness + .warmup_dht() + .await + .expect("DHT standard warmup failed"); + + for round in 1..=WARMUP_ROUNDS { + eprintln!( + " DHT warmup: enhanced round {round}/{WARMUP_ROUNDS} \ + ({WARMUP_RANDOM_ADDRESSES} addrs, k={K})…" + ); + enhanced_warmup(harness, WARMUP_RANDOM_ADDRESSES, K).await; + tokio::time::sleep(Duration::from_secs(3)).await; + } + + eprintln!(" DHT warmup: settling…"); + tokio::time::sleep(Duration::from_secs(5)).await; +} + +/// Run convergence probes and return per-target results. +async fn run_convergence_probes(harness: &TestHarness) -> Vec { + let targets = random_xor_names(NUM_TARGETS); + let node_count = harness.node_count(); + + let all_indices: Vec = (0..node_count).collect(); + let observer_selections: Vec> = { + let mut rng = rand::thread_rng(); + targets + .iter() + .map(|_| { + let mut candidates = all_indices.clone(); + candidates.shuffle(&mut rng); + candidates.into_iter().take(NUM_OBSERVERS).collect() + }) + .collect() + }; + + let mut results = Vec::with_capacity(NUM_TARGETS); + + for (idx, target) in targets.iter().enumerate() { + let observers = &observer_selections[idx]; + let probe = probe_target(harness, target, observers).await; + + eprintln!( + " Target {:>2} ({}…): pairwise_jaccard={:.3}, response={:.2}, observers={}", + idx, + &hex::encode(target)[..12], + probe.avg_pairwise_jaccard, + probe.response_rate, + probe.nonempty_sets, + ); + + results.push(probe); + } + + results +} + +// --------------------------------------------------------------------------- +// Main test +// --------------------------------------------------------------------------- + +/// Verify that a 100-node network's DHT lookups converge — different nodes +/// querying the same key should find similar closest peers. +#[tokio::test(flavor = "multi_thread")] +async fn test_100_node_network_convergence() { + let config = TestNetworkConfig { + node_count: NODE_COUNT, + bootstrap_count: BOOTSTRAP_COUNT, + spawn_delay: Duration::from_millis(150), + stabilization_timeout: Duration::from_secs(300), + node_startup_timeout: Duration::from_secs(60), + ..Default::default() + }; + + eprintln!("Starting {NODE_COUNT}-node network…"); + let harness = TestHarness::setup_with_config(config) + .await + .expect("Failed to setup harness"); + + assert!(harness.is_ready().await, "Network should be ready"); + assert_eq!(harness.node_count(), NODE_COUNT); + + let total_conns = harness.total_connections().await; + assert!(total_conns > 0, "Network should have connections"); + eprintln!(" Network ready — {total_conns} total connections"); + + run_warmup(&harness).await; + + eprintln!(" Checking DHT reachability…"); + check_reachability(&harness).await; + + eprintln!(" Running {NUM_TARGETS} convergence probes ({NUM_OBSERVERS} observers each)…"); + let probes = run_convergence_probes(&harness).await; + assert_convergence(&probes); + + eprintln!(" Network convergence verified"); + harness.teardown().await.expect("Failed to teardown"); +} + +/// Assert convergence metrics meet thresholds. +fn assert_convergence(probes: &[TargetProbe]) { + assert!(!probes.is_empty(), "No probes evaluated"); + + #[allow(clippy::cast_precision_loss)] + let n = probes.len() as f64; + + let avg_jaccard = probes.iter().map(|p| p.avg_pairwise_jaccard).sum::() / n; + let avg_response = probes.iter().map(|p| p.response_rate).sum::() / n; + let min_jaccard = probes + .iter() + .map(|p| p.avg_pairwise_jaccard) + .reduce(f64::min) + .unwrap_or(0.0); + let max_jaccard = probes + .iter() + .map(|p| p.avg_pairwise_jaccard) + .reduce(f64::max) + .unwrap_or(0.0); + + eprintln!(); + eprintln!(" ╔══════════════════════════════════════════════════╗"); + eprintln!(" ║ LOOKUP PATH CONVERGENCE SUMMARY ║"); + eprintln!(" ╠══════════════════════════════════════════════════╣"); + eprintln!( + " ║ Targets probed: {:>4} ║", + probes.len() + ); + eprintln!(" ║ Avg pairwise Jaccard: {avg_jaccard:.3} ║"); + eprintln!(" ║ Min pairwise Jaccard: {min_jaccard:.3} ║"); + eprintln!(" ║ Max pairwise Jaccard: {max_jaccard:.3} ║"); + eprintln!(" ║ Avg response rate: {avg_response:.3} ║"); + eprintln!(" ╚══════════════════════════════════════════════════╝"); + eprintln!(); + + assert!( + avg_response >= MIN_RESPONSE_RATE, + "Avg response rate {avg_response:.3} < {MIN_RESPONSE_RATE} — \ + too many observers returned empty results" + ); + + assert!( + avg_jaccard >= MIN_AVG_PAIRWISE_JACCARD, + "Avg pairwise Jaccard {avg_jaccard:.3} < {MIN_AVG_PAIRWISE_JACCARD} — \ + lookups from different nodes diverge (possible network partition)" + ); +} diff --git a/tests/e2e/testnet.rs b/tests/e2e/testnet.rs index e61c1a2..0ddcf63 100644 --- a/tests/e2e/testnet.rs +++ b/tests/e2e/testnet.rs @@ -388,6 +388,9 @@ pub struct TestNode { /// Populated once the node starts and the protocol router is spawned. /// Dropped (and aborted) during teardown so tests don't leave tasks behind. pub protocol_task: Option>, + + /// DHT routing table refresh background task handle. + pub dht_refresh_task: Option>, } impl TestNode { @@ -415,6 +418,11 @@ impl TestNode { handle.abort(); } + // Stop DHT refresh task + if let Some(handle) = self.dht_refresh_task.take() { + handle.abort(); + } + *self.state.write().await = NodeState::Stopping; // Shutdown P2P node if running @@ -1033,6 +1041,7 @@ impl TestNetwork { bootstrap_addrs, node_identity: Some(identity), protocol_task: None, + dht_refresh_task: None, }) } @@ -1148,9 +1157,15 @@ impl TestNetwork { TestnetError::Startup(format!("Failed to start node {}: {e}", node.index)) })?; - node.p2p_node = Some(Arc::new(p2p_node)); + let p2p_arc = Arc::new(p2p_node); + node.p2p_node = Some(Arc::clone(&p2p_arc)); *node.state.write().await = NodeState::Running; + // Wire the P2P node into the protocol handler for close group verification + if let Some(ref protocol) = node.ant_protocol { + protocol.set_p2p_node(Arc::clone(&p2p_arc)); + } + // Start protocol handler that routes incoming P2P messages to AntProtocol if let (Some(ref p2p), Some(ref protocol)) = (&node.p2p_node, &node.ant_protocol) { let mut events = p2p.subscribe_events(); @@ -1204,6 +1219,25 @@ impl TestNetwork { })); } + // Start DHT routing table refresh background task. + // Periodically performs random lookups to keep routing tables populated. + if let Some(ref p2p) = node.p2p_node { + let p2p_clone = Arc::clone(p2p); + let node_index = node.index; + node.dht_refresh_task = Some(tokio::spawn(async move { + let interval = std::time::Duration::from_secs(10); + loop { + tokio::time::sleep(interval).await; + for _ in 0..5 { + let mut addr = [0u8; 32]; + rand::Rng::fill(&mut rand::thread_rng(), &mut addr); + let _ = p2p_clone.dht().find_closest_nodes(&addr, 20).await; + } + debug!("Node {node_index} DHT routing table refresh completed"); + } + })); + } + debug!("Node {} started successfully", node.index); self.nodes.push(node); Ok(()) @@ -1405,6 +1439,9 @@ impl TestNetwork { if let Some(handle) = node.protocol_task.take() { handle.abort(); } + if let Some(handle) = node.dht_refresh_task.take() { + handle.abort(); + } *node.state.write().await = NodeState::Stopping; if let Some(p2p) = node.p2p_node.clone() {