Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion ant-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,23 @@ use ant_core::data::{
use cli::{Cli, Commands};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
async fn main() {
let code = match run().await {
Ok(()) => 0,
Err(e) => {
eprintln!("Error: {e:?}");
1
}
};

// Force-exit to avoid hanging on tokio runtime shutdown.
// Open QUIC connections and pending background tasks (DHT, keep-alive)
// block the runtime's graceful shutdown indefinitely. All data has been
// persisted / printed by this point, so there is nothing left to clean up.
std::process::exit(code);
}

async fn run() -> anyhow::Result<()> {
let cli = Cli::parse();

// Initialize tracing for data commands (node commands handle their own output)
Expand Down
37 changes: 29 additions & 8 deletions ant-core/src/data/client/quote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,18 +126,22 @@ impl Client {
quote_futures.push(quote_future);
}

// Collect quotes as they complete
let mut quotes_with_peers = Vec::with_capacity(REQUIRED_QUOTES);
// Collect all quote responses (don't short-circuit on the first 5).
//
// The previous first-5-wins approach caused nodes behind NAT to be
// systematically excluded: cloud nodes always respond faster, so the
// NATed node's quote would arrive 6th and be dropped. By collecting
// all responses and then selecting the closest by XOR distance, every
// reachable node has a fair chance of being included.
let mut all_quotes = Vec::with_capacity(remote_peers.len());
let mut already_stored_count = 0usize;
let mut failures: Vec<String> = Vec::new();
let total_peers = remote_peers.len();

while let Some((peer_id, addrs, quote_result)) = quote_futures.next().await {
match quote_result {
Ok((quote, price)) => {
quotes_with_peers.push((peer_id, addrs, quote, price));
if quotes_with_peers.len() >= REQUIRED_QUOTES {
break;
}
all_quotes.push((peer_id, addrs, quote, price));
}
Err(Error::AlreadyStored) => {
already_stored_count += 1;
Expand All @@ -148,10 +152,27 @@ impl Client {
failures.push(format!("{peer_id}: {e}"));
}
}

// Once every peer has responded (or failed), stop waiting.
let responded = all_quotes.len() + already_stored_count + failures.len();
if responded >= total_peers {
break;
}
}

// If we collected enough quotes, proceed with payment regardless
// of how many peers reported already_stored.
// Sort by XOR distance to the chunk address (closest first), then
// take the REQUIRED_QUOTES closest. This ensures deterministic,
// distance-based selection rather than speed-based racing.
all_quotes.sort_by_key(|(peer_id, _, _, _)| {
let peer_bytes = peer_id.as_bytes();
let mut distance = [0u8; 32];
for i in 0..32 {
distance[i] = peer_bytes[i] ^ address[i];
}
distance
});
let quotes_with_peers: Vec<_> = all_quotes.into_iter().take(REQUIRED_QUOTES).collect();

if quotes_with_peers.len() >= REQUIRED_QUOTES {
info!(
"Collected {} quotes for address {}",
Expand Down
Loading