From 68223a7dca0d9b2c12c61764258a10c4ceb53f5d Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Wed, 25 Mar 2026 21:27:31 +0100 Subject: [PATCH 1/5] feat: wave-based batch uploads with single EVM payment per wave Replace per-chunk sequential EVM payments with batched wave-based uploads. Chunks are grouped into waves (up to 64), quotes collected concurrently, then paid in a single wallet.pay_for_quotes() call. Stores from wave N pipeline with quote collection for wave N+1. For a 3-chunk (4MB) upload this reduces payment from 3 separate EVM transactions (~1s with nonce contention) to 1 transaction (~10ms). Co-Authored-By: Claude Opus 4.6 (1M context) --- ant-core/src/data/client/batch.rs | 347 ++++++++++++++++++++++++++++ ant-core/src/data/client/data.rs | 47 ++-- ant-core/src/data/client/file.rs | 76 ++---- ant-core/src/data/client/mod.rs | 1 + ant-core/src/data/client/payment.rs | 2 +- 5 files changed, 385 insertions(+), 88 deletions(-) create mode 100644 ant-core/src/data/client/batch.rs diff --git a/ant-core/src/data/client/batch.rs b/ant-core/src/data/client/batch.rs new file mode 100644 index 0000000..68366fb --- /dev/null +++ b/ant-core/src/data/client/batch.rs @@ -0,0 +1,347 @@ +//! Batch chunk upload with wave-based pipelined EVM payments. +//! +//! Groups chunks into waves of [`PAYMENT_WAVE_SIZE`] and pays for each +//! wave in a single EVM transaction. Stores from wave N are pipelined +//! with quote collection for wave N+1 via `tokio::join!`. + +use crate::data::client::payment::peer_id_to_encoded; +use crate::data::client::Client; +use crate::data::error::{Error, Result}; +use ant_evm::{EncodedPeerId, PaymentQuote, ProofOfPayment}; +use ant_node::ant_protocol::DATA_TYPE_CHUNK; +use ant_node::client::{compute_address, XorName}; +use ant_node::core::{MultiAddr, PeerId}; +use ant_node::payment::{serialize_single_node_proof, PaymentProof, SingleNodePayment}; +use bytes::Bytes; +use futures::stream::{self, StreamExt}; +use std::collections::HashSet; +use tracing::{debug, info}; + +/// Number of chunks per payment wave. +const PAYMENT_WAVE_SIZE: usize = 64; + +/// Chunk quoted but not yet paid. Produced by [`Client::prepare_chunk_payment`]. +pub struct PreparedChunk { + /// The chunk content bytes. + pub content: Bytes, + /// Content address (BLAKE3 hash). + pub address: XorName, + /// Closest peers from quote collection — PUT targets for close-group replication. + pub quoted_peers: Vec<(PeerId, Vec)>, + /// Payment structure (quotes sorted, median selected, not yet paid on-chain). + pub payment: SingleNodePayment, + /// Peer quotes for building `ProofOfPayment`. + pub peer_quotes: Vec<(EncodedPeerId, PaymentQuote)>, +} + +/// Chunk paid but not yet stored. Produced by [`Client::batch_pay`]. +pub struct PaidChunk { + /// The chunk content bytes. + pub content: Bytes, + /// Content address (BLAKE3 hash). + pub address: XorName, + /// Closest peers from quote collection — PUT targets for close-group replication. + pub quoted_peers: Vec<(PeerId, Vec)>, + /// Serialized [`PaymentProof`] bytes. + pub proof_bytes: Vec, +} + +impl Client { + /// Prepare a single chunk for batch payment. + /// + /// Collects quotes and fetches contract prices without making any + /// on-chain transaction. Returns `Ok(None)` if the chunk is already + /// stored on the network. + /// + /// # Errors + /// + /// Returns an error if quote collection or payment construction fails. + pub async fn prepare_chunk_payment(&self, content: Bytes) -> Result> { + let address = compute_address(&content); + let data_size = u64::try_from(content.len()) + .map_err(|e| Error::InvalidData(format!("content size too large: {e}")))?; + + let quotes_with_peers = match self + .get_store_quotes(&address, data_size, DATA_TYPE_CHUNK) + .await + { + Ok(quotes) => quotes, + Err(Error::AlreadyStored) => { + debug!("Chunk {} already stored, skipping", hex::encode(address)); + return Ok(None); + } + Err(e) => return Err(e), + }; + + let wallet = self.require_wallet()?; + + // Capture all quoted peers for close-group replication. + let quoted_peers: Vec<(PeerId, Vec)> = quotes_with_peers + .iter() + .map(|(peer_id, addrs, _, _)| (*peer_id, addrs.clone())) + .collect(); + + // Fetch authoritative prices from the on-chain contract. + let metrics_batch: Vec<_> = quotes_with_peers + .iter() + .map(|(_, _, quote, _)| quote.quoting_metrics.clone()) + .collect(); + + let contract_prices = + evmlib::contract::payment_vault::get_market_price(wallet.network(), metrics_batch) + .await + .map_err(|e| { + Error::Payment(format!("Failed to get market prices from contract: {e}")) + })?; + + if contract_prices.len() != quotes_with_peers.len() { + return Err(Error::Payment(format!( + "Contract returned {} prices for {} quotes", + contract_prices.len(), + quotes_with_peers.len() + ))); + } + + // Build peer_quotes for ProofOfPayment + quotes for SingleNodePayment. + let mut peer_quotes = Vec::with_capacity(quotes_with_peers.len()); + let mut quotes_for_payment = Vec::with_capacity(quotes_with_peers.len()); + + for ((peer_id, _addrs, quote, _local_price), contract_price) in + quotes_with_peers.into_iter().zip(contract_prices) + { + let encoded = peer_id_to_encoded(&peer_id)?; + peer_quotes.push((encoded, quote.clone())); + quotes_for_payment.push((quote, contract_price)); + } + + let payment = SingleNodePayment::from_quotes(quotes_for_payment) + .map_err(|e| Error::Payment(format!("Failed to create payment: {e}")))?; + + Ok(Some(PreparedChunk { + content, + address, + quoted_peers, + payment, + peer_quotes, + })) + } + + /// Pay for multiple chunks in a single EVM transaction. + /// + /// Flattens all quote payments from the prepared chunks into one + /// `wallet.pay_for_quotes()` call, then maps transaction hashes + /// back to per-chunk [`PaymentProof`] bytes. + /// + /// # Errors + /// + /// Returns an error if the wallet is not configured or the on-chain + /// payment fails. + pub async fn batch_pay(&self, prepared: Vec) -> Result> { + if prepared.is_empty() { + return Ok(Vec::new()); + } + + let wallet = self.require_wallet()?; + + // Flatten all quote payments from all chunks into a single batch. + let mut all_payments = + Vec::with_capacity(prepared.len() * prepared[0].payment.quotes.len()); + for chunk in &prepared { + for info in &chunk.payment.quotes { + all_payments.push((info.quote_hash, info.rewards_address, info.amount)); + } + } + + info!( + "Batch payment for {} chunks ({} quote entries)", + prepared.len(), + all_payments.len() + ); + + let (tx_hash_map, _gas_info) = wallet.pay_for_quotes(all_payments).await.map_err( + |evmlib::wallet::PayForQuotesError(err, _)| { + Error::Payment(format!("Batch payment failed: {err}")) + }, + )?; + + info!( + "Batch payment succeeded: {} transactions", + tx_hash_map.len() + ); + + // Map tx hashes back to per-chunk PaymentProofs. + let mut paid_chunks = Vec::with_capacity(prepared.len()); + for chunk in prepared { + let tx_hashes: Vec<_> = chunk + .payment + .quotes + .iter() + .filter(|info| !info.amount.is_zero()) + .filter_map(|info| tx_hash_map.get(&info.quote_hash).copied()) + .collect(); + + let proof = PaymentProof { + proof_of_payment: ProofOfPayment { + peer_quotes: chunk.peer_quotes, + }, + tx_hashes, + }; + + let proof_bytes = serialize_single_node_proof(&proof).map_err(|e| { + Error::Serialization(format!("Failed to serialize payment proof: {e}")) + })?; + + paid_chunks.push(PaidChunk { + content: chunk.content, + address: chunk.address, + quoted_peers: chunk.quoted_peers, + proof_bytes, + }); + } + + Ok(paid_chunks) + } + + /// Upload chunks in waves with pipelined EVM payments. + /// + /// Processes chunks in waves of [`PAYMENT_WAVE_SIZE`]. Within each wave: + /// 1. **Prepare**: collect quotes for all chunks concurrently + /// 2. **Pay**: single EVM transaction for the whole wave + /// 3. **Store**: concurrent [`Client::chunk_put_to_close_group`] calls + /// + /// Stores from wave N overlap with quote collection for wave N+1 + /// via `tokio::join!`. + /// + /// # Errors + /// + /// Returns an error if any payment or store operation fails. + pub async fn batch_upload_chunks(&self, chunks: Vec) -> Result> { + if chunks.is_empty() { + return Ok(Vec::new()); + } + + let total_chunks = chunks.len(); + info!("Batch uploading {total_chunks} chunks in waves of {PAYMENT_WAVE_SIZE}"); + + let mut all_addresses = Vec::with_capacity(total_chunks); + let mut seen_addresses: HashSet = HashSet::new(); + + // Deduplicate chunks by content address. + let mut unique_chunks = Vec::with_capacity(total_chunks); + for chunk in chunks { + let address = compute_address(&chunk); + if seen_addresses.insert(address) { + unique_chunks.push(chunk); + } else { + debug!("Skipping duplicate chunk {}", hex::encode(address)); + all_addresses.push(address); + } + } + + // Split into waves. + let waves: Vec> = unique_chunks + .chunks(PAYMENT_WAVE_SIZE) + .map(<[Bytes]>::to_vec) + .collect(); + let wave_count = waves.len(); + + info!( + "{total_chunks} chunks -> {} unique -> {wave_count} waves", + seen_addresses.len() + ); + + let mut pending_store: Option> = None; + + for (wave_idx, wave_chunks) in waves.into_iter().enumerate() { + let wave_num = wave_idx + 1; + + // Pipeline: store previous wave while preparing this one. + let (prepare_result, store_result) = match pending_store.take() { + Some(paid_chunks) => { + let (prep, stored) = tokio::join!( + self.prepare_wave(wave_chunks), + self.store_paid_chunks(paid_chunks) + ); + (prep, Some(stored)) + } + None => (self.prepare_wave(wave_chunks).await, None), + }; + + // Propagate store errors from previous wave. + if let Some(stored) = store_result { + all_addresses.extend(stored?); + } + + let (prepared_chunks, already_stored) = prepare_result?; + all_addresses.extend(already_stored); + + if prepared_chunks.is_empty() { + info!("Wave {wave_num}/{wave_count}: all chunks already stored"); + continue; + } + + info!( + "Wave {wave_num}/{wave_count}: paying for {} chunks", + prepared_chunks.len() + ); + let paid_chunks = self.batch_pay(prepared_chunks).await?; + pending_store = Some(paid_chunks); + } + + // Store the last wave. + if let Some(paid_chunks) = pending_store { + all_addresses.extend(self.store_paid_chunks(paid_chunks).await?); + } + + info!("Batch upload complete: {} addresses", all_addresses.len()); + Ok(all_addresses) + } + + /// Prepare a wave of chunks by collecting quotes concurrently. + /// + /// Returns `(prepared_chunks, already_stored_addresses)`. + async fn prepare_wave(&self, chunks: Vec) -> Result<(Vec, Vec)> { + let chunk_count = chunks.len(); + let chunks_with_addr: Vec<(Bytes, XorName)> = chunks + .into_iter() + .map(|c| { + let addr = compute_address(&c); + (c, addr) + }) + .collect(); + + let results: Vec<(XorName, Result>)> = stream::iter(chunks_with_addr) + .map(|(content, address)| async move { + (address, self.prepare_chunk_payment(content).await) + }) + .buffer_unordered(chunk_count) + .collect() + .await; + + let mut prepared = Vec::with_capacity(chunk_count); + let mut already_stored = Vec::new(); + + for (address, result) in results { + match result? { + Some(chunk) => prepared.push(chunk), + None => already_stored.push(address), + } + } + + Ok((prepared, already_stored)) + } + + /// Store a batch of paid chunks concurrently to their close groups. + async fn store_paid_chunks(&self, paid_chunks: Vec) -> Result> { + let store_futures: Vec<_> = paid_chunks + .into_iter() + .map(|chunk| async move { + self.chunk_put_to_close_group(chunk.content, chunk.proof_bytes, &chunk.quoted_peers) + .await + }) + .collect(); + + let results = futures::future::join_all(store_futures).await; + results.into_iter().collect() + } +} diff --git a/ant-core/src/data/client/data.rs b/ant-core/src/data/client/data.rs index c625b40..a23081a 100644 --- a/ant-core/src/data/client/data.rs +++ b/ant-core/src/data/client/data.rs @@ -14,7 +14,7 @@ use ant_node::client::compute_address; use bytes::Bytes; use futures::stream::{self, StreamExt, TryStreamExt}; use self_encryption::{decrypt, encrypt, DataMap, EncryptedChunk}; -use tracing::{debug, info, warn}; +use tracing::{debug, info}; /// Result of an in-memory data upload: the `DataMap` needed to retrieve the data. #[derive(Debug, Clone)] @@ -51,17 +51,8 @@ impl Client { .map(|chunk| chunk.content) .collect(); - // Fail-fast: cancel remaining uploads on first error to avoid - // wasting gas on chunks that can't form a complete DataMap. - let mut chunks_stored = 0usize; - let mut upload_stream = stream::iter(chunk_contents) - .map(|content| self.chunk_put(content)) - .buffer_unordered(4); - - while let Some(result) = upload_stream.next().await { - result?; - chunks_stored += 1; - } + let addresses = self.batch_upload_chunks(chunk_contents).await?; + let chunks_stored = addresses.len(); info!("Data uploaded: {chunks_stored} chunks stored ({content_len} bytes original)"); @@ -119,18 +110,11 @@ impl Client { { Ok(result) => result, Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => { - warn!("Merkle needs more peers ({msg}), falling back to per-chunk"); - let mut chunks_stored = 0usize; - let mut s = stream::iter(chunk_contents) - .map(|c| self.chunk_put(c)) - .buffer_unordered(4); - while let Some(result) = s.next().await { - result?; - chunks_stored += 1; - } + info!("Merkle needs more peers ({msg}), falling back to wave-batch"); + let addresses = self.batch_upload_chunks(chunk_contents).await?; return Ok(DataUploadResult { data_map, - chunks_stored, + chunks_stored: addresses.len(), payment_mode_used: PaymentMode::Single, }); } @@ -169,21 +153,16 @@ impl Client { payment_mode_used: PaymentMode::Merkle, }) } else { - // Standard per-chunk payment path - let mut chunks_stored = 0usize; - let mut upload_stream = stream::iter(chunk_contents) - .map(|content| self.chunk_put(content)) - .buffer_unordered(4); - - while let Some(result) = upload_stream.next().await { - result?; - chunks_stored += 1; - } + // Wave-based batch payment path (single EVM tx per wave). + let addresses = self.batch_upload_chunks(chunk_contents).await?; - info!("Data uploaded: {chunks_stored} chunks stored ({content_len} bytes original)"); + info!( + "Data uploaded: {} chunks stored ({content_len} bytes original)", + addresses.len() + ); Ok(DataUploadResult { data_map, - chunks_stored, + chunks_stored: addresses.len(), payment_mode_used: PaymentMode::Single, }) } diff --git a/ant-core/src/data/client/file.rs b/ant-core/src/data/client/file.rs index c40c011..77818b6 100644 --- a/ant-core/src/data/client/file.rs +++ b/ant-core/src/data/client/file.rs @@ -18,7 +18,7 @@ use std::io::Write; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use tokio::runtime::Handle; -use tracing::{debug, info, warn}; +use tracing::{debug, info}; /// Result of a file upload: the `DataMap` needed to retrieve the file. #[derive(Debug, Clone)] @@ -125,22 +125,13 @@ impl Client { let (mut chunk_rx, datamap_rx, handle) = spawn_file_encryption(path.to_path_buf())?; - // Collect chunks from encryption channel, then upload concurrently. + // Collect all chunks from encryption channel. let mut chunk_contents = Vec::new(); while let Some(content) = chunk_rx.recv().await { chunk_contents.push(content); } - let mut chunks_stored = 0usize; - let mut upload_stream = futures::stream::iter(chunk_contents) - .map(|content| self.chunk_put(content)) - .buffer_unordered(4); - - while let Some(result) = futures::StreamExt::next(&mut upload_stream).await { - result?; - chunks_stored += 1; - } - + // Await encryption completion to catch errors before paying for storage. handle .await .map_err(|e| Error::Encryption(format!("encryption task panicked: {e}")))? @@ -150,6 +141,9 @@ impl Client { .await .map_err(|_| Error::Encryption("no DataMap from encryption thread".to_string()))?; + let addresses = self.batch_upload_chunks(chunk_contents).await?; + let chunks_stored = addresses.len(); + info!( "File uploaded: {chunks_stored} chunks stored ({})", path.display() @@ -184,12 +178,22 @@ impl Client { let (mut chunk_rx, datamap_rx, handle) = spawn_file_encryption(path.to_path_buf())?; - // Collect all chunks first (buffer-then-pay) + // Collect all chunks first (buffer-then-pay). let mut chunk_contents = Vec::new(); while let Some(content) = chunk_rx.recv().await { chunk_contents.push(content); } + // Await encryption completion to catch errors before paying for storage. + handle + .await + .map_err(|e| Error::Encryption(format!("encryption task panicked: {e}")))? + .map_err(|e| Error::Encryption(format!("encryption failed: {e}")))?; + + let data_map = datamap_rx + .await + .map_err(|_| Error::Encryption("no DataMap from encryption thread".to_string()))?; + let chunk_count = chunk_contents.len(); let (chunks_stored, actual_mode) = if self.should_use_merkle(chunk_count, mode) { @@ -209,28 +213,11 @@ impl Client { { Ok(result) => result, Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => { - warn!("Merkle needs more peers ({msg}), falling back to per-chunk"); - let mut s_stored = 0usize; - let mut s = stream::iter(chunk_contents) - .map(|c| self.chunk_put(c)) - .buffer_unordered(4); - while let Some(result) = futures::StreamExt::next(&mut s).await { - result?; - s_stored += 1; - } - - // Need datamap from the encryption handle - handle - .await - .map_err(|e| Error::Encryption(format!("encryption task panicked: {e}")))? - .map_err(|e| Error::Encryption(format!("encryption failed: {e}")))?; - let data_map = datamap_rx.await.map_err(|_| { - Error::Encryption("no DataMap from encryption thread".to_string()) - })?; - + info!("Merkle needs more peers ({msg}), falling back to wave-batch"); + let addresses = self.batch_upload_chunks(chunk_contents).await?; return Ok(FileUploadResult { data_map, - chunks_stored: s_stored, + chunks_stored: addresses.len(), payment_mode_used: PaymentMode::Single, }); } @@ -262,28 +249,11 @@ impl Client { } (stored, PaymentMode::Merkle) } else { - // Standard per-chunk payment path - let mut stored = 0usize; - let mut upload_stream = stream::iter(chunk_contents) - .map(|content| self.chunk_put(content)) - .buffer_unordered(4); - - while let Some(result) = futures::StreamExt::next(&mut upload_stream).await { - result?; - stored += 1; - } - (stored, PaymentMode::Single) + // Wave-based batch payment path (single EVM tx per wave). + let addresses = self.batch_upload_chunks(chunk_contents).await?; + (addresses.len(), PaymentMode::Single) }; - handle - .await - .map_err(|e| Error::Encryption(format!("encryption task panicked: {e}")))? - .map_err(|e| Error::Encryption(format!("encryption failed: {e}")))?; - - let data_map = datamap_rx - .await - .map_err(|_| Error::Encryption("no DataMap from encryption thread".to_string()))?; - info!( "File uploaded with mode {mode:?}: {chunks_stored} chunks stored ({})", path.display() diff --git a/ant-core/src/data/client/mod.rs b/ant-core/src/data/client/mod.rs index 6901233..5b09b80 100644 --- a/ant-core/src/data/client/mod.rs +++ b/ant-core/src/data/client/mod.rs @@ -3,6 +3,7 @@ //! Provides high-level APIs for storing and retrieving data //! on the Autonomi decentralized network. +pub mod batch; pub mod cache; pub mod chunk; pub mod data; diff --git a/ant-core/src/data/client/payment.rs b/ant-core/src/data/client/payment.rs index a543a65..04c9064 100644 --- a/ant-core/src/data/client/payment.rs +++ b/ant-core/src/data/client/payment.rs @@ -154,7 +154,7 @@ impl Client { } /// Convert an ant-node `PeerId` to an `EncodedPeerId` for payment proofs. -fn peer_id_to_encoded(peer_id: &PeerId) -> Result { +pub(crate) fn peer_id_to_encoded(peer_id: &PeerId) -> Result { hex_node_id_to_encoded_peer_id(&peer_id.to_hex()) .map_err(|e| Error::Payment(format!("Failed to encode peer ID: {e}"))) } From ac9a4abf959b4d1294433d2a31fd74b2a8afa8ee Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Wed, 25 Mar 2026 21:43:00 +0100 Subject: [PATCH 2/5] docs: fix rustdoc links to private items in batch module Co-Authored-By: Claude Opus 4.6 (1M context) --- ant-core/src/data/client/batch.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ant-core/src/data/client/batch.rs b/ant-core/src/data/client/batch.rs index 68366fb..f9ec6f8 100644 --- a/ant-core/src/data/client/batch.rs +++ b/ant-core/src/data/client/batch.rs @@ -1,6 +1,6 @@ //! Batch chunk upload with wave-based pipelined EVM payments. //! -//! Groups chunks into waves of [`PAYMENT_WAVE_SIZE`] and pays for each +//! Groups chunks into waves of 64 and pays for each //! wave in a single EVM transaction. Stores from wave N are pipelined //! with quote collection for wave N+1 via `tokio::join!`. @@ -204,10 +204,10 @@ impl Client { /// Upload chunks in waves with pipelined EVM payments. /// - /// Processes chunks in waves of [`PAYMENT_WAVE_SIZE`]. Within each wave: + /// Processes chunks in waves of `PAYMENT_WAVE_SIZE` (64). Within each wave: /// 1. **Prepare**: collect quotes for all chunks concurrently /// 2. **Pay**: single EVM transaction for the whole wave - /// 3. **Store**: concurrent [`Client::chunk_put_to_close_group`] calls + /// 3. **Store**: concurrent chunk replication to close group /// /// Stores from wave N overlap with quote collection for wave N+1 /// via `tokio::join!`. From 548640b71b4dc18191d160c81cb6694ab0af0aa2 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Wed, 25 Mar 2026 22:41:05 +0100 Subject: [PATCH 3/5] fix: throttle concurrent uploads and limit replication to majority peers Uploads of large files to small networks (e.g. local devnets) were failing because the client blasted all chunks to all peers simultaneously, overwhelming QUIC connections and causing cascading disconnects. Three changes: - Only replicate chunks to CLOSE_GROUP_MAJORITY peers instead of all CLOSE_GROUP_SIZE, reducing per-chunk network load - Throttle both quote collection and chunk storage with buffer_unordered(chunk_concurrency) instead of unbounded parallelism - Make chunk_concurrency configurable via ClientConfig and --chunk-concurrency CLI flag, defaulting to half the available CPU threads Co-Authored-By: Claude Opus 4.6 (1M context) --- ant-cli/src/cli.rs | 5 +++++ ant-cli/src/main.rs | 8 +++++++- ant-core/src/data/client/batch.rs | 10 +++++----- ant-core/src/data/client/chunk.rs | 12 ++++++++---- ant-core/src/data/client/data.rs | 2 +- ant-core/src/data/client/file.rs | 2 +- ant-core/src/data/client/mod.rs | 19 +++++++++++++++++++ ant-core/tests/e2e_merkle.rs | 1 + 8 files changed, 47 insertions(+), 12 deletions(-) diff --git a/ant-cli/src/cli.rs b/ant-cli/src/cli.rs index b9ccdc0..afd2c62 100644 --- a/ant-cli/src/cli.rs +++ b/ant-cli/src/cli.rs @@ -28,6 +28,11 @@ pub struct Cli { #[arg(long, default_value_t = 60)] pub timeout_secs: u64, + /// Maximum number of chunks processed concurrently during uploads. + /// Defaults to half the available CPU threads. + #[arg(long)] + pub chunk_concurrency: Option, + /// Log level. #[arg(long, default_value = "info")] pub log_level: String, diff --git a/ant-cli/src/main.rs b/ant-cli/src/main.rs index 0b02164..313bdd9 100644 --- a/ant-cli/src/main.rs +++ b/ant-cli/src/main.rs @@ -56,6 +56,7 @@ async fn run() -> anyhow::Result<()> { timeout_secs, log_level: _, evm_network, + chunk_concurrency, } = cli; // Shared context for data commands that need EVM / bootstrap info. @@ -65,6 +66,7 @@ async fn run() -> anyhow::Result<()> { allow_loopback, timeout_secs, evm_network, + chunk_concurrency, }; match command { @@ -120,6 +122,7 @@ struct DataCliContext { allow_loopback: bool, timeout_secs: u64, evm_network: String, + chunk_concurrency: Option, } /// Build a data client with wallet if SECRET_KEY is set. @@ -135,10 +138,13 @@ async fn build_data_client(ctx: &DataCliContext, needs_wallet: bool) -> anyhow:: let bootstrap = resolve_bootstrap_from(ctx, manifest.as_ref())?; let node = create_client_node(bootstrap, ctx.allow_loopback).await?; - let config = ClientConfig { + let mut config = ClientConfig { timeout_secs: ctx.timeout_secs, ..Default::default() }; + if let Some(concurrency) = ctx.chunk_concurrency { + config.chunk_concurrency = concurrency; + } let mut client = Client::from_node(node, config); diff --git a/ant-core/src/data/client/batch.rs b/ant-core/src/data/client/batch.rs index f9ec6f8..705485e 100644 --- a/ant-core/src/data/client/batch.rs +++ b/ant-core/src/data/client/batch.rs @@ -314,7 +314,7 @@ impl Client { .map(|(content, address)| async move { (address, self.prepare_chunk_payment(content).await) }) - .buffer_unordered(chunk_count) + .buffer_unordered(self.config().chunk_concurrency) .collect() .await; @@ -333,15 +333,15 @@ impl Client { /// Store a batch of paid chunks concurrently to their close groups. async fn store_paid_chunks(&self, paid_chunks: Vec) -> Result> { - let store_futures: Vec<_> = paid_chunks - .into_iter() + let results: Vec> = stream::iter(paid_chunks) .map(|chunk| async move { self.chunk_put_to_close_group(chunk.content, chunk.proof_bytes, &chunk.quoted_peers) .await }) - .collect(); + .buffer_unordered(self.config().chunk_concurrency) + .collect() + .await; - let results = futures::future::join_all(store_futures).await; results.into_iter().collect() } } diff --git a/ant-core/src/data/client/chunk.rs b/ant-core/src/data/client/chunk.rs index d2dcb15..c2f2705 100644 --- a/ant-core/src/data/client/chunk.rs +++ b/ant-core/src/data/client/chunk.rs @@ -54,9 +54,9 @@ impl Client { /// Store a chunk to `CLOSE_GROUP_MAJORITY` peers from the quoted set. /// - /// Sends the PUT concurrently to all `peers` and succeeds once - /// `CLOSE_GROUP_MAJORITY` confirm storage. Peers that already have - /// the chunk count towards the majority. + /// Sends the PUT concurrently to the first `CLOSE_GROUP_MAJORITY` + /// peers and succeeds once all confirm storage. Peers that already + /// have the chunk count towards the majority. /// /// # Errors /// @@ -70,8 +70,12 @@ impl Client { ) -> Result { let address = compute_address(&content); + // Only send to CLOSE_GROUP_MAJORITY peers — no need to replicate + // beyond the minimum required for consensus. + let target_peers = &peers[..peers.len().min(CLOSE_GROUP_MAJORITY)]; + let mut put_futures = FuturesUnordered::new(); - for (peer_id, addrs) in peers { + for (peer_id, addrs) in target_peers { let content = content.clone(); let proof = proof.clone(); let peer_id = *peer_id; diff --git a/ant-core/src/data/client/data.rs b/ant-core/src/data/client/data.rs index a23081a..ff3b73b 100644 --- a/ant-core/src/data/client/data.rs +++ b/ant-core/src/data/client/data.rs @@ -139,7 +139,7 @@ impl Client { } }, )) - .buffer_unordered(4); + .buffer_unordered(self.config().chunk_concurrency); while let Some(result) = upload_stream.next().await { result?; diff --git a/ant-core/src/data/client/file.rs b/ant-core/src/data/client/file.rs index 77818b6..74f8591 100644 --- a/ant-core/src/data/client/file.rs +++ b/ant-core/src/data/client/file.rs @@ -241,7 +241,7 @@ impl Client { } }, )) - .buffer_unordered(4); + .buffer_unordered(self.config().chunk_concurrency); while let Some(result) = futures::StreamExt::next(&mut upload_stream).await { result?; diff --git a/ant-core/src/data/client/mod.rs b/ant-core/src/data/client/mod.rs index 5b09b80..c26f05a 100644 --- a/ant-core/src/data/client/mod.rs +++ b/ant-core/src/data/client/mod.rs @@ -26,6 +26,19 @@ use tracing::debug; /// Default timeout for network operations in seconds. const CLIENT_TIMEOUT_SECS: u64 = 10; +/// Assumed CPU thread count when `available_parallelism()` fails. +const FALLBACK_THREAD_COUNT: usize = 4; + +/// Derive a sensible default chunk concurrency from available CPU parallelism. +/// +/// Uses half the available threads (network I/O doesn't need 1:1 CPU mapping). +fn default_chunk_concurrency() -> usize { + let threads = std::thread::available_parallelism() + .map(std::num::NonZeroUsize::get) + .unwrap_or(FALLBACK_THREAD_COUNT); + (threads / 2).max(1) +} + /// Configuration for the Autonomi client. #[derive(Debug, Clone)] pub struct ClientConfig { @@ -33,6 +46,11 @@ pub struct ClientConfig { pub timeout_secs: u64, /// Number of closest peers to consider for routing. pub close_group_size: usize, + /// Maximum number of chunks processed concurrently during uploads. + /// + /// Controls parallelism for quote collection, chunk storage, and + /// merkle upload paths. Defaults to half the available CPU threads. + pub chunk_concurrency: usize, } impl Default for ClientConfig { @@ -40,6 +58,7 @@ impl Default for ClientConfig { Self { timeout_secs: CLIENT_TIMEOUT_SECS, close_group_size: CLOSE_GROUP_SIZE, + chunk_concurrency: default_chunk_concurrency(), } } } diff --git a/ant-core/tests/e2e_merkle.rs b/ant-core/tests/e2e_merkle.rs index de090ae..dc848f1 100644 --- a/ant-core/tests/e2e_merkle.rs +++ b/ant-core/tests/e2e_merkle.rs @@ -39,6 +39,7 @@ async fn setup_merkle_testnet() -> (Client, MiniTestnet) { let config = ClientConfig { timeout_secs: CLIENT_TIMEOUT_SECS, close_group_size: 20, + ..Default::default() }; let client = Client::from_node(Arc::clone(&node), config).with_wallet(testnet.wallet().clone()); From 09ddc3625eb0f2d743a1d8d9b0c109b29d597890 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Wed, 25 Mar 2026 22:42:31 +0100 Subject: [PATCH 4/5] fix: log chunk concurrency at upload start Co-Authored-By: Claude Opus 4.6 (1M context) --- ant-core/src/data/client/batch.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ant-core/src/data/client/batch.rs b/ant-core/src/data/client/batch.rs index 705485e..8e4fc75 100644 --- a/ant-core/src/data/client/batch.rs +++ b/ant-core/src/data/client/batch.rs @@ -221,7 +221,8 @@ impl Client { } let total_chunks = chunks.len(); - info!("Batch uploading {total_chunks} chunks in waves of {PAYMENT_WAVE_SIZE}"); + let concurrency = self.config().chunk_concurrency; + info!("Batch uploading {total_chunks} chunks in waves of {PAYMENT_WAVE_SIZE} (concurrency: {concurrency})"); let mut all_addresses = Vec::with_capacity(total_chunks); let mut seen_addresses: HashSet = HashSet::new(); From b64a812de7a930c70a91ace9b0ef48f78ee3fe8f Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Thu, 26 Mar 2026 10:10:33 +0100 Subject: [PATCH 5/5] fix: fall back to remaining close group peers on chunk PUT failure MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, chunk uploads were sent to exactly CLOSE_GROUP_MAJORITY peers with no recovery path — a single peer failure failed the entire upload. Now, on each peer failure, the next peer from the remaining close group is tried as a fallback, while the happy path still only contacts the minimum CLOSE_GROUP_MAJORITY peers. Co-Authored-By: Claude Opus 4.6 (1M context) --- ant-core/src/data/client/chunk.rs | 57 ++++++++++++++++++++++--------- 1 file changed, 40 insertions(+), 17 deletions(-) diff --git a/ant-core/src/data/client/chunk.rs b/ant-core/src/data/client/chunk.rs index c2f2705..7292d63 100644 --- a/ant-core/src/data/client/chunk.rs +++ b/ant-core/src/data/client/chunk.rs @@ -14,6 +14,7 @@ use ant_node::core::{MultiAddr, PeerId}; use ant_node::CLOSE_GROUP_MAJORITY; use bytes::Bytes; use futures::stream::{FuturesUnordered, StreamExt}; +use std::future::Future; use std::time::Duration; use tracing::{debug, info, warn}; @@ -54,9 +55,10 @@ impl Client { /// Store a chunk to `CLOSE_GROUP_MAJORITY` peers from the quoted set. /// - /// Sends the PUT concurrently to the first `CLOSE_GROUP_MAJORITY` - /// peers and succeeds once all confirm storage. Peers that already - /// have the chunk count towards the majority. + /// Initially sends the PUT concurrently to the first + /// `CLOSE_GROUP_MAJORITY` peers. If any fail, falls back to the + /// remaining peers in the quoted set until majority is reached or + /// all peers are exhausted. /// /// # Errors /// @@ -70,26 +72,17 @@ impl Client { ) -> Result { let address = compute_address(&content); - // Only send to CLOSE_GROUP_MAJORITY peers — no need to replicate - // beyond the minimum required for consensus. - let target_peers = &peers[..peers.len().min(CLOSE_GROUP_MAJORITY)]; + let initial_count = peers.len().min(CLOSE_GROUP_MAJORITY); + let (initial_peers, fallback_peers) = peers.split_at(initial_count); let mut put_futures = FuturesUnordered::new(); - for (peer_id, addrs) in target_peers { - let content = content.clone(); - let proof = proof.clone(); - let peer_id = *peer_id; - let addrs = addrs.clone(); - put_futures.push(async move { - let result = self - .chunk_put_with_proof(content, proof, &peer_id, &addrs) - .await; - (peer_id, result) - }); + for (peer_id, addrs) in initial_peers { + put_futures.push(self.spawn_chunk_put(content.clone(), proof.clone(), peer_id, addrs)); } let mut success_count = 0usize; let mut failures: Vec = Vec::new(); + let mut fallback_iter = fallback_peers.iter(); while let Some((peer_id, result)) = put_futures.next().await { match result { @@ -106,6 +99,19 @@ impl Client { Err(e) => { warn!("Failed to store chunk on {peer_id}: {e}"); failures.push(format!("{peer_id}: {e}")); + + if let Some((fb_peer, fb_addrs)) = fallback_iter.next() { + debug!( + "Falling back to peer {fb_peer} for chunk {}", + hex::encode(address) + ); + put_futures.push(self.spawn_chunk_put( + content.clone(), + proof.clone(), + fb_peer, + fb_addrs, + )); + } } } } @@ -116,6 +122,23 @@ impl Client { ))) } + /// Spawn a chunk PUT future for a single peer. + fn spawn_chunk_put<'a>( + &'a self, + content: Bytes, + proof: Vec, + peer_id: &'a PeerId, + addrs: &'a [MultiAddr], + ) -> impl Future)> + 'a { + let peer_id_owned = *peer_id; + async move { + let result = self + .chunk_put_with_proof(content, proof, &peer_id_owned, addrs) + .await; + (peer_id_owned, result) + } + } + /// Store a chunk on the Autonomi network with a pre-built payment proof. /// /// Sends to a single peer. Callers that need replication across the