diff --git a/ant-cli/src/cli.rs b/ant-cli/src/cli.rs index b9ccdc0..afd2c62 100644 --- a/ant-cli/src/cli.rs +++ b/ant-cli/src/cli.rs @@ -28,6 +28,11 @@ pub struct Cli { #[arg(long, default_value_t = 60)] pub timeout_secs: u64, + /// Maximum number of chunks processed concurrently during uploads. + /// Defaults to half the available CPU threads. + #[arg(long)] + pub chunk_concurrency: Option, + /// Log level. #[arg(long, default_value = "info")] pub log_level: String, diff --git a/ant-cli/src/main.rs b/ant-cli/src/main.rs index 0b02164..313bdd9 100644 --- a/ant-cli/src/main.rs +++ b/ant-cli/src/main.rs @@ -56,6 +56,7 @@ async fn run() -> anyhow::Result<()> { timeout_secs, log_level: _, evm_network, + chunk_concurrency, } = cli; // Shared context for data commands that need EVM / bootstrap info. @@ -65,6 +66,7 @@ async fn run() -> anyhow::Result<()> { allow_loopback, timeout_secs, evm_network, + chunk_concurrency, }; match command { @@ -120,6 +122,7 @@ struct DataCliContext { allow_loopback: bool, timeout_secs: u64, evm_network: String, + chunk_concurrency: Option, } /// Build a data client with wallet if SECRET_KEY is set. @@ -135,10 +138,13 @@ async fn build_data_client(ctx: &DataCliContext, needs_wallet: bool) -> anyhow:: let bootstrap = resolve_bootstrap_from(ctx, manifest.as_ref())?; let node = create_client_node(bootstrap, ctx.allow_loopback).await?; - let config = ClientConfig { + let mut config = ClientConfig { timeout_secs: ctx.timeout_secs, ..Default::default() }; + if let Some(concurrency) = ctx.chunk_concurrency { + config.chunk_concurrency = concurrency; + } let mut client = Client::from_node(node, config); diff --git a/ant-core/src/data/client/batch.rs b/ant-core/src/data/client/batch.rs new file mode 100644 index 0000000..8e4fc75 --- /dev/null +++ b/ant-core/src/data/client/batch.rs @@ -0,0 +1,348 @@ +//! Batch chunk upload with wave-based pipelined EVM payments. +//! +//! Groups chunks into waves of 64 and pays for each +//! wave in a single EVM transaction. Stores from wave N are pipelined +//! with quote collection for wave N+1 via `tokio::join!`. + +use crate::data::client::payment::peer_id_to_encoded; +use crate::data::client::Client; +use crate::data::error::{Error, Result}; +use ant_evm::{EncodedPeerId, PaymentQuote, ProofOfPayment}; +use ant_node::ant_protocol::DATA_TYPE_CHUNK; +use ant_node::client::{compute_address, XorName}; +use ant_node::core::{MultiAddr, PeerId}; +use ant_node::payment::{serialize_single_node_proof, PaymentProof, SingleNodePayment}; +use bytes::Bytes; +use futures::stream::{self, StreamExt}; +use std::collections::HashSet; +use tracing::{debug, info}; + +/// Number of chunks per payment wave. +const PAYMENT_WAVE_SIZE: usize = 64; + +/// Chunk quoted but not yet paid. Produced by [`Client::prepare_chunk_payment`]. +pub struct PreparedChunk { + /// The chunk content bytes. + pub content: Bytes, + /// Content address (BLAKE3 hash). + pub address: XorName, + /// Closest peers from quote collection — PUT targets for close-group replication. + pub quoted_peers: Vec<(PeerId, Vec)>, + /// Payment structure (quotes sorted, median selected, not yet paid on-chain). + pub payment: SingleNodePayment, + /// Peer quotes for building `ProofOfPayment`. + pub peer_quotes: Vec<(EncodedPeerId, PaymentQuote)>, +} + +/// Chunk paid but not yet stored. Produced by [`Client::batch_pay`]. +pub struct PaidChunk { + /// The chunk content bytes. + pub content: Bytes, + /// Content address (BLAKE3 hash). + pub address: XorName, + /// Closest peers from quote collection — PUT targets for close-group replication. + pub quoted_peers: Vec<(PeerId, Vec)>, + /// Serialized [`PaymentProof`] bytes. + pub proof_bytes: Vec, +} + +impl Client { + /// Prepare a single chunk for batch payment. + /// + /// Collects quotes and fetches contract prices without making any + /// on-chain transaction. Returns `Ok(None)` if the chunk is already + /// stored on the network. + /// + /// # Errors + /// + /// Returns an error if quote collection or payment construction fails. + pub async fn prepare_chunk_payment(&self, content: Bytes) -> Result> { + let address = compute_address(&content); + let data_size = u64::try_from(content.len()) + .map_err(|e| Error::InvalidData(format!("content size too large: {e}")))?; + + let quotes_with_peers = match self + .get_store_quotes(&address, data_size, DATA_TYPE_CHUNK) + .await + { + Ok(quotes) => quotes, + Err(Error::AlreadyStored) => { + debug!("Chunk {} already stored, skipping", hex::encode(address)); + return Ok(None); + } + Err(e) => return Err(e), + }; + + let wallet = self.require_wallet()?; + + // Capture all quoted peers for close-group replication. + let quoted_peers: Vec<(PeerId, Vec)> = quotes_with_peers + .iter() + .map(|(peer_id, addrs, _, _)| (*peer_id, addrs.clone())) + .collect(); + + // Fetch authoritative prices from the on-chain contract. + let metrics_batch: Vec<_> = quotes_with_peers + .iter() + .map(|(_, _, quote, _)| quote.quoting_metrics.clone()) + .collect(); + + let contract_prices = + evmlib::contract::payment_vault::get_market_price(wallet.network(), metrics_batch) + .await + .map_err(|e| { + Error::Payment(format!("Failed to get market prices from contract: {e}")) + })?; + + if contract_prices.len() != quotes_with_peers.len() { + return Err(Error::Payment(format!( + "Contract returned {} prices for {} quotes", + contract_prices.len(), + quotes_with_peers.len() + ))); + } + + // Build peer_quotes for ProofOfPayment + quotes for SingleNodePayment. + let mut peer_quotes = Vec::with_capacity(quotes_with_peers.len()); + let mut quotes_for_payment = Vec::with_capacity(quotes_with_peers.len()); + + for ((peer_id, _addrs, quote, _local_price), contract_price) in + quotes_with_peers.into_iter().zip(contract_prices) + { + let encoded = peer_id_to_encoded(&peer_id)?; + peer_quotes.push((encoded, quote.clone())); + quotes_for_payment.push((quote, contract_price)); + } + + let payment = SingleNodePayment::from_quotes(quotes_for_payment) + .map_err(|e| Error::Payment(format!("Failed to create payment: {e}")))?; + + Ok(Some(PreparedChunk { + content, + address, + quoted_peers, + payment, + peer_quotes, + })) + } + + /// Pay for multiple chunks in a single EVM transaction. + /// + /// Flattens all quote payments from the prepared chunks into one + /// `wallet.pay_for_quotes()` call, then maps transaction hashes + /// back to per-chunk [`PaymentProof`] bytes. + /// + /// # Errors + /// + /// Returns an error if the wallet is not configured or the on-chain + /// payment fails. + pub async fn batch_pay(&self, prepared: Vec) -> Result> { + if prepared.is_empty() { + return Ok(Vec::new()); + } + + let wallet = self.require_wallet()?; + + // Flatten all quote payments from all chunks into a single batch. + let mut all_payments = + Vec::with_capacity(prepared.len() * prepared[0].payment.quotes.len()); + for chunk in &prepared { + for info in &chunk.payment.quotes { + all_payments.push((info.quote_hash, info.rewards_address, info.amount)); + } + } + + info!( + "Batch payment for {} chunks ({} quote entries)", + prepared.len(), + all_payments.len() + ); + + let (tx_hash_map, _gas_info) = wallet.pay_for_quotes(all_payments).await.map_err( + |evmlib::wallet::PayForQuotesError(err, _)| { + Error::Payment(format!("Batch payment failed: {err}")) + }, + )?; + + info!( + "Batch payment succeeded: {} transactions", + tx_hash_map.len() + ); + + // Map tx hashes back to per-chunk PaymentProofs. + let mut paid_chunks = Vec::with_capacity(prepared.len()); + for chunk in prepared { + let tx_hashes: Vec<_> = chunk + .payment + .quotes + .iter() + .filter(|info| !info.amount.is_zero()) + .filter_map(|info| tx_hash_map.get(&info.quote_hash).copied()) + .collect(); + + let proof = PaymentProof { + proof_of_payment: ProofOfPayment { + peer_quotes: chunk.peer_quotes, + }, + tx_hashes, + }; + + let proof_bytes = serialize_single_node_proof(&proof).map_err(|e| { + Error::Serialization(format!("Failed to serialize payment proof: {e}")) + })?; + + paid_chunks.push(PaidChunk { + content: chunk.content, + address: chunk.address, + quoted_peers: chunk.quoted_peers, + proof_bytes, + }); + } + + Ok(paid_chunks) + } + + /// Upload chunks in waves with pipelined EVM payments. + /// + /// Processes chunks in waves of `PAYMENT_WAVE_SIZE` (64). Within each wave: + /// 1. **Prepare**: collect quotes for all chunks concurrently + /// 2. **Pay**: single EVM transaction for the whole wave + /// 3. **Store**: concurrent chunk replication to close group + /// + /// Stores from wave N overlap with quote collection for wave N+1 + /// via `tokio::join!`. + /// + /// # Errors + /// + /// Returns an error if any payment or store operation fails. + pub async fn batch_upload_chunks(&self, chunks: Vec) -> Result> { + if chunks.is_empty() { + return Ok(Vec::new()); + } + + let total_chunks = chunks.len(); + let concurrency = self.config().chunk_concurrency; + info!("Batch uploading {total_chunks} chunks in waves of {PAYMENT_WAVE_SIZE} (concurrency: {concurrency})"); + + let mut all_addresses = Vec::with_capacity(total_chunks); + let mut seen_addresses: HashSet = HashSet::new(); + + // Deduplicate chunks by content address. + let mut unique_chunks = Vec::with_capacity(total_chunks); + for chunk in chunks { + let address = compute_address(&chunk); + if seen_addresses.insert(address) { + unique_chunks.push(chunk); + } else { + debug!("Skipping duplicate chunk {}", hex::encode(address)); + all_addresses.push(address); + } + } + + // Split into waves. + let waves: Vec> = unique_chunks + .chunks(PAYMENT_WAVE_SIZE) + .map(<[Bytes]>::to_vec) + .collect(); + let wave_count = waves.len(); + + info!( + "{total_chunks} chunks -> {} unique -> {wave_count} waves", + seen_addresses.len() + ); + + let mut pending_store: Option> = None; + + for (wave_idx, wave_chunks) in waves.into_iter().enumerate() { + let wave_num = wave_idx + 1; + + // Pipeline: store previous wave while preparing this one. + let (prepare_result, store_result) = match pending_store.take() { + Some(paid_chunks) => { + let (prep, stored) = tokio::join!( + self.prepare_wave(wave_chunks), + self.store_paid_chunks(paid_chunks) + ); + (prep, Some(stored)) + } + None => (self.prepare_wave(wave_chunks).await, None), + }; + + // Propagate store errors from previous wave. + if let Some(stored) = store_result { + all_addresses.extend(stored?); + } + + let (prepared_chunks, already_stored) = prepare_result?; + all_addresses.extend(already_stored); + + if prepared_chunks.is_empty() { + info!("Wave {wave_num}/{wave_count}: all chunks already stored"); + continue; + } + + info!( + "Wave {wave_num}/{wave_count}: paying for {} chunks", + prepared_chunks.len() + ); + let paid_chunks = self.batch_pay(prepared_chunks).await?; + pending_store = Some(paid_chunks); + } + + // Store the last wave. + if let Some(paid_chunks) = pending_store { + all_addresses.extend(self.store_paid_chunks(paid_chunks).await?); + } + + info!("Batch upload complete: {} addresses", all_addresses.len()); + Ok(all_addresses) + } + + /// Prepare a wave of chunks by collecting quotes concurrently. + /// + /// Returns `(prepared_chunks, already_stored_addresses)`. + async fn prepare_wave(&self, chunks: Vec) -> Result<(Vec, Vec)> { + let chunk_count = chunks.len(); + let chunks_with_addr: Vec<(Bytes, XorName)> = chunks + .into_iter() + .map(|c| { + let addr = compute_address(&c); + (c, addr) + }) + .collect(); + + let results: Vec<(XorName, Result>)> = stream::iter(chunks_with_addr) + .map(|(content, address)| async move { + (address, self.prepare_chunk_payment(content).await) + }) + .buffer_unordered(self.config().chunk_concurrency) + .collect() + .await; + + let mut prepared = Vec::with_capacity(chunk_count); + let mut already_stored = Vec::new(); + + for (address, result) in results { + match result? { + Some(chunk) => prepared.push(chunk), + None => already_stored.push(address), + } + } + + Ok((prepared, already_stored)) + } + + /// Store a batch of paid chunks concurrently to their close groups. + async fn store_paid_chunks(&self, paid_chunks: Vec) -> Result> { + let results: Vec> = stream::iter(paid_chunks) + .map(|chunk| async move { + self.chunk_put_to_close_group(chunk.content, chunk.proof_bytes, &chunk.quoted_peers) + .await + }) + .buffer_unordered(self.config().chunk_concurrency) + .collect() + .await; + + results.into_iter().collect() + } +} diff --git a/ant-core/src/data/client/chunk.rs b/ant-core/src/data/client/chunk.rs index d2dcb15..7292d63 100644 --- a/ant-core/src/data/client/chunk.rs +++ b/ant-core/src/data/client/chunk.rs @@ -14,6 +14,7 @@ use ant_node::core::{MultiAddr, PeerId}; use ant_node::CLOSE_GROUP_MAJORITY; use bytes::Bytes; use futures::stream::{FuturesUnordered, StreamExt}; +use std::future::Future; use std::time::Duration; use tracing::{debug, info, warn}; @@ -54,9 +55,10 @@ impl Client { /// Store a chunk to `CLOSE_GROUP_MAJORITY` peers from the quoted set. /// - /// Sends the PUT concurrently to all `peers` and succeeds once - /// `CLOSE_GROUP_MAJORITY` confirm storage. Peers that already have - /// the chunk count towards the majority. + /// Initially sends the PUT concurrently to the first + /// `CLOSE_GROUP_MAJORITY` peers. If any fail, falls back to the + /// remaining peers in the quoted set until majority is reached or + /// all peers are exhausted. /// /// # Errors /// @@ -70,22 +72,17 @@ impl Client { ) -> Result { let address = compute_address(&content); + let initial_count = peers.len().min(CLOSE_GROUP_MAJORITY); + let (initial_peers, fallback_peers) = peers.split_at(initial_count); + let mut put_futures = FuturesUnordered::new(); - for (peer_id, addrs) in peers { - let content = content.clone(); - let proof = proof.clone(); - let peer_id = *peer_id; - let addrs = addrs.clone(); - put_futures.push(async move { - let result = self - .chunk_put_with_proof(content, proof, &peer_id, &addrs) - .await; - (peer_id, result) - }); + for (peer_id, addrs) in initial_peers { + put_futures.push(self.spawn_chunk_put(content.clone(), proof.clone(), peer_id, addrs)); } let mut success_count = 0usize; let mut failures: Vec = Vec::new(); + let mut fallback_iter = fallback_peers.iter(); while let Some((peer_id, result)) = put_futures.next().await { match result { @@ -102,6 +99,19 @@ impl Client { Err(e) => { warn!("Failed to store chunk on {peer_id}: {e}"); failures.push(format!("{peer_id}: {e}")); + + if let Some((fb_peer, fb_addrs)) = fallback_iter.next() { + debug!( + "Falling back to peer {fb_peer} for chunk {}", + hex::encode(address) + ); + put_futures.push(self.spawn_chunk_put( + content.clone(), + proof.clone(), + fb_peer, + fb_addrs, + )); + } } } } @@ -112,6 +122,23 @@ impl Client { ))) } + /// Spawn a chunk PUT future for a single peer. + fn spawn_chunk_put<'a>( + &'a self, + content: Bytes, + proof: Vec, + peer_id: &'a PeerId, + addrs: &'a [MultiAddr], + ) -> impl Future)> + 'a { + let peer_id_owned = *peer_id; + async move { + let result = self + .chunk_put_with_proof(content, proof, &peer_id_owned, addrs) + .await; + (peer_id_owned, result) + } + } + /// Store a chunk on the Autonomi network with a pre-built payment proof. /// /// Sends to a single peer. Callers that need replication across the diff --git a/ant-core/src/data/client/data.rs b/ant-core/src/data/client/data.rs index c625b40..ff3b73b 100644 --- a/ant-core/src/data/client/data.rs +++ b/ant-core/src/data/client/data.rs @@ -14,7 +14,7 @@ use ant_node::client::compute_address; use bytes::Bytes; use futures::stream::{self, StreamExt, TryStreamExt}; use self_encryption::{decrypt, encrypt, DataMap, EncryptedChunk}; -use tracing::{debug, info, warn}; +use tracing::{debug, info}; /// Result of an in-memory data upload: the `DataMap` needed to retrieve the data. #[derive(Debug, Clone)] @@ -51,17 +51,8 @@ impl Client { .map(|chunk| chunk.content) .collect(); - // Fail-fast: cancel remaining uploads on first error to avoid - // wasting gas on chunks that can't form a complete DataMap. - let mut chunks_stored = 0usize; - let mut upload_stream = stream::iter(chunk_contents) - .map(|content| self.chunk_put(content)) - .buffer_unordered(4); - - while let Some(result) = upload_stream.next().await { - result?; - chunks_stored += 1; - } + let addresses = self.batch_upload_chunks(chunk_contents).await?; + let chunks_stored = addresses.len(); info!("Data uploaded: {chunks_stored} chunks stored ({content_len} bytes original)"); @@ -119,18 +110,11 @@ impl Client { { Ok(result) => result, Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => { - warn!("Merkle needs more peers ({msg}), falling back to per-chunk"); - let mut chunks_stored = 0usize; - let mut s = stream::iter(chunk_contents) - .map(|c| self.chunk_put(c)) - .buffer_unordered(4); - while let Some(result) = s.next().await { - result?; - chunks_stored += 1; - } + info!("Merkle needs more peers ({msg}), falling back to wave-batch"); + let addresses = self.batch_upload_chunks(chunk_contents).await?; return Ok(DataUploadResult { data_map, - chunks_stored, + chunks_stored: addresses.len(), payment_mode_used: PaymentMode::Single, }); } @@ -155,7 +139,7 @@ impl Client { } }, )) - .buffer_unordered(4); + .buffer_unordered(self.config().chunk_concurrency); while let Some(result) = upload_stream.next().await { result?; @@ -169,21 +153,16 @@ impl Client { payment_mode_used: PaymentMode::Merkle, }) } else { - // Standard per-chunk payment path - let mut chunks_stored = 0usize; - let mut upload_stream = stream::iter(chunk_contents) - .map(|content| self.chunk_put(content)) - .buffer_unordered(4); + // Wave-based batch payment path (single EVM tx per wave). + let addresses = self.batch_upload_chunks(chunk_contents).await?; - while let Some(result) = upload_stream.next().await { - result?; - chunks_stored += 1; - } - - info!("Data uploaded: {chunks_stored} chunks stored ({content_len} bytes original)"); + info!( + "Data uploaded: {} chunks stored ({content_len} bytes original)", + addresses.len() + ); Ok(DataUploadResult { data_map, - chunks_stored, + chunks_stored: addresses.len(), payment_mode_used: PaymentMode::Single, }) } diff --git a/ant-core/src/data/client/file.rs b/ant-core/src/data/client/file.rs index c40c011..74f8591 100644 --- a/ant-core/src/data/client/file.rs +++ b/ant-core/src/data/client/file.rs @@ -18,7 +18,7 @@ use std::io::Write; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use tokio::runtime::Handle; -use tracing::{debug, info, warn}; +use tracing::{debug, info}; /// Result of a file upload: the `DataMap` needed to retrieve the file. #[derive(Debug, Clone)] @@ -125,22 +125,13 @@ impl Client { let (mut chunk_rx, datamap_rx, handle) = spawn_file_encryption(path.to_path_buf())?; - // Collect chunks from encryption channel, then upload concurrently. + // Collect all chunks from encryption channel. let mut chunk_contents = Vec::new(); while let Some(content) = chunk_rx.recv().await { chunk_contents.push(content); } - let mut chunks_stored = 0usize; - let mut upload_stream = futures::stream::iter(chunk_contents) - .map(|content| self.chunk_put(content)) - .buffer_unordered(4); - - while let Some(result) = futures::StreamExt::next(&mut upload_stream).await { - result?; - chunks_stored += 1; - } - + // Await encryption completion to catch errors before paying for storage. handle .await .map_err(|e| Error::Encryption(format!("encryption task panicked: {e}")))? @@ -150,6 +141,9 @@ impl Client { .await .map_err(|_| Error::Encryption("no DataMap from encryption thread".to_string()))?; + let addresses = self.batch_upload_chunks(chunk_contents).await?; + let chunks_stored = addresses.len(); + info!( "File uploaded: {chunks_stored} chunks stored ({})", path.display() @@ -184,12 +178,22 @@ impl Client { let (mut chunk_rx, datamap_rx, handle) = spawn_file_encryption(path.to_path_buf())?; - // Collect all chunks first (buffer-then-pay) + // Collect all chunks first (buffer-then-pay). let mut chunk_contents = Vec::new(); while let Some(content) = chunk_rx.recv().await { chunk_contents.push(content); } + // Await encryption completion to catch errors before paying for storage. + handle + .await + .map_err(|e| Error::Encryption(format!("encryption task panicked: {e}")))? + .map_err(|e| Error::Encryption(format!("encryption failed: {e}")))?; + + let data_map = datamap_rx + .await + .map_err(|_| Error::Encryption("no DataMap from encryption thread".to_string()))?; + let chunk_count = chunk_contents.len(); let (chunks_stored, actual_mode) = if self.should_use_merkle(chunk_count, mode) { @@ -209,28 +213,11 @@ impl Client { { Ok(result) => result, Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => { - warn!("Merkle needs more peers ({msg}), falling back to per-chunk"); - let mut s_stored = 0usize; - let mut s = stream::iter(chunk_contents) - .map(|c| self.chunk_put(c)) - .buffer_unordered(4); - while let Some(result) = futures::StreamExt::next(&mut s).await { - result?; - s_stored += 1; - } - - // Need datamap from the encryption handle - handle - .await - .map_err(|e| Error::Encryption(format!("encryption task panicked: {e}")))? - .map_err(|e| Error::Encryption(format!("encryption failed: {e}")))?; - let data_map = datamap_rx.await.map_err(|_| { - Error::Encryption("no DataMap from encryption thread".to_string()) - })?; - + info!("Merkle needs more peers ({msg}), falling back to wave-batch"); + let addresses = self.batch_upload_chunks(chunk_contents).await?; return Ok(FileUploadResult { data_map, - chunks_stored: s_stored, + chunks_stored: addresses.len(), payment_mode_used: PaymentMode::Single, }); } @@ -254,7 +241,7 @@ impl Client { } }, )) - .buffer_unordered(4); + .buffer_unordered(self.config().chunk_concurrency); while let Some(result) = futures::StreamExt::next(&mut upload_stream).await { result?; @@ -262,28 +249,11 @@ impl Client { } (stored, PaymentMode::Merkle) } else { - // Standard per-chunk payment path - let mut stored = 0usize; - let mut upload_stream = stream::iter(chunk_contents) - .map(|content| self.chunk_put(content)) - .buffer_unordered(4); - - while let Some(result) = futures::StreamExt::next(&mut upload_stream).await { - result?; - stored += 1; - } - (stored, PaymentMode::Single) + // Wave-based batch payment path (single EVM tx per wave). + let addresses = self.batch_upload_chunks(chunk_contents).await?; + (addresses.len(), PaymentMode::Single) }; - handle - .await - .map_err(|e| Error::Encryption(format!("encryption task panicked: {e}")))? - .map_err(|e| Error::Encryption(format!("encryption failed: {e}")))?; - - let data_map = datamap_rx - .await - .map_err(|_| Error::Encryption("no DataMap from encryption thread".to_string()))?; - info!( "File uploaded with mode {mode:?}: {chunks_stored} chunks stored ({})", path.display() diff --git a/ant-core/src/data/client/mod.rs b/ant-core/src/data/client/mod.rs index 6901233..c26f05a 100644 --- a/ant-core/src/data/client/mod.rs +++ b/ant-core/src/data/client/mod.rs @@ -3,6 +3,7 @@ //! Provides high-level APIs for storing and retrieving data //! on the Autonomi decentralized network. +pub mod batch; pub mod cache; pub mod chunk; pub mod data; @@ -25,6 +26,19 @@ use tracing::debug; /// Default timeout for network operations in seconds. const CLIENT_TIMEOUT_SECS: u64 = 10; +/// Assumed CPU thread count when `available_parallelism()` fails. +const FALLBACK_THREAD_COUNT: usize = 4; + +/// Derive a sensible default chunk concurrency from available CPU parallelism. +/// +/// Uses half the available threads (network I/O doesn't need 1:1 CPU mapping). +fn default_chunk_concurrency() -> usize { + let threads = std::thread::available_parallelism() + .map(std::num::NonZeroUsize::get) + .unwrap_or(FALLBACK_THREAD_COUNT); + (threads / 2).max(1) +} + /// Configuration for the Autonomi client. #[derive(Debug, Clone)] pub struct ClientConfig { @@ -32,6 +46,11 @@ pub struct ClientConfig { pub timeout_secs: u64, /// Number of closest peers to consider for routing. pub close_group_size: usize, + /// Maximum number of chunks processed concurrently during uploads. + /// + /// Controls parallelism for quote collection, chunk storage, and + /// merkle upload paths. Defaults to half the available CPU threads. + pub chunk_concurrency: usize, } impl Default for ClientConfig { @@ -39,6 +58,7 @@ impl Default for ClientConfig { Self { timeout_secs: CLIENT_TIMEOUT_SECS, close_group_size: CLOSE_GROUP_SIZE, + chunk_concurrency: default_chunk_concurrency(), } } } diff --git a/ant-core/src/data/client/payment.rs b/ant-core/src/data/client/payment.rs index a543a65..04c9064 100644 --- a/ant-core/src/data/client/payment.rs +++ b/ant-core/src/data/client/payment.rs @@ -154,7 +154,7 @@ impl Client { } /// Convert an ant-node `PeerId` to an `EncodedPeerId` for payment proofs. -fn peer_id_to_encoded(peer_id: &PeerId) -> Result { +pub(crate) fn peer_id_to_encoded(peer_id: &PeerId) -> Result { hex_node_id_to_encoded_peer_id(&peer_id.to_hex()) .map_err(|e| Error::Payment(format!("Failed to encode peer ID: {e}"))) } diff --git a/ant-core/tests/e2e_merkle.rs b/ant-core/tests/e2e_merkle.rs index de090ae..dc848f1 100644 --- a/ant-core/tests/e2e_merkle.rs +++ b/ant-core/tests/e2e_merkle.rs @@ -39,6 +39,7 @@ async fn setup_merkle_testnet() -> (Client, MiniTestnet) { let config = ClientConfig { timeout_secs: CLIENT_TIMEOUT_SECS, close_group_size: 20, + ..Default::default() }; let client = Client::from_node(Arc::clone(&node), config).with_wallet(testnet.wallet().clone());