diff --git a/network/src/archive/pack.rs b/network/src/archive/pack.rs index 60892711..11fe95ec 100644 --- a/network/src/archive/pack.rs +++ b/network/src/archive/pack.rs @@ -1,15 +1,15 @@ use std::sync::Arc; use anyhow::{anyhow, Result}; use solana_sdk::pubkey::Pubkey; -use brine_tree::{Leaf, Hash}; +use brine_tree::{Leaf, Hash, MerkleTree}; use tape_api::prelude::*; use tape_client::get_epoch_account; use solana_client::nonblocking::rpc_client::RpcClient; use crate::store::*; -use super::queue::Rx; +use super::queue::{Rx, SegmentJob}; -const LAYER_NUMBER: u8 = 10; +type CanopyTree = MerkleTree<{ SEGMENT_TREE_HEIGHT - SECTOR_TREE_HEIGHT }>; /// Orchestrator Task C – CPU-heavy preprocessing (packx) pub async fn run(rpc: Arc, mut rx: Rx, miner: Pubkey, store: Arc) -> Result<()> { @@ -18,20 +18,11 @@ pub async fn run(rpc: Arc, mut rx: Rx, miner: Pubkey, store: Arc anyhow::Result<()> { log::info!("packx: tape={} seg={}", job.tape, job.seg_no); - let solved = pack_segment(&miner, &job.data, packing_difficulty)?; - store.put_segment(&job.tape, job.seg_no, solved)?; - - // TODO: need a way to check if we need to update the sector root - - // let handle = tokio::runtime::Handle::current(); - // handle.block_on(update_sector_root(&store, &rpc, &job.tape, job.seg_no))?; - - Ok(()) + process_poa_job(&store, miner, packing_difficulty, job) }) .await??; } @@ -39,6 +30,73 @@ pub async fn run(rpc: Arc, mut rx: Rx, miner: Pubkey, store: Arc, + miner: Pubkey, + packing_difficulty: u64, + job: SegmentJob, +) -> anyhow::Result<()> { + let sector_number = job.seg_no / SECTOR_LEAVES as u64; + let local_idx = (job.seg_no % SECTOR_LEAVES as u64) as usize; + + // TODO: rate limit or find a way to determine if this is the last segment in a sector + // (might not be the last segment index for tapes that don't fill the entire last sector). + + let mut sector = get_or_create_sector(store.as_ref(), &job.tape, sector_number)?; + + // Pack the segment, and update the PackedTapeLayer sector root + let solved = pack_segment(&miner, &job.data, packing_difficulty)?; + sector.set_segment(local_idx, &solved); + + let empty_hashes = get_or_create_empty_hashes(store, &job.tape)?; + let empty_leaf = empty_hashes.first().unwrap().as_leaf(); + let miner_bytes = miner.to_bytes(); + + let leaves_unpacked = compute_sector_leaves_unpacked( + §or, + sector_number, + &miner_bytes, + empty_leaf, + )?; + let leaves_packed = compute_sector_leaves_packed( + §or, + sector_number, + empty_leaf, + )?; + + // Compute both roots with the same zero vector + let root_unpacked = compute_sector_root(&leaves_unpacked, &empty_hashes)?; + let root_packed = compute_sector_root(&leaves_packed, &empty_hashes)?; + + update_sector_canopy_with_key( + store.as_ref(), + sector_number, + root_unpacked, + MerkleCacheKey::UnpackedTapeLayer { + address: job.tape, + layer: SECTOR_TREE_HEIGHT as u8, + }, + )?; + + update_sector_canopy_with_key( + store.as_ref(), + sector_number, + root_packed, + MerkleCacheKey::PackedTapeLayer { + address: job.tape, + layer: SECTOR_TREE_HEIGHT as u8, + }, + )?; + + // Finally, add the newly packed segment into the sector itself. + store.put_sector(&job.tape, sector_number, §or)?; + + Ok(()) +} + +/// Packs a segment using the packx algorithm. +/// Can be quite CPU intensive if the difficulty is high. pub fn pack_segment(miner_address: &Pubkey, segment: &[u8], packing_difficulty: u64) -> Result> { let miner_address: [u8; 32] = miner_address.to_bytes(); let canonical_segment = padded_array::(segment); @@ -54,40 +112,91 @@ pub fn pack_segment(miner_address: &Pubkey, segment: &[u8], packing_difficulty: Ok(segment_bytes.to_vec()) } -/// Updates the Merkle subtree for a given sector number. -pub async fn update_sector_root( +/// Computes the Merkle root for the entire tape by using a cached canopy of sector roots. +pub fn get_tape_root( store: &Arc, tape_address: &Pubkey, - sector_number: u64, -) -> Result<()> { +) -> Result { + // canopy height = number of levels above the sector layer up to the root + const CANOPY_HEIGHT: usize = SEGMENT_TREE_HEIGHT - SECTOR_TREE_HEIGHT; - let empty_hashes = get_or_create_empty_hashes(store, tape_address).await?; - let empty_leaf = empty_hashes.first().unwrap().as_leaf(); + // All zero values for the full-height segment tree + let zeros_full = get_or_create_empty_hashes(store, tape_address)?; + if zeros_full.len() != SEGMENT_TREE_HEIGHT { + return Err(anyhow!( + "Invalid zero_values len: expected {}, got {}", + SEGMENT_TREE_HEIGHT, + zeros_full.len() + )); + } - let leaves = compute_sector_leaves(store, tape_address, sector_number, empty_leaf)?; - let root = compute_sector_root(&leaves, &empty_hashes, LAYER_NUMBER)?; + // Zero values for the canopy tree (start at the sector layer) + let canopy_zeros: [Hash; CANOPY_HEIGHT] = zeros_full + [SECTOR_TREE_HEIGHT .. SEGMENT_TREE_HEIGHT] + .try_into() + .map_err(|_| anyhow!( + "Invalid canopy zeros slice: expected {}, got {}", + CANOPY_HEIGHT, + zeros_full.len().saturating_sub(SECTOR_TREE_HEIGHT) + ))?; + + // Build canopy over sector roots + let mut canopy = CanopyTree::from_zeros(canopy_zeros); + + // Load sector roots cached at the sector layer + let sector_roots = store.get_merkle_cache( + &MerkleCacheKey::UnpackedTapeLayer { + address: *tape_address, + layer: SECTOR_TREE_HEIGHT as u8 + } + )?; - update_layer(store, tape_address, LAYER_NUMBER, sector_number, root)?; + for root_bytes in sector_roots.iter() { + let leaf = Leaf::from(*root_bytes); + canopy.try_add_leaf(leaf).expect("Failed to add sector root"); + } - Ok(()) + Ok(canopy.get_root()) } -/// Computes leaves for a sector from the store, filling with empty_leaf as needed. -pub fn compute_sector_leaves( - store: &TapeStore, - tape_address: &Pubkey, +/// Computes packed leaves (stored solution bytes). +pub fn compute_sector_leaves_packed( + sector: &Sector, sector_number: u64, empty_leaf: Leaf, ) -> Result> { - let sector = store.get_sector(tape_address, sector_number)?; + let mut leaves = vec![empty_leaf; SECTOR_LEAVES]; + for i in 0..SECTOR_LEAVES { + if let Some(packed) = sector.get_segment(i) { + let segment_id = (sector_number * SECTOR_LEAVES as u64) + i as u64; + leaves[i] = Leaf::new(&[ + &segment_id.to_le_bytes(), + packed, + ]); + } + } + Ok(leaves) +} +/// Computes unpacked leaves (reconstructed 128-byte data from solutions). +pub fn compute_sector_leaves_unpacked( + sector: &Sector, + sector_number: u64, + miner_bytes: &[u8; 32], + empty_leaf: Leaf, +) -> Result> { let mut leaves = vec![empty_leaf; SECTOR_LEAVES]; - for (i, leaf) in leaves.iter_mut().enumerate() { - if let Some(segment) = sector.get_segment(i) { + for i in 0..SECTOR_LEAVES { + if let Some(packed) = sector.get_segment(i) { + let mut arr = [0u8; PACKED_SEGMENT_SIZE]; + arr.copy_from_slice(&packed[..PACKED_SEGMENT_SIZE]); + let sol = packx::Solution::from_bytes(&arr); + let data_unpacked = sol.unpack(miner_bytes); + let segment_id = (sector_number * SECTOR_LEAVES as u64) + i as u64; - *leaf = Leaf::new(&[ + leaves[i] = Leaf::new(&[ &segment_id.to_le_bytes(), - segment + &data_unpacked, ]); } } @@ -98,7 +207,6 @@ pub fn compute_sector_leaves( pub fn compute_sector_root( leaves: &[Leaf], empty_hashes: &[Hash], - layer_number: u8, ) -> Result { let mut tree = SegmentTree::from_zeros( empty_hashes @@ -113,7 +221,7 @@ pub fn compute_sector_root( tree.next_index = leaves.len() as u64; - let layer_nodes = tree.get_layer_nodes(leaves, layer_number as usize); + let layer_nodes = tree.get_layer_nodes(leaves, SECTOR_TREE_HEIGHT); if layer_nodes.len() != 1 { return Err(anyhow!( "Invalid layer nodes length: expected 1, got {}", @@ -124,15 +232,14 @@ pub fn compute_sector_root( Ok(*layer_nodes.first().unwrap()) } -/// Updates the layer in the store with the new sector root. -pub fn update_layer( +/// Helper to update a specific MerkleCacheKey layer with the new sector root. +pub fn update_sector_canopy_with_key( store: &TapeStore, - tape_address: &Pubkey, - layer_number: u8, sector_number: u64, root: Hash, + key: MerkleCacheKey, ) -> Result<()> { - let mut layer = match store.get_layer(tape_address, layer_number) { + let mut layer = match store.get_merkle_cache(&key) { Ok(layer) => layer, Err(_) => vec![[0u8; 32]; (sector_number + 1) as usize], }; @@ -142,24 +249,25 @@ pub fn update_layer( } layer[sector_number as usize] = root.to_bytes(); - store.put_layer(tape_address, layer_number, &layer)?; + store.put_merkle_cache(&key, &layer)?; Ok(()) } /// Helper to create or init the zero values for a tape -/// Note: This only needs to be done once per tape, the zero values are the height of the tree and -/// are calculated from the tape's merkle seed -pub async fn get_or_create_empty_hashes( +pub fn get_or_create_empty_hashes( store: &Arc, tape_address: &Pubkey, ) -> Result> { - const H: usize = SEGMENT_TREE_HEIGHT; - let empty_values = match store.get_zero_values(tape_address) { + let empty_values = match store.get_merkle_cache( + &MerkleCacheKey::ZeroValues { + address: *tape_address + } + ) { Ok(empty_values) => empty_values, Err(_) => { // Create an empty SegmentTree to get the zero values - let tree = SegmentTree::new(&[&tape_address.as_ref()]); + let tree = SegmentTree::new(&[tape_address.as_ref()]); let empty_values = tree.zero_values; let seeds_bytes = empty_values .into_iter() @@ -167,13 +275,20 @@ pub async fn get_or_create_empty_hashes( .collect::>(); // Throw the empty_values into the store for future use - store.put_zero_values(tape_address, &seeds_bytes)?; + store.put_merkle_cache( + &MerkleCacheKey::ZeroValues { address: *tape_address }, + &seeds_bytes + )?; seeds_bytes } }; - if empty_values.len() != H { - return Err(anyhow!("Invalid number of zero values: expected {}, got {}", H, empty_values.len())); + if empty_values.len() != SEGMENT_TREE_HEIGHT { + return Err( + anyhow!("Invalid number of zero values: expected {}, got {}", + SEGMENT_TREE_HEIGHT, + empty_values.len() + )); } // Convert empty values to Hash type @@ -185,129 +300,172 @@ pub async fn get_or_create_empty_hashes( Ok(empty_hashes) } +fn get_or_create_sector( + store: &TapeStore, + tape_address: &Pubkey, + sector_number: u64, +) -> Result { + match store.get_sector(tape_address, sector_number) { + Ok(s) => Ok(s), + Err(StoreError::SectorNotFoundForAddress(_, _)) => Ok(Sector::new()), + Err(e) => Err(e), + } +} #[cfg(test)] mod tests { use super::*; use solana_sdk::pubkey::Pubkey; use tempdir::TempDir; - use brine_tree::{Leaf, Hash}; + use brine_tree::{Leaf, get_cached_merkle_proof}; - fn setup_store() -> Result<(TapeStore, TempDir), StoreError> { + fn setup_store() -> Result<(Arc, TempDir), StoreError> { let temp_dir = TempDir::new("rocksdb_test").map_err(StoreError::IoError)?; let store = TapeStore::new(temp_dir.path())?; - Ok((store, temp_dir)) + Ok((Arc::new(store), temp_dir)) } fn create_segment_data(marker: u8, miner: &Pubkey) -> Vec { + const TEST_DIFFICULTY: u32 = 0; + let data = &[marker; SEGMENT_SIZE]; let canonical_segment = padded_array::(data); let solution = packx::solve( &miner.to_bytes(), &canonical_segment, - 0 - ).expect("Failed to pack segment"); + TEST_DIFFICULTY + ).expect("Failed to pack segment"); solution.to_bytes().to_vec() } - fn create_empty_hashes(seed: &[u8]) -> Vec { - let tree = SegmentTree::new(&[seed]); - tree.zero_values.iter().copied().collect() - } - - #[test] - fn test_subtree_update() -> Result<()> { + fn test_with_larger_stack() -> Result<()> { // Setup store and tape address let (store, _temp_dir) = setup_store()?; - let tape_address = Pubkey::new_unique(); + + // Setup our tape and miner let miner_address = Pubkey::new_unique(); + let tape_address = Pubkey::new_unique(); + let mut tape_tree = SegmentTree::new(&[tape_address.as_ref()]); + let mut leaves = vec![]; - // Create empty hashes and store them - let seed = b"test_seed"; - let empty_hashes = create_empty_hashes(seed); - let empty_hashes_bytes = empty_hashes.iter().map(|h| h.to_bytes()).collect::>(); - store.put_zero_values(&tape_address, &empty_hashes_bytes)?; - let empty_leaf = empty_hashes.first().unwrap().as_leaf(); + let empty_values = get_or_create_empty_hashes(&store, &tape_address)?; + let empty_leaf = empty_values[0].as_leaf(); + assert_eq!(empty_values.len(), SEGMENT_TREE_HEIGHT); + assert_eq!(empty_values, empty_values); - // Create a bunch of leaves (fill 2.5 sectors) + // Fill the tape with some segments (2.5 sectors worth) + let miner_bytes = miner_address.to_bytes(); let leaf_count = (SECTOR_LEAVES as f64 * 2.5) as usize; - let mut tree = SegmentTree::new(&[seed]); - let mut leaves = vec![]; - for i in 0..leaf_count { let segment_id = i as u64; - let segment_data = create_segment_data(1, &miner_address); + let segment_data_packed = create_segment_data(1, &miner_address); + + // For the in-memory expected tree, use UNPACKED bytes + let mut sol_bytes = [0u8; PACKED_SEGMENT_SIZE]; + sol_bytes.copy_from_slice(&segment_data_packed[..PACKED_SEGMENT_SIZE]); + let sol = packx::Solution::from_bytes(&sol_bytes); + let data_unpacked = sol.unpack(&miner_bytes); + let leaf = Leaf::new(&[ &segment_id.to_le_bytes(), - &segment_data + &data_unpacked ]); - // Add leaf to the tree - tree.try_add_leaf(leaf).expect("Failed to add leaf"); - - // Store the segment in a sector - let sector_number = segment_id / SECTOR_LEAVES as u64; - let local_seg_idx = (segment_id % SECTOR_LEAVES as u64) as usize; - let mut sector = store - .get_sector(&tape_address, sector_number) - .unwrap_or_else(|_| Sector::new()); - sector.set_segment(local_seg_idx, &segment_data); - store.put_sector(&tape_address, sector_number, §or)?; - - // Collect leaves so we can verify later + tape_tree.try_add_leaf(leaf).expect("Failed to add leaf"); + store.put_segment(&tape_address, segment_id, segment_data_packed)?; leaves.push(leaf); } - // Verify sector count - assert_eq!(store.get_sector_count(&tape_address)?, 3); - - // Add a new leaf and test subtree update - let segment_number = leaves.len() as u64; - let sector_number = segment_number / SECTOR_LEAVES as u64; - let segment_data = create_segment_data(42, &miner_address); - let new_leaf = Leaf::new(&[ - &segment_number.to_le_bytes(), - &segment_data - ]); - - tree.try_add_leaf(new_leaf).expect("Failed to add new leaf"); - leaves.push(new_leaf); - - // Store the leaf so that compute_sector_leaves can find it - let mut sector = store - .get_sector(&tape_address, sector_number) - .unwrap_or_else(|_| Sector::new()); - let local_seg_idx = (segment_number % SECTOR_LEAVES as u64) as usize; - sector.set_segment(local_seg_idx, &segment_data); - store.put_sector(&tape_address, sector_number, §or)?; - - // Compute leaves for the sector - let sector_leaves = compute_sector_leaves(&store, &tape_address, sector_number, empty_leaf)?; - - assert_eq!(sector_leaves.len(), SECTOR_LEAVES, "Incorrect number of leaves"); - assert_eq!(sector_leaves[local_seg_idx], new_leaf); - - // Compute the sector root - let root = compute_sector_root(§or_leaves, &empty_hashes, 10)?; - - // Verify root using the existing tree - let layer_nodes = tree.get_layer_nodes(&leaves, 10); - - assert_eq!(layer_nodes.len(), 3); - assert_eq!(root, layer_nodes[sector_number as usize]); + let expected_vals = tape_tree.get_layer_nodes(&leaves, SECTOR_TREE_HEIGHT); + let expected_canopy = expected_vals + .iter() + .map(|h| h.to_bytes()) + .collect::>(); + + // Calculate sector roots and update the stored canopy values + for sector_number in 0..=2 { + let sector = store.get_sector(&tape_address, sector_number)?; + + let leaves_unpacked = compute_sector_leaves_unpacked( + §or, + sector_number, + &miner_bytes, + empty_leaf + )?; + let root_unpacked = compute_sector_root(&leaves_unpacked, &empty_values)?; + + update_sector_canopy_with_key( + &store, + sector_number, + root_unpacked, + MerkleCacheKey::UnpackedTapeLayer { + address: tape_address, + layer: SECTOR_TREE_HEIGHT as u8, + }, + )?; + } - // Update the layer - update_layer(&store, &tape_address, 10, sector_number, root)?; + let actual_canopy = store.get_merkle_cache( + &MerkleCacheKey::UnpackedTapeLayer { + address: tape_address, + layer: SECTOR_TREE_HEIGHT as u8 + } + )?; + + assert_eq!(expected_canopy.len(), actual_canopy.len()); + assert_eq!(expected_canopy, actual_canopy); + + // Lets try a Merkle proof for a segment + let segment_number : usize = 1234; + let sector_number = (segment_number as u64) / SECTOR_LEAVES as u64; + let sector = store.get_sector(&tape_address, sector_number)?; + + let expected_proof = tape_tree.get_proof(&leaves, segment_number); // <- Requires all leaves in memory (bad) + let actual_proof = get_cached_merkle_proof( // <- Only one sector in memory (good) + &tape_tree, + segment_number, + SECTOR_TREE_HEIGHT, + &expected_vals, + |i| { + let local_idx = i % SECTOR_LEAVES; + match sector.get_segment(local_idx) { // <- Look ma, no store access here! + Some(packed) => { + let mut arr = [0u8; PACKED_SEGMENT_SIZE]; + arr.copy_from_slice(&packed[..PACKED_SEGMENT_SIZE]); + let sol = packx::Solution::from_bytes(&arr); + let data_unpacked = sol.unpack(&miner_bytes); + + Some(Leaf::new(&[ + &(i as u64).to_le_bytes(), + &data_unpacked + ])) + } + None => Some(empty_leaf), + } + } + ); + + assert_eq!(expected_proof, actual_proof); + + // Verify the tape root matches our computed root + let computed_root = get_tape_root(&store, &tape_address)?; + assert_eq!(computed_root, tape_tree.get_root()); - // Verify the layer - let layer = store.get_layer(&tape_address, 10)?; - assert_eq!(layer.len(), sector_number as usize + 1); + Ok(()) + } - // The first two will be empty because we never called update_layer for them - assert_eq!(layer[0], [0u8; 32]); - assert_eq!(layer[1], [0u8; 32]); - assert_eq!(layer[sector_number as usize], root.to_bytes()); + #[test] + fn test_subtree_update() -> Result<()> { + // TODO: get to the bottom of what is eating the stack space in this test + + let _ = std::thread::Builder::new() + .name("larger_stack".into()) + .stack_size(4 * 1024 * 1024) + .spawn(|| test_with_larger_stack()) + .unwrap() + .join() + .unwrap(); Ok(()) } diff --git a/network/src/store/column/merkle.rs b/network/src/store/column/merkle.rs index 771a9894..46f3cca7 100644 --- a/network/src/store/column/merkle.rs +++ b/network/src/store/column/merkle.rs @@ -1,68 +1,246 @@ use solana_sdk::pubkey::Pubkey; use crate::store::*; +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum MerkleCacheKey { + ZeroValues { address: Pubkey }, // Merkle tree zero hashes + UnpackedTapeLayer { address: Pubkey, layer: u8 }, // Unpacked tape layer nodes + PackedTapeLayer { address: Pubkey, layer: u8 }, // Packed tape layer nodes + Raw36([u8; 36]), +} + +impl MerkleCacheKey { + #[inline(always)] + fn to_vec(&self) -> Vec { + match *self { + MerkleCacheKey::ZeroValues { address } => + build_key(&address, MERKLE_ZEROS, 0), + + MerkleCacheKey::UnpackedTapeLayer { address, layer } => + build_key(&address, TAPE_LAYER, layer), + + MerkleCacheKey::PackedTapeLayer { address, layer } => + build_key(&address, MINER_LAYER, layer), + + MerkleCacheKey::Raw36(bytes) => bytes.to_vec(), + } + } +} + pub trait MerkleOps { - fn get_hash_values(&self, key: Vec) -> Result, StoreError>; - fn put_hash_values(&self, key: Vec, values: &[[u8; 32]]) -> Result<(), StoreError>; + fn get_merkle_cache(&self, key: &MerkleCacheKey) -> Result, StoreError>; + fn put_merkle_cache(&self, key: &MerkleCacheKey, values: &[[u8; 32]]) -> Result<(), StoreError>; - fn get_zero_values(&self, address: &Pubkey) -> Result, StoreError>; - fn put_zero_values(&self, address: &Pubkey, values: &[[u8; 32]]) -> Result<(), StoreError>; + /// Optional: compile-time length check, returns a plain array. + fn get_merkle_cache_fixed(&self, key: &MerkleCacheKey) + -> Result<[[u8; 32]; N], StoreError> + { + let v = self.get_merkle_cache(key)?; + if v.len() != N { + return Err(StoreError::InvalidHashSize(v.len() * 32)); + } - fn get_layer(&self, tape_address: &Pubkey, layer:u8) -> Result, StoreError>; - fn put_layer(&self, tape_address: &Pubkey, layer:u8, values: &[[u8; 32]]) -> Result<(), StoreError>; + let arr: [[u8; 32]; N] = v + .try_into() + .map_err(|vv: Vec<[u8; 32]>| StoreError::InvalidHashSize(vv.len() * 32))?; + Ok(arr) + } } impl MerkleOps for TapeStore { - fn get_hash_values(&self, key: Vec) -> Result, StoreError> { + fn get_merkle_cache(&self, key: &MerkleCacheKey) -> Result, StoreError> { let cf = self.get_cf_handle(ColumnFamily::MerkleHashes)?; - let data = self - .db - .get_cf(&cf, &key)? - .ok_or(StoreError::HashNotFound)?; - - let mut result = vec![]; + let k = key.to_vec(); + let data = self.db.get_cf(&cf, &k)?.ok_or(StoreError::HashNotFound)?; + if data.len() % 32 != 0 { + return Err(StoreError::InvalidHashSize(data.len())); + } + let mut out = Vec::with_capacity(data.len() / 32); for chunk in data.chunks_exact(32) { let mut arr = [0u8; 32]; arr.copy_from_slice(chunk); - result.push(arr); + out.push(arr); } - Ok(result) + Ok(out) } - fn put_hash_values(&self, key: Vec, values: &[[u8; 32]]) -> Result<(), StoreError> { + fn put_merkle_cache(&self, key: &MerkleCacheKey, values: &[[u8; 32]]) -> Result<(), StoreError> { let cf = self.get_cf_handle(ColumnFamily::MerkleHashes)?; + let k = key.to_vec(); + let data: Vec = values.iter().flatten().copied().collect(); + self.db.put_cf(&cf, &k, &data)?; + Ok(()) + } +} + +#[inline(always)] +fn build_key(address: &Pubkey, layer_type: u8, layer_id: u8) -> Vec { + let mut key = Vec::with_capacity(36); + key.extend_from_slice(&address.to_bytes()); // 32 + key.push(layer_id); // 1 + key.extend_from_slice(&[layer_type, 0, 0]); // 3 (type + padding) + key +} + + +#[cfg(test)] +mod tests { + use super::*; + use tempdir::TempDir; + + fn setup_store() -> Result<(TapeStore, TempDir), StoreError> { + let tmp = TempDir::new("rocksdb_test").map_err(StoreError::IoError)?; + let store = TapeStore::new(tmp.path())?; + Ok((store, tmp)) + } - let data = values.iter().flatten().copied().collect::>(); - self.db.put_cf(&cf, &key, &data)?; + fn h(i: u8) -> [u8; 32] { + [i; 32] + } + + #[test] + fn merkle_zero_values_roundtrip_fixed() -> Result<(), StoreError> { + let (store, _tmp) = setup_store()?; + let addr = Pubkey::new_unique(); + + // Prepare exactly SECTOR_TREE_HEIGHT hashes + let mut vals = Vec::<[u8; 32]>::with_capacity(SECTOR_TREE_HEIGHT); + for i in 0..SECTOR_TREE_HEIGHT { + vals.push(h(i as u8)); + } + + // Put + store.put_merkle_cache( + &MerkleCacheKey::ZeroValues { address: addr }, + &vals, + )?; + + // Get + let got: [[u8; 32]; SECTOR_TREE_HEIGHT] = store.get_merkle_cache_fixed( + &MerkleCacheKey::ZeroValues { address: addr }, + )?; + + assert_eq!(got.as_slice(), vals.as_slice()); Ok(()) } - fn get_zero_values(&self, address: &Pubkey) -> Result, StoreError> { - let key = build_key(address, MERKLE_ZEROS, 0); - self.get_hash_values(key) + #[test] + fn merkle_unpacked_layer_roundtrip_vec() -> Result<(), StoreError> { + let (store, _tmp) = setup_store()?; + let addr = Pubkey::new_unique(); + let layer = SECTOR_TREE_HEIGHT as u8; + + // Arbitrary set of hashes for the layer + let vals = vec![h(1), h(2), h(3), h(4)]; + + store.put_merkle_cache( + &MerkleCacheKey::UnpackedTapeLayer { address: addr, layer }, + &vals, + )?; + + let got = store.get_merkle_cache( + &MerkleCacheKey::UnpackedTapeLayer { address: addr, layer }, + )?; + + assert_eq!(got, vals); + Ok(()) } - fn put_zero_values(&self, address: &Pubkey, values: &[[u8; 32]]) -> Result<(), StoreError> { - let key = build_key(address, MERKLE_ZEROS, 0); - self.put_hash_values(key, values) + #[test] + fn merkle_packed_layer_roundtrip_vec() -> Result<(), StoreError> { + let (store, _tmp) = setup_store()?; + let addr = Pubkey::new_unique(); + let layer = (SECTOR_TREE_HEIGHT as u8).saturating_add(1); + + let vals = vec![h(9), h(8)]; + + store.put_merkle_cache( + &MerkleCacheKey::PackedTapeLayer { address: addr, layer }, + &vals, + )?; + + let got = store.get_merkle_cache( + &MerkleCacheKey::PackedTapeLayer { address: addr, layer }, + )?; + + assert_eq!(got, vals); + Ok(()) } - fn get_layer(&self, address: &Pubkey, layer: u8) -> Result, StoreError> { - let key = build_key(address, TAPE_LAYER, layer); - self.get_hash_values(key) + #[test] + fn merkle_raw36_key_roundtrip() -> Result<(), StoreError> { + let (store, _tmp) = setup_store()?; + let addr = Pubkey::new_unique(); + let raw_key: [u8; 36] = { + let v = build_key(&addr, TAPE_LAYER, 7); + let mut arr = [0u8; 36]; + arr.copy_from_slice(&v); + arr + }; + + let vals = vec![h(42), h(43), h(44)]; + + store.put_merkle_cache(&MerkleCacheKey::Raw36(raw_key), &vals)?; + let got = store.get_merkle_cache(&MerkleCacheKey::Raw36(raw_key))?; + + assert_eq!(got, vals); + Ok(()) } - fn put_layer(&self, address: &Pubkey, layer: u8, values: &[[u8; 32]]) -> Result<(), StoreError> { - let key = build_key(address, TAPE_LAYER, layer); - self.put_hash_values(key, values) + #[test] + fn merkle_not_found_returns_error() -> Result<(), StoreError> { + let (store, _tmp) = setup_store()?; + let addr = Pubkey::new_unique(); + + let res = store.get_merkle_cache( + &MerkleCacheKey::UnpackedTapeLayer { address: addr, layer: 0 }, + ); + + assert!(matches!(res, Err(StoreError::HashNotFound))); + Ok(()) } -} -#[inline(always)] -fn build_key(address: &Pubkey, layer_type: u8, layer_id: u8) -> Vec { - let mut key = Vec::with_capacity(36); - key.extend_from_slice(&address.to_bytes()); - key.push(layer_id); - key.extend_from_slice(&[layer_type, 0, 0]); - key + #[test] + fn merkle_invalid_stored_length_is_rejected() -> Result<(), StoreError> { + let (store, _tmp) = setup_store()?; + let addr = Pubkey::new_unique(); + let key_vec = build_key(&addr, TAPE_LAYER, 3); + + // Manually write an invalid-length value (not a multiple of 32) + let cf = store.get_cf_handle(ColumnFamily::MerkleHashes)?; + store.db.put_cf(&cf, &key_vec, &[0u8; 5])?; + + let res = store.get_merkle_cache( + &MerkleCacheKey::UnpackedTapeLayer { address: addr, layer: 3 }, + ); + + match res { + Err(StoreError::InvalidHashSize(len)) => { + assert_eq!(len, 5); + } + other => panic!("expected InvalidHashSize, got {:?}", other), + } + Ok(()) + } + + #[test] + fn merkle_fixed_len_mismatch_is_rejected() -> Result<(), StoreError> { + let (store, _tmp) = setup_store()?; + let addr = Pubkey::new_unique(); + + // Store 3 hashes + let vals = vec![h(1), h(2), h(3)]; + store.put_merkle_cache( + &MerkleCacheKey::PackedTapeLayer { address: addr, layer: 1 }, + &vals, + )?; + + // Ask for a different fixed size (e.g., 4) => should error + let res = store.get_merkle_cache_fixed::<4>( + &MerkleCacheKey::PackedTapeLayer { address: addr, layer: 1 }, + ); + + assert!(matches!(res, Err(StoreError::InvalidHashSize(96)))); + Ok(()) + } } diff --git a/network/src/store/column/mod.rs b/network/src/store/column/mod.rs index 96fd70fb..9a124e24 100644 --- a/network/src/store/column/mod.rs +++ b/network/src/store/column/mod.rs @@ -9,5 +9,5 @@ pub use health::{StoreStaticKeys, HealthOps}; pub use tape::TapeOps; pub use segment::SegmentOps; pub use sector::{SectorOps, Sector}; -pub use merkle::MerkleOps; +pub use merkle::{MerkleOps, MerkleCacheKey}; pub use stats::{LocalStats, StatsOps}; diff --git a/network/src/store/consts.rs b/network/src/store/consts.rs index fb9c1100..ef46750c 100644 --- a/network/src/store/consts.rs +++ b/network/src/store/consts.rs @@ -1,4 +1,5 @@ -pub const SECTOR_LEAVES: usize = 1 << 10; +pub const SECTOR_TREE_HEIGHT: usize = 10; +pub const SECTOR_LEAVES: usize = 1 << SECTOR_TREE_HEIGHT; // 1024 leaves pub const SECTOR_BITMAP_BYTES: usize = SECTOR_LEAVES / 8; pub const SECTOR_HEADER_BYTES: usize = SECTOR_BITMAP_BYTES + 32;